5.5 从Job.submit()开始的第二段流程
如前所述,提交作业的三种方法都汇聚到了Job.submit()。到了Job.submit(),提交作业的流程就进入了“地方”上的第二个阶段,要把作业提交到“中央”去了。如果把Hadoop集群中的每一个节点都看作一个岛屿,那么这就是要“出海”,涉及跨节点的操作了。
我们在前面做过Job类的摘要,但是那其实只是一部分,现在我们需要从另一个视角再做一个摘要了:
class Job extends JobContextImpl implements JobContext {} ]submit() ]setUseNewAPI() ]connect() ]getJobSubmitter(FileSystem fs, ClientProtocol submitClient) > new JobSubmitter(fs, submitClient) ]isUber() //是否“拼车”模式(MapTask与ReduceTask在同一节点上) ]setPartitionerClass()//Mapper的输出可能要由Partitioner按某种规则分发给多个Reducer ]setMapSpeculativeExecution() //是否需要有 Speculative的Mapper起预备队的作用 ]setReduceSpeculativeExecution()//是否需要有 Speculative的Reducer起预备队的作用 ]setCacheFiles()
]…
我们看Job.submit()的代码。
[Job.submit()] public void submit(){ ensureState(JobState.DEFINE); //确认没有重复提交 setUseNewAPI(); //根据配置信息确定是否采用新API connect(); //建立与集群的连接,创建Cluster对象cluster final JobSubmitter submitter= getJobSubmitter(cluster.getFileSystem(), cluster.getClient()); status=ugi.doAs(new PrivilegedExceptionAction<JobStatus>(){ publ ic JobStatus run(){ return submitter.submitJobInternal(Job.this, cluster); } }); state=JobState.RUNNING; LOG.info("The url to track the j ob:"+getTrackingURL()); }
这里先说明一下Job.setUseNewAPI()。这个函数根据配置文件中的若干配置项确定本作业所采用的是新API还是老API,并生成显式的配置项“mapred.mapper.new-api”和“mapred.reducer.new-api”,并将之写入内存中的配置块,即Job.conf对象(这是个org.apache.hadoop.mapred.JobConf对象,见JobContextImpl类的定义)。如前所述,提交作业的API有新老之分,最后都汇聚到Job.submit(),都已转换成新API。这里的Job是定义于新API的org.apache.hadoop.mapreduce.Job。但是这API不仅仅是作业提交的API,同时也是MapReduce计算框架的API。更具体地说,Mapper和Reducer的API也有新老之分。现在是时候进一步加以说明了。
在早期的Hadoop中,Mapper和Reducer都定义为interface。当然,界面Mapper上定义了函数map(),界面Reducer上定义了函数reduce()。而具体的应用,则须实现这两个界面,提供实际的mapper和reducer。以前述示例一的采用老API的ValueAggregator为例,则先定义一个抽象类ValueAggregatorJobBase:
abstract class ValueAggregatorJobBase<K1 extends WritableComparable, V1 extends Writable> implements Mapper<K1, V1, Text, Text>, Reducer<Text, Text, Text, Text> {}
这个抽象类虽说是实现了Mapper和Reducer两个界面,实际上却并未提供map()和reduce()这两种操作,那是由扩充了这个ValueAggregatorJobBase的ValueAggregatorMapper和ValueAggregatorReducer提供的,例如:
class ValueAggregatorMapper<K1 extends WritableComparable, V1 extends Writable> extends ValueAggregatorJobBase<K1, V1> {} ]map()
ValueAggregatorReducer也是一样。这就是采用老API的mapper和reducer。
而新API就不一样了。在新API上,Mapper和Reducer都是class:
class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {} ]map() class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {} ]reduce()
而具体的应用,例如WordCount,则有:
class WordCount {} ]static class TokenizerMapper extendsMapper<Object, Text, Text, IntWritable>{} ]static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable>{}
所以,在新老两种API中,连mapper和reducer的类型都是不一样的。尽管作业提交的流程都汇入了新API上的Job.submit(),但是当然应该告诉将来的执行者,这个具体的应用所采取的是新API还是老API。
这个信息通过Job.setUseNewAPI()设置在Job.conf中,这个Job类是新API上的Job类,这是对JobContextImpl的扩充,其成分conf是从后者继承来的。注意不要把这Job类跟老API上的Job类混淆,那是对ControlledJob的扩充。
之所以会如此地弯弯绕绕,应该是出于版本更新的需要。具体的代码就留给读者自己阅读和研究了。再次提醒读者,遇有同名的类型定义时,一定要注意是在什么package中定义,而加以引用的代码所import的又是哪一个package。
回到上面的代码,既然要跨节点操作,就得建立起对外的联系,上面对connect()的调用就起着这个作用。
然后,就像要找一个懂得怎样跟“中央”打交道提交作业的办事员一样,要创建一个专门干这活儿的JobSubmitter,它的submitJobInternal()方法就是专做这件事情的。当程序从submitJobInternal()返回的时候,作业的提交已经完成,所以就把作业的状态改成RUNNING。至于ugi.doAs()的作用,前面在示例一中已经介绍过了。
初看之下Job.submit()似乎没几行程序,但实际上却是相当复杂的一个过程。我们先看Job.connect()的代码。
[Job.submit()> Job.connect()] private synchronized void connect() throws IOException, InterruptedException, ClassNotFoundException { if (cluster==null){ //如果cluster尚未创建 cluster=ugi.doAs(new PrivilegedExceptionAction<Cluster>(){ publ ic Cluster run(){ return new Cluster(getConfiguration()); } });
} }
可见connect()的作用就是保证节点上有个Cluster类对象,如果还没有,就创建一个。顾名思义,Cluster类对象中应该存有与集群有关的信息,知道如何与集群打交道,其中也包括与“中央”打交道。但是这些信息从哪里来呢?这不是来自集群,因为此刻我们还不知道怎么跟集群打交道。所以这些信息只能来自配置文件。我们看Cluster类的摘要:
class Cluster{} ]ClientProtocolProvider clientProtocolProvider //集群条件下为YarnClientProtocolProvider ]Cl ientProtocol client //在集群条件下,这是与外界通信的渠道和规则 ]static ServiceLoader<ClientProtocolProvider> frameworkLoader= ServiceLoader.load(ClientProtocolProvider.class) ]… //其他数据 ]static {ConfigUtil.loadResources(); } //类的静态初始化 > addDeprecatedKeys(); //为保持与老API兼容而设 > Configuration.addDefaultResource("mapred-default.xml"); > Configuration.addDefaultResource("mapred-site.xml"); > Configuration.addDefaultResource("yarn-default.xml"); > Configuration.addDefaultResource("yarn-site.xml"); ]Cluster(Configuration conf) //构造方法1 > this(null, conf) ]Cluster(InetSocketAddress jobTrackAddr, Configuration conf)//构造方法2,给定主节点地址 > this.conf=conf > this.ugi=UserGroupInformation.getCurrentUser() > initialize(jobTrackAddr, conf) ]initialize(InetSocketAddress jobTrackAddr, Configuration conf) > for (ClientProtocolProvider provider:frameworkLoader){ >> ClientProtocol clientProtocol=null >> if (jobTrackAddr==null){ >>> clientProtocol=provider.create(conf) >> }else { >>> clientProtocol=provider.create(jobTrackAddr, conf) >> } >> clientProtocolProvider=provider >> client=cl ientProtocol > } //end for ]getFileSystem() //获取集群的文件系统 ]getCl ient() > return cl ient ]…
这里有个静态初始化过程,在此过程中通过ConfigUtil.loadResources()装载了一批“资源”,具体都是.xml配置文件,主要有mapred-default.xml、mapred-site.xml和yarn-default.xml、yarn-site.xml。其中前两个配置文件用于老通信机制的实现,后两个用于新机制的实现。此外还有一些现在已不再使用,只是为保持与老版本兼容所需的资源。前面讲过,静态初始化是针对类的,而不是针对具体对象的,所以只执行一次。当然,这些配置文件都存在于本节点的磁盘上,其中例如mapred-default.xml是全局性的,所有节点上的这个文件应保持一致,而mapred-site.xml是只针对所在节点的,不同节点上的配置必然有所不同。
创建对象的时候要对有初始赋值的变量成分进行赋值,所以这里的frameworkLoader会得到赋值。这个变量的类型是ServiceLoader<ClientProtocolProvider>,就是针对ClientProtocolProvider类的ServiceLoader,而且这就是通过ServiceLoader.load()装载的。
这里首先要解释一下什么是ClientProtocolProvider和ClientProtocol。用户向RM节点提交作业,是要RM为其安排运行,所以RM起着服务提供者的作用,而用户则处于客户的位置。既然如此,双方就得有个协议,对于双方怎么交互,乃至服务怎么提供,都得有个规定。在Hadoop的代码中,这所谓Protocol甚至被“上纲上线”到了计算框架的高度,连是否采用YARN框架也被纳入了这个范畴。实际上ClientProtocol就起着这样的作用,而ClientProtocolProvider顾名思义是ClientProtocol的提供者,起着有点像是Factory的作用。至于ServiceLoader<ClientProtocolProvider>,那是用来装载ClientProtocolProvider的。
ServiceLoader是由JDK提供的一个类,在源文件Cluster.java的头部有一行“import java.util.ServiceLoader; ”说明运行时需要导入这个工具性质的类。在OpenJDK的代码中,这个类的摘要是这样的:
class ServiceLoader<S> implements Iterable<S>{} ]String PREFIX="META-INF/services/" //关于这个 service的信息在某个目录下面 ]Class<S> service ]ClassLoader loader ]LinkedHashMap<String, S> providers ]Iterator<S> iterator() ]load(Class<S> service) ]parse(Class service, URL u) //URL可以是文件路径
就是说,ServiceLoader是针对某种服务S的,在这里是ClientProtocolProvider。关于这种服务的“元信息”在Hadoop源码某个分支上的“META-INF/services/”目录下。对于ClientProtocolProvider应有名为org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider的文件存在。事实上,在Hadoop的源码包中有两个这样的文件,一个是在mapreduce-project/mapreduce-client/hadoop-mapreduce-client-jobclient分支上,另一个是在mapreduce-proj ect/mapreduce-client/hadoop-mapreduce-client-common分支上。前者的内容(除注释之外)是路径org.apache.hadoop.mapred.YarnClientProtocolProvider;后者的内容则为org.apache.hadoop.mapred.LocalClientProtocolProvider。
ServiceLoader实现了Iterable界面,提供一个iterator()函数,因而可以用在for循环中。它还提供了一个load()方法,可以通过ClassLoader加载Class。此外,它还提供解析文件内容的功能。
装载了作为ServiceLoader对象的frameworkLoader,其LinkedHashMap中就有了上述的两个路径,这样就可以通过其iterator()函数依次引用这两个路径了。
然后,在Cluster类的构造函数中就会调用其initialize (),目的是要创建ClientProtocolProvider和ClientProtocol。
但是ClientProtocolProvider是个抽象类,这意味着只有继承和扩充了这个抽象类的具体类才能被实体化成对象。Hadoop的源码中一共只有两个类扩充和落实了这个抽象类,那就是LocalClientProtocolProvider和YarnClientProtocolProvider。
class LocalClientProtocolProvider extends ClientProtocolProvider{} class YarnClientProtocolProvider extends ClientProtocolProvider{}
可想而知,由这两种ClientProtocolProvider提供的ClientProtocol也是不一样的。事实上ClientProtocol是个界面,实现了这个界面的类也有两个,分别为LocalJobRunner和YARNRunner。但是实际使用的只能是其中之一。
那么究竟应该是哪一个呢?可以通过配置项加以设定。
配置文件mapred-default.xml中有个配置项“mapreduce.framework.name”,用来设置所用的计算框架。配置项的值可以是local、classic或yarn。其中classic是指2.0版以前老的框架,那时候还没有YARN;而yarn当然是指在集群上运行YARN框架;至于local,那是在单机上进行MapRduce计算,严格说来那就不是YARN框架了。当然,在单机上运行比在集群上运行要简单得多,所以这两种情况下所创建的ClientProtocol是不一样的,一个是LocalJobRunner,另一个是YARNRunner。至于classic则已经不用了。
<property> <name>mapreduce.framework.name</name> <value>local</value> <description>The runtime framework for executingMapReduce jobs. Can be one of local, classic or yarn. </description> </property>
注意,这里设置的是local。这是因为,人们把Hadoop下载过来,第一步总是先在单机上试试,稍微熟悉一下,然后才会跑到集群上去。所以,把Hadoop安装到集群上去的时候要把这个配置项改成yarn。Hadoop的文档hadoop-mapreduce-project/INSTALL中讲了这个问题:
Step 8)Modify mapred-site.xml to use yarn framework
<property>
<name> mapreduce.framework.name</name>
<value> yarn</value>
</property>
除来自配置文件以外,也可以在命令行中通过参数加以指定,例如“-jobconf mapreduce. framework.name=yarn”。
有了这些背景,我们再看Cluster的Initialize()操作:
[Job.submit()> Job.connect()> Cluster.Cluster()> Cluster.Initialize()] private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)throws … { synchronized (frameworkLoader){ //不允许多个线程同时进入此段代码,需要加锁 for (ClientProtocolProvider provider:frameworkLoader){ LOG.debug("Trying ClientProtocolProvider:"+provider.getClass().getName()); ClientProtocol clientProtocol=null; try{ //试图创建Cl ientProtocol,即LocalJobRunner或YARNRunner,视配置而定 if (jobTrackAddr==null){ cl ientProtocol=provider.create(conf); }else { cl ientProtocol= provider.create(j obTrackAddr, conf); } if (clientProtocol! =null){ //已经创建成功 clientProtocolProvider=provider; client=cl ientProtocol; //已经创建了Cl ientProtocol 对象,YARNRunner或LocalJobRunner LOG.debug("Picked"+provider.getClass().getName() +"as the ClientProtocolProvider"); break; //装载provider成功并创建clientProtocol 成功,跳出 for循环 }else { LOG.debug("Cannot pick"+provider.getClass().getName() +"as the ClientProtocolProvider-returned null protocol"); } }catch(Exception e){ //创建失败,记入日志 LOG.info("Failed to use"+provider.getClass().getName() +"due to error:"+e.getMessage()); } }//end for //本轮循环中未能创建Cl ientProtocol,继续下一轮 for循环 }//end synchronized if (null==clientProtocolProvider||null==client){ //未能创建所需的对象 throw new IOException( "Cannot initialize Cluster.Please check your configuration for" +MRConfig.FRAMEWORK_NAME+"and the correspond server addresses."); } }
这里的for循环,是基于前述ServiceLoader中iterator()的循环。实际上也就是对两个ClientProtocolProvider的循环,目的是要通过ClientProtocolProvider.create()创建用户所要求的ClientProtocol,也无非就是LocalJobRunner或YARNRunner。只要有一次创建成功,循环就没有必要继续了,因为只能有一种选择;但是,如果两次都失败,程序就无法继续了,因为不知道该怎样让RM提供计算服务。而能否成功创建,则取决于前述配置项的设置。不过ClientProtocolProvider是抽象类,实际上依次进行尝试的是LocalClientProtocolProvider和YarnClientProtocolProvider。假定第一轮循环时进行尝试的是前者,那么:
[Job.submit()> Job.connect()> Cluster.Cluster()> Cluster.Initialize()> LocalClientProtocolProvider.create()] public ClientProtocol create(Configuration conf)throws IOException { String framework=conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME); //从配置块中获取配置项“mapreduce.framework.name”,默认“local” if (! MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)){ return null; //如果不是“local”就失败 } conf.setInt(JobContext.NUM_MAPS,1); //既然是 local 就没有必要用多个Mapper return new LocalJobRunner(conf); //实际创建LocalJobRunner对象 }
这里涉及的几个字符串常量定义于界面MRConfig:
public static final String FRAMEWORK_NAME ="mapreduce.framework.name"; public static final String CLASSIC_FRAMEWORK_NAME ="classic"; public static final String YARN_FRAMEWORK_NAME ="yarn"; public static final String LOCAL_FRAMEWORK_NAME="local";
如果配置项的值是“local”,或者干脆就没有这个配置项,那就创建LocalJobRunner,并把作业单中的NUM_MAPS设置成1;因为既然是local,在单机上计算就没有必要使用多个Mapper,那样并没有性能上的好处。创建了LocalJobRunner,循环就结束了。
如果有这么个配置项,但所设置的值不是“local”,那就失败了,那就试试别的,应该就是YarnClientProtocolProvider:
[Job.submit()> Job.connect()> Cluster.Cluster()> Cluster.Initialize()> YarnClientProtocolProvider.create()] public ClientProtocol create(Configuration conf)throws IOException { if (MRConfig.YARN_FRAMEWORK_NAME.equals( conf.get(MRConfig.FRAMEWORK_NAME))){ return new YARNRunner(conf); //实际创建YARNRunner对象 } return null; }
如果配置项的值是“yarn”,那就创建YARNRunner。
我们关心的是Hadoop在集群上的运行,所以假定选用YarnClientProtocolProvider,所创建的ClientProtocol是YARNRunner。后面我们将看到这个YARNRunner的作用。
注意,YarnClientProtocolProvider有两个create(),上面代码中根据是否提供jobTrackAddr而分别加以调用,但是现在Hadoop代码中对这两个create()的调用殊途同归,最后归结到同一个create(),所以j obTrackAddr其实是不起作用的,这应该是老版本留下的残余。
另外,frameworkLoader是Cluster类的静态成分,因而只是在JVM装载Cluster类的时候装载一次,但是LocalJobRunner或YARNRunner则是在Cluster类的Initialize()中创建,那就是每创建一个Cluster对象时都会被执行一次。然而每一个作业的提交都是由一个独立的JVM进程完成的,都在一个独立的JVM上,所以实际上每个作业都会有自己的frameworkLoader,当然就也有自己的LocalJobRunner或YARNRunner。
回到前面Job.submit()的代码,下一步是对getJobSubmitter()的调用。这个函数创建一个JobSubmitter类对象,然后Job.submit()就调用它的submitJobInternal()方法,完成作业的提交。创建JobSubmitter对象时的两个参数就是调用getJobSubmitter()时的两个参数,就是cluster.getFileSystem()和cluster.getClient()。其中cluster.getClient()返回的就是YARNRunner或LocalJobRunner;而cluster.getFileSystem()的返回结果,对于YARNRunner是RM节点上文件系统的URL,对于LocalJobRunner则是本节点上的一个相对路径为“mapred/system”的目录。
创建了JobSubmitter对象后,调用其submitJobInternal()的两个参数也仍旧是这两个参数。
下面是JobSubmitter类的摘要:
class JobSubmitter{} ]FileSystem jtFs ]ClientProtocol submitClient//来自Job.submit(),在集群条件下是YARNRunner ]String submitHostName ]String submitHostAddress ]JobSubmitter(FileSystem submitFs, ClientProtocol submitClient)//构建方法 > this.submitClient=submitClient //将参数 submitClient复制到JobSubmitter中 //这是在Job.submit()中通过cluster.getClient()获取的,在集群条件下是YARNRunner > this.j tFs=submitFs//j t是JobTracker的缩写,2.0版之后的Hadoop已改成RM ]compareFs(FileSystem srcFs, FileSystem destFs)//比较两个文件系统是否相同 ]getPathURI() ]checkSpecs() ]copyRemoteFiles() ]copyAndConfigureFiles() ]copyJar(Path originalJarPath, Path submitJarFile, short replication) ]addMRFrameworkToDistributedCache() ]submitJobInternal(Job j ob, Cluster cluster)//将作业提交给集群 ]writeNewSplits(JobContext job, Path jobSubmitDir)
]…
此刻我们关心的焦点就是这个类所提供的方法submitJobInternal(),因为Job.submit()就是通过这个方法将作业提交到集群的。这个方法的代码有点长,我已对其稍作整理。
[Job.submit()> JobSubmitter.submitJobInternal()] JobStatus submitJobInternal(Job job, Cluster cluster)throws ClassNotFoundException, …{ checkSpecs(job); //validate the jobs output specs,检查输出格式等配置的合理性 Configuration conf=job.getConfiguration(); addMRFrameworkToDistributedCache(conf); Path jobStagingArea=JobSubmissionFiles.getStagingDir(cluster, conf); //获取目录路径 //configure the command line options correctly on the submitting dfs InetAddress ip=InetAddress.getLocalHost(); //获取本节点(主机)的 IP地址 if (ip! =null){ submitHostAddress=ip.getHostAddress(); //本节点 IP地址的字符串形式 submitHostName=ip.getHostName(); //本节点名称 conf.set(MRJobConfig.JOB_SUBMITHOST, submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR, submitHostAddress); } JobID j obId=submitClient.getNewJobID(); //生成一个作业 ID号 job.setJobID(jobId); //将作业 ID号写入Job对象 Path submitJobDir=new Path(jobStagingArea, jobId.toString()); //本作业的临时子目录名中包含着作业 ID号码 JobStatus status=null; try{ conf.set(MRJobConfig.USER_NAME, //用户名 UserGroupInformation.getCurrentUser().getShortUserName()); conf.set("hadoop.http.filter.initializers", //准备用于Http接口的过滤器初始化 "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer"); conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString()); LOG.debug("Configuring job"+jobId+"with"+submitJobDir+"as the submit dir"); //get delegation token for the dir /* 准备好与访问权限有关的证件(token)*/ TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[]{submitJobDir }, conf); //获取与NameNode打交道所需证件 populateTokenCache(conf, job.getCredentials()); //generate a secret to authenticate shuffle transfers if (TokenCache.getShuffleSecretKey(job.getCredentials())==null){ //需要生成Mapper与Reducer之间的数据流动所用的密码
KeyGenerator keyGen; try{ keyGen=KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM); keyGen.init(SHUFFLE_KEY_LENGTH); }catch(NoSuchAlgorithmException e){ throw new IOException("Error generating shuffle secret key", e); } SecretKey shuffleKey=keyGen.generateKey(); TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(), job.getCredentials()); } //end if copyAndConfigureFiles(j ob, submitJobDir); //将可执行文件之类拷贝到HDFS中 Path submitJobFile=JobSubmissionFiles.getJobConfPath(submitJobDir); //配置文件路径 //Create the spl its for the j ob /* 将输入数据文件切片,并写入临时目录 */ LOG.debug("Creating splits at"+jtFs.makeQualified(submitJobDir)); int maps=writeSplits(j ob, submitJobDir); //生成切片,以切片数量决定Mapper数量 conf.setInt(MRJobConfig.NUM_MAPS, maps); //"mapreduce.j ob.maps" LOG.info("number of spl its:"+maps); //write"queue admins of the queue to which job is being submitted"to job file. String queue=conf.get(MRJobConfig.QUEUE_NAME, JobConf.DEFAULT_QUEUE_NAME); //默认作业调度队列名为“default” AccessControlList acl=submitClient.getQueueAdmins(queue); conf.set(toFullPropertyName(queue, QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString()); //removing jobtoken referrals before copying the jobconf to HDFS //as the tasks don't need this setting, actually they may break //because of it if present as the referral will point to a different job. TokenCache.cleanUpTokenReferral(conf); if (conf.getBoolean(MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED, MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)){ //Add HDFS tracking ids,如果启用了跟踪机制的话 ArrayList<String> trackingIds=new ArrayList<String>(); for(Token<? extends TokenIdentifier> t:job.getCredentials().getAllTokens()){ trackingIds.add(t.decodeIdentifier().getTrackingId()); } conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
trackingIds.toArray(new String[trackingIds.size()])); }//end if //Write job file to submit dir writeConf(conf, submitJobFile); //将conf的内容写入一个.xml文件 //Now, actually submit the job(using the submit name)/* 万事俱备,只欠提交了 */ printTokens(jobId, job.getCredentials()); status=submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials()); //提交作业,通过YarnRunner.submitJob()或LocalJobRunner.submitJob() if (status! =null){ return status; //提交成功 }else { throw new IOException("Could not launch job"); //提交失败 } }finally{ if (status==null){ //如果失败就需要善后 LOG.info("Cleaning up the staging area"+submitJobDir); if(jtFs! =null&&submitJobDir! =null)jtFs.delete(submitJobDir, true); //删除目录 } } //end try-finally }
代码中已经加了一些注释供读者自己阅读,我们在这里集中关注几个事,那就是copyAndConfigureFiles()、writeSplits()、writeConf()及最后的submitClient.submitJob()。
先看copyAndConfigureFiles()。以前,我们只是笼统地讲提交作业时要将有关的各种资源随同作业单一起提交,但是现在需要具体化了。需要随同作业单一起提交的资源和信息有两类:一类是需要交到资源管理器RM手里,供RM在立项和调度时使用的;另一类则并非供RM直接使用,而是供具体进行计算的节点使用的。前者包括本节点即作业提交者的IP地址、节点名、用户名、作业ID号,以及有关MapReduce计算输入数据文件的信息,还有为提交作业而提供的“证章(Token)”等。这些信息将被打包提交给RM,这就是狭义的作业提交,是流程的主体。后者则有作业执行所需的j ar可执行文件、外来对象库等。如果计算的输入文件在本地,则后者还应包括输入文件。这些资源并不需要提交给RM,因为RM本身并不需要用到这些资源,但是必须要把这些资源复制或转移到全局性的HDFS文件系统中,让具体承担计算任务的节点能够取用。
为了上传相关的资源和信息,需要在HDFS文件系统中为本作业创建一个目录。HDFS文件系统中有一个目录是专门用于作业提交的,称为“舞台目录(staging directory)”。所以这里要通过JobSubmissionFiles.getStagingDir()从集群获取这个目录的路径。然后就以本作业的ID,即JobId为目录名在这个舞台目录中创建一个临时的子目录,这就是代码中的submitJobDir。以后凡是与本作业有关的资源和信息,就都上传到这个子目录中。
首先是通过copyAndConfigureFiles()上传作业的可执行映像:
[Job.submit()>JobSubmitter.submitJobInternal()>copyAndConfigureFiles()] copyAndConfigureFiles(Job j ob, Path j obSubmitDir) > conf=job.getConfiguration() > replication=(short)conf.getInt(Job.SUBMIT_REPLICATION,10) >copyAndConfigureFiles(Job job, Path jobSubmitDir, short replication)//多了一个参数 >> String files=conf.get("tmpfiles")//来自命令行中的-files选项,计算中需要用到的文件 >> String libjars=conf.get("tmpjars")//来自命令行中的-libjars选项,计算中需要用到的jar >> String archives=conf.get("tmparchives")//来自命令行中的-archives选项 >> String jobJar=job.getJar()//本作业的java程序经编译生成的jar文件,这是一定有的 >> FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms)//在HDFS文件系统中创建目录 >> Path filesDir=JobSubmissionFiles.getJobDistCacheFiles(submitJobDir) >> Path archivesDir=JobSubmissionFiles.getJobDistCacheArchives(submitJobDir) >> Path libjarsDir=JobSubmissionFiles.getJobDistCacheLibjars(submitJobDir) >> if (files! =null){ //如果命令行中有-files选项 >>+ FileSystem.mkdirs(jtFs, filesDir, mapredSysPerms) //在HDFS文件系统中创建目录 >>+ String[]fileArr=files.split(", ") >>+ for (String tmpFile:fileArr){ >>++ tmpURI=new URI(tmpFile) >>++ tmp=new Path(tmpURI) >>++ newPath=copyRemoteFiles(filesDir, tmp, conf, replication)//把文件复制到HDFS中 >>++> FileSystem remoteFs=originalPath.getFileSystem(conf)//这是本地的一个目录 >>++> if(compareFs(remoteFs, jtFs))return originalPath //如果源和目的相同就无须复制 >>++> newPath=new Path(parentDir, originalPath.getName())//准备用在HDFS中的目录名 >>++> FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf) //复制文件,remoteFs和originalPath为源,j tFs和newPath为目标,不删除源文件 >>++> jtFs.setReplication(newPath, replication) //设置复份数量 >>++> return newPath //copyRemoteFiles()结束 >>++ pathURI=getPathURI(newPath, tmpURI.getFragment()) >>++ DistributedCache.addCacheFile(pathURI, conf) >>+ } >> }//end if (files! =null) >> if (libjars! =null){ //如果命令行中有-libjars选项 >>+ FileSystem.mkdirs(jtFs, libjarsDir, mapredSysPerms)//在HDFS文件系统中创建目录 >>+ … >>+ newPath=copyRemoteFiles(libjarsDir, tmp, conf, replication)//复制文件至HDFS中 >>+ … >> } >> if (archives! =null){ //如果命令行中有-archives选项
>>+ FileSystem.mkdirs(jtFs, archivesDir, mapredSysPerms) >>+ … >>+ newPath=copyRemoteFiles(archivesDir, tmp, conf, replication)//复制文件至HDFS中 >>+ … >> } > if (jobJar! =null){//copy jar to JobTracker's fs,这是无条件的,每个作业都有jar文件 >>+ if ("".equals(job.getJobName()))job.setJobName(new Path(jobJar).getName()) >>+ j obJarPath=new Path(j obJar) >>+ j obJarURI=j obJarPath.toUri() >>+ if (jobJarURI.getScheme()==null||jobJarURI.getScheme().equals("file")){ >>++ copyJar(jobJarPath, JobSubmissionFiles.getJobJar(submitJobDir), replication) >>++>jtFs.copyFromLocalFile(originalJarPath, submitJarFile) >>++> jtFs.setReplication(submitJarFile, replication) >>++> jtFs.setPermission(submitJarFile, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)); >>++job.setJar(JobSubmissionFiles.getJobJar(submitJobDir).toString()) >>+ } >> }
这个函数要做的事情,首先是copy,就是把作业提交者所在本地(宿主)文件系统中的一些文件复制到HDFS中,因为HDFS是全局的、谁都能访问到的,而且有多个复份。那么要复制一些什么文件呢?首先就是作业本身的可执行程序,就是编译产生的j ar文件。除此之外,在启动这个作业的命令行中也可能以可选项的方式提供一些文件,那也要复制到HDFS中。HDFS文件不存在“在哪一个节点上”的问题,同一个文件中不同的“块”就可以在不同的节点上。注意,submitJobInternal()所直接调用的是两个参数的copyAndConfigureFiles(),这个函数补上一个参数replication即复份个数,再调用三个参数的copyAndConfigureFiles(),以完成操作。复制到HDFS的文件会被保存多份(一般是三份),但是可以因具体文件而不同。那么这些文件要保存几份呢?可以在配置块中设定,这里默认的是10份。10个复份,会被存储在10个不同的节点上,看起来似乎很多,但是考虑到运行时Mapper的数量可能会有数百上千,那也就不稀奇了。不过,要是Mapper的数量只有三五个,那这个开销所占的比例就太高了。由此可见,把很小的题目放在Hadoop上计算是不划算的。
代码摘要中加了注释,这里就不多作讲解了。注意,注释原文中提到的JobTracker是老版本中才有的,现在已为ResourceManager和ApplicationMaster所替代。
回到前面submitJobInternal()的摘要中,下一步是生成并上传关于输入数据分片的信息,即split文件,这是由writeSplits()完成的:
[Job.submit()>JobSubmitter.submitJobInternal()> writeSplits()]
JobSubmitter.writeSplits(JobContext j ob, Path j obSubmitDir)
> jConf=(JobConf)job.getConfiguration()
> if (jConf.getUseNewMapper()){
>+ int maps=writeNewSplits(j ob, j obSubmitDir) //按新API的要求写 Spl it文件,返回 Spl it的数量 >+> InputFormat<? , ? > input=ReflectionUtils.newInstance(job.getInputFormatClass(), conf) >+> List<InputSplit> splits=input.getSplits(job)==FileInputFormat.getSplits(job) //为输入(数据)文件生成一个InputSplit的List >+>> minSize=Math.max(getFormatMinSplitSize(), getMinSplitSize(job))//最小Split尺寸 >+>> maxSize=getMaxSplitSize(job) //最大 Split尺寸 >+>> List<InputSplit> splits=new ArrayList<InputSplit>()//创建一个空白的Split List >+>> List<FileStatus> files=listStatus(job) //获取每个输入文件的FileStatus >+>> for (FileStatus file:files) //对于每个输入文件(可以不止一个) >+>>+ path=file.getPath() //获取其路径 >+>>+ length=file.getLen() //获取其文件长度 >+>>+ if (length! =0){ //如果文件长度不为0 >+>>++ if(file instanceof LocatedFileStatus){//如果是个含有数据块位置信息的文件 >+>>+++ blkLocations=((LocatedFileStatus)file).getBlockLocations() >+>>++ }else { //一般的文件 >+>>+++ FileSystem fs=path.getFileSystem(job.getConfiguration()) >+>>+++ blkLocations=fs.getFileBlockLocations(file,0, length) >+>>++ } >+>>++ if (isSplitable(job, path)){ >+>>+++ blockSize=file.getBlockSize() >+>>+++ splitSize=computeSplitSize(blockSize, minSize, maxSize) //Spl it的大小不一定就是数据块的大小,但通常都是 >+>>+++ bytesRemaining=length >+>>+++ while (((double)bytesRemaining)/splitSize > SPLIT_SLOP){ >+>>++++ blkIndex=getBlockIndex(blkLocations, length-bytesRemaining) >+>>++++ splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())) >+>>++++ bytesRemaining-=splitSize >+>>+++ } >+>>+++ if (bytesRemaining! =0){ //剩下的尾巴(剩余部分)作为一个分片 >+>>++++ blkIndex=getBlockIndex(blkLocations, length-bytesRemaining) //分片起点所在数据块的 index >+>>++++ splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())) >+>>+++ } >+>>++ }else { //not splitable >+>>+++ splits.add(makeSplit(path,0, length, blkLocations[0].getHosts(),
blkLocations[0].getCachedHosts())) >+>>++ } >+>>+ }else { //文件长度为0 >+>>++ splits.add(makeSplit(path,0, length, new String[0])) >+>>+ } >+>> } //end for each file >+>> job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()) >+>> return splits >+> array=(T[])splits.toArray(new InputSplit[splits.size()]) >+> Arrays.sort(array, new Spl itComparator()) >+> JobSplitWriter.createSplitFiles(jobSubmitDir, conf, jobSubmitDir.getFileSystem(conf), array) //创建Split文件 >+> return array.length > }else { >+ maps=writeOldSplits(jConf, jobSubmitDir) > } > return maps
如前所述,把输入数据文件分成多少个切片(Split),将来就会安排多少个Mapper分头进行计算。那么这个输入数据文件是根据什么条件来分片的呢?根据大小。配置文件mapred-default.xml中应该有两项配置:“mapreduce.input.fileinputformat.split.minsize”,即SPLIT_MINSIZE;“mapreduce.input.fileinputformat.split.maxsize”,即SPLIT_MAXSIZE。如果没有就用默认值。但是切片的大小还有别的限制,那就是Hadoop文件系统中“块(block)”的大小,一个块的大小一般是128MB。
这里摘要中的splits就是对分片信息的描述。这是个InputSplit对象的List,不过InputSplit是抽象类,因为这里要考虑输入数据可能有多种不同的来源,比方说来自数据文件,也可以来自数据库查询的输出,也可能实时来自网络。来源不同,分片的方法自然也就不同。就数据文件而言,扩充了InputSplit的具体类为FileSplit:
class FileSplit extends InputSplit implements Writable {} ]Path file //输入文件路径 ]long start //分片在文件中的位置(起点) ]long length //分片长度 ]String[]hosts //这个分片所在数据块的多个复份所在节点 ]SplitLocationInfo[]hostInfos //每个数据块复份所在节点,以及是否缓存 ]]boolean inMemory //是否缓存在内存中 ]]String location //所在节点
有了这么一个List之后,就将其转化成一个数组,并加以排序,然后就将其串行化后写入文件,以供作业提交之用。注意,我们在这里所谓的一个Split、一个FileSplit,只是对一个Split的描述,而不是这个Split本身。至于FileSplit所描述的某个HDFS文件的那部分内容,则已经重复存储在某几个节点上。
[Job.submit()>JobSubmitter.submitJobInternal()> writeSplits()>writeNewSplits() >JobSplitWriter.createSplitFiles()] createSplitFiles(j obSubmitDir, conf, fs, array) > path1=JobSubmissionFiles.getJobSplitFile(jobSubmitDir) //路径名为“~/job.split” > FSDataOutputStream out=createFile(fs, path1, conf) //创建输入片(Spl it)文件,并为其创建一个输出流 > SplitMetaInfo[]info=writeNewSplits(conf, splits, out) //将分片信息写入Split文件: >> info=new SplitMetaInfo[array.length] >> if (array.length! =0){ >>+ factory=new SerializationFactory(conf) >>+ maxBlockLocations=conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT) >>+ offset=out.getPos() >>+ for(T spl it:array){ //对于数组中的每一个 Spl it: >>++ prevCount=out.getPos() >>++ Text.writeString(out, split.getClass().getName()) >>++ Serializer<T> serializer=factory.getSerializer((Class<T>)split.getClass()) >>++ serializer.open(out) //串行化后的内容会写入输出流out >>++ serializer.serialize(split) >>++ currCount=out.getPos() >>++ String[]locations=split.getLocations() >>++ if (locations.length > maxBlockLocations){ >>+++ locations=Arrays.copyOf(locations, maxBlockLocations) >>++ } >>++ info[i++]=new JobSplit.SplitMetaInfo(locations, offset, split.getLength()) >>++ offset+=currCount-prevCount >>+ } //end for >> } >> return info > out.close() > path2=JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir)//“~/job.splitmetainfo” > writeJobSplitMetaInfo(fs, path2, new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, info) >> FSDataOutputStream out=FileSystem.create(fs, filename, p) >> out.write(JobSplit.META_SPLIT_FILE_HEADER) >>WritableUtils.writeVInt(out, splitMetaInfoVersion) >>WritableUtils.writeVInt(out, allSplitMetaInfo.length)//allSplitMetaInfo就是上面的info[] >> for (JobSplit.SplitMetaInfo splitMetaInfo :allSplitMetaInfo){ >>+ splitMetaInfo.write(out)
>> } >> out.close()
这里生成了两个文件,都在为具体作业创建的目录下,第一个是“job.split”,第二个是“job.splitmetainfo”。前者的内容是对于每个Split描述的记录,相当于一个数组,但是每个元素的长度可能有所不同(因为String)。注意,在大型的计算中可能会有数百、上千个Split。后者为元数据文件,相当于为前者所做的索引,其信息来自写第一个文件时的积累。这里的allSplitMetaInfo就是上面的SplitMetaInfo数组info[], SplitMetaInfo类定义为:
class SplitMetaInfo implements Writable {} ]long startOffset //这是在 Split文件即“job.split”中的位移 ]long inputDataLength ]String[]locations
再回到前面submitJobInternal()的摘要中,最后,还要通过writeConf()把配置块conf的内容加以“串行化”并写入一个.xml文件,当然这也是在RM节点上本作业的子目录中:
writeConf(conf, submitJobFile) > FSDataOutputStream out=FileSystem.create(jtFs, jobFile, newFsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION)) > conf.writeXml(out) > out.close()
提交作业时还有个提交给RM的哪一个队列的问题,这跟作业的调度运行有关。RM有三种不同的调度策略,即“先进先出”、“基于功能与容量”、“基于公平”,从而就有三个不同的队列。那么怎么确定呢?目前是通过.xml配置文件静态设置的,需要以QUEUE_NAME,即“mapreduce.j ob.queuename”为键值去配置块中查询(实际来自文件mapred-default.xml)。此外,那些队列是有访问控制名单(ACL)的,并非任何用户都可以将作业提交给任何一个队列,所以还要提供与ACL相关的身份信息,即Credentials。
至此,已是万事俱备,只欠提交了,而提交是submitClient.submitJob()的事。
这个submitClient是JobSubmitter内部的一个成分,是实现了ClientProtocol界面的YARNRunner或LocalJobRunner对象。至于具体是哪一种,则要看配置文件中对于“mapreduce.framework.name”这一项的设定,或者在启动作业运行时命令行中的相应参数设定。对于Hadoop集群,这应该是YARNRunner(如果是在单机上,则是LocalJobRunner。至于2.0版之前的Hadoop那是另一回事了)。这是在前面Cluster.Initialize()中确定并创建的。
于是,这个作业的JobId、相关文件所在的目录路径submitJobDir和身份信息Credentials,就将由YARNRunner提交给RM节点。