Flink设计与实现:核心原理与源码解析
上QQ阅读APP看书,第一时间看更新

3.2.4 ResourceManager的创建与初始化

ResourceManager作为集群资源管理组件,不同的Cluster集群资源管理涉及的初始化过程也会有所不同。我们以StandaloneSessionCluster为例,介绍ResourceManager的创建和初始化过程,对于其他集群的部署和启动,如基于Kubunetes、Yarn等的资源管理器,我们将放在第5章进行详细讲解。

图3-10 ResourceManager UML关系图

如图3-10所示,根据资源管理器的不同,ResourceManager抽象类有不同的资源管理器实现类。我们可以将ResourceManager实现类分两类,一类支持动态资源管理,例如KubernetesResourceManager、YarnResourceManager及MesosResourceManager,另一类不支持动态资源管理,例如StandaloneResourceManager。支持动态资源管理的集群类型,可以按需启动TaskManager资源,根据Job所需的资源请求动态启动TaskManager节点,这种资源管理方式不用担心资源浪费和资源动态伸缩的问题。实现动态资源管理的ResourceManager需要继承ActiveResourceManager基本实现类。

从图3-10中我们也可以看出,不管是哪种类型的ResourceManager实现,都需要在内部创建ResourceManagerRuntimeServices。ResourceManagerRuntimeServices中包含SlotManager和JobLeaderIdService两个主要服务。其中SlotManager服务管理整个集群的Slot计算资源,并对Slot计算资源进行统一的分配和管理;JobLeaderIdService通过实现jobLeaderIdListeners实时监听JobManager的运行状态,以获取集群启动的作业对应的JobLeaderId信息,防止出现JobManager无法连接的情况。当然对于ResourceManager需要的服务,不会局限于ResourceManagerRuntimeServices提供的这两个服务,在创建和启动ResourceManager组件中,同样也需要RpcService、HighAvailabilityServices、HeartbeatServices等基础服务,并通过这些基础服务创建ResourceManager组件。

1.创建ResourceManager组件

如代码清单3-22所示,在StandaloneResourceManagerFactory中通过调用createResourceManager()方法创建Standalone类型集群的ResourceManager组件。

·在创建StandaloneResourceManager之前,需要先创建ResourceManagerRuntimeServices,主要包含了SlotManager和JobLeaderIdService两个内部服务。

·创建StandaloneResourceManager需要RpcService、HeartbeatServices、HighAvailabilityServices等基础服务,这些基础服务已经在创建组件之前初始化完毕,集群组件会通过这些基础服务创建各自的内部服务。

·返回创建好的StandaloneResourceManager实例,等待启动ResourceManager组件。

代码清单3-22 StandaloneResourceManagerFactory.createResourceManager()方法定义


public ResourceManager<ResourceID> createResourceManager(
      Configuration configuration,
      ResourceID resourceId,
      RpcService rpcService,
      HighAvailabilityServices highAvailabilityServices,
      HeartbeatServices heartbeatServices,
      FatalErrorHandler fatalErrorHandler,
      ClusterInformation clusterInformation,
      @Nullable String webInterfaceUrl,
      ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception {
   // 创建ResourceManagerRuntimeServices
   final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntime
      ServicesConfiguration = ResourceManagerRuntimeServicesConfiguration.
      fromConfig uration(configuration);
   final ResourceManagerRuntimeServices resourceManagerRuntimeServices = 
      ResourceManagerRuntimeServices.fromConfiguration(
      resourceManagerRuntimeServicesConfiguration,
      highAvailabilityServices,
      rpcService.getScheduledExecutor());
   final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.get
      StandaloneClusterStartupPeriodTime(configuration);
   // 返回创建的StandaloneResourceManager实例
   return new StandaloneResourceManager(
      rpcService,
      getEndpointId(),
      resourceId,
      highAvailabilityServices,
      heartbeatServices,
      resourceManagerRuntimeServices.getSlotManager(),
      resourceManagerRuntimeServices.getJobLeaderIdService(),
      clusterInformation,
      fatalErrorHandler,
      resourceManagerMetricGroup,
      standaloneClusterStartupPeriodTime);
}

如代码清单3-23所示,在ResourceManagerRuntimeServices.fromConfiguration方法定义中可以看出,方法包含了对SlotManager和JobLeaderIdService服务的创建,创建完成之后,将其应用在StandaloneResourceManager组件中。

代码清单3-23 ResourceManagerRuntimeServices.fromConfiguration方法


public static ResourceManagerRuntimeServices fromConfiguration(
      ResourceManagerRuntimeServicesConfiguration configuration,
      HighAvailabilityServices highAvailabilityServices,
      ScheduledExecutor scheduledExecutor) throws Exception {
     // 创建SlotManager服务
   final SlotManager slotManager = createSlotManager(configuration,
       scheduledExecutor);
   // 创建JobLeaderIdService服务
   final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
      highAvailabilityServices,
      scheduledExecutor,
      configuration.getJobTimeout());
   // 返回ResourceManagerRuntimeServices
   return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService);
}

2.SlotManager的创建和初始化

