3.1.2 集群的启动与初始化
Flink的集群模式主要分为Per-Job和Session两种,其中Per-Job集群模式为每一个提交的Job单独创建一套完整的运行时集群环境,该Job独享运行时集群使用的计算资源以及组件服务。与Per-Job集群相比,Session集群能够运行多个Flink作业,且这些作业可以共享运行时中的Dispatcher、ResourceManager等组件服务。两种集群运行模式各有特点和使用范围,从资源利用的角度看,Session集群资源的使用率相对高一些,Per-Job集群任务之间的资源隔离性会好一些。
不管是哪种类型的集群,集群运行时环境中涉及的核心组件都是一样的,主要的区别集中在作业的提交和运行过程中。这里我们以Session类型集群为例,继续介绍集群运行时,对于Per-Job类型集群单独的实现部分,我们会在介绍过程中进行特殊说明。
1.ClusterEntrypoint详解
当用户指定Session Cli命令启动集群时,首先会在Flink集群启动脚本中调用ClusterEntrypoint抽象实现类中提供的main()方法,以启动和运行相应类型的集群环境。ClusterEntrypoint是整个集群运行时的启动入口类,且内部带有main()方法。运行时管理节点中,所有服务都通过ClusterEntrypoint进行触发和启动,进而完成核心组件的创建和初始化。
如图3-2所示,ClusterEntrypoint会根据集群运行模式,将ClusterEntrypoint分为SessionClusterEntrypoint和JobClusterEntrypoint两种基本实现类。顾名思义,SessionClusterEntrypoint是Session类型集群的入口启动类,JobClusterEntrypoint是Per-Job类型集群的入口启动类。在集群运行模式基本类的基础上,衍生出了集群资源管理器对应的ClusterEntrypoint实现类,例如YarnJobClusterEntrypoint、StandaloneJobClusterEntrypoint等。
从图3-2中可以看出,SessionClusterEntrypoint的实现类有YarnSessionClusterEntrypoint、StandaloneSessionClusterEntrypoint以及KubernetesSessionClusterEntrypoint等。JobClusterEntrypoint的实现类主要有YarnJobClusterEntrypoint、StandaloneJobClusterEntrypoint和MesosJobClusterEntrypoint。用户创建和启动的集群类型不同,最终通过不同的ClusterEntrypoint实现类启动对应类型的集群运行环境。
2.通过ClusterEntrypoint启动集群服务
如图3-3所示,我们以StandaloneSessionClusterEntrypoint为例说明StandaloneSession集群的启动过程,并介绍其主要核心组件的创建和初始化方法。关于其他类型的集群创建,我们将在第4章重点讲解。
从图3-3中我们可以看出,集群初始化过程主要包含如下步骤。
·用户运行start-cluster.sh命令启动StandaloneSession集群,此时在启动脚本中会启动StandaloneSessionClusterEntrypoint入口类。
·在StandaloneSessionClusterEntrypoint.main方法中创建StandaloneSessionClusterEntrypoint实例,然后将该实例传递至抽象实现类ClusterEntrypoint.runClusterEntrypoint(entrypoint)方法继续后续流程。
·在ClusterEntrypoint中调用clusterEntrypoint.startCluster()方法启动指定的ClusterEntrypoint实现类。
·调用基本实现类ClusterEntrypoint.runCluster()私有方法启动集群服务和组件。
·调用ClusterEntrypoint.initializeServices(configuration)内部方法,初始化运行时集群需要创建的基础组件服务,如HAServices、CommonRPCService等。
·调用ClusterEntrypoint.createDispatcherResourceManagerComponentFactory()子类实现方法,创建DispatcherResourceManagerComponentFactory对象,在本实例中会调用StandaloneSessionClusterEntrypoint实现的方法,其他类型的集群环境会根据不同实现,创建不同类型的DispatcherResourceManager工厂类。
·在StandaloneSessionClusterEntrypoint.createDispatcherResourceManagerComponentFacto-ry()方法中最终调用DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory()方法,创建基于Session模式的DefaultDispatcherResourceMana-gerComponentFactory。
图3-2 ClusterEntrypoint UML关系图
图3-3 StandaloneSession集群启动流程
·基于前面创建的基础服务,调用DispatcherResourceManagerComponentFactory.create()方法创建集群核心组件封装类DispatcherResourceManagerComponent,可以看出核心组件实际上包括了Dispatcher和ResourceManager。
·在DispatcherResourceManagerComponentFactory.create()方法中,首先创建和启动WebMonitorEndpoint对象,作为Dispatcher对应的Rest endpoint,通过Rest API将JobGraph提交到Dispatcher上,同时,WebMonitorEndpoint也会提供Web UI需要的Rest API接口实现。
·调用ResourceManagerFactory.createResourceManager()方法创建ResourceManager组件并启动。
·调用DispatcherRunnerFactory.createDispatcherRunner()方法创建DispatcherRunner组件后,启动DispatcherRunner服务。
·将创建好的WebMonitorEndpoint、ResourceManager和DispatcherRunner封装到DispatcherResourceManagerComponent中,其中还包括DispatcherRunner和ResourceManager对应的高可用管理服务dispatcherLeaderRetrievalService和resourceManagerRetrievalService。
以下我们从源码层面介绍ClusterEntrypoint涉及的重点步骤。
如代码清单3-1所示,ClusterEntrypoint提供了实现子类的runClusterEntrypoint()静态方法。例如在StandaloneSessionClusterEntrypoint中,main()方法会调用runClusterEntrypoint()方法,触发集群启动的进程。
代码清单3-1 ClusterEntrypoint.runClusterEntrypoint()方法
public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) { final String clusterEntrypointName = clusterEntrypoint.getClass(). getSimpleName(); try { // 调用clusterEntrypoint.startCluster()方法 clusterEntrypoint.startCluster(); } catch (ClusterEntrypointException e) { LOG.error(String.format("Could not start cluster entrypoint %s.", clusterEntrypointName), e); System.exit(STARTUP_FAILURE_RETURN_CODE); } // 此处省略部分代码 }
接着在clusterEntrypoint.startCluster()方法中调用ClusterEntrypoint.runCluster()的内部方法创建集群组件。如代码清单3-2所示,ClusterEntrypoint.runCluster()方法主要包含如下步骤。
·调用ClusterEntrypoint.initializeServices()方法,完成集群需要的基础服务初始化操作。
·将创建和初始化RPC服务地址和端口配置写入configuration,以便在接下来创建的组件中使用。
·调用createDispatcherResourceManagerComponentFactory()抽象方法,创建对应集群的DispatcherResourceManagerComponentFactory。
·通过dispatcherResourceManagerComponentFactory的实现类创建clusterComponent,也就是运行时中使用的组件服务。
·向clusterComponent的ShutDownFuture对象中添加需要在集群停止后执行的异步操作。
代码清单3-2 ClusterEntrypoint.runCluster()方法
private void runCluster(Configuration configuration) throws Exception { synchronized (lock) { // 通过configuration初始化集群服务 initializeServices(configuration); // 将RPC服务中的端口和地址写入configuration configuration.setString(JobManagerOptions.ADDRESS, commonRpcService. getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService. getPort()); // 创建DispatcherResourceManagerComponentFactory,创建方法由子类实现 final DispatcherResourceManagerComponentFactory dispatcherResourceMana gerComponentFactory = createDispatcherResourceManagerComponentFactory (configuration); // 通过dispatcherResourceManagerComponentFactory创建clusterComponent clusterComponent = dispatcherResourceManagerComponentFactory.create( configuration, ioExecutor, commonRpcService, haServices, blobServer, heartbeatServices, metricRegistry, archivedExecutionGraphStore, new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryService RpcService()), this); // 向clusterComponent的ShutDownFuture中添加操作 clusterComponent.getShutDownFuture().whenComplete( (ApplicationStatus applicationStatus, Throwable throwable) -> { if (throwable != null) { shutDownAsync( ApplicationStatus.UNKNOWN, ExceptionUtils.stringifyException(throwable), false); } else { shutDownAsync( applicationStatus, null, true); } }); } }
如代码清单3-3所示,在ClusterEntrypoint.initializeServices()方法中初始化了集群组件需要的多种服务,方法包括如下步骤。
·从configuration中获取配置的RPC地址和portRange参数,根据配置地址和端口信息创建集群所需的公用commonRpcService服务。更新configuration中的address和port配置,用于支持集群组件高可用服务。
·创建ioExecutor线程池,用于集群组件的I/O操作,如本地文件数据读取和输出等。
·创建并启动haService,向集群组件提供高可用支持,集群中的组件都会通过haService创建高可用服务。
·创建并启动blobServer,存储集群需要的Blob对象数据,blobServer中存储的数据能够被JobManager以及TaskManager访问,例如JobGraph中的JAR包等数据。
·创建heartbeatServices,主要用于创建集群组件之间的心跳检测,例如ResourceManager与JobManager之间的心跳服务。
·创建metricRegistry服务,用于注册集群监控指标收集。
·创建archivedExecutionGraphStore服务,用于压缩并存储集群中的ExecutionGraph,主要有FileArchivedExecutionGraphStore和MemoryArchivedExecutionGraphStore两种实现类型。
代码清单3-3 ClusterEntrypoint.initializeServices()方法
protected void initializeServices(Configuration configuration) throws Exception { // 初始化集群服务 LOG.info("Initializing cluster services."); // 加锁处理 synchronized (lock) { //获取RPC地址和端口 final String bindAddress = configuration.getString(JobManagerOptions. ADDRESS); final String portRange = getRPCPortRange(configuration); // 根据配置的地址和端口创建公用的RpcService commonRpcService = createRpcService(configuration, bindAddress, portRange); // 根据创建好的commonRpcService,更新configuration中的address和port配置, 用于创建高可用服务 configuration.setString(JobManagerOptions.ADDRESS, commonRpcService. getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService. getPort()); // 创建ioExecutor线程池,提供集群中的I/O操作 ioExecutor = Executors.newFixedThreadPool( Hardware.getNumberCPUCores(), new ExecutorThreadFactory("cluster-io")); // 创建高可用服务,提供集群高可用支持 haServices = createHaServices(configuration, ioExecutor); // 创建并启动BlobServer,用于存储对象数据,例如JobGraph中的JAR包等 blobServer = new BlobServer(configuration, haServices. createBlobStore()); blobServer.start(); // 创建heartbeatServices,用于在组件和组件之间进行心跳检测 heartbeatServices = createHeartbeatServices(configuration); // 创建metricRegistry metricRegistry = createMetricRegistry(configuration); final RpcService metricQueryServiceRpcService = MetricUtils.startMetrics RpcService(configuration, bindAddress); metricRegistry.startQueryService(metricQueryServiceRpcService, null); final String hostname = RpcUtils.getHostname(commonRpcService); // 创建processMetricGroup,用于监控集群系统指标 processMetricGroup = MetricUtils.instantiateProcessMetricGroup( metricRegistry, hostname, ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration)); // 创建archivedExecutionGraphStore,不同的集群类型创建不同的ExecutionGraphStore archivedExecutionGraphStore = createSerializableExecutionGraphStore (configuration, commonRpcService.getScheduledExecutor()); } }