大数据处理系统:Hadoop源代码情景分析
上QQ阅读APP看书,第一时间看更新

3.4 状态机

如前所述,YARN主节点RM受理用户提交的作业之后要为其创建一个AppManager,即AM,还要为其分配资源,然后要密切注视作业的进展,最后还要处理善后。这显然需要在主节点上维持关于这个作业的上下文,因为作业有它自己的生命周期,从其产生到推进,再到消亡,是一个过程,在此过程中作业的状态在改变,主节点上需要有些与此相适应的操作。通常,像这样得到受理并且正在推进中的作业不是只有一个而是有很多个。显然这些作业的生命周期有时间上的重叠,因而存在着并发。也就是说,RM管理着诸多并发的作业。对于并发的过程,如果过程复杂而粒度细小,就只能用线程或进程的方法加以实现。但是,对于相对简单并且粒度较粗的过程,则也可以采用一种简单一些的实现方法,那就是状态机。而RM对于作业的管理,就比较适合采用状态机的方法。

这样的情况在YARN中也不只是RM一处。以AM为例,它就管理着属于同一个作业的诸多“任务(task)”,这些任务分布在许多节点上,有着大致相同的生命周期,所以每个AM都管理着诸多并行或并发的任务。注意不要混淆,这些任务本来就是作为独立的JVM进程在运行的,这里说的是AM对这些任务的管理,例如发现某个任务停滞不前了就要安排后备替换,任务运行结束了就要安排善后事宜,等等。再例如,即使就某个具体的Mapper任务而言,也需要先进行资源本地化,可能需要从若干不同的节点复制程序映像和数据,这又是一些并发的过程。

所以YARN的代码中广泛使用着状态机(这也是2.0版前后的一个显著区别),我们有必要先分析一下这些状态机的实现,以期对这些状态机的作用有比较深入的理解。

这里所讲的“状态机(state machine)”,当然是对“有限状态机”的简称。状态机其实也是一种并发机制,试想如果一个过程可以从开头一直走到结束,而不是“走走停停”,那就没有必要用状态机了。反过来,如果每一个过程都是通过一个独立的进程或线程加以执行,那么这个进程或线程本身就相当于一个(状态数量极多且粒度极小的)状态机。所以,在某种意义上,可以把进程和线程看成由操作系统运行的状态机。但是,如果要由应用层软件去管理多个这样的过程,而且过程又不太复杂,粒度也比较粗,那么为每个过程建立一个状态机也是不坏的选择。

在状态机模型中,一个宏观的过程被抽象成一台机器,其结构包括一组“状态”、一组触发规则和一组操作。只要不是正在处理某个事件的过程中(这是微观的过程),这台机器就总是停留在其中的某个状态上。然后,如果有什么事件发生,这台机器就受到“触发”,这时候它就要查一下规则,看看在当前的状态下遇到这样的事件是否需要处理一下,做点什么,以及是否需要跳变到另一个状态,如果需要就加以执行。状态机对于事件的处理,包括状态的跳变,是“原子”的,即不可分割的。这就是说,如果状态机在“做点什么”的过程中又来了一个新的事件,那么暂时就不加理会,要到做完了对本次事件的处理并完成状态跳变(如果需要的话)以后才又可受到触发。这就像操作系统不允许中断服务嵌套一样。

通常在讲述状态机的书上都会把状态机画成图,图上的“节点”代表状态,而带箭头的“弧”则代表跳转规则,即在什么事件的触发下会做些什么并跳转到什么节点(状态)。这对于简单的状态机倒挺合适,可是对于比较复杂的状态机就不太合适了,因为那样的状态图看上去可能会像是一团乱麻,还不如看着跳转表更觉清晰。

从程序设计的角度看,定义一组状态是简单的,那就是一个枚举(enum)类型,定义一组事件也一样。而跳转规则就麻烦一些,但也并不复杂,因为那就是一个表,或者说一个结构数组。表的每一行代表一条规则,如果定义了M种状态和N种事件,那么理论上这个表就应该有(M×N)行,但是在具体实现时有些行或许可以合并。一般而言,在一行中应该有4个字段,或者说这个表应该有4列,那就是当前状态、(到来的)事件类型、下一状态和所需的操作处理。至于里面具体的内容,那就是具体流程的问题,而不是状态机这种模型和机制的问题了。

最后,怎样使用这张跳转表,怎样完成所规定的操作和跳变,这就都需要由程序来实现了,其中之一称为Dispatcher。Dispatch本是“派遣”和去向什么地方的意思,发生了一个什么事件时,是Dispatcher根据这事件的类型和参数确定应该用来驱动哪一个状态机,并将其交给这状态机的“引擎”。

在Hadoop的代码中,具体的状态机,特别是其跳转表,都是由程序动态生成的,而不是静态预定的。用来生成状态机及其跳转表的Java类是StateMachineFactory,即“状态机工厂”。之所以是这样,以作业为例,Hadoop系统中可能有很多个作业,每个作业都得有自己的状态机,这些状态机有相同的跳转表(因而有相同的状态集合、相同的触发事件集合和操作集合),但是每个状态机所处的状态可能不同。显然,两个作业不能共享同一个状态机,但是却可以共享同一个状态机工厂,因为它们的跳转表是一样的。再说,我们无法事先估计作业的数量,而只能来一个作业就为其动态“生产”一个状态机。下面是StateMachineFactory这个类的摘要。

      class StateMachineFactory< OPERAND, STATE extends Enum<STATE>,
                    EVENTTYPE extends Enum<EVENTTYPE>, EVENT >{}
              //这是模版(Template)式类型定义,参数类型不同就有不同的 StateMachineFactory
      ]TransitionsListNode transitionsListNode
      ]Map<STATE,
            Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>
            stateMachineTable       //状态机的跳转总表,表中对于每个状态都有一个子表
      ]STATE defaultInitialState    //状态机的默认初始状态


      ]class TransitionsListNode{}
      ]class ApplicableSingleOrMultipleTransition<…>{}
      ]class SingleInternalArc implements Transition<…> {}
      ]class MultipleInternalArc implements Transition<…> {} // SingleInternalArc类似
      ]class ApplicableSingleOrMultipleTransition<…> implements ApplicableTransition<…>{}
      ]class InternalStateMachine{}
      ]]OPERAND operand
      ]]STATE currentState
      ]]doTransition(EVENTTYPE eventType, EVENT event)
        > currentState=StateMachineFactory.this.doTransition(operand, currentState,
                                                                  eventType, event)

对StateMachineFactory类的定义是个模版(Template)定义。任何状态机工厂都是针对着特定四个要素的,那就是<OPERAND, STATE, EVENTTYPE, EVENT>。这里OPERAND是状态机拥有者的类型,说明具体的状态机是为谁所用。STATE说明这台状态机有些什么状态,这通常是一个枚举类型。EVENTTYPE是用于这台状态机的事件类型,通常也是一个枚举类型。而EVENT,则是具体事件所属的class。只要有其中任何一个要素的类型不同,那就是一种不同的状态机工厂,从而生产出来的状态机也就不同。不过这种不同只是对象类型上的不同,就好比只是所加工的材料不同,工厂及其产品(在这里就是状态机)的结构和形态都是一样的。那么这些要素的类型到什么时候才确定呢?这要到创建具体的StateMachineFactory对象的时候。我们不妨看两个例子:

      class JobImpl implements …{}
      ]StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
                                                                stateMachineFactory


      class LocalizedResource implements …{}
      ]StateMachineFactory<LocalizedResource, ResourceState,
                          ResourceEventType, ResourceEvent> stateMachineFactory
          //对于LocalizedResource, EVENTTYPEResourceEventType,EVENTResourceEvent

