大数据处理系统:Hadoop源代码情景分析
上QQ阅读APP看书,第一时间看更新

5.6 YARNRunner和ResourceMgrDelegate

YARN,是“Yet Another Resource Negotiator”的缩写,意为“又一个资源协调者”,知道C编译和YACC的人马上就会联想到“Yet Another C Compiler”。之所以是“又一个”,是因为Hadoop原先在0.×和1.×版中就有个类似于“资源协调者”这样的角色,只是不叫这个名称,所用的机制也不一样,然后又开发出了一种结构更好、概念更清晰、功能也更强的(集群)资源协调/管理机制,所以才叫YARN。不过也有人主张YARN是“YARN Application Resource Negotiator”的缩写,这又使人联想到GNU是“GNU is Not Unix”的缩写,这是一样的套路。

而YarnRunner,顾名思义就是YARN的Runner。我们先看一下这个类的摘要:

      class YARNRunner implements ClientProtocol{}
      ]ResourceMgrDelegate resMgrDelegate //这是RM派驻在地方上的特派员
      ]Cl ientCache cl ientCache
      ]FileContext defaultFileContext
      ]YARNRunner(Configuration conf) //构造函数
        > this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf)))
                                      //需要创建特派员,然后调用下一个构造函数
      ]YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate)//构造函数
        > this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate))
                                      //需要创建Cl ientCache,然后调用下一个构造函数
      ]YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
                                        Cl ientCache cl ientCache)//这是最终的构造函数
        > this.resMgrDelegate=resMgrDelegate
        > this.clientCache=clientCache
        > this.defaultFileContext=FileContext.getFileContext(this.conf)
      ]submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
      ]createApplicationResource(FileContext fs, Path p, LocalResourceType type)
      ]createApplicationSubmissionContext(Configuration jobConf, String jobSubmitDir, …)
      ]killJob(JobID arg0)
      ]killTask(TaskAttemptID arg0, boolean arg1)

YARNRunner内部有个起着重要作用的ResourceMgrDelegate类对象resMgrDelegate,这是在创建YARNRunner时在其构造方法中创建的,或者是作为构造参数之一传下来的。与此相似,还有个ClientCache类对象clientCache,那也是在创建YARNRunner的时候创建的,但是重要性就没有那么大了。

我们先来看YARNRunner.submitJob()的代码,这已是“临门一脚”了。

      [Job.submit()> JobSubmitter.submitJobInternal()> YARNRunner.submitJob()]


        public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)throws …{
          addHistoryToken(ts); //用于为历史记录服务,与作业历史(JobHistory)”有关
          //Construct necessary information to start theMR AM(AMApplicationMaster)
          ApplicationSubmissionContext appContext=
                                createApplicationSubmissionContext(conf, j obSubmitDir, ts);
                  //创建一个ApplicationSubmissionContext,并将conf中的相关信息转移过去
          //Submit to ResourceManager /* 将作业提交给资源管理者(ResourceManager)*/
          try{
            ApplicationId applicationId=resMgrDelegate.submitApplication(appContext);
                                  //RM派驻的特派员转交ApplicationSubmissionContext
            ApplicationReport appMaster=resMgrDelegate.getApplicationReport(applicationId);
            String diagnostics=
                (appMaster==null? "application report is null":appMaster.getDiagnostics());
            if (appMaster==null
                ||appMaster.getYarnApplicationState()==YarnApplicationState.FAILED
                ||appMaster.getYarnApplicationState()==YarnApplicationState.KILLED){
              throw new IOException("Failed to run job:"+diagnostics);
            }
            return cl ientCache.getCl ient(j obId).getJobStatus(j obId);
          }catch(YarnException e){
            throw new IOException(e);
          }//end try-catch
        }

所谓作业提交,是要把作业交到ResourceManager即RM的手里。RM是整个集群的资源管理者。更确切地说,这是一个ResourceManager类的对象,这个对象在主节点上。整个集群中只有一个实际有效的ResourceManager对象,其所在的节点可以通过.xml配置文件加以指定,也可以通过在某一机器节点上启动ResourceManage而使其成为事实上的主节点。除这个节点之外,其余每个节点上都有个NodeManager,更确切地说是一个NodeManager类对象,这个对象起着类似于地方政府的作用。不过,正如我们在前面的代码中所见,向“中央”提交作业并不需要通过“地方政府”;但是,当“中央”为了完成作业而要向“地方”指派任务的时候,就要跟NodeManager打交道了。所以,NodeManager本质上也是一种Server。至于ResourceManager,虽然统管着整个集群的资源和所有作业的调度运行,对于NodeManager而言却是Client;但是,相对于用户,相对于具体的应用,例如WordCount和QuasiMonteCarlo这些应用,却又是Server。所以Server和Client的概念有层次的不同,要看是在什么层次上说。

