大数据处理系统:Hadoop源代码情景分析
上QQ阅读APP看书,第一时间看更新

4.2 ProtoBuf

在像Hadoop这样大量使用RPC的系统中,Protocol的数量应该不少,具体目标函数的数量就更多了。对于每个具体的Protocol,有了前述的RPC类和可以用来生成Server和Proxy的ProtocolEngine后,Client端的编程工作量已可大大降低,但是Server端虽然也降低了却总少不了要有一层远程调用的入口函数,这些入口函数的位置上本来应该就是目标函数本身,但一般在进出目标函数的前后总还是少不了有些哪怕是很简单的处理,所以还是需要有一层类似于“跳板”那样的中转。然而这些中转函数的编程却是份机械而枯燥的工作,还容易出错,维护也麻烦。

不仅如此,ProtocolEngine中还应包括具体protocol所需的报文(message)生成和解析,在采用面向对象语言的系统中这又包括对象的串行化(serialize,也称序列化)和去串行化,因为通过网络传输的信息必须按位串行。对象的串行化和去串行化原理上倒不复杂,但却是相当麻烦的操作。在2.0以前的版本中,Hadoop自己实现了各种protocol的入口函数和对各种protocol对象的串行化和去串行化,整个引擎相当于现在的WritableRpcEngine。但是,从2.0版开始,Hadoop改用了Google的ProtoBuf软件和工具来做这些事,由此而形成的ProtocolEngine就是ProtobufRpcEngine。

ProtoBuf是Protocol Buffer的缩写,原是Google为内部使用而开发的一个软件项目,后来成了开源项目。这个项目旨在为RPC开发一个灵活方便的中间层,其底层是IPC,就像由前述Server和Client构成的通信机制,其上层则是具体的应用。这个中间层的作用,就是在机器节点之间建立支持具体protocol的RPC机制,在本地提供一个跟远地相同的API,并提供实现了这个API的服务器server和代理proxy,使得客户端应用层对定义于这个API的函数(方法)调用转化成相应的请求报文,交由下面的IPC层发送到远地的服务端,在远地又转化成对于那里相同API上对应函数(方法)的调用,再把调用结果返回到客户端,作为对应用层的返回结果。

为此,ProtoBuf项目创造了一种proto语言并提供相应的编译器protoc,让开发者可以方便地用这种语言描述所要实现的protocol,然后通过编译自动生成相应的代码。以YARN用户与RM节点交互所用的ApplicationClientProtocol为例,我们看一下Hadoop代码中的一个.proto文件applicationclient_protocol.proto,这就是为此protocol编写的描述文件:

      option java_package="org.apache.hadoop.yarn.proto";
      option java_outer_classname="ApplicationClientProtocol";
      …
      package hadoop.yarn


      import"Security.proto";
      import"yarn_service_protos.proto";


      service ApplicationClientProtocolService {
        rpc getNewApplication (GetNewApplicationRequestProto)
                                  returns (GetNewApplicationResponseProto);
        rpc getApplicationReport (GetApplicationReportRequestProto)
                                  returns (GetApplicationReportResponseProto);
        rpc submitApplication (SubmitApplicationRequestProto)
                                  returns (SubmitApplicationResponseProto);
        …   //下面还有很多
      }

这个文件定义和描述了一种称为ApplicationClientProtocolService的service,这种服务具体支持包括getNewApplication、getApplicationReport、submitApplication在内的多种RPC调用。以其中的submitApplication为例,客户端在发起RPC调用、请求远方调用其submitApplication()方法时,应向远方发送一个SubmitApplicationRequestProto报文(message);对方在完成操作之后则返回一个SubmitApplicationResponseProto报文。这里并没有涉及这两种报文的格式,那是在另一个文件yarn_service_protos.proto中定义的,所以前面要import"yarn_service_protos.proto"。另外,每个报文中其实都有与信息安全有关的字段,所以也要import"Security.proto"。

还要解释一下这些报文的命名。以SubmitApplicationRequestProto为例,这里的关键字部分是SubmitApplicationRequest,后缀Proto并不表示SubmitApplicationRequest就是一个Protocol,而只是说这是一个“Protocol数据单元”。事实上这只是ApplicationClientProtocol所定义的诸多报文格式之一。

这个文件还通过“java_outer_classname=”这个语句指示编译器:经过编译之后,为这个Service生成的代码应该都放在一个名为ApplicationClientProtocol的外层class之内。另一方面,由于这个文件只定义了一个service,所以在ApplicationClientProtocol类内部的第一层中将只直接定义一个类,那就是“class ApplicationClientProtocolService”。在ApplicationClientProtocolService内部,则将提供客户端和服务端两边的种种操作方法。