在JobImpl和LocalizedResource内部都有StateMachineFactory,这两个状态机工厂的结构相同,但是因为具体的用途不同,两个状态机工厂的类型是不一样的。模版定义中的OPERAND,在JobImpl.stateMachineFactory中是个JobImpl类的对象,而在LocalizedResource.stateMachineFactory中则是个LocalizedResource类对象。模版定义中的STATE,对于前者是为JobImpl的状态机定义的一组状态(通常是个枚举类型),对于后者则是为LocalizedResource的状态机定义的一组状态(另一个枚举类型)。余可类推。

但是这些要素类型的不同并不妨碍二者同为StateMachineFactory,因为它们内部的结构成分相同,提供的操作方法也相同。

StateMachineFactory类最重要的内部数据成分是stateMachineTable,这是一个MAP,就是映射表,或者说是便查表,是若干二元组的集合。这个MAP的类型定义也是模板型定义,而且更复杂,这里要加一点解释。

首先,stateMachineTable是一个映射表,表中的每个元素都是一个二元组,让你给定一个STATE就可得到它的映射。但是,如上所述STATE的具体类型可以不同,它所映射的目标的类型也可以不同,所以MAP的类型定义也是模板型定义。我们可以想象,资源管理者RM所用状态机中的状态类型,与资源本地化过程所用状态机中的状态类型,理所当然是不一样的,可是因此就要定义许多不同的MAP,那就太麻烦了,这就是模版的意义所在。那么这里STATE的映射是什么呢?是另一种MAP,那也是一个映射表,是从EVENTTYPE到Transition的映射表,让你可以根据事件类型查到相应的跳变说明。而Transition则是关于一次具体跳变的说明,其内容包括伴随着跳变需要执行的操作,也即对于事件的反应,以及状态机在跳变后所处的状态。

这样,如果我们把第一层的MAP即stateMachineTable展开,并把MAP和Transition的内容用方括号框起来,那么就是这个样子:

      MAP stateMachineTable [
        STATE, MAP transitionMap [
                EVENTTYPE, Transition
                EVENTTYPE, Transition
                …  //别的事件类型,别的跳变
                ]
        STATE, MAP transitionMap [
                …
                ]
        … //别的当前状态,别的transitionMap
      ]

可以这样理解,stateMachineTable是个二列表,表中的每一行都是个STATE和MAP的二元组,状态机中定义了多少种状态,这个表中就有几行。这个STATE代表着状态机的当前状态,每个不同的状态都决定了一个第二层的MAP,这里称之为transitionMap,代表着一组跳变规则。然后,这个第二层的MAP又是一个二列表,这个表中的每行都是EVENTTYPE和Transition的二元组,使每个事件类型都对应着一种跳变。这种对应是有条件的,只有当状态机处于当前这种状态时,这样的事件才对应着这样的跳变。如果是在另一种当前状态,同样的事件就可能对应着另一种跳变了。

于是,如果发生了针对某状态机的事件,就可以在该状态机的stateMachineTable中找到其当前状态所对应的transitionMap;再根据事件类型在该transitionMap中找到相应的跳变说明Transition,该Transition给出了需要执行的操作以及跳变后的状态。

一个Transition描述了一种跳变,其构成要素有二:一是伴随着跳变的操作;二是跳变后的状态。但是,针对不同的状态机,虽然同是关于跳变的描述,并且结构相同,Transition的类型定义却涉及四个因素的类型差异,所以Transition的类型也是模版形式的“Transition<OPERAND, STATE, EVENTTYPE, EVENT>”。要理解为什么涉及四种因素的类型差异,我们不妨通过几个实例看看Transition的定义和实现。首先,Transition不是一个类,而是一个界面,所以凡是程序代码中用到Transition的其实都是指实现了此种界面的某类对象。

      interface TransitionOPERAND, STATE extends Enum<STATE>,
              EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
        STATE doTransition(OPERAND operand, STATE oldState,
                                      EVENT event, EVENTTYPE eventType);
      }

这个界面只定义了一个方法函数doTransition(),这个函数有四个参数,这些参数的类型可以因不同的状态机而不同,因而这四者就都出现在Transition的模版式定义中,并且也出现在前面StateMachineFactory的类型定义中。

那么有哪些类是实现了这个界面的呢?有两个,都定义在StateMachineFactory内部:

      class StateMachineFactory<…> {}
      ]class SingleInternalArc implements Transition<OPERAND, STATE,
                                                      EVENTTYPE, EVENT>{}
      ]]STATE postState
      ]]SingleArcTransition<OPERAND, EVENT> hook; //transition hook,操作挂钩
      ]]doTransition(OPERAND operand, STATE oldState, EVENT event, EVENTTYPE eventType)
        > if (hook! =null)hook.transition(operand, event)
                                          //通过操作挂钩执行该跳变的transition()函数
        > return postState
      ]class MultipleInternalArc implements Transition<OPERAND, STATE,
                                                        EVENTTYPE, EVENT>{}
      ]]Set<STATE> val idPostStates
      ]]MultipleArcTransition<OPERAND, EVENT, STATE> hook; //transition hook,操作挂钩
      ]]doTransition(OPERAND operand, STATE oldState, EVENT event, EVENTTYPE eventType)
        > postState=hook.transition(operand, event)//通过操作挂钩执行跳变的transition()函数
        > if (! val idPostStates.contains(postState)){
        >+ InvalidStateTransitonException(oldState, eventType)
        > }
        > return postState

StateMachineFactory内部定义了SingleInternalArc和MultipleInternalArc这两种“弧(Arc)”的类型。弧在状态图中代表着跳变,所定义的这两种弧都实现了Transition界面,都提供一个doTransition()函数。二者的区别是,SingleInternalArc是从单个oldState到单个postState的跳变,而MultipleInternalArc可以是从同一个oldState到多个postState之一的跳变,具体取决于执行hook.transition()的结果。

以SingleInternalArc为例,可想而知其内部成分postState和hook都是在构造函数中得到设置的。其中postState是跳变后的状态;而hook意为操作挂钩,其实是一个SingleArcTransition对象,doTransition()调用hook.transition(),就是调用由此对象提供的transition()函数。

从上面这段代码摘要中可见,作为类型模版,Transition对于OPERAND等四个因素的依赖跟doTransition()的参数类型一致,也跟StateMachineFactory所依赖的那四个类型一致。我们在前面已经通过实例看了在实际的(而不是模版的)StateMachineFactory类型定义中这些抽象类型究竟可以是什么。