那么,提交物又是什么呢?我们在前面看到,与作业有关的资源已经上传到全局的HDFS文件系统中,那里已经有了一个专用于本作业的临时目录,所缺的就是一个类似于“申请报告”、“项目说明书”这样的东西了。ASC,即ApplicationSubmissionContext就起着这样的作用,所谓的Context本来就有“来龙去脉”、“上下文”的意思。所以这里通过createApplicationSubmissionContext()创建了一份ApplicationSubmissionContext,并把配置块conf中当前的相关信息、已上传资料所在的目录路径以及有关身份和访问权限的信息都复制转移过去。所谓Application,就其本质而言,其实就是Job,只是具体的数据结构有所不同,信息的范围又有所扩大而已。而所谓Context,又接近于Configuration,但也有所扩大,增加的那些信息基本上也是来自配置块。例如,我们得告诉ResourceManager需要多大的内存,这就要通过字符串MR_AM_VMEM_MB,即“yarn.app.mapreduce.am.resource.mb”在配置块中查询。如果配置文件中未作规定,就采用默认值1536MB。此外,还要告诉ResourceManager需要几个CPU核来执行我们的作业,这就要通过字符串MR_AM_CPU_VCORES,即“yarn. app.mapreduce.am.resource.cpu-vcores”在配置块中查询,如果配置文件中未作规定,就采用默认值1。特别地,这个函数还提供了有关Application Master即“项目组长”该用哪一个Shell (例如bash)以及有关某些环境变量的信息。再如作业的名称,如此等等,不一而足。createApplicationSubmissionContext()这个函数的代码烦琐但并不复杂,就留给读者自己阅读理解了。重要的是,ApplicationSubmissionContext,即appContext,才是按ResourceManager所要求模板填写的“正式作业单”。按源文件ApplicationSubmissionContext.java中的注释,这个类的对象“代表着ResourceManager为发起该应用的ApplicationMaster所需的全部信息”。

不过,ApplicationSubmissionContext其实是个抽象类,具体加以落实的类是ApplicationSubmissionContextPBImpl,所以实际创建和提交的作业单是后一类的对象。这里的PB是Protocol Buffer的缩写,本书前面已有介绍。显然,ApplicationSubmissionContextPBImpl意为ApplicationClientProtocol采用PB技术的实现。这个类的数据部分摘要如下:

      class ApplicationSubmissionContextPBImpl extends ApplicationSubmissionContext {}
      ]ApplicationSubmissionContextProto proto  //这是通信协议部分
      ]ApplicationId applicationId
      ]Priority priority
      ]ContainerLaunchContext amContainer
      ]Resource resource
      ]Set<String> applicationTags
      ]ResourceRequest amResourceRequest
      ]LogAggregationContext logAggregationContext
      ]ReservationId reservationId          //用于资源预订

这里面如applicationId、priority、amResourceRequest等成分的作用不言自明。其实最重要的成分倒是amContainer,这是一个CLC,即ContainerLaunchContext对象,是RM为这个应用指派建立ApplicationMaster时所给定的Context。这个Context决定着本应用中各个任务的执行。显然,这些信息的源头都在提交作业的这一方。同样,这也是个抽象类,具体予以落实的类是ContainerLaunchContextPBImpl:

      class ContainerLaunchContextPBImpl extends ContainerLaunchContext {}
      ]ContainerLaunchContextProto proto  //这是通信协议部分
      ]Map<String, LocalResource> localResources
      ]ByteBuffer tokens
      ]Map<String, ByteBuffer> serviceData
      ]Map<String, String> environment
      ]List<String> commands //这是在指派用于本作业AM的节点上启动AM的命令行
      ]Map<ApplicationAccessType, String> applicationACLS

看似貌不惊人,其实所含的信息量是很大的。特别值得一提的是其中的commands,这是供RM在某个节点上发起一个ApplicationMaster进程时所用的命令行。我们在前面看到YARNRunner.submitJob()调用了一个函数createApplicationSubmissionContext(),这个函数的代码中就有这么一个片段:

      //Setup the command to run the AM
      List<String> vargs=new ArrayList<String>(8);
      vargs.add(MRApps.crossPlatformifyMREnv(jobConf, Environment.JAVA_HOME)
                                                                    +"/bin/java");
      …  //这中间是许多命令行选项和参数
      vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS);
                                //这是“org.apache.hadoop.mapreduce.v2.app.MRAppMaster”
      vargs.add("1>"+ApplicationConstants.LOG_DIR_EXPANSION_VAR+
                Path.SEPARATOR+ApplicationConstants.STDOUT); //这是“<LOG_DIR>/stdout”
      vargs.add("2>"+ApplicationConstants.LOG_DIR_EXPANSION_VAR+
                Path.SEPARATOR+ApplicationConstants.STDERR); //这是“<LOG_DIR>/stderr”
      这段程序在运行时生成的是下面这样的一个Shell命令行:
       JAVA_HOME/bin/java …org.apache.hadoop.mapreduce.v2.app.MRAppMaster\
                                1> <LOG_DIR>/stdout  2> <LOG_DIR>/stderr

这里的<LOG_DIR>为日志文件所在目录,应以实际的目录路径代入。

RM受理了所提交的作业以后,会把这个ContainerLaunchContext转发到某个NM节点上,在那里执行这个shell命令行,另起一个Java虚拟机,让它执行MRAppMaster.class。

由此可见,这个ApplicationSubmissionContext对象appContext,真的是“代表着ResourceManager为发起该应用的ApplicationMaster所需的全部信息”。

