3.2.4 ResourceManager的创建与初始化
ResourceManager作为集群资源管理组件,不同的Cluster集群资源管理涉及的初始化过程也会有所不同。我们以StandaloneSessionCluster为例,介绍ResourceManager的创建和初始化过程,对于其他集群的部署和启动,如基于Kubunetes、Yarn等的资源管理器,我们将放在第5章进行详细讲解。
图3-10 ResourceManager UML关系图
如图3-10所示,根据资源管理器的不同,ResourceManager抽象类有不同的资源管理器实现类。我们可以将ResourceManager实现类分两类,一类支持动态资源管理,例如KubernetesResourceManager、YarnResourceManager及MesosResourceManager,另一类不支持动态资源管理,例如StandaloneResourceManager。支持动态资源管理的集群类型,可以按需启动TaskManager资源,根据Job所需的资源请求动态启动TaskManager节点,这种资源管理方式不用担心资源浪费和资源动态伸缩的问题。实现动态资源管理的ResourceManager需要继承ActiveResourceManager基本实现类。
从图3-10中我们也可以看出,不管是哪种类型的ResourceManager实现,都需要在内部创建ResourceManagerRuntimeServices。ResourceManagerRuntimeServices中包含SlotManager和JobLeaderIdService两个主要服务。其中SlotManager服务管理整个集群的Slot计算资源,并对Slot计算资源进行统一的分配和管理;JobLeaderIdService通过实现jobLeaderIdListeners实时监听JobManager的运行状态,以获取集群启动的作业对应的JobLeaderId信息,防止出现JobManager无法连接的情况。当然对于ResourceManager需要的服务,不会局限于ResourceManagerRuntimeServices提供的这两个服务,在创建和启动ResourceManager组件中,同样也需要RpcService、HighAvailabilityServices、HeartbeatServices等基础服务,并通过这些基础服务创建ResourceManager组件。
1.创建ResourceManager组件
如代码清单3-22所示,在StandaloneResourceManagerFactory中通过调用createResourceManager()方法创建Standalone类型集群的ResourceManager组件。
·在创建StandaloneResourceManager之前,需要先创建ResourceManagerRuntimeServices,主要包含了SlotManager和JobLeaderIdService两个内部服务。
·创建StandaloneResourceManager需要RpcService、HeartbeatServices、HighAvailabilityServices等基础服务,这些基础服务已经在创建组件之前初始化完毕,集群组件会通过这些基础服务创建各自的内部服务。
·返回创建好的StandaloneResourceManager实例,等待启动ResourceManager组件。
代码清单3-22 StandaloneResourceManagerFactory.createResourceManager()方法定义
public ResourceManager<ResourceID> createResourceManager( Configuration configuration, ResourceID resourceId, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, FatalErrorHandler fatalErrorHandler, ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, ResourceManagerMetricGroup resourceManagerMetricGroup) throws Exception { // 创建ResourceManagerRuntimeServices final ResourceManagerRuntimeServicesConfiguration resourceManagerRuntime ServicesConfiguration = ResourceManagerRuntimeServicesConfiguration. fromConfig uration(configuration); final ResourceManagerRuntimeServices resourceManagerRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( resourceManagerRuntimeServicesConfiguration, highAvailabilityServices, rpcService.getScheduledExecutor()); final Time standaloneClusterStartupPeriodTime = ConfigurationUtils.get StandaloneClusterStartupPeriodTime(configuration); // 返回创建的StandaloneResourceManager实例 return new StandaloneResourceManager( rpcService, getEndpointId(), resourceId, highAvailabilityServices, heartbeatServices, resourceManagerRuntimeServices.getSlotManager(), resourceManagerRuntimeServices.getJobLeaderIdService(), clusterInformation, fatalErrorHandler, resourceManagerMetricGroup, standaloneClusterStartupPeriodTime); }
如代码清单3-23所示,在ResourceManagerRuntimeServices.fromConfiguration方法定义中可以看出,方法包含了对SlotManager和JobLeaderIdService服务的创建,创建完成之后,将其应用在StandaloneResourceManager组件中。
代码清单3-23 ResourceManagerRuntimeServices.fromConfiguration方法
public static ResourceManagerRuntimeServices fromConfiguration( ResourceManagerRuntimeServicesConfiguration configuration, HighAvailabilityServices highAvailabilityServices, ScheduledExecutor scheduledExecutor) throws Exception { // 创建SlotManager服务 final SlotManager slotManager = createSlotManager(configuration, scheduledExecutor); // 创建JobLeaderIdService服务 final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService( highAvailabilityServices, scheduledExecutor, configuration.getJobTimeout()); // 返回ResourceManagerRuntimeServices return new ResourceManagerRuntimeServices(slotManager, jobLeaderIdService); }
2.SlotManager的创建和初始化
SlotManager是ResourceManager组件最重要的内部组件,主要用于管理和协调整个集群的Slot计算资源,同时实现了对TaskManager信息的注册和管理。下面我们重点了解SlotManager内部服务的创建过程。如代码清单3-24所示,调用ResourceManagerRuntimeServices.createSlotManager()方法创建SlotManager服务,方法的主要逻辑如下。
·创建SlotManagerConfiguration配置,用于创建SlotManager。SlotManagerConfiguration主要包含TaskManagerRequestTimeout、SlotRequestTimeout等配置信息。
·创建和初始化SlotMatchingStrategy。SlotMatchingStrategy根据作业中给定的ResourceProfile匹配Slot计算资源。SlotMatchingStrategy主要分为两种类型,一种是LeastUtilizationSlotMatchingStrategy,即按照利用率最低原则匹配Slot资源,尽可能保证TaskExecutor上资源的使用率处于比较低的水平,这种策略能够有效降低机器的负载。另一种是AnyMatchingSlotMatchingStrategy,即直接返回第一个匹配的Slot资源策略。
·创建SlotManagerImpl实现类并返回,然后在ResourceManager组件启动过程中进行初始化。
代码清单3-24 ResourceManagerRuntimeServices.createSlotManager()方法
private static SlotManager createSlotManager(ResourceManagerRuntimeServices Configuration configuration, ScheduledExecutor scheduledExecutor) { // 创建SlotManagerConfiguration final SlotManagerConfiguration slotManagerConfiguration = configuration. getSlotManagerConfiguration(); //初始化SlotMatchingStrategy final SlotMatchingStrategy slotMatchingStrategy; if (slotManagerConfiguration.evenlySpreadOutSlots()) { slotMatchingStrategy = LeastUtilizationSlotMatchingStrategy.INSTANCE; } else { slotMatchingStrategy = AnyMatchingSlotMatchingStrategy.INSTANCE; } // 返回SlotManager实现类 return new SlotManagerImpl( slotMatchingStrategy, scheduledExecutor, slotManagerConfiguration.getTaskManagerRequestTimeout(), slotManagerConfiguration.getSlotRequestTimeout(), slotManagerConfiguration.getTaskManagerTimeout(), slotManagerConfiguration.isWaitResultConsumedBeforeRelease()); }
3.ResourceManager的初始化和启动
ResourceManager组件创建完毕后,会在DefaultDispatcherResourceManagerComponentFactory中调用ResourceManager.start()方法启动ResourceManager组件。因为ResourceManager继承自RpcEndpoint,所以ResourceManager本质上是一个RPC组件服务,启动ResourceManager组件实际上就是在启动ResourceManager组件对应RpcEndpoint中的RpcServer。当ResourceManager对应的RPC服务启动后,就会通过RpcEndpoint调用ResourceManager.onStart()方法启动ResourceManager内部的其他核心服务,最终完成ResourceManager的启动流程。
如代码清单3-25所示,在ResourceManager.onStart()方法内调用了ResourceManager.startResourceManagerServices()方法启动ResourceManager组件使用的内部服务。onStart()方法主要包含如下流程。
·从highAvailabilityServices基础服务中获取ResourceManager对应的leaderElectionService,其中leaderElectionService用于在高可用集群模式下,提供选择ResourceManager Leader的能力,以保证集群ResourceManager组件的高可用。
·调用ResourceManager.initialize()初始化方法,这里主要由ResourceManager的子类实现,例如在StrandaloneResourceManager中可以定义需要进行初始化的操作。
·通过LeaderElectionService服务启动ResourceManager,并将启动的ResourceManager RpcEndpoint设定为Leader角色。
·启动jobLeaderIdService服务,用于管理注册的JobManager节点,包括对JobManager的注册和注销等操作。
·注册Slot和TaskExecutor的Metrics监控信息。
代码清单3-25 ResourceManager.startResourceManagerServices()方法
private void startResourceManagerServices() throws Exception { try { // 从高可用服务中获取ResourceManager的leaderElectionService leaderElectionService = highAvailabilityServices.getResourceManagerLeader ElectionService(); // ResourceManager初始化方法,主要由子类实现服务启动过程中需要执行的操作 initialize(); // 通过LeaderElectionService服务启动当前ResourceManager,并设定为Leader角色 leaderElectionService.start(this); // 启动JobLeaderIdService jobLeaderIdService.start(new JobLeaderIdActionsImpl()); // 注册Slot和TaskExecutor的Metrics监控指标 registerSlotAndTaskExecutorMetrics(); } catch (Exception e) { handleStartResourceManagerServicesException(e); } }
在ResourceManager.startResourceManagerServices()方法中执行leaderElectionService.start(this)代码时,实际上就是让leaderElectionService选择当前的LeaderContender为Leader节点。ResourceManager实现了LeaderContender接口,因此可以作为竞争者参与到Leader的竞选中。通过leaderElectionService选举当前的ResourceManager组件为Leader节点,此时ResourceManager就可以对外提供服务了。
一旦当前的LeaderContender被选为Leader节点,就会调用LeaderContender.grantLeadership()方法为该LeaderContender授予Leadership角色。如代码清单3-26所示,ResourceManager作为LeaderContender,接收LeaderElectionService赋予的Leadership角色,调用leaderElectionService.confirmLeadership()方法接收并确认LeaderShip。最终当前的ResourceManager作为LeaderContender就成功接受了Leadership角色。
代码清单3-26 ResourceManager.grantLeadership()方法
public void grantLeadership(final UUID newLeaderSessionID) { // 在clearStateFuture中增加异步操作,调用tryAcceptLeadership()方法 final CompletableFuture<Boolean> acceptLeadershipFuture = clearStateFuture .thenComposeAsync((ignored) -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor()); // 如果LeaderContender接受了Leadership,通知leaderElectionService进行confirm操作 final CompletableFuture<Void> confirmationFuture = acceptLeadershipFuture. thenAcceptAsync( (acceptLeadership) -> { if (acceptLeadership) { // 调用leaderElectionService.confirmLeadership()进行Leadership确认 leaderElectionService.confirmLeadership(newLeaderSessionID, getAddress()); } }, getRpcService().getExecutor()); // 如果在此过程中出现错误,则执行onFatalError()方法 confirmationFuture.whenComplete( (Void ignored, Throwable throwable) -> { if (throwable != null) { onFatalError(ExceptionUtils.stripCompletionException(throwable)); } }); }
接下来在tryAcceptLeadership()方法中调用startServicesOnLeadership()方法对接收到的LeaderShip执行后续操作。如代码清单3-27所示,方法中主要启动心跳服务和SlotManager服务等ResourceManager内部服务。HeartbeatServices和SlotManager服务成功启动,标志着ResourceManager服务初始化和启动完毕,接下来就可以对外提供服务了。
代码清单3-27 ResourceManager.startServicesOnLeadership()方法
protected void startServicesOnLeadership() { // 启动心跳服务 startHeartbeatServices(); // 启动SlotManager服务 slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); }
如代码清单3-28所示,在ResourceManager中HeartbeatService的启动方法中,主要包括了对taskManagerHeartbeatManager和jobManagerHeartbeatManager两个心跳管理服务的启动操作。而心跳管理服务主要通过TaskManagerHeartbeatListener和JobManagerHeartbeatListener两个监听器收集来自TaskManager和JobManager的心跳信息,以保证整个运行时中各个组件之间能够正常通信。
代码清单3-28 ResourceManager.startServicesOnLeadership()方法
private void startHeartbeatServices() { // 启动TaskManager对应的HeartbeatManager taskManagerHeartbeatManager = heartbeatServices. createHeartbeatManagerSender( resourceId, new TaskManagerHeartbeatListener(), getMainThreadExecutor(), log); jobManagerHeartbeatManager = heartbeatServices. createHeartbeatManagerSender( resourceId, new JobManagerHeartbeatListener(), getMainThreadExecutor(), log); }
4.启动SlotManager服务
我们知道,SlotManager服务会在ResourceManager收到LeaderShip后启动,这里主要会调用SlotManager.start()方法启动SlotManager服务。如代码清单3-29所示,SlotManager.start()方法主要有如下逻辑。
·对传入的参数进行校验,确保参数不为空。
·将SlotManager启动标志设为True,然后通过scheduledExecutor线程池启动TaskManager周期性超时检查线程服务,实际上是通过checkTaskManagerTimeouts()方法实现该检查,防止TaskManager长时间掉线等问题。
·启动单独的线程对提交的SlotRequest进行周期性超时检查,防止Slot请求超时。
·最后成功启动SlotManager服务。
代码清单3-29 SlotManagerImpl.start()方法
public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceActions newResourceActions) { LOG.info("Starting the SlotManager."); // 对参数进行非空检验 this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId); mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); resourceActions = Preconditions.checkNotNull(newResourceActions); // 设置启动标志为true started = true; // 启动TaskManager周期性超时检查 taskManagerTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay( () -> mainThreadExecutor.execute( () -> checkTaskManagerTimeouts()), 0L, taskManagerTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); // 启动SlotRequest周期性超时检查 slotRequestTimeoutCheck = scheduledExecutor.scheduleWithFixedDelay( () -> mainThreadExecutor.execute( () -> checkSlotRequestTimeouts()), 0L, slotRequestTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); }
到这里,ResourceManager启动和初始化工作就完成了,此时ResourceManager可以接收来自TaskManager的注册信息,并向JobManager提供Slot计算资源,为提交的作业提供计算资源。