到现在为止,我们还只是讲了StateMachineFactory的数据部分和类型定义部分,还没有讲到它的操作部分,所以还要看一下StateMachineFactory操作方法部分的摘要:

      class StateMachineFactory<…> {}
      ]StateMachineFactory(STATE defaultInitialState)  //构造函数之一
        > this.transitionsListNode=null
        > this.defaultInitialState=defaultInitialState
        > this.optimized=false
        > this.stateMachineTable=null
      ]StateMachineFactory(                      //构造函数之二
              StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that,
              ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> t)
        > this.defaultInitialState=that.defaultInitialState
        > this.transitionsListNode=new TransitionsListNode(t, that.transitionsListNode)
        > this.optimized=false
        > this.stateMachineTable=null
      ]StateMachineFactory(…)                    //构造函数之三
      ]makeStateMachineTable() //生成跳转表
      ]addTransition(…) //在当前 StateMachineFactory的基础上添加单弧跳转规则
        > s=new SingleInternalArc(postState, hook)//创建一个指明目标状态和伴随操作的单弧
        > a=new ApplicableSingleOrMultipleTransition<…>(preState, eventType, s)
                                              //创建包含此单弧的跳变规则
        > new StateMachineFactory<…>(this, a)    //创建加上该规则的新StateMachineFactory
      ]addTransition(…)        //在当前 StateMachineFactory的基础上添加多弧跳转规则
      ]installTopology()
        > return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, true)
        >> this.defaultInitialState=that.defaultInitialState
        >> this.transitionsListNode=that.transitionsListNode
        >> this.optimized=optimized
        >> if (optimized)makeStateMachineTable()
        >> else stateMachineTable=null
      ]doTransition(OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event)
        >Map<…> transitionMap=stateMachineTable.get(oldState)
                                              //根据当前状态获取transitionMap
        > if (transitionMap! =null){
        >> Transition<…> transition=transitionMap.get(eventType)
                  //根据事件类型获取跳转规则,通常是一个 SingleInternalArc对象
        >> if(transition! =null)return transition.doTransition(operand, oldState, event, eventType)
                                              //调用该跳转规则的doTransition()方法
        > }
      ]make(OPERAND operand, STATE initialState) //生成一台针对具体应用的状态机
        > new InternalStateMachine(operand, initialState)
        >> this.operand=operand
        >> this.currentState=initialState
        >> if (! optimized)maybeMakeStateMachineTable()
        >>> if (stateMachineTable==null)makeStateMachineTable()
        ]generateStateGraph(String name)       //生成代表着状态机的状态图

凡是要使用状态机的模块(class),首先必须要调用StateMachineFactory的构造函数之一,创建一个基本空白的StateMachineFactory对象,然后通过addTransition()一条条添加跳变规则。每次调用addTransition()时,都会通过上面的构造函数之二在原有的StateMachineFactory对象的基础上创建并返回一个新的StateMachineFactory对象。所以StateMachineFactory对象是immutable。同样,StateMachineFactory内部的TransitionsListNode也是immutable,每次调用addTransition()就在原有基础上创建一个新的TransitionsListNode,而所添加的跳变规则就积累在TransitionsListNode中。最后,完成了规则的添加之后,就调用installTopology(),这个函数又调用StateMachineFactory的构造函数之三,这次就会调用makeStateMachineTable(),根据TransitionsListNode的内容创建跳转表。注意,这样形成的是状态机工厂StateMachineFactory,而不是状态机InternalStateMachine,所以最后还得调用这里的make()方法创建状态机。创建状态机时在InternalStateMachine的构造函数中也可以构建跳转表。当然,跳转表只需构建一次,要么在创建状态机工厂时构建,要么在创建状态机时构建,这取决于StateMachineFactory内部的一个变量optimized,如果未经调用installTopology(),这个变量就是false,那就在通过make()创建状态机的时候再创建。

源代码中涉及很多模板式即“泛型”的定义和引用,做摘要的时候不得不用省略号取代,否则就太长了,读者不应只满足于阅读摘要,应该结合着摘要去看源代码。

光看StateMachineFactory的摘要或代码也许还是不甚了了,最好还要结合实际的状态机看一下。为此我们以Hadoop代码中最简单的状态机,即LocalizedResource的状态机作为实例来做进一步的讲解分析。

一个LocalizedResource对象代表着一份需要本地化的具体资源,这里所谓的“资源”是指数据文件或可执行程序。每一份这样的资源都只有四个状态,即INIT、DOWNLOADING、LOCALIZED、FAILED。这些状态的意义不言自明,因为所谓“本地化”就是去别的节点下载,而下载可以成功也可能失败。定义于资源本地化的事件则有五种:REQUEST、LOCALIZED、RELEASE、LOCALIZATION_FAILED和RECOVERED。

我们先看LocalizedResource对象的创建,当然这个类有它的构造函数LocalizedResource()。但是,如果一个类的内部定义了某些结构成分并且有赋值,那么在具体创建一个对象时首先执行的是这些结构成分的赋值,然后才是执行其构造函数。特别地,如果一个类的内部有静态成分,那么更是在创建该类的第一个对象之初就先创建这些静态成分。这些静态成分供该类的所有对象共享,以后再创建该类对象时就可直接加以引用。

下面是LocalizedResource类定义的摘要:

      class LocalizedResource implements EventHandler<ResourceEvent> {}
      ]StateMachine<ResourceState, ResourceEventType, ResourceEvent> stateMachine
      ]static StateMachineFactory<LocalizedResource, ResourceState,
                                ResourceEventType, ResourceEvent> stateMachineFactory=
                new StateMachineFactory<LocalizedResource, ResourceState,
                                  ResourceEventType, ResourceEvent>(ResourceState.INIT)
                .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
                  ResourceEventType.REQUEST, new FetchResourceTransition())
                …installTopology()     //添加了很多跳变规则之后调用 installTopology()
      ]LocalizedResource(LocalResourceRequest rsrc, Dispatcher dispatcher)
        > this.rsrc=rsrc
        > this.dispatcher=dispatcher
        > this.ref=new LinkedList<ContainerId>()
        > this.stateMachine=stateMachineFactory.make(this)
      ]…

LocalizedResource内部有两个与状态机有关的结构成分,其中之一是stateMachine,这是实现了界面StateMachine的某类对象,至于究竟是什么类型我们后面会看到;另一个就是StateMachineFactory类对象stateMachineFactory。这二者的类型定义都是模版式的,因为是用于资源本地化,所以OPERAND、STATE等四个抽象类型(泛型)在这里落实为LocalizedResource、ResourceState、ResourceEventType和ResourceEvent。

先看stateMachine,它既非静态变量,也并未赋值,所以在创建LocalizedResource对象之时这个成分的值为null。

再看stateMachineFactory,这是个静态成分,而且又有赋值,我们把前面摘要中省略的语句全文列出于下:

      private static final StateMachineFactory
                <LocalizedResource, ResourceState, ResourceEventType, ResourceEvent>
        stateMachineFactory=
          new StateMachineFactory<LocalizedResource, ResourceState,
                        ResourceEventType, ResourceEvent>(ResourceState.INIT)
                                            //初始状态为 INIT,下面一条条添加跳变规则
      .addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
                        ResourceEventType.REQUEST, new FetchResourceTransition())
      .addTransition(ResourceState.INIT, ResourceState.LOCALIZED,
                        ResourceEventType.RECOVERED, new RecoveredTransition())


      //From DOWNLOADING (ref > 0, may be localizing)
      .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING,
                        ResourceEventType.REQUEST, new FetchResourceTransition())
                                    //TODO:Duplicate addition! !
  .addTransition(ResourceState.DOWNLOADING, ResourceState.LOCALIZED,
                      ResourceEventType.LOCALIZED,
                                    new FetchSuccessTransition())
  .addTransition(ResourceState.DOWNLOADING, ResourceState.DOWNLOADING,
                      ResourceEventType.RELEASE, new ReleaseTransition())
  .addTransition(ResourceState.DOWNLOADING, ResourceState.FAILED,
                  ResourceEventType.LOCALIZATION_FAILED, new FetchFailedTransition())


  //From LOCALIZED (ref >=0, on disk)
  .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
                  ResourceEventType.REQUEST, new LocalizedResourceTransition())
  .addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
                  ResourceEventType.RELEASE, new ReleaseTransition())
  .installTopology();

注意,这里从头到尾只是一个语句,一个很长、看起来似乎有点怪异的语句,Hadoop代码中的状态机都是这样创建的。这个语句先调用StateMachineFactory类的构造方法,创建一个空白的StateMachineFactory对象,然后调用StateMachineFactory.addTransition(),在其跳转表中添加跳转规则。如前所述,addTransition()所返回的仍是StateMachineFactory,于是就可以一条一条往下加,直到最后调用installTopology(),那仍旧还是返回StateMachineFactory,因而最后可以赋值给变量stateMachineFactory。