而appContext的提交,则是通过ResourceMgrDelegate.submitApplication()完成的。ResourceMgrDelegate,顾名思义是ResourceManager的Delegate,即“派驻代表”、“特派员”。从概念上说,既然是“代表”, YARNRunner通过ResourceMgrDelegate.submitApplication()把appContext交到了它的手里,就算是提交给了ResourceManager,以后就是“代表”怎么把appContext送达RM的事了。我们先看一下ResourceMgrDelegate类的摘要,这个类是对抽象类YarnClient的继承和扩展:

      class ResourceMgrDelegate extends YarnClient{}
      ]YarnConfiguration conf
      ]ApplicationSubmissionContext application
      ]YarnClient client //实际上是YarnClientImpl类的对象,那也是对YarnClient的继承和扩展
      ]ResourceMgrDelegate(YarnConfiguration conf) //这是ResourceMgrDelegate的构造方法
        > super(ResourceMgrDelegate.class.getName())
        > this.conf=conf
        > this.client=YarnClient.createYarnClient() //创建YarnClient对象client
        >> client=new YarnClientImpl()//YarnClient.createYarnClient()创建的是YarnClientImpl
        > init(conf) //这是由AbstractService类提供的,YarnClient是对AbstractService的扩展
        >> serviceInit(config)
        >> notifyListeners()
        > start()   //这也是由AbstractService类提供的
        >> stateModel.enterState(STATE.STARTED)
        >> startTime=System.currentTimeMillis()
        >> serviceStart()
        >> notifyListeners()
      ]serviceInit(Configuration conf)
        > cl ient.init(conf)==YarnCl ientImpl.init(conf)
        >> AbstractService.init()
        >>> serviceInit()==YarnClientImpl.serviceInit()
        >>>> …
        > super.serviceInit(conf)
      ]serviceStart()
        > cl ient.start()==YarnCl ientImpl.start()
        >> AbstractService.start()
        >>> serviceStart()==YarnClientImpl.serviceStart()
        >>>> rmClient=ClientRMProxy.createRMProxy(…, ApplicationClientProtocol.class)
        >>>> historyClient.start()
        >>>> timelineClient.start()
        >>>> super.serviceStart()
        >>> notifyListeners()
        > super.serviceStart()
      ]submitApplication(ApplicationSubmissionContext appContext)
        > client.submitApplication(appContext)

我们在前面看到,ResourceMgrDelegate对象是在YARNRunner的构造函数中创建的。而YARNRunner,则是在前面的Cluster.Initialize()中创建的。再往上追溯,则Cluster类对象是在首次调用connect()时创建的。所以,任何一个节点,只要曾经调用过connect(),即曾经与“集群”有过连接,节点上就会有个Cluster类对象,从而就会有个YARNRunner对象,也就会有个ResourceMgrDelegate对象,而且如下所述就会有个YarnClientImpl对象。

从ResourceMgrDelegate类的摘要可以看出,它的submitApplication()方法其实十分简单,只是转而调用client.submitApplication()而已。而ResourceMgrDelegate类中的client,则同样可以从ResourceMgrDelegate类的摘要中看出,那是在其构造函数ResourceMgrDelegate()中通过YarnClient.createYarnClient()创建的,类型为YarnClientImpl。

像ResourceMgrDelegate一样,YarnClientImpl也是对YarnClient的继承和扩充,当然是不同的扩充。YarnClientImpl,顾名思义就是对YarnClient的具体实现(implimentation)。而YARNRunner.submitJob()对ResourceMgrDelegate.submitApplication()的调用,则实际上落实为对于YarnClientImpl.submitApplication()的调用。

      [Job.submit()> JobSubmitter.submitJobInternal()> YARNRunner.submitJob()
        > ResourceMgrDelegate.submitApplication()> YarnClientImpl.submitApplication()]
        public ApplicationId submitApplication(ApplicationSubmissionContext appContext)
                                                  throws YarnException, IOException {
          ApplicationId applicationId=appContext.getApplicationId();
          if (applicationId==null){
            throw new ApplicationIdNotProvidedException(
                "ApplicationId is not provided in ApplicationSubmissionContext");
          }
          //创建一个 SubmitApplicationRequestPBImpl 类的记录块
          SubmitApplicationRequest request=
                                  Records.newRecord(SubmitApplicationRequest.class);
          request.setApplicationSubmissionContext(appContext); //设置好记录块中的Context


          //Automatically add the timeline DT into the CLC
          //Only when the security and the timeline service are both enabled
          if (isSecurityEnabled()&&timelineServiceEnabled){
            addTimelineDelegationToken(appContext.getAMContainerSpec());
          }


          //TODO:YARN-1763∶Handle RMfailovers during the submitApplication call.
          rmClient.submitApplication(request); //实际的跨节点提交


          int pollCount=0;
          long startTime=System.currentTimeMillis();


          while (true){
            try{
              YarnApplicationState state=
                  getApplicationReport(applicationId).getYarnApplicationState();
                          //获取来自RM节点的应用状态报告,从中获取本应用的当前状态
              if (! state.equals(YarnApplicationState.NEW)&&
                    !state.equals(YarnApplicationState.NEW_SAVING)){
                LOG.info("Submitted application"+applicationId);
                break; //作业已进入运行阶段,结束while循环
              }


              long elapsedMillis=System.currentTimeMillis()-startTime;
              if (enforceAsyncAPITimeout()&&elapsedMillis >=asyncApiPollTimeoutMillis){
                throw new YarnException("Timed out while waiting for application"+
                                    applicationId+"to be submitted successfully");
              }
                //Notify the client through the log every 10 poll, in case the client
                //is blocked here too long.
                 if (++pollCount % 10==0){
                  LOG.info("Application submission is not finished, "+
                      "submitted application"+applicationId+"is still in"+state);
                }
                 try{
                  Thread.sleep(submitPollIntervalMillis); //睡眠一段时间
                }catch(InterruptedException ie){
                  LOG.error("Interrupted while waiting for application"+applicationId
                                                    +"to be successfully submitted.");
                }
              }catch(ApplicationNotFoundException ex){
                //FailOver or RMrestart happens before RMStateStore saves ApplicationState
                LOG.info("Re-submit application"+applicationId+"with the"+
                                                  "same ApplicationSubmissionContext");
                 rmClient.submitApplication(request); //失败后的再次提交
            } //end try-catch
          } //end while,进入下一轮循环
          return applicationId;
        }