不过,具体与报文格式有关的代码不在ApplicationClientProtocol类的内部,因为如上所述那是在另一个.proto文件中定义的。

我们也粗粗看一下yarn_service_protos.proto:

      option java_package="org.apache.hadoop.yarn.proto";
      option java_outer_classname="YarnServiceProtos";
      …
      package hadoop.yarn;
      import"Security.proto";
      import"yarn_protos.proto";


      …
      message SubmitApplicationRequestProto {
        optional ApplicationSubmissionContextProto application_submission_context=1;
      }
      …

这个文件中定义了许多属于YARN子系统的报文格式,这里只摘列了其中的一种,就是用来请求提交作业的SubmitApplicationRequestProto。这种报文中只有一个成分,那是一个ApplicationSubmissionContextProto。因为一共只有这么一个成分,其序号当然就是1。经protoc编译之后,这个文件所定义的内容都在一个名为YarnServiceProtos的class中。

而ApplicationSubmissionContextProto,则又是在另一个.proto文件yarn_protos.proto (见前面的import语句)中定义的:

      option java_package="org.apache.hadoop.yarn.proto";
      option java_outer_classname="YarnProtos";
      …
      package hadoop.yarn;


      import"Security.proto";


      …
      message ApplicationSubmissionContextProto {
        optional ApplicationIdProto application_id=1;
        optional string application_name=2 [default="N/A"];
        optional string queue=3 [default="default"];
        optional PriorityProto priority=4;
        optional ContainerLaunchContextProto am_container_spec=5;
        optional bool cancel_tokens_when_complete=6 [default=true];
        optional bool unmanaged_am=7 [default=false];
        optional int32 maxAppAttempts=8 [default=0];
        optional ResourceProto resource=9;
        optional string applicationType=10 [default="YARN"];
        optional bool keep_containers_across_application_attempts=11 [default=false];
        repeated string applicationTags=12;
        optional int64 attempt_failures_validity_interval=13 [default=-1];
        optional LogAggregationContextProto log_aggregation_context=14;
        optional ReservationIdProto reservation_id=15;
        optional string node_label_expression=16;
        optional ResourceRequestProto am_container_resource_request=17;
      }
      …

除Security.proto以外这个文件没有再导入别的.proto文件,这意味着各种字段的类型除有关安全的之外均可在本文件中找到定义,直到全都被解析成如string、bool、int32那样的基本类型为止,实施串行化/去串行化的代码最终就是针对基本类型的。

例如这里的字段application_id,其类型ApplicationIdProto就定义在同一文件中:

      message ApplicationIdProto {
        optional int32 id=1;             //一个32位整数
        optional int64 cluster_timestamp=2; //一位64位整数
      }

这个文件中定义的内容经编译以后全都在一个名为YarnProtos的class中。

由此可见,service语句相当于Java的interface定义,message语句则相当于纯数据结构的class定义。而编译器protoc,则根据这些语句生成必要的代码,将service语句转变成实现了相应interface的class定义;将原本只相当于数据结构定义的message语句转变成包括操作方法在内的class定义。至于Protobuf所提供的串行化/去串行化功能,以及作为中间层的承上启下,则全都体现在所生成的这些代码中。编译器protoc可以生成Java、C++、Python、Ruby等多种语言的代码,我们在这里只关心Java。

这样,经过protoc的编译,上述的三个.proto文件就被编译成三个Java文件,产生了三个class的代码,即ApplicationClientProtocol、YarnServiceProtos和YarnProtos。其中YarnServiceProtos和YarnProtos是公共的,覆盖了YARN这一层所定义的所有报文格式;而ApplicationClientProtocol则只是YARN的诸多RPC服务所用protocol之一。由于message的种类很多,YarnServiceProtos.java和YarnProtos.java这两个源文件竟各有4万多行!显然,当报文的种类较多时,用来实现串行化/去串行化和合成/解析的代码量是相当大的,若要人工编写确实是个负担,ProtoBuf的作用由此可见。不过我们在这里所关心的并非如何实现串行化/去串行化和合成/解析的细节,而是RPC机制的构建和流程。我们关心的是:服务端和客户端两边的RPC“协议栈(Protocol Stack)”是怎样搭建起来的;进一步,以作业提交即SubmitApplication()为例,一次完整的RPC操作是怎样完成的,要走过怎样一个流程。从这个角度看,ApplicationClientProtocol这个class对我们远远更为重要。当然,YARN子系统中定义的RPC服务很多,更不说还有HDFS子系统中定义的RPC服务,这里只是用ApplicationClientProtocol作为一个实例来说明问题。