如果把跳转表的内容画成状态图,那么每增加一条跳转规则就相当于在图上增添了一条表示跳转的弧。除起点和终点之外,与一次跳转相联系的还有触发事件和作为响应的操作。这是addTransition()的调用界面:

      [StateMachineFactory.addTransition()]


      StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>
      addTransition (STATE preState, STATE postState, EVENTTYPE eventType,
                                  SingleArcTransition<OPERAND, EVENT> hook)

首先这个函数的返回值仍是一个StateMachineFactory。这很好理解:给一个状态机工厂增添一条跳变规则,所得到的仍是一个状态机工厂。这看似是在为状态机工厂增添规则,实际上这些规则都会出现在由此工厂生产的状态机中,所以这实质上是在为将来要生产创建的状态机增添规则。函数的调用参数是四个:preState和postState是跳变前后的状态;eventType是引起跳变的事件类型,更确切地说,是当状态机处于preState状态的条件下可以引发去往postState状态的事件类型;最后一个参数hook给定了伴随着跳变的操作提供者。

参数hook的类型是SingleArcTransition,但这只是个interface,所以应该是个实现了这个界面的某种类型。其实SingleArcTransition这个界面只定义了一种操作transition(),这就是伴随着跳变的操作:

      public interface SingleArcTransition<OPERAND, EVENT> {
        public void transition(OPERAND operand, EVENT event)
      }

所以参数hook应该是实现了这个界面,即提供transition()操作方法的某类对象。注意, transition()这两个参数的类型都是模板中使用的抽象类型,在这里落实为LocalizedResource和ResourceEvent。这里要说明,函数transition()只是伴随着跳变的操作,而不是跳变本身。状态机并非因为执行了这个函数才发生跳变,而是因为状态机对于跳变规则的执行。

那么实现了SingleArcTransition界面的类究竟是什么呢?这有很多,具体到资源本地化的状态机,我们以前面代码中的最后一次addTransition()调用为例:

      addTransition(ResourceState.LOCALIZED, ResourceState.LOCALIZED,
                      ResourceEventType.RELEASE, new ReleaseTransition())

这次调用所增添的规则是这样的:如果状态机的当前状态preState是LOCALIZED,发生了类型为ResourceEventType.RELEASE的事件,则发生以LOCALIZED为目标的跳变,所以目标状态postState仍保持LOCALIZED不变,跳变时执行由一个ReleaseTransition对象所提供的操作,那就是ReleaseTransition.transition()。

那么ReleaseTransition是个实现了SingleArcTransition界面的类吗?是的:

      class LocalizedResource implements EventHandler<ResourceEvent> {}
      ]class ReleaseTransition extends ResourceTransition {}
      ]]void transition(LocalizedResource rsrc, ResourceEvent event)

此刻我们还不关心LocalizedResource.ReleaseTransition.transition()究竟做些什么,但是我们看到ReleaseTransition是定义于LocalizedResource内部的一个类,这个类是对ResourceTransition的扩充,而ResourceTransition则实现了SingleArcTransition界面。实际上ResourceTransition是个抽象类,真正实现了SingleArcTransition界面并提供transition()函数的,在这里就是ReleaseTransition。所以上述addTransition()调用的最后一个参数hook就是临时通过new操作创建的ReleaseTransition对象。

同样的道理,前面的第一次addTransition()调用所添加的规则是:如果当前状态为INIT,而且发生了事件ResourceEventType.REQUEST,就执行FetchResourceTransition.transition(),并跳转到状态DOWNLOADING,因而状态机的当前状态就从INIT变成DOWNLOADING。

这样,在创建一个LocalizedResource对象的时候,在调用其构造方法之前就已经把它的StateMachineFactory,包括其跳转表准备好了。当然,凡是像LocalizedResource这样需要创建状态机的类,都得通过程序代码往跳转表中放上自己的跳变规则,以形成自己的StateMachineFactory。需要本地化的资源可能会有很多,每项资源都要有个状态机,这些状态机都将有相同的跳转表。

还要说明,这里的stateMachineFactory是LocalizedResource类的静态成分,那是由整个类所共享的,但stateMachine却不是,每个具体的LocalizedResource对象都需要创建自己的状态机stateMachine。这当然很好理解,资源的本地化本来就是每项具体资源的事,是互不相干的事。由此可知,虽然StateMachineFactory为数不多,但是状态机的数量可以很大。

注意,至此为止只是状态机工厂的创建,这是在调用LocalizedResource类的构造函数之前发生的,这里还没有涉及状态机的创建。

完成了内部数据成分的初始化之后,就要调用LocalizedResource的构造函数了:

      public LocalizedResource(LocalResourceRequest rsrc, Dispatcher dispatcher){
        this.rsrc=rsrc;           //这是具体的资源请求
        this.dispatcher=dispatcher; //准备用于这个状态机的Dispatcher
        this.ref=new LinkedList<ContainerId>();
        ReadWriteLock readWriteLock=new ReentrantReadWriteLock();
        this.readLock=readWriteLock.readLock();  //用来保护并发读操作的锁
        this.writeLock=readWriteLock.writeLock(); //用来保护并发写操作的锁

        this.stateMachine=stateMachineFactory.make(this); //生成状态机
      }

这里我们关心的是最后一个语句,即状态机的生成。注意调用参数this,这代表着正在构造的LocalizedResource对象,即一项具体的需要加以本地化的资源。凡是这个对象内部的结构成分,例如this.rsrc、this.dispatcher,包括this.readLock和this.writeLock,都会随同用作参数的this被传递给stateMachineFactory.make()。换言之,在stateMachineFactory.make()内部是可以访问到这些结构成分的。

我们接着看stateMachineFactory.make()的代码。

      [LocalizedResource.LocalizedResource()> StateMachineFactory.make()]
        publ ic StateMachine<STATE, EVENTTYPE, EVENT> make(OPERAND operand){
          return new InternalStateMachine(operand, defaultInitialState);
        }

可见实际生成的是个InternalStateMachine类的对象。InternalStateMachine是个定义于StateMachineFactory内部的类,这个类实现了StateMachine界面,所以我们可以把返回的InternalStateMachine对象赋值给LocalizedResource.stateMachine。

下面是InternalStateMachine类构造函数的摘要:

      [LocalizedResource.LocalizedResource()> StateMachineFactory.make()
      > InternalStateMachine.InternalStateMachine()]
      InternalStateMachine(OPERAND operand, STATE initialState)

      > this.operand=operand
      > this.currentState=initialState
      > if (! optimized){
      >+ maybeMakeStateMachineTable()
      >+> if (stateMachineTable==null)makeStateMachineTable()
      > }

注意调用StateMachineFactory.make()时的参数this,就是前面所创建的具体LocalizedResource的对象,在这里变成了this.operand,即InternalStateMachine.operand。也就是说,作为一个具体对象的状态机,其内部成分operand就指向这个对象。这样,为一具体LocalizedResource对象创建的状态机,里面就有着通向这个LocalizedResource对象的渠道,借此可以访问这个对象的相关成分。

