Flink设计与实现:核心原理与源码解析
上QQ阅读APP看书,第一时间看更新

1.3.2 源码调试环境搭建

除了对Flink源码进行编译外,我们还需要在本地开发环境中搭建Flink集群调试环境,这样有利于通过断点的方式深入了解Flink内部程序的运行过程。和官方文档建议的一样,这里推荐读者使用IDEA作为源码阅读工具,因为IDEA提供了比较方便的代码调试工具。

下面介绍在IDEA中构建Flink源码调试环境的方法。

1.MiniCluster单机调试环境

搭建单机版的本地环境相对简单,实际上就是直接在IDEA中调试Flink应用,此时Flink会启动MiniCluster模拟分布式集群环境,在单个JVM进程中构建最精简的运行时环境。这种方法在构建上花费的成本相对较低,这是因为和真正的分布式集群环境相比,MiniCluster省略了集群的一些主要组件,如Flink WebUI等。对于想要掌握Flink源码的读者,建议不要直接使用MiniCluster作为源码的调试环境,而采用分布式集群本地调试环境或远程调试环境。

2.分布式集群本地调试环境

分布式集群本地调试环境实际上就是在IDEA中运行StandaloneCluster,此时在IDEA中会同时启动JobManager和TaskManager进程,这个过程会消耗较多的本地CPU和内存资源,因此我们的电脑需要有足够多的计算资源来构建环境。

下面介绍分布式集群本地调试环境的搭建过程。这里假设读者已经将Flink源码导入本地IDEA并完成了源码编译工作。

(1)启动JobManager

在不同类型的集群资源管理器上构建Flink集群,对应的集群入口类会有所不同。这里的集群入口类实际上就是集群服务启动main()方法所在的类。例如StandaloneSessionCluster对应的集群入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint。

我们需要先找到StandaloneSessionClusterEntrypoint入口类,其内部含有启动Standalone集群管理节点的main()方法,然后进入IDEA的Run/Debug Configurations配置界面。如图1-12所示,在界面中增加新的应用调试配置信息,然后在Main class配置栏中指定StandaloneSessionClusterEntrypoint。

图1-12 配置在IDEA中启动StandaloneSessionClusterEntrypoint

接下来配置VM options和Program arguments两个参数。VM options用于构建StandaloneSessionClusterEntrypoint Class启动过程依赖的环境信息,包括log4j.configuration配置及classpath环境信息。

如代码清单1-1所示,logback.configurationFile指定了JobManager启动过程依赖的Log4j配置文件地址,这里是我们使用源码编译后,在build-target/conf/路径中生成的log4j-console.properties。classpath则指定了JobManager启动中依赖的JAR包信息。我们需要将源码编译后build-target/lib/路径中的全部JAR包添加到classpath参数中,其中source_code_path是Flink源码的绝对路径。

代码清单1-1 VM options配置信息


-Dlog4j.configuration=file:/home/workspace/flink/build-target/conf/log4j-
    console.properties 
-classpath :/{source_code_path}/flink/build-target/lib/flink-dist_2.11-
   1.10-SNAPSHOT.jar:/{source_code_path}/flink/build-target/lib/flink-table-
   blink_2.11-1.10-SNAPSHOT.jar:/{source_code_path}/flink/build-target/lib/
   flink-table_2.11-1.10-SNAPSHOT.jar:/{source_code_path}/flink/build-
   target/lib/log4j-1.2.17.jar:/{source_code_path}/flink/build-target/lib/
    slf4j-log4j12-1.7.15.jar

除了VM options外,我们还需要配置Program arguments参数,用于指定flink-conf.yaml配置文件。同样在源码编译生成的build-target/conf路径中找到配置文件,当然也可以指定文件路径获取配置文件。

在flink-conf.yaml文件中,原则上不需要对参数做任何调整,如果JobManager和TaskManager不在同一台机器上,则需要修改flink-conf.yaml中的参数信息。这里我们使用build-target/conf路径的默认配置,代码如下。


-c /path_to_flink_source_code/flink/build-target/conf

接下来选择Use classpath of module配置栏中的module信息,这里选择flink-runtime_2.11模块。选择完毕后配置合适版本的JRE,只要保证JDK在1.8以上版本即可。

配置完毕后,点击Apply按钮,就可以在IDEA启动栏中运行StandaloneSessionClusterEntrypoint类,此时本地Standalone的管理节点可以启动并运行了。

JobManager节点启动后,可以在浏览器中输入http://localhost:8081检查Flink集群的管理节点是否正常运行。

(2)启动TaskManager

TaskManager和JobManager的启动方式基本一致,唯一的区别是启动类不同,TaskManager实例的启动类为org.apache.flink.runtime.taskexecutor.TaskManagerRunner。只需要按照JobManager的步骤配置TaskManager启动信息,然后指定TaskManagerRunner作为TaskManager的Main Class即可。需要注意的是,TaskManager启动后会主动向JobManager注册,如果没有可以连接的JobManager实例,会返回注册异常,因此TaskManager需要在JobManager启动之后启动。

以上就是在IDEA中搭建分布式集群调试环境的过程,通过客户端命令行可以向Flink集群提交作业并运行,在各个模块中设定代码调试断点。

3.分布式集群远程调试环境

除了使用本地计算资源搭建Flink分布式集群之外,也可以通过JVM远程调试功能调试Flink集群。这种方法其实就是在服务器上启动Flink集群,然后通过设定JVM监听端口,在IDEA中启动Remote远程调试工具,然后连接监听端口,间接实现对源码的调试。这种调试方式不需要占用太多的本地计算资源,因此对本地电脑配置的要求较低。但是远端的集群代码要和本地代码保持一致,否则可能出现断点无法追踪的情况。

配置远程调试环境时,需要在远端集群bin/flink-daemon.sh启动文件,添加以下JVM配置信息。注意,如果JobManager和TaskManager运行在同一节点上,需要区分端口,否则可能出现冲突。


JVM_ARGS="$JVM_ARGS
   -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"

如图1-13所示,启动远端服务器的Flink集群后,会自动开启监听端口5005,接下来就可以在IDEA中配置远程监听,启动本地JobManager debug进程。此时可以连接到服务器上的Flink集群,实现远程调试。

图1-13 配置IDEA远程调试环境

需要注意的是,在远程调试启动的过程中,JobManager和TaskManager需要先在远端集群上运行,之后才可以在本地IDEA中进行连接和调试。如果需要调试JobManager和TaskManager启动过程,就不能采用远程调试这种模式了,只能借助本地搭建分布式集群环境来实现。