实际跨节点提交之前先把appContext转化成一个SubmitApplicationRequest记录块,然后通过rmClient.submitApplication()提交请求。由于是跨节点的通信,在发出请求之后就进入一个while循环,不时地通过getApplicationReport()询问对方并接收对岸发回的报告。这有两种可能的结果:一种是收不到报告,发生了ApplicationNotFoundException异常,那就再发一遍,然后再getApplicationReport();另一种是收到了报告,那就要从报告中查看所提交的应用处于什么状态。如果已经不在NEW或NEW_SAVING状态,就说明已经开始运行,那就可以跳出循环并返回了;否则就睡眠一会儿,等一下再来getApplicationReport()。

对我们来说,这个流程还没有走到头,还要进一步追问rmClient.submitApplication()是怎么实现的。

这个rmClient是内嵌在YarnClientImpl中实现了ApplicationClientProtocol界面的对象,具体就是ApplicationClientProtocolPBClientImpl类对象。ApplicationClientProtocol是个界面,同时也是一种通信规程(Protocol),或称“协议”,这个界面就是为这种规程而设计和定义的。而ApplicationClientProtocolPBClientImpl类则采用Protocol Buffer技术实现了这个界面,也就是实现了这个规程。

本书第4章介绍过“Protocol Buffer”,即protobuf,以及用其语言编写的.proto文件applicationclient_protocol.proto的内容,和经过编译工具protoc加以编译之后生成的代码。

这个.proto文件定义了一个名为ApplicationClientProtocolService的Service,即RPC服务,编译工具protoc就生成实现这个服务的代码,包括服务端和客户端两方的代码。这样,我们在客户端调用由这种服务所提供的submitApplication()时,它就向对方发出一个SubmitApplicationRequestProto报文。而服务端就在那里调用某个类的submitApplication()方法,并在该方法结束和返回后向请求方发回一个SubmitApplicationResponseProto报文。这样,表面上是本地的一次submitApplication()调用,实际上调用的却是远地的同名操作方法。

但是,这里马上就有两个问题:在本地调用的submitApplication()方法是由什么类提供的?在远地调用的又是什么类的submitApplication()方法?答案就在于,这个service定义经protoc编译就会生成双方这两个类的代码。

首先,.proto文件中定义的Service如ApplicationClientProtocolService其实就是一个界面,而所生成的客户端Proxy类,即“代理类”,则实现了这个界面,并且又是对ApplicationClientProtocol的扩充。这个类就提供了submitApplication()以及所定义的其他方法。向对方发送SubmitApplicationRequestProto报文就是由这个方法完成的。

另一方面,protoc也会生成一个运行于对方(服务端)的Service类,这个类在接收到SubmitApplicationRequestProto报文时会调用那里某个扩充了ApplicationClientProtocol的“真实的、实际的”类所提供的submitApplication()。

于是,由protoc生成的代码对于客户端即操作的请求方就变得“透明”了,我们可以在概念上认为在客户端调用的就是服务端的那个类所提供的submitApplication(),这两个类也正好都是对ApplicationClientProtocol的扩充。至于过程中的串行化/去串行化等,则都已包含在其中了。

在前面YarnClientImpl.submitApplication()的代码中,rmClient是YarnClientImpl内部一个实现了ApplicationClientProtocol界面的对象。YarnClientImpl在其创建过程中执行其serviceStart()方法,通过ClientRMProxy.createRMProxy()创建了这个对象,属于ApplicationClientProtocolPBClientImpl类。不过ApplicationClientProtocolPBClientImpl类倒不是直接由protoc生成的,而只是使用了由protoc生成的类作为其底层。

      class YarnClientImpl extends YarnClient {}
      ]ApplicationClientProtocol rmClient
      ]AHSClient historyClient
      ]YarnCl ientImpl()
        > super()==YarnCl ient.YarnCl ient() //执行其父类YarnCl ient的构建方法
        >> super()==AbstractService.AbstractService()
      ]serviceInit(Configuration conf)
      ]serviceStart()
        > rmClient=ClientRMProxy.createRMProxy(getConfig(), ApplicationClientProtocol.class)
        > historyClient.start()
        > timelineClient.start()
        > super.serviceStart()
      ]createApplication()
      ]submitApplication(ApplicationSubmissionContext appContext) //见前文所引代码
        > rmClient.submitApplication(request)
          ==ApplicationClientProtocolPBClientImpl.submitApplication()