至此,这个LocalizedResource对象已经建立了它自己的状态机。但是怎么才能让这个状态机动起来呢?这里还有好多别的因素。为了深入理解它的状态机,我们从另一个角度看一下LocalizedResource这个类的摘要。

      class LocalizedResource implements EventHandler<ResourceEvent> {}
      ]LocalResourceRequest rsrc //具体本地资源请求,LocalizedResource对象就是为此而建的
      ]Dispatcher dispatcher
      ]StateMachine<…> stateMachine
      ]Queue<ContainerId> ref; //Queue of containers using this localized resource
      ]static final StateMachineFactory<…> stateMachineFactory
      ]LocalizedResource(LocalResourceRequest rsrc, Dispatcher dispatcher)
      ]handle(ResourceEvent event)
        > …
        > oldState=this.stateMachine.getCurrentState()
        > newState=this.stateMachine.doTransition(event.getType(), event)
      ]abstract class ResourceTransition implements SingleArcTransition<…> {}
      ]static class FetchResourceTransition extends ResourceTransition {}
      ]]transition(LocalizedResource rsrc, ResourceEvent event)
        > ResourceRequestEvent req=(ResourceRequestEvent)event
                                          //参数 event实际上是个ResourceRequestEvent
        > LocalizerContext ctxt=req.getContext()    //从中抽取LocalizerContext
        > ContainerId container=ctxt.getContainerId()
                                          //再从LocalizerContext中抽取ContainerId
        > rsrc.ref.add(container)      //ContainerId加入LocalizedResource对象rsrc
        > e=new LocalizerResourceRequestEvent(rsrc, …,
                                          req.getLocalResourceRequest().getPattern())
                          //创建(其实是重新构造)一个LocalizerResourceRequestEvent事件
        > rsrc.dispatcher.getEventHandler().handle(e)  //并处理这个事件
      ]…
      ]static class ReleaseTransition extends ResourceTransition {}
      ]]transition(LocalizedResource rsrc, ResourceEvent event)

LocalizedResource类内部定义了一系列的内嵌类,这里只列出了其中的两个,就是FetchResourceTransition和ReleaseTransition。实际上还有FetchSuccessTransition、FetchFailedTransition、LocalizedResourceTransition和RecoveredTransition。之所以没有把它们都列出来,是因为这些类的结构都是一样的,除了构造方法之外就只有一个操作方法transition()。不仅如此,连transition()的格局也都相同,里面免不了要创建一个事件,然后以此为参数调用rsrc.dispatcher.getEventHandler().handle(),可谓千篇一律。其实,就此处的代码而言,与其说是创建一个事件,毋宁说是重构一个事件,因为这里用来创建LocalizerResourceRequestEvent事件的信息,是从参数event,一个ResourceRequestEvent对象中抽取的,所以这里有着某种连贯性。

这里要说明一下,所谓一个“事件”,例如LocalizerResourceRequestEvent,更贴切地说应该是“事件通知”或“事件报告”,而并非事件本身,但是大家都已习惯称之为“事件”。

细心的读者也许已经看出,前面作为参数hook出现在跳转规则中的那些用来提供伴随操作的,正是这里所定义的这些类的对象。比方说,如果当前状态是INIT,而发生了事件REQUEST,那么相应的操作就是调用由FetchResourceTransition提供的transition()方法。

除此之外,值得注意的是LocalizedResource还提供了一个方法handle(),其调用参数是类型为ResourceEvent的事件。我们知道LocalizedResource实现了EventHandler界面,所以LocalizedResource.handle()也就是EventHandler.handle()。不光是LocalizedResource如此,其他采用了状态机的类也是如此。事实上不仅是状态机,凡是实现了EventHandler界面的类,凡是代表着受事件驱动的过程的类,都有个handle()函数。再来看看上面FetchResourceTransition.transition()的代码中对rsrc.dispatcher.getEventHandler().handle(e)的调用,rsrc.dispatcher是具体资源rsrc所绑定的Dispatcher,而Dispatcher.getEventHandler()则确定应该把所产生的事件发送给哪一个EventHandler对象,这可以是另一个状态机,也可以是当前这个状态机本身,也可以不是状态机,但总之是代表着某个受事件驱动的过程。确定了之后,就调用那个对象的handle()函数,驱动那个过程前行。而那个过程的前行,则又可能反过来直接或间接发出驱动LocalizedResource的状态机进一步前行的事件。

事实上,LocalizedResource的状态机就是靠这些因素的配合才能动起来的。下面我们就以这么一条跳变规则为例,来看看这个状态机是怎么运转的:

      (ResourceState.INIT, ResourceState.DOWNLOADING,
                ResourceEventType.REQUEST, new FetchResourceTransition())

假定现在这台状态机处于初始状态,即ResourceState.INIT状态。这时候发生了一个事件,有个对象发出了对于资源本地化的请求,于是就有了一个反映着这个事实的、类型为ResourceEventType.REQUEST的“事件”,成为状态机的一个触发条件。

那么这个事件究竟是怎么来的呢?这里介绍一下背景。它的来历是这样:在集群内的每个从节点上,NodeManeger会创建一个ContainerManagerImpl对象,顾名思义这是专门管理“容器(Container)”的,就好像是地方政府里的一个部门。这个对象又会创建另一个类型为ResourceLocalizationService的对象,这就好像是个资源本地化的专管员,它就专门负责一个具体容器的资源本地化。前面讲过,所谓容器其实就是把对于许多资源与任务的描述打包在一起。我们就从ResourceLocalizationService的这一项操作handleInitContainerResources()开始,之所以会启动这个操作是因为得到了一个作为ContainerLocalizationRequestEvent的请求:

      [ResourceLocalizationService.handleInitContainerResources()]
        /**
          * For each of the requested resources for a container, determines the
          * appropriate LocalResourcesTracker and forwards a LocalResourceRequest to that tracker.
          */
      private void handleInitContainerResources (ContainerLocalizationRequestEvent rsrcReqs){
        Container c=rsrcReqs.getContainer(); //从容器本地化请求事件中取得具体的容器
        //create a loading cache for the file statuses
        LoadingCache<Path, Future<FileStatus>> statCache=
          CacheBuilder.newBuilder().build(FSDownload.createStatusCacheLoader(getConfig()));
        LocalizerContext ctxt=
          new LocalizerContext(c.getUser(), c.getContainerId(), …, statCache); //创建一个上下文
        Map<LocalResourceVisibility, Collection<LocalResourceRequest>>
                      rsrcs=rsrcReqs.getRequestedResources(); //获取所请求的资源清单
         for (Map.Entry<LocalResourceVisibility, Collection<… >> e:rsrcs.entrySet()){
              //对于要求本地化的每种不同可见度的资源(资源集合中的每个二元组)
          LocalResourcesTracker tracker=getLocalResourcesTracker(e.getKey(), c.getUser(),
                  c.getContainerId().getApplicationAttemptId().getApplicationId());
                  //tracker是个EventHandler
          for (LocalResourceRequest req:e.getValue()){  //对于其中的每项资源
            tracker.handle(new ResourceRequestEvent(req, e.getKey(), ctxt));
                  //调用这个EventHandlerhandle()函数
          }
        }
      }

如代码前面的注释所言,这个函数的作用是:就容器中所要求的每一项资源,确定其相应的LocalResourcesTracker,并向其发送一个LocalResourceRequest事件。

从代码中看,容器中描述的资源是成组的,每个组是一个entrySet,里面可以包含多个本地资源请求,所以有两层for循环嵌套。每一组资源都有个LocalResourcesTracker,即本地资源的“追踪者”。具体的LocalResourcesTracker是由三个因素决定的:第一个是“可见度”LocalResourceVisibility,具体有PUBLIC、PRIVATE和APPLICATION三种;第二个是任务所属的用户;第三个是ApplicationId。对于同一个“追踪者”可以发送多个具体的资源请求。

针对每一项具体的资源请求,都要发送一个事件,但是这个事件并非作为参数传下来的ContainerLocalizationRequestEvent,而要根据从中抽取的每项资源信息另行创建一个ResourceRequestEvent类的事件,这个类的构造函数如下:

      class ResourceRequestEvent extends ResourceEvent {
        ResourceRequestEvent(LocalResourceRequest resource,
                            LocalResourceVisibility vis, LocalizerContext context){
          super(resource, ResourceEventType.REQUEST); //这就是对于状态机的一个触发条件
          this.vis=vis;
          this.context=context;
        }
        …
      }