我们先分层看一下ApplicationClientProtocol这个class的结构:

      class ApplicationClientProtocol {
      ]abstract class ApplicationClientProtocolService implements com.google.protobuf.Service{}
      ]static com.google.protobuf.Descriptors.FileDescriptor descriptor;

ApplicationClientProtocol是由protoc根据文件ApplicationClientProtocol.proto编译生成的外层class。这个.proto文件中只有一个service语句,所以这个class的内部就只有一个成分,就是ApplicationClientProtocolService,这是个抽象类。之所以是抽象类,是因为其内部对应于Service语句中各RPC子句的操作方法都还有待落实。这个抽象类必须实现ProtoBuf中定义的Service界面,这个界面上的操作方法主要是callMethod(),后面我们将会看到其实现和作用。除此之外,每个这样的外层class内部都有个FileDescriptor,即文件描述块;同时还有个用来读取这个描述块的方法getDescriptor(),不过这里并未列出,我们也不关心。

所以ApplicationClientProtocolService是ApplicationClientProtocol的主体,实质性的内容都在这里面,下面是这个抽象类的摘要:

      abstract class ApplicationClientProtocol.ApplicationClientProtocolService
                                              implements com.google.protobuf.Service{}
      ----------第一部分:对于两个界面的定义:----------
      ]interface Interface {}      //异步操作界面
                //定义于这个 interface中的22个抽象方法同样来自rpc子句,都是三个参数:
      ]]abstract submitApplication(RpcController controller,
                  GetApplicationReportRequestProto request, protobuf.RpcCallback<> done)
      ]]…
      ]interface BlockingInterface { //同步操作界面
                //与上面这个 Interface相对应,同样也是22个抽象方法,但都是两个参数
      ]]abstract submitApplication(RpcController controller,
                      GetApplicationReportRequestProto request)
      ]]…
      -------第二部分:对应于 service语句中各rpc子句的抽象方法--------
                //共有22个,这里只列出其一
                //22个抽象方法都有同样的参数表,都是三个参数,都是面向异步操作
      ]abstract submitApplication(RpcController controller,
                  GetApplicationReportRequestProto request, protobuf.RpcCallback<…> done)
      ]…
      -----第三部分:对com.google.protobuf.Service界面上三个方法的实现------
      ]callMethod(MethodDescriptor method, RpcController controller,
                  Message request, RpcCallback done)//四个参数,用于异步操作
        > switch(method.getIndex()){
        > case 2:
        >+ return this.submitApplication(controller, request, specializeCallback(done))
        > }
      ]getRequestPrototype(MethodDescriptor method)
        > switch(method.getIndex()){
        > case 2:
        >+ return SubmitApplicationRequestProto.getDefaultInstance()
        > }
      ]getResponsePrototype(MethodDescriptor method)
        > switch(method.getIndex()){
        > case 2:
        >+ return SubmitApplicationResponseProto.getDefaultInstance()
        > }
      ------------第四部分:服务端 Server的实现-------------
      ]newReflectiveService(Interface impl) //用来创建ApplicationClientProtocolService对象
                //注意,参数 impl 应该是个实现了上述 Interface界面的某类对象
        > return new ApplicationClientProtocolService() //在此动态落实22个抽象方法的定义
          ]submitApplication(controller, request, com.google.protobuf.RpcCallback<…> done)
            > impl.submitApplication(controller, request, done)
                                  //调用参数 impl,是对上述异步 Interface的某种实现
          ]…  //其他抽象方法的动态定义从略
      ]newReflectiveBlockingService (BlockingInterface impl)
                                  //用来创建实现同步操作界面protobuf.BlockingService的对象
              //注意,参数 impl 应该是个实现了上述BlockingInterface界面的某类对象
        > return new com.google.protobuf.BlockingService(){}
              //BlockingService是个 interface,需要提供所定义函数的实现才能创建对象
              //所定义函数之一是个统一的callBlockingMethod(),用作protocol 的总入口
          ]callBlockingMethod(MethodDescriptor method, RpcController controller,
                            Message request) //与前面callMethod()相似,但只有3个参数
            > switch(method.getIndex()){
            > case 2:
            >+ return impl.submitApplication(controller, request)//这里只有两个参数了
            > }
          ]getRequestPrototype(MethodDescriptor method)
          ]getResponsePrototype(MethodDescriptor method)
      ----------第五部分:客户端 StubProxy的创建-----------
      ]newStub(protobuf.RpcChannel channel) //创建异步的 Stub对象
          > return new Stub(channel)
      ]class Stub extends ApplicationClientProtocolService implements Interface {}
                  //注意,Stub所实现的正是上述的(异步)Interface界面
      ]]Stub(com.google.protobuf.RpcChannel channel) //Stub对象的构造函数
      ]]submitApplication()
      > channel.callMethod(getDescriptor().getMethods().get(2), controller, request, …)
      ]]… //前面 Interface中定义了更多抽象函数,分别对应于.proto文件中的rpc语句
      ]newBlockingStub(protobuf.BlockingRpcChannel channel)//创建同步的BlockingStub对象
        > return new BlockingStub(channel)
      ]class BlockingStub implements BlockingInterface {}
                      //注意BlockingStub所实现的正是上述的(同步)BlockingInterface界面
      ]]BlockingStub(com.google.protobuf.BlockingRpcChannel channel)//构造函数
      ]]submitApplication()
        > return channel.callBlockingMethod(getDescriptor().getMethods().get(2), …)
      ]]…