通过这个摘要,我们知道了rmClient的来历,也知道了由YarnClientImpl提供的操作submitApplication()其实是通过YarnClientImpl.ApplicationClientProtocolPBClientImpl()实现的,换言之是通过ApplicationClientProtocolPBClientImpl.submitApplication()实现的。我们继续往下追看:

      [Job.submit()> JobSubmitter.submitJobInternal()> YARNRunner.submitJob()>
      ResourceMgrDelegate.submitApplication()> YarnClientImpl.submitApplication()>
      ApplicationClientProtocolPBClientImpl.submitApplication()]


      submitApplication(SubmitApplicationRequest request)
        > requestProto=((SubmitApplicationRequestPBImpl)request).getProto()
                                        //从请求request中取出其协议报文(message)部分
        > resp=proxy.submitApplication(null, requestProto)
                                          //交由proxy将报文发送出去,并等候服务端回应
        > return new SubmitApplicationResponsePBImpl(resp)
                              //将服务端回应包装成 SubmitApplicationResponsePBImpl 对象

这里我们也需要知道两个信息:一是这里proxy的来历;二是proxy.submitApplication()做了些什么。所以我们先重温并看一下ApplicationClientProtocolPBClientImpl类的进一步的摘要:

      class ApplicationClientProtocolPBClientImpl implements ApplicationClientProtocol, …{}
      ]ApplicationClientProtocolPB proxy
      ]ApplicationClientProtocolPBClientImpl(long clientVersion,
                                            InetSocketAddress addr, Configuration conf)
        > RPC.setProtocolEngine(conf, ApplicationClientProtocolPB.class,
                                                            ProtobufRpcEngine.class)
        >> conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class)
              //将配置项“rpc.engine.ApplicationClientProtocolPB”设置成ProtobufRpcEngine
        > proxy=RPC.getProxy(ApplicationClientProtocolPB.class, clientVersion, addr, conf)
        >> ProtocolProxy<T> p=RPC.getProtocolProxy(protocol, clientVersion, addr, conf)
        >>> if(UserGroupInformation.isSecurityEnabled())SaslRpcServer.init(conf)//如果需要加密
        >>> return getProtocolEngine(protocol, conf).getProxy(protocol, clientVersion, …)
                      ==ProtobufRpcEngine.getProxy(protocol, clientVersion, …)
                      //参数protocol ApplicationClientProtocolPB
        >>>> Invoker invoker=new Invoker(protocol, addr, ticket, conf, factory, rpcTimeout,
                      connectionRetryPolicy, fallbackToSimpleAuth)      //创建 Invoker
        >>>> (T)_proxy=Proxy.newProxyInstance(protocol.getClassLoader(),
                                                        new Class[]{protocol}, invoker)
        >>>> return new ProtocolProxy<T>(protocol, _proxy, false)
                              //参数_proxy被赋给ProtocolProxy.proxy
        >> p.getProxy()==ProtocolProxy<T>.getProxy()
        >>> return proxy//返回值被赋给上面的ApplicationClientProtocolPBClientImpl.proxy
                          //这就是由Proxy.newProxyInstance()创建的那个对象
      ]submitApplication(SubmitApplicationRequest request) //见上
        > requestProto=((SubmitApplicationRequestPBImpl)request).getProto()
        > resp=proxy.submitApplication(null, requestProto)
        > return new SubmitApplicationResponsePBImpl(resp)

显然,这个proxy是在ApplicationClientProtocolPBClientImpl类的构造函数中创建的,其类型为ApplicationClientProtocolPB。这创建的过程还真有点复杂。

先是通过RPC.setProtocolEngine()把ApplicationClientProtocolPB这个Protocal的引擎设置成ProtobufRpcEngine。这一步并不复杂,只是把配置块conf中的配置项“rpc.engine. ApplicationClientProtocolPB”动态设置成ProtobufRpcEngine而已。

然后就通过RPC.getProxy()创建ApplicationClientProtocolPB的proxy,这我们在前面已经看见过了。

总之,前面的proxy.submitApplication(),实际上就是由protoc编译生成的ApplicationClientProtocolService.BlockingInterface.submitApplication()。

还要强调,这个proxy存在于用户为提交运行具体应用而起的那个JVM上,它既不属于ResourceManager,也不属于NodeManager,而是一个独立的Java虚拟机,可以是在集群内的任何一台机器上。以示例WordCount为例,使用者在某台机器上键入命令行“yarn WordCount”或直接就键入“java WordCount”,就在这台机器上启动了一个JVM来执行WordCount.class。

通过proxy发出的SubmitApplicationRequest,是以RM节点为目标的,最终经由操作系统提供的网络传输层以TCP报文的方式送达RM所在节点机上的对等层,那上面是ProtoBuf,它会从TCP报文中还原出对端所发送的对象。再往上,那就是同样也实现了ApplicationClientProtocolPB界面的ApplicationClientProtocolPBServiceImpl, ProtoBuf这一层会根据对方请求直接就调用其submitApplication()。