这个类是对ResourceEvent的扩充,具体的资源和事件的类型都需要保存在ResourceEvent中,所以这里通过super()语句调用父类ResourceEvent的构造函数。我们此刻所关心的是,这个事件的类型是ResourceEventType.REQUEST,这正是前面那条跳变规则的触发条件。

创建一个ResourceEvent类(更确切地说是ResourceEvent的某个子类)的事件,并通过某个LocalResourcesTracker的handle()函数加以处理,可以看作是资源本地化状态机的一个活动周期的起源。

这个事件,即请求,首先交给tracker的handle(),由其加以处理。不管具体的tracker是谁,都是实现了LocalResourcesTracker界面的某类对象,实际上是LocalResourcesTrackerImpl。界面LocalResourcesTracker是对EventHandler的扩充,所以LocalResourcesTrackerImpl同时也是个EventHandler。LocalResourcesTrackerImpl.handle()的代码中有一些我们此刻并不关心的内容,所以只看一下它与REQUEST事件相关的摘要:

      [ResourceLocalizationService.handleInitContainerResources()
      > LocalResourcesTrackerImpl.handle()]


      LocalResourcesTrackerImpl.handle(ResourceEvent event)
      > LocalResourceRequest req=event.getLocalResourceRequest()
      > LocalizedResource rsrc=localrsrc.get(req) //rsrc是个LocalizedResource对象
                                    //localrsrc是个MAP,记载着已知的LocalizedResource
      > switch(event.getType()){
      > case REQUEST:
      >+ if (null==rsrc){        //如果MAP中还没有记载此项资源
      >++ rsrc=new LocalizedResource(req, dispatcher);
      >++ localrsrc.put(req, rsrc); //就创建一项记载
      >+ }
      > }
      > rsrc.handle(event)          //所以实际上是LocalizedResource.handle()

可见,LocalResourcesTrackerImpl对象(对于事件)的处理,其实是由LocalizedResource对象进行的,而LocalizedResource正是我们现在所关心的那个状态机的拥有者。这样,我们就明白触发这个状态机的事件是怎么来的了。我们在前面已经有了LocalizedResource的摘要,现在再着重看它的handle()方法究竟干了些什么,这次是源代码:

      [ResourceLocalizationService.handleInitContainerResources()
      > LocalResourcesTrackerImpl.handle()> LocalizedResource.handle()]
        LocalizedResource.handle(ResourceEvent event){
          try{
            this.writeLock.lock(); //加锁
            Path resourcePath=event.getLocalResourceRequest().getPath(); //文件路径
            LOG.debug("Processing"+resourcePath+"of type"+event.getType());


            ResourceState oldState=this.stateMachine.getCurrentState(); //获取状态机的当前状态
            ResourceState newState=null;
            try{
              newState=this.stateMachine.doTransition(event.getType(), event);
            }catch(InvalidStateTransitonException e){
              LOG.warn("Can't handle this event at current state", e);
            }
            if (oldState! =newState){ //如果发生状态变化就记入日志
              LOG.info("Resource"+resourcePath+(localPath! =null ?
                "(->"+localPath+")":"")+"transitioned from"+oldState+"to"+newState);
            }
          }finally{
            this.writeLock.unlock(); //解锁
          }
        }

这段程序,一言以蔽之,就是在加锁防干扰的条件下调用状态机的doTransition()方法。别的语句都只是为LOG即运行日志服务。不过从这些语句中可以看出,当程序从doTransition()返回的时候,状态机的跳变已经完成了,但是也可能前后状态相同。

那么状态机的doTransition()方法究竟干些什么呢?LocalizedResource类的代码中的stateMachine,其类型名为StateMachine,但是StateMachine只是界面,实际的状态机是定义于StateMachineFactory内部的InternalStateMachine。所以stateMachine.doTransition()实际上是StateMachineFactory.InternalStateMachine.doTransition()。

      [ResourceLocalizationService.handleInitContainerResources()
      > LocalResourcesTrackerImpl.handle()> LocalizedResource.handle()
      > StateMachineFactory.InternalStateMachine.doTransition()]


      public synchronized STATE doTransition(EVENTTYPE eventType, EVENT event){
        currentState=StateMachineFactory.this.doTransition(operand,
                                                  currentState, eventType, event);
        return currentState;
      }

只是转了一下手,把事情交给了StateMachineFactory.this.doTransition(),实际上就是StateMachineFactory.doTransition()。如前所述,这里两个调用参数的类型都是抽象类型,但是具体到LocalizedResource,都已落实为ResourceEventType和ResourceEvent。

但是要注意,状态机的当前状态,即currentState,却正是在这里改变的。这里作为参数传递下去的currentState是老的当前状态,而返回的却是新的当前状态。所以, StateMachineFactory.doTransition()的作用一方面是执行伴随着本次状态跳变的操作,另一方面是返回新的状态,完成本次跳变。我们看一下这个函数的摘要:

      [ResourceLocalizationService.handleInitContainerResources()
      > LocalResourcesTrackerImpl.handle()> LocalizedResource.handle()
      > StateMachineFactory.doTransition()]


      doTransition(OPERAND operand, STATE oldState, EVENTTYPE eventType, EVENT event)
      > transitionMap=stateMachineTable.get(oldState) //找到与当前状态相应的transitionMap
      > if (transitionMap! =null){
      >+ Transition<…> transition=transitionMap.get(eventType)
                                //根据事件类型在此transitionMap中找到相应的跳变说明
      >+ if (transition! =null)return transition.doTransition(operand, oldState, event, eventType)
        ==StateMachineFactory.SingleInternalArc.doTransition() //执行规定的跳变
      >+> if (hook! =null){
      >+>+ hook.transition(operand, event) //执行伴随操作并完成状态变化
      >+> }
      >+> return postState; //返回跳变后的状态,即本次跳变的目标
      > }

这里4个调用参数的类型都是来自模版的抽象类型,对于LocalizedResource来说operand的类型是LocalizedResource; oldState和eventType的类型分别是ResourceState和ResourceEventType; event的类型则是ResourceEvent。

StateMachineFactory.doTransition()的程序也很简单,就是先根据当前状态oldState从状态跳转表中获取与此对应的transitionMap,再从中找到以eventType为触发条件的那条规则,就是程序中的transition。找到了适用规则,就调用其doTransition()。我们在前面已经看到,这个transition的类型Transition是个界面,实际上是定义于StateMachineFactory内部,实现了这个界面的SingleInternalArc,所以Transition.doTransition()实际上是SingleInternalArc.doTransition(),而后者只是通过其hook调用hook.transition()。当然,这是某个具体的SingleInternalArc。

那么现在这个hook究竟是什么呢?我们在前面创建状态机工厂的过程中已经看到,那就是调用addTransition()添加相应跳变规则时的参数hook,在我们现在这个情景中是FetchResourceTransition,因为当初那次调用是这样的:

      addTransition(ResourceState.INIT, ResourceState.DOWNLOADING,
                  ResourceEventType.REQUEST, new FetchResourceTransition())

如前所述,FetchResourceTransition是对抽象类ResourceTransition的扩充。后者虽然在形式上“实现”了界面SingleArcTransition,实际上却要靠扩充落实了这个抽象类的具体类来提供这个界面所定义的方法transition(),所以,如FetchResourceTransition等这些具体的类都提供了自己的transition()。