SlotManager是ResourceManager组件最重要的内部组件,主要用于管理和协调整个集群的Slot计算资源,同时实现了对TaskManager信息的注册和管理。下面我们重点了解SlotManager内部服务的创建过程。如代码清单3-24所示,调用ResourceManagerRuntimeServices.createSlotManager()方法创建SlotManager服务,方法的主要逻辑如下。

·创建SlotManagerConfiguration配置,用于创建SlotManager。SlotManagerConfiguration主要包含TaskManagerRequestTimeout、SlotRequestTimeout等配置信息。

·创建和初始化SlotMatchingStrategy。SlotMatchingStrategy根据作业中给定的ResourceProfile匹配Slot计算资源。SlotMatchingStrategy主要分为两种类型,一种是LeastUtilizationSlotMatchingStrategy,即按照利用率最低原则匹配Slot资源,尽可能保证TaskExecutor上资源的使用率处于比较低的水平,这种策略能够有效降低机器的负载。另一种是AnyMatchingSlotMatchingStrategy,即直接返回第一个匹配的Slot资源策略。

·创建SlotManagerImpl实现类并返回,然后在ResourceManager组件启动过程中进行初始化。

代码清单3-24 ResourceManagerRuntimeServices.createSlotManager()方法


private static SlotManager createSlotManager(ResourceManagerRuntimeServices
   Configuration configuration, ScheduledExecutor scheduledExecutor) {
   // 创建SlotManagerConfiguration
   final SlotManagerConfiguration slotManagerConfiguration = configuration.
      getSlotManagerConfiguration();
      //初始化SlotMatchingStrategy
   final SlotMatchingStrategy slotMatchingStrategy;
   if (slotManagerConfiguration.evenlySpreadOutSlots()) {
      slotMatchingStrategy = LeastUtilizationSlotMatchingStrategy.INSTANCE;
   } else {
      slotMatchingStrategy = AnyMatchingSlotMatchingStrategy.INSTANCE;
   }
   // 返回SlotManager实现类
   return new SlotManagerImpl(
      slotMatchingStrategy,
      scheduledExecutor,
      slotManagerConfiguration.getTaskManagerRequestTimeout(),
      slotManagerConfiguration.getSlotRequestTimeout(),
      slotManagerConfiguration.getTaskManagerTimeout(),
      slotManagerConfiguration.isWaitResultConsumedBeforeRelease());
}

3.ResourceManager的初始化和启动

ResourceManager组件创建完毕后,会在DefaultDispatcherResourceManagerComponentFactory中调用ResourceManager.start()方法启动ResourceManager组件。因为ResourceManager继承自RpcEndpoint,所以ResourceManager本质上是一个RPC组件服务,启动ResourceManager组件实际上就是在启动ResourceManager组件对应RpcEndpoint中的RpcServer。当ResourceManager对应的RPC服务启动后,就会通过RpcEndpoint调用ResourceManager.onStart()方法启动ResourceManager内部的其他核心服务,最终完成ResourceManager的启动流程。

如代码清单3-25所示,在ResourceManager.onStart()方法内调用了ResourceManager.startResourceManagerServices()方法启动ResourceManager组件使用的内部服务。onStart()方法主要包含如下流程。

·从highAvailabilityServices基础服务中获取ResourceManager对应的leaderElectionService,其中leaderElectionService用于在高可用集群模式下,提供选择ResourceManager Leader的能力,以保证集群ResourceManager组件的高可用。

·调用ResourceManager.initialize()初始化方法,这里主要由ResourceManager的子类实现,例如在StrandaloneResourceManager中可以定义需要进行初始化的操作。

·通过LeaderElectionService服务启动ResourceManager,并将启动的ResourceManager RpcEndpoint设定为Leader角色。

·启动jobLeaderIdService服务,用于管理注册的JobManager节点,包括对JobManager的注册和注销等操作。

·注册Slot和TaskExecutor的Metrics监控信息。

代码清单3-25 ResourceManager.startResourceManagerServices()方法


private void startResourceManagerServices() throws Exception {
   try {
      // 从高可用服务中获取ResourceManager的leaderElectionService
      leaderElectionService = highAvailabilityServices.getResourceManagerLeader
         ElectionService();
      // ResourceManager初始化方法,主要由子类实现服务启动过程中需要执行的操作
      initialize();
      // 通过LeaderElectionService服务启动当前ResourceManager,并设定为Leader角色
      leaderElectionService.start(this);
      // 启动JobLeaderIdService
      jobLeaderIdService.start(new JobLeaderIdActionsImpl());
            // 注册Slot和TaskExecutor的Metrics监控指标
      registerSlotAndTaskExecutorMetrics();
   } catch (Exception e) {
      handleStartResourceManagerServicesException(e);
   }
}

在ResourceManager.startResourceManagerServices()方法中执行leaderElectionService.start(this)代码时,实际上就是让leaderElectionService选择当前的LeaderContender为Leader节点。ResourceManager实现了LeaderContender接口,因此可以作为竞争者参与到Leader的竞选中。通过leaderElectionService选举当前的ResourceManager组件为Leader节点,此时ResourceManager就可以对外提供服务了。