这个抽象类有五个方面的内容。

第一部分是两个界面(interface)的定义,即Interface和BlockingInterface,其中Interface是异步操作界面,BlockingInterface是同步操作界面。这两个界面各自定义了包括submitApplication()在内的那22个操作方法,但是调用参数有所不同。异步界面Interface所定义的都是异步操作,一调用就立即返回,实际的工作交给一个独立的线程去完成,所以是不阻塞(Non-Blocking)的,但是调用者必须提供一个供回调的操作方法done()(实质上是个函数指针),让那个线程在完成操作之后就调用这个方法,把结果返回给调用者。所以,异步界面上的那些操作方法都有三个参数。而同步界面BlockingInterface所定义的则都是同步操作,其所在线程在调用的过程中可能因某些操作被“阻塞”而进入睡眠等待,一直到操作完成时才被唤醒,所以是Blocking。同步界面上的这些操作方法都只有两个参数,因为无须提供回调方法。

这两个界面是为客户端定义的,客户端的RPC操作都要跨节点进行,整个过程可能需要延续较长的时间,并非立即可以完成,所以才有阻塞/不阻塞,即同步/异步的问题。事实上,后面第五部分的Stub和BlockingStub这两个类分别实现了Interface和BlockingInterface这两个界面。

第二部分是包括submitApplication()在内的22个抽象操作方法,分别对应着ApplicationClientProtocol.proto中service语句里面的22个RPC子句,那就是服务端,即RM节点上为客户端提供的与Application直接相关的22种RPC,这是尚未落实的。

第三部分是对于com.google.protobuf.Service这个界面所定义的三个方法的实现,包括callMethod()、getRequestPrototype()、getResponsePrototype()。其中callMethod()相当于一个统一的门户,将一个方法描述块MethodDescriptor作为(第一个)参数交给callMethod(),它就根据描述块中的序号和该界面上所定义方法的列表调用相应的方法,其余的参数则被用作调用相应方法时的参数。由于调用相应方法的过程同样可能很长,所以也有同步/异步的问题;而com.google.protobuf.Service是一个异步界面,因而也要提供回调函数。至于getRequestPrototype()和getResponsePrototype()就比较简单了,那只是根据具体的方法,即具体的RPC操作获取用作请求报文和响应报文的样板,这样的操作不需要长时间等待,是立即可以完成的,所以即使属于异步界面也无须提供回调函数。其实ProtoBuf还提供一个与此相应的同步界面protobuf.BlockingService,下面我们就会看到。

第四部分是对服务端Server即RPC提供者的实现。这里提供了创建两种Server的方法,一种是异步的,另一种是同步的。通过newReflectiveService()可以在服务端创建一个异步的Server,通过newReflectiveBlockingService()则可以创建一个同步的Server。两种手段都提供,任由开发者选用。

前面讲到客户端即RPC调用发起端的同步和异步。异步RPC调用意味着调用者线程一经调用不等结束就可返回而无须睡眠等待,此时可以安排这个线程先干点别的,RPC结束返回时自会调用其回调函数,让调用者线程在回调函数中恢复同步。