这样,对hook.transition()的调用就转化为对FetchResourceTransition.transition()的调用。

      [ResourceLocalizationService.handleInitContainerResources()>
      LocalResourcesTrackerImpl.handle()> LocalizedResource.handle()>
      StateMachineFactory.doTransition()> StateMachineFactory.SingleInternalArc.doTransition()>
      FetchResourceTransition.transition()]


        private static class FetchResourceTransition extends ResourceTransition {
          @Override
          public void transition(LocalizedResource rsrc, ResourceEvent event){
            ResourceRequestEvent req=(ResourceRequestEvent)event; //还原成扩展后的类型
            LocalizerContext ctxt=req.getContext();     //从资源请求事件中获取Context
            ContainerId container=ctxt.getContainerId();  //Context中获取ContainerId
            rsrc.ref.add(container); //将此ContainerId加入该项资源的引用链表
            rsrc.dispatcher.getEventHandler().handle(
                new LocalizerResourceRequestEvent(rsrc, req.getVisibility(), ctxt,
                                      req.getLocalResourceRequest().getPattern()));
        }
      }

这里的参数rsrc是个LocalizedResource对象,这就是前面调用doTransition()时的参数的operand,就是具体状态机的拥有者,那就是某项具体的资源。

程序中先将作为参数传下来的event还原成其实际的、扩充后的类型。之所以需要有这样的还原,是因为:以LocalizedResource的状态机为例,出于通用的考虑,前面的代码中都不以像ResourceRequestEvent这样经过扩充后的类型作为参数的类型,因为如果那样就得还有以ResourceLocalizedEvent、ResourceFailedLocalizationEvent等为参数类型的相同的代码。可是既然这些类型都是对ResourceEvent的扩充,就不妨以ResourceEvent为参数类型,这样一路下来的代码就可以通用了。不过,虽然参数event是作为ResourceEvent类对象传下来的,这并不意味着它真的就变成了ResourceEvent,也不能只把它当成ResourceEvent,那样就会丢失一部分信息。它的头部固然是ResourceEvent,但是尾巴也并没有被切掉。但是一旦到达了知道其底细的地方,就要把它投射(cast)恢复成原来的类型,因为它们的尾部还携带着信息。

然后,作为ResourceRequestEvent,就可以从中恢复出相关的信息了,在这里是容器的ContainerId;并进行相关的处理,在这里只是把ContainerId加入到一个链表中,把它保存下来。

最后一步的操作几乎是标准的,那就是先另外创建一个事件,在这里是LocalizerResourceRequestEvent,然后通过Dispatcher来触发别的受事件驱动的过程,就是这里的rsrc.dispatcher.getEventHandler().handle(),这在前面已经讲到过了。

就像机器总是有固定和活动两种部件一样,我们也可以从概念上将状态机分成静态和动态两部分。静态部分就是状态和跳变规则,那一部分可以画成一个状态图;动态部分则是怎么使状态机转动起来的那套机制。现在,状态机已经动了一下,发生了一次状态跳变,但是还有没有下一次转动呢?状态机的转动是否可持续呢?Dispatcher就起着十分重要的作用。

注意,Dispatcher和Eventhandler都是界面,实现了Dispatcher界面的主要是AsyncDispatcher,摘要如下:

      class AsyncDispatcher extends AbstractService implements Dispatcher {}
      ]BlockingQueue<Event> eventQueue      //事件队列
      ]EventHandler handlerInstance          //事件处理器
      ]Thread eventHandl ingThread            //事件处理线程
      ]Map<Class<? extends Enum>, EventHandler> eventDispatchers
      ]AsyncDispatcher(BlockingQueue<Event> eventQueue)
        > super("Dispatcher")
        > this.eventQueue=eventQueue
        > this.eventDispatchers=new HashMap<Class<? extends Enum>, EventHandler>()
      ]serviceStart()
        > super.serviceStart()
        > eventHandl ingThread=new Thread(createThread()) //创建事件处理线程
        > eventHandl ingThread.start()
      ]createThread()           //事件处理线程的代码
        > return new Runnable()
                    ]run()
                      > while (! stopped&&! Thread.currentThread().isInterrupted()){
                      >+ Event event=eventQueue.take()  //从事件队列中取下一个事件
                      >+ if (event! =null)dispatch(event)  //发起处理
                      > }
      ]dispatch(Event event) //发起处理一个事件
        > Class<? extends Enum> type=event.getType().getDeclaringClass()
        > EventHandler handler=eventDispatchers.get(type)
        > if(handler! =null)handler.handle(event) //调用事件处理器的handle()函数
      ]register(Class<? extends Enum> eventType, EventHandler handler)
      ]getEventHandler()  //获取本Dispatcher的事件处理器
        > if (handlerInstance==null){
        >+ handlerInstance=new GenericEventHandler()
        > }
        > return handlerInstance
      ]class GenericEventHandler implements EventHandler<Event> {}
      ]]handle(Event event)

AsyncDispatcher之所以称为“异步”,就是因为它有一个事件队列和一个事件处理线程。这样,当有事件发生时,如果状态机尚未完成上一事件的处理,就不必睡眠等待,只要把事件挂入队列就完成并返回,事件处理线程执行完对先前事件的处理后自会来此队列中逐个加以处理。

注意这里的getEventHandler()返回的是一个GenercEventHandler对象,由此可见AsyncDispatcher.getEventHandler().handle()显然就是GenericEventHandler.handle()。这是定义于AsyncDispatcher内部的一个类。所以,前面的rsrc.dispatcher.getEventHandler().handle()实际上就是AsyncDispatcher.GenericEventHandler.handle()。调用这个函数本来的目的是要对事件进行处理,但实际上只是把待处理的事件挂入了AsyncDispatcher的事件队列,以等待处理。

在深入了解GenericEventHandler.handle()之前,我们还要先看一下上面所创建的事件LocalizerResourceRequestEvent()及其构造函数。

      public class LocalizerResourceRequestEvent extends LocalizerEvent {
        private final LocalizerContext context;       //上下文
        private final LocalizedResource resource;      //所属的那项资源
        private final LocalResourceVisibility vis;
        private final String pattern;


        public LocalizerResourceRequestEvent(LocalizedResource resource,
                      LocalResourceVisibility vis, LocalizerContext context, String pattern){
          super(LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION,
                                ConverterUtils.toString(context.getContainerId()));
          this.vis=vis;
          this.context=context;
          this.resource=resource;
          this.pattern=pattern;
        }
        …
      }

这个类是对LocalizerEvent的扩充,这里通过super()调用其父类LocalizerEvent的构造函数,将其事件类型设置成REQUEST_RESOURCE_LOCALIZATION,并设置其ContainerId。

创建了这个事件之后,就以此为参数调用GenericEventHandler.handle()。

      [ResourceLocalizationService.handleInitContainerResources()>
      LocalResourcesTrackerImpl.handle()> LocalizedResource.handle()>
      StateMachineFactory.doTransition()> StateMachineFactory.SingleInternalArc.doTransition()>
      FetchResourceTransition.transition()> AsyncDispatcher.GenericEventHandler.handle()]


          public void handle(Event event){
            if (blockNewEvents)return;
            drained=false;
            /* all this method does is enqueue all the events onto the queue */
            int qSize=eventQueue.size();
            if (qSize! =0&&qSize %1000==0){
              LOG.info("Size of event-queue is"+qSize); //每隔1000个事件做一次LOG
            }
            int remCapacity=eventQueue.remainingCapacity();
            if (remCapacity < 1000){//队列的剩余容量已小于1000, LOG告警
              LOG.warn("Very low remaining capacity in the event-queue:"+remCapacity);
            }
            try{
              eventQueue.put(event);     //将事件挂入队列
            }catch(InterruptedException e){
              if (! stopped){
                LOG.warn("AsyncDispatcher thread interrupted", e);
              }
              throw new YarnRuntimeException(e);
            }
          }