一旦当前的LeaderContender被选为Leader节点,就会调用LeaderContender.grantLeadership()方法为该LeaderContender授予Leadership角色。如代码清单3-26所示,ResourceManager作为LeaderContender,接收LeaderElectionService赋予的Leadership角色,调用leaderElectionService.confirmLeadership()方法接收并确认LeaderShip。最终当前的ResourceManager作为LeaderContender就成功接受了Leadership角色。

代码清单3-26 ResourceManager.grantLeadership()方法


public void grantLeadership(final UUID newLeaderSessionID) {
   // 在clearStateFuture中增加异步操作,调用tryAcceptLeadership()方法
   final CompletableFuture<Boolean> acceptLeadershipFuture = clearStateFuture
      .thenComposeAsync((ignored) -> tryAcceptLeadership(newLeaderSessionID),
         getUnfencedMainThreadExecutor());
   // 如果LeaderContender接受了Leadership,通知leaderElectionService进行confirm操作
   final CompletableFuture<Void> confirmationFuture = acceptLeadershipFuture.
      thenAcceptAsync(
      (acceptLeadership) -> {
         if (acceptLeadership) {
            // 调用leaderElectionService.confirmLeadership()进行Leadership确认
            leaderElectionService.confirmLeadership(newLeaderSessionID, 
               getAddress());
         }
      },
      getRpcService().getExecutor());
    // 如果在此过程中出现错误,则执行onFatalError()方法
   confirmationFuture.whenComplete(
      (Void ignored, Throwable throwable) -> {
         if (throwable != null) {
            onFatalError(ExceptionUtils.stripCompletionException(throwable));
         }
      });
}

接下来在tryAcceptLeadership()方法中调用startServicesOnLeadership()方法对接收到的LeaderShip执行后续操作。如代码清单3-27所示,方法中主要启动心跳服务和SlotManager服务等ResourceManager内部服务。HeartbeatServices和SlotManager服务成功启动,标志着ResourceManager服务初始化和启动完毕,接下来就可以对外提供服务了。

代码清单3-27 ResourceManager.startServicesOnLeadership()方法


protected void startServicesOnLeadership() {
   // 启动心跳服务
   startHeartbeatServices();
   // 启动SlotManager服务
   slotManager.start(getFencingToken(), getMainThreadExecutor(),
                     new ResourceActionsImpl());
}

如代码清单3-28所示,在ResourceManager中HeartbeatService的启动方法中,主要包括了对taskManagerHeartbeatManager和jobManagerHeartbeatManager两个心跳管理服务的启动操作。而心跳管理服务主要通过TaskManagerHeartbeatListener和JobManagerHeartbeatListener两个监听器收集来自TaskManager和JobManager的心跳信息,以保证整个运行时中各个组件之间能够正常通信。

代码清单3-28 ResourceManager.startServicesOnLeadership()方法


private void startHeartbeatServices() {
   // 启动TaskManager对应的HeartbeatManager
   taskManagerHeartbeatManager = heartbeatServices.
      createHeartbeatManagerSender(
      resourceId,
      new TaskManagerHeartbeatListener(),
      getMainThreadExecutor(),
      log);
   jobManagerHeartbeatManager = heartbeatServices.
      createHeartbeatManagerSender(
      resourceId,
      new JobManagerHeartbeatListener(),
      getMainThreadExecutor(),
      log);
}

4.启动SlotManager服务

我们知道,SlotManager服务会在ResourceManager收到LeaderShip后启动,这里主要会调用SlotManager.start()方法启动SlotManager服务。如代码清单3-29所示,SlotManager.start()方法主要有如下逻辑。

·对传入的参数进行校验,确保参数不为空。

·将SlotManager启动标志设为True,然后通过scheduledExecutor线程池启动TaskManager周期性超时检查线程服务,实际上是通过checkTaskManagerTimeouts()方法实现该检查,防止TaskManager长时间掉线等问题。

·启动单独的线程对提交的SlotRequest进行周期性超时检查,防止Slot请求超时。

·最后成功启动SlotManager服务。

代码清单3-29 SlotManagerImpl.start()方法


public void start(ResourceManagerId newResourceManagerId, 
                  Executor newMainThreadExecutor, ResourceActions newResourceActions) {
   LOG.info("Starting the SlotManager.");
   // 对参数进行非空检验
   this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId);
   mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor);
   resourceActions = Preconditions.checkNotNull(newResourceActions);
   // 设置启动标志为true
   started = true;
   // 启动TaskManager周期性超时检查
   taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
      () -> mainThreadExecutor.execute(
         () -> checkTaskManagerTimeouts()),
      0L,
      taskManagerTimeout.toMilliseconds(),
      TimeUnit.MILLISECONDS);
   // 启动SlotRequest周期性超时检查
   slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay(
      () -> mainThreadExecutor.execute(
         () -> checkSlotRequestTimeouts()),
      0L,
      slotRequestTimeout.toMilliseconds(),
      TimeUnit.MILLISECONDS);
}

到这里,ResourceManager启动和初始化工作就完成了,此时ResourceManager可以接收来自TaskManager的注册信息,并向JobManager提供Slot计算资源,为提交的作业提供计算资源。