这样,Client一侧对于ApplicationClientProtocolPBClientImpl所提供函数的调用就转化成Server一侧对于ApplicationClientProtocolPBServiceImpl所提供的对应函数的调用。当然,Server一侧函数调用的返回值也会转化成Client一侧的返回值,这就实现了远程过程调用RPC。不言而喻,Client/Server双方的这两个对象必须提供对同一个界面的实现,在这里就是ApplicationClientProtocolPB。

这里还要说明一个问题。从软件结构的角度看,我们常把Client这一边实际提供服务的这一层看成最高层,它的下面是ProtoBuf,再下面才是网络传输层,而且函数调用的层次也遵循着同样的顺序,即:

      YARNRunner.submitJob()              //这是处于顶层的应用层
      ResourceMgrDelegate.submitApplication()//这是RM的代理
      YarnClientImpl.submitApplication()    //YARN框架的Client一侧
      ApplicationClientProtocolPBClientImpl.submitApplication()
                                          //ApplicationClientProtocol 界面
      proxy.submitApplication()           //ApplicationClientProtocolPB界面
      Protocol 内部实现的 submitApplication() //TCP/IP的基础上发送应用层的请求
      SocketTCP/IP                    //这是网络连接的最低层

可是Server这一边就不同了。在Server这一边,结构的层次和函数调用的层次是相反的,结构上处于最底层的Socket和TCP/IP反倒处于函数调用栈的最高层,愈往下调用实质上就愈往结构上的高层走。这是因为TCP/IP报文最初到达的是底层,然后逐层往上递交的过程一般都是通过函数调用实现的,所以层层往下调用的过程反倒变成了层层往上递交的过程。

不过我们倒也没有必要从Socket这一层开始,我们也可以跳过ProtoBuf这一层,因为前面已经讲过了,直接就从ApplicationClientProtocolPBServiceImpl.submitApplication()开始。这里先看一下ApplicationClientProtocolPBServiceImpl的摘要:

      class ApplicationClientProtocolPBServiceImpl implements ApplicationClientProtocolPB{}
      ]ApplicationClientProtocol real
      ]ApplicationClientProtocolPBServiceImpl(ApplicationClientProtocol impl)
        > this.real=impl
      ]getApplicationReport(RpcController arg0, GetApplicationReportRequestProto proto)
        > GetApplicationReportRequestPBImpl request=
                                          new GetApplicationReportRequestPBImpl(proto)
        > GetApplicationReportResponse response=real.getApplicationReport(request)
        > return ((GetApplicationReportResponsePBImpl)response).getProto()
      ]getClusterMetrics(RpcController arg0, GetClusterMetricsRequestProto proto)
        > GetClusterMetricsRequestPBImpl request=new GetClusterMetricsRequestPBImpl(proto)
        > GetClusterMetricsResponse response=real.getClusterMetrics(request)
        > return ((GetClusterMetricsResponsePBImpl)response).getProto()
      ]submitApplication(RpcController arg0, SubmitApplicationRequestProto proto)
        > SubmitApplicationRequestPBImpl request=new SubmitApplicationRequestPBImpl(proto)
        > SubmitApplicationResponse response=real.submitApplication(request)
        > return ((SubmitApplicationResponsePBImpl)response).getProto()

显然,这个对象内部有个成分real,这是个实现了ApplicationClientProtocol界面的某类对象,而ApplicationClientProtocolPBServiceImpl实现的则是ApplicationClientProtocolPB界面。与Client侧比较一下,可知那里实现着ApplicationClientProtocol界面的就是ApplicationClientProtocolPBClientImpl,实现ApplicationClientProtocolPB界面的则是proxy所属的无名类,那是在ProtoBuff层的代码中动态定义的,论结构层次前者在上而后者在下。而调用层次就反过来了,ApplicationClientProtocolPBServiceImpl在上而real所属的类在下。所以real所属的类所在的层次与ApplicationClientProtocolPBClientImpl对等。

这里所看到的所有方法都是通过由real所提供的同名方法完成的,而且都遵循着相同的三步操作模式:先准备一个为具体操作而定义的请求request;然后以此为参数调用real所提供的某个方法,获取所返回的响应response;最后从这response中提取信息,生成上一层规程(proto)所要求的响应报文。

那么这个real对象究竟是什么呢?这里我们只知道,它属于某个实现了ApplicationClientProtocol界面的类,因为ApplicationClientProtocol是个界面而不是类。并且从代码中可以看出,这个对象是在创建ApplicationClientProtocolPBServiceImpl对象时作为参数传下来的。现在的问题是,是谁创建了ApplicationClientProtocolPBServiceImpl?