显然,这里所谓的handle(),所谓对于事件的处理,只是把它挂入了AsyncDispatcher的事件队列eventQueue。把事件挂入队列之后,当前的这个流程就逐层返回了,这里包含了从hook.transition()和StateMachineFactory.doTransition()的返回,一直要返回到前面的ResourceLocalizationService.handleInitContainerResources()中产生ResourceRequestEvent事件并着手处理的地方,从那里对tracker.handle()的调用返回。这里特别要说明,从hook.transition()返回则意味着跳变伴随操作的完成,从StateMachineFactory.doTransition()返回意味着跳变业已完成,状态机已从INIT状态跳变到DOWNLOADING状态。

这就是异步处理的方式,把事件挂入一个队列,交给别人去处理就返回了,而不是事必躬亲地一直把事情处理完才返回,那样就是“同步”的了。

每个具体的AsyncDispatcher对象,在其创建之初,在serviceStart()中会创建一个线程eventHandlingThread,这个线程所执行的程序是由createThread()创建的一个无名Runnable。这个Runnable的主体run()的代码是一个while循环。只要不被打断,这个while循环实际上就是无穷的,每次循环都试图从事件队列eventQueue中获取一个事件(如果没有就睡眠等待),拿到一个事件就dispatch()这个事件。而dispatch(),则要根据这个事件的类型从一个对照表eventDispatchers中寻找适合此种事件类型的事件处理者EventHandler,若能找到就调用其handle()函数做进一步的处理。这就是“分发”,即dispatch。

      AsyncDispatcher.eventHandlingThread.run()
      > while (! stopped&&! Thread.currentThread().isInterrupted()){
      >+ event=eventQueue.take()
      >+ dispatch(event)
      >+> type=event.getType().getDeclaringClass()
      >+> EventHandler handler=eventDispatchers.get(type)
      >+> handler.handle(event)
      > }

那么对照表即eventDispatchers中的内容是怎么来的呢?AsyncDispatcher类提供了一个函数register(),具体的EventHandler可以调用这个函数向某个AsyncDispatcher对象登记,这样就进入了它的对照表eventDispatchers。

LocalizerResourceRequestEvent是对LocalizerEvent的扩充,而LocalizerEvent的处理者是LocalizerTracker,这是在ResourceLocalizationService的serviceInit()中通过一个语句dispatcher.register(LocalizerEventType.class, localizerTracker)登记的。所以,这里的handler.handle()就是LocalizerTracker.handle():

      [AsyncDispatcher.eventHandlingThread.run()> AsyncDispatcher.dispatch()>
      LocalizerTracker.handle()]


          public void handle(LocalizerEvent event){
            String locId=event.getLocalizerId();
            switch(event.getType()){
            case REQUEST_RESOURCE_LOCALIZATION:
              //0)find running localizer or start new thread
              LocalizerResourceRequestEvent req=(LocalizerResourceRequestEvent)event;
                  //将被当成LocalizerEvent event还原成LocalizerResourceRequestEvent
              switch (req.getVisibility()){
              case PUBLIC:
                publicLocalizer.addResource(req);
                break;
              case PRIVATE:
              case APPLICATION:
                synchronized (privLocalizers){
                  LocalizerRunner localizer=privLocalizers.get(locId);
                  if (null==localizer){
                    LOG.info("Created localizer for"+locId);
                    localizer=new LocalizerRunner(req.getContext(), locId);
                            //这是一个线程,负责实施资源本地化
                    privLocalizers.put(locId, localizer);
                    localizer.start();
                  }
                  //1)propagate event
                  localizer.addResource(req);
                }
              break;
            }
            break;
          }
        }

这样,该项资源的本地化过程就又向前推进了,作为其一部分的LocalizedResource的这个状态机也向前走了一步,进入了DOWNLOADING状态。

资源本地化只是我们用来解释说明Hadoop代码中的状态机所用的例子,我们此刻关心的是状态机这种机制的实现,而不是资源本地化这个过程本身,所以到了这里我们就可以打住了。因为这下面肯定会有一些具体的操作,然后又会向这个LocalizedResource对象发出下一个事件,于是这LocalizedResource对象的状态机就又会再往前走一步。

在涉及状态机的整个“生态系统”中,通常有三个线程即三个主体在活动。一是状态机的所有者,也是状态机的操作者,是它因某种请求或事件的到来而调用handle()函数,引起了状态机的跳转以及伴随着跳转的响应,其结果是产生了另一个事件;二是分发者Dispatcher,它将状态机在响应操作中产生的事件分发给某个与状态机所代表的过程相关的对象,触发其某些方面的操作;三就是这个与状态机所代表过程相关的对象,例如上面的LocalizerRunner,在Dispatcher所转达的事件触发下进行某些处理,然后又向状态机的操作者发出新的请求或事件。如此周而复始,直至该状态机所实现的整个过程结束。

其中Dispatcher所起的作用类似于网络中的路由器,它只是起着路由的作用,所以既可以用于状态机,也可以用于别处,当然作为分发目标的对象必须实现EventHandler界面,即提供handle(T event)函数。

LocalizedResource对象的状态机是这样,其他对象的状态机(如果有的话)也是这样。

总结一下,假定A是一个类,a是这个类的一个对象,并且A有状态机,则有:

(1)StateMachineFactory是A的静态成分,所以一共只有一份,在A.classs中,而具体的A类对象中并不包含StateMachineFactory。

(2)每个A类对象都会有由此StateMachineFactory生成的状态机stateMachine,状态机的主体是一个跳转表,表中是一些跳转规则。可以把每条跳转规则看成一个四元组(当前状态,新状态,触发事件,hook),其中的hook是一个用来对状态跳变做出伴随操作的对象,每个hook对象都提供一个transition()函数。不过,规则中也可以没有hook,那就说明除状态跳变外无须任何反应。

(3)除状态机外,还需要有个实现了Dispatcher界面的某类对象,通常是个AsyncDispatcher对象。

(4)发生某种与此状态机有关的事件时,会产生一个定义于这个StateMachine的事件对象,并以此事件对象为参数调用A.handle()。

(5)A.handle()根据状态机A.stateMachine的当前状态和具体事件对象的类型在其跳转表中找到相应的跳转规则T。

(6)如果本条跳转规则中提供了一个用来对该次跳转做出伴随操作的对象hook,例如FetchResourceTransition,那就调用hook.transition()。注意跳转前后的状态也可以相同,即实际上没有状态变化。

(7)然后将状态机的当前状态改变成跳转规则所规定的新状态。

(8)在hook.transition()中很可能会创建新的事件对象(例如前面的LocalizerResourceRequestEvent),这个事件对象要通过预设的Dispatcher对象加以发送,最终调用某个目标对象的handle()函数。这个分发目标可以就是A本身,也可以是别的对象,需要预先向Dispatcher对象登记。

这样,一个拥有状态机的对象A,从它的handle()第一次被调用开始,每次受到事件触发时根据其当前状态和触发事件类型确定调用某个hook对象的transition()以做出行动上的反应,并且改变或不改变其状态。而hook对象在作为反应的行动中又可能产生另一个事件,并通过Dispatcher加以发送。最后,直接或间接地又会以新的事件对象为参数调用对象A的hande(),它的状态机就是这样动了起来,运转了起来。

Hadoop中所有的状态机都是这样,都按照这个标准的模式运转。以后,当涉及状态机时,我们就不再深入到这样的细节中去,而只需简单说一下谁的状态机在这个特定事件的触发下做出何种反应、跳变到何种状态;如果创建新的事件,则这个新的事件被分发到何处、触发何种行为,这样就行了。