那么服务端的同步和异步又是什么意思呢?我们在前面IPC层的Server中看到,服务端有专门的Call接收线程和Call处理线程,其中接收比较简单,因为当一个请求到达服务端所在的节点时其操作系统底层会唤醒这个接收线程,接收线程把Call请求读进来就把它挂在一个队列callQueue中,而处理线程Handler则从callQueue摘下Call请求并加以处理,包括调用RPC目标函数,处理之后还要发回响应报文,返回RPC结果。然而对于RPC目标函数的执行可能会比较复杂,时间上也可能会比较长,那么是让这个Handler线程直接去调用相关的过程,在其自身的上下文中完成整个操作,还是让它交给别的线程去调用相关的过程?这就是区别,前者是同步的过程,后者是异步的过程。如果采用同步调用,那么这个Handler线程要等目标函数返回时才能去callQueue中摘取下一个Call请求,这样在负载比较重的时候就可能对“并发连接数”产生一些不利影响。

再看这两个方法函数的实现方式。

其中newReflectiveService()所创建的是一个ApplicationClientProtocolService类对象,但是ApplicationClientProtocolService是个抽象类,它的包括submitApplication()在内的22个操作方法都是抽象方法,所以这里就动态地加以定义落实。上面的摘要中只是列出了其中之一,即submitApplication(),实际上这22个操作方法的落实就好像是由同一个模板印出来似的,都是转手调用impl这个对象中的同名操作方法。那么这个impl是什么呢?这是调用newReflectiveService()创建对象时的参数,应该是个实现了上述(第一部分中所定义) Interface界面的某类对象,可以是通往RPC目标函数的跳板,也可以直接就是RPC目标函数,这要看具体的实现,这个类的代码当然不是ProtoBuf所能生成的。

调用newReflectiveBlockingService()时的参数impl则是个实现了上述BlockingInterface界面的某类对象。后面我们将会看到,在Hadoop的代码中,这实际上是一个ApplicationClientProtocolPBServiceImpl类的对象,不过这个类所直接实现的是ApplicationClientProtocolPB界面,那是对于上述BlockingInterface的扩充。注意,调用newReflectiveBlockingService()时所创建的是一个实现了protobuf.BlockingService界面的对象,它采用的是统一的入口函数callBlockingMethod(),但是在这个方法内部的switch语句中会按具体方法在该界面上的序号调用impl内部的相关操作方法。以submitApplication()为例,在ApplicationClientProtocolPB即BlockingInterface界面上这是2号操作方法(从0开始编号),所以在callBlockingMethod()内的case语句中会转而调用impl.submitApplication(),即ApplicationClientProtocolPBServiceImpl.submitApplication()。

我们可以看到,newReflectiveService()和newReflectiveBlockingService()这两个函数名中都有Reflective这个词。之所以如此,应该是如我们在前面所见,对目标函数的调用终究是利用Java的Reflection机制实现的。

最后,第五部分是为客户端准备、用来为客户端创建stub的。所谓stub,词典上是“树桩”、“蒂头”的意思。其实这个词常被水管工用来表示这样的意思:一个水管上加了个接头,准备接上另一根水管通向某处,但是那根水管暂时还没有准备好,就先用个塞子把这接口塞住。这个意思被引申到软件实践上,则有(用来模拟实际情况或者留下外接手段的)“转接点”、“端接点”的意思。ApplicationClientProtocol这个类的内部定义了两个类,即Stub和BlockingStub,可以通过newStub()和newBlockingStub()分别创建异步和同步两种stub。注意Stub和BlockingStub所实现的界面就是前面第一部分定义的那两个界面。

创建时的参数,也就是stub的下一层,分别是一个实现了protobuf.RpcChannel或protobuf.BlockingRpcChannel界面的某类对象。这两个界面都只定义了一个方法,就是callMethod()或callBlockingMethod()。

Protobuf所说的同步和异步是仅就ProtoBuf这个局部而言的,所以Protobuf所说的异步确实就是异步,而Protobuf所说的同步却仍可以在总体上是异步的。我们在前面看到服务端有三个线程,这在总体上已经是异步的了。

至于ProtoBuf下面的IPC层,本意是需要由实际应用的开发者自己提供的,前面看到的Server和Client就是IPC层的一种实现。但是后来Google又搞了一个底层的项目叫gRPC,用来与ProtoBuf配套,说是更能配合得天衣无缝。不过IPC这一层本来就不太复杂,所以Hadoop至少直到2.7.1版仍在使用自己的IPC层实现,那就是前面的Server和Client。

其实Hadoop并没有照单全收由ProtoBuf提供的种种选择,而是从中选取它认为合适的素材,结合Java语言Reflection机制所提供的Proxy技术,开发出了自己的RPC机制。总的来说,Hadoop采用了ProtoBuf所提供的报文生成/解析和串行化/去串行化功能,服务端大体上采用了ProtoBuf所提供的BlockingService界面,但是客户端采用的是Reflection机制所提供的Proxy技术。