这个问题要从ClientRMService讲起。ClientRMService是RM这一边专门为Client(而不是NodeManager)服务的,其作用就像是YARN子系统为Client即平台使用者提供服务的窗口,接受用户提交的作业请求就是其最主要的职能。RM在其初始化过程serviceInit()中调用createClientRMService()创建了一个ClientRMService对象。下面是这个类的摘要:

      class ClientRMService extends AbstractService implements ApplicationClientProtocol {}
      ]YarnScheduler scheduler     //这是RM中的作业管理者
      ]RMContext rmContext        //记录着有关RM的种种基本情况
      ]RMAppManager rmAppManager //这是RM中的App管理者
      ]Server server             //作为其基础的RPC server
      ]ClientRMService(RMContext rmContext, YarnScheduler scheduler, …)
      ]serviceInit(Configuration conf)
        > clientBindAddress=getBindAddress(conf)
        > super.serviceInit(conf)
      ]serviceStart()
        > conf=getConfig()
        > YarnRPC rpc=YarnRPC.create(conf)  //创建一个YarnRPC对象
        > this.server=rpc.getServer(ApplicationClientProtocol.class, this, clientBindAddress, …)
          ==HadoopYarnProtoRPC.getServer(Class protocol, Object instance, …)
                                        //创建作为ClientRMService基础的RPC server
        >> RpcServerFactoryPBImpl factory=RpcFactoryProvider.getServerFactory(conf)
        >> return RpcServerFactoryPBImpl.getServer(protocol, instance, addr, conf, …)
        > this.server.start()
        > clientBindAddress=conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
                                                    YarnConfiguration.RM_ADDRESS, …)
        > super.serviceStart()
      ]checkAccess(UserGroupInformation callerUGI, String owner, …)
      ]submitApplication(SubmitApplicationRequest request)
      ]… //定义于ApplicationClientProtocol 界面的其他方法

可以看到,submitApplication()就是它提供的方法之一。更重要的是,从serviceStart()的摘要可见,此种服务是建立在RPC的基础之上的,所以前提就是得创建一个RPC层的Server。但是,同样属于RPC层,因其面向和处理的业务不同,所定义的规程(Protocol)也就不同,从而具体Server的许多细节也就不同。那么,这里要创建的是什么样的Server呢?我们看到这里以ApplicationClientProtocol.class为参数调用rpc.getServer(),意思是要创建实现着ApplicationClientProtocol界面的Server。值得一提的是,在Hadoop的代码中有两个ApplicationClientProtocol:一个是class,那是protoc在编译ProtocolBuf定义文件applicationclient_protocol.proto时自动生成的;另一个是interface,那是Hadoop本身的代码中就有的。虽然二者同名,但所属的package不同,而源文件ClientRMService.j ava中所import的是“… yarn.api.ApplicationClientProtocol”,所以是interface ApplicationClientProtocol。

另外,这里rpc的类型是YarnRPC,但那只是个抽象类,所以实际上是扩充、落实了YarnRPC的HadoopYarnProtoRPC。而HadoopYarnProtoRPC.getServer()则在前面的摘要中已予展开,是通过RpcServerFactoryPBImpl.getServer()获取或创建这个RPC层Server。

所以我们接着看RpcServerFactoryPBImpl.getServer()的摘要,这里有名堂了:

      [ResourceManager.serviceInit()>createClientRMService()>ClientRMService.serviceStart()>
      HadoopYarnProtoRPC.getServer()> RpcServerFactoryPBImpl.getServer()]
      RpcServerFactoryPBImpl.getServer(Class<? > protocol, Object instance,
                    InetSocketAddress addr, Configuration conf,
                    SecretManager<? extends TokenIdentifier> secretManager,
                    int numHandlers, String portRangeConfig)
      > Constructor<? > constructor=serviceCache.get(protocol)
                  //先在 serviceCache中寻找,看有没有实现这protocol 界面的类的构造函数
      > if (constructor==null){    //如果还没有
      >+ String name=getPbServiceImplClassName(protocol)
                                    //获取实现这个界面的类的名称,具体获取的方法见下:
      >+> srcPackagePart=getPackageName(clazz)
                                    //clazz就是protocol,ApplicationClientProtocol.class
                                    //srcPackagePart=="org.apache.hadoop.yarn.api"
      >+> srcClassName=getClassName(clazz) //"ApplicationClientProtocol"
      >+> destPackagePart=srcPackagePart+"."+PB_IMPL_PACKAGE_SUFFIX
                      //destPackagePart=="org.apache.hadoop.yarn.api.impl.pb.service"
      >+> destClassPart=srcClassName+PB_IMPL_CLASS_SUFFIX //"PBServiceImpl"
      >+> return destPackagePart+"."+destClassPart
        //"org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl"
                                                //现在有了实现这个界面的类的完整路径
      >+ Class<? > pbServiceImplClazz=localConf.getClassByName(name) //获取其Class对象
      >+ constructor=pbServiceImplClazz.getConstructor(protocol) //获取这个类的构造函数
                                //constructor=ApplicationClientProtocolPBServiceImpl()
      >+ constructor.setAccessible(true)
      >+ serviceCache.putIfAbsent(protocol, constructor)//缓存在serviceCache,以后就简单了
      > }
      > Obj ect service=constructor.newInstance(instance)
                                //创建ApplicationClientProtocolPBServiceImpl 对象
      > Class<? > pbProtocol=service.getClass().getInterfaces()[0]
                                //这个 Interface就是ApplicationClientProtocolPB
      >Method method=protoCache.get(protocol) //仍是ApplicationClientProtocol
          //protoCache中有没有Protoc为此界面生成的函数newReflectiveBlockingService()
      > if (method==null){ //没有
      >+ Class<? > protoClazz=localConf.getClassByName(getProtoClassName(protocol))
      >+ method=protoClazz.getMethod("newReflectiveBlockingService",
                                            pbProtocol.getInterfaces()[0])
      >+ method.setAccessible(true)
      >+ protoCache.putIfAbsent(protocol, method) //将其缓存在protoCache
      > }
      > return createServer(pbProtocol, addr, conf, secretManager, numHandlers,
                        (BlockingService)method.invoke(null, service),
                        portRangeConfig)
                        //创建ApplicationClientProtocolPBServiceImpl RPC Server

先看调用参数。这里的参数protocol是ApplicationClientProtocol.class, instance是个ClientRMService对象,我们将会看到,正是这个对象将成为前面所说的real。

既然形式参数protocol实际上是ApplicationClientProtocol.class,从中就可以提取其类型名称等信息。在此基础上,这里生成一个中间层即ApplicationClientProtocolPBServiceImpl的类名,并获取其构造函数,以创建一个此类对象。但是这个对象还不是RPC层的Server, RPC层Server其实是由protoc自动生成的一个BlockingService。所以这里又进一步通过createServer()创建一个BlockingService对象作为RPC层Server。表面上这里并未把对于中间层ApplicationClientProtocolPBServiceImpl的引用保存在一个变量中,但是实际上这已经深埋在RPC层Server内部,因为这里的变量service是对newReflectiveBlockingService()做反射式调用(运行时,而不是编译时,按方法名找到这个函数,再加以调用)时的参数。

这样,以后每当RPC层的Server接收到一个定义于ApplicationClientProtocol界面的RPC请求时,就都会以此请求为参数调用ApplicationClientProtocolPBServiceImpl中的相应方法。对于作业提交,这就到了ApplicationClientProtocolPBServiceImpl.submitApplication()。

回看前面ApplicationClientProtocolPBServiceImpl中对于real.submitApplication()的调用,现在我们知道了,real就是ClientRMService,所以这就是ClientRMService.submitApplication()。我们继续往下看。

      [ApplicationClientProtocolPBServiceImpl.submitApplication()
      > ClientRMService.submitApplication()]


      public SubmitApplicationResponse submitApplication(SubmitApplicationRequest request)
                                                              throws YarnException {
          ApplicationSubmissionContext submissionContext=
                                            request.getApplicationSubmissionContext();
          ApplicationId applicationId=submissionContext.getApplicationId();


          //ApplicationSubmissionContext needs to be validated for safety- only those fields
          //that are independent of the RM's configuration will be checked here, those that are
          //dependent on RMconfiguration are validated in RMAppManager.
          String user=null;
          try{
            //Safety
            user=UserGroupInformation.getCurrentUser().getShortUserName();
          }catch(IOException ie){
            LOG.warn("Unable to get the current user.", ie); //不能获取用户名
            RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
            ie.getMessage(), "ClientRMService", "Exception in submitting application", applicationId);
            throw RPCUtil.getRemoteException(ie);
          }
          //Check whether app has already been put into rmContext
          //If it is, simply return the response
          if (rmContext.getRMApps().get(applicationId)! =null){//App已在队列中,是重复提交
            LOG.info("This is an earlier submitted application:"+applicationId);
            return SubmitApplicationResponse.newInstance();
          }


          if (submissionContext.getQueue()==null){ //如果未指定提交到哪个队列
            submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
          }
          if (submissionContext.getApplicationName()==null){ //如果未提供App名称
            submissionContext.setApplicationName(
                YarnConfiguration.DEFAULT_APPLICATION_NAME);
          }
          if (submissionContext.getApplicationType()==null){ //如果未提供App类型
            submissionContext.setApplicationType(
                                          YarnConfiguration.DEFAULT_APPLICATION_TYPE);
          }else { //须检查提供的App类型名称是否太长
            if (submissionContext.getApplicationType().length()>
                            YarnConfiguration.APPLICATION_TYPE_LENGTH){
              submissionContext.setApplicationType(submissionContext.getApplicationType()
                .substring(0, YarnConfiguration.APPLICATION_TYPE_LENGTH));
            }
          }
          try{
            //call RMAppManager to submit application directly
            rmAppManager.submitApplication(submissionContext,
                    System.currentTimeMillis(), user); //App交到了RMAppManager的手里
            LOG.info("Application with id"+applicationId.getId()+"submitted by user"+user);
            RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,
                                          "ClientRMService", applicationId);
        }catch(YarnException e){
          LOG.info("Exception in submitting application with id"+applicationId.getId(), e);
          RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,
          e.getMessage(), "ClientRMService", "Exception in submitting application", applicationId);
          throw e;
        }
        SubmitApplicationResponse response=recordFactory.newRecordInstance(
              SubmitApplicationResponse.class); //创建一个 SubmitApplicationResponse
        return response;
      }

这里的rmAppManager是个RMAppManager类的对象。RMAppManager类对象相当于“中央”的一个部门。ResourceManager要管的事多得很,对于App(作业)的管理只是其中之一,而RMAppManager就是专门管这个事的。如果不考虑容错所需的备份,那么整个Hadoop系统中只有一个ResourceManager,也只有一个RMAppManager,而且就是由ResourceManager所创建的。可见,当一个作业,也就是一个App,被提交到“中央”的时候,是被交到了RMAppManager对象的手里。从作业提交的角度看,一旦进入了RM节点上的RMAppManager.submitApplication(),作业的提交就已完成。至于这以后的处理,那是RM的事了。