流计算框架Flink与Storm的性能对比
1.背景
Apache Flink和Apache Storm是当前业界广泛使用的两个分布式实时计算框架。其中Apache Storm(以下简称“Storm”)在美团点评实时计算业务中已有较为成熟的运用(可参考Storm的可靠性保证测试),有管理平台、常用API和相应的文档,大量实时作业基于Storm构建。而Apache Flink(以下简称“Flink”)在近期倍受关注,具有高吞吐、低延迟、高可靠和精确计算等特性,对事件窗口有很好的支持,目前在美团点评实时计算业务中也已有一定应用。
为深入熟悉了解Flink框架,验证其稳定性和可靠性,评估其实时处理性能,识别该体系中的缺点,找到其性能瓶颈并进行优化,给用户提供最适合的实时计算引擎,我们以实践经验丰富的Storm框架作为对照,进行了一系列实验测试Flink框架的性能,计算Flink作为确保“至少一次”和“恰好一次”语义的实时计算框架时对资源的消耗,为实时计算平台资源规划、框架选择、性能调优等决策及Flink平台的建设提出建议并提供数据支持,为后续的SLA建设提供一定参考。
Flink与Storm两个框架对比:
2.测试目标
评估不同场景、不同数据压力下Flink和Storm两个实时计算框架目前的性能表现,获取其详细性能数据并找到处理性能的极限;了解不同配置对Flink性能影响的程度,分析各种配置的适用场景,从而得出调优建议。
2.1 测试场景
“输入-输出”简单处理场景
通过对“输入-输出”这样简单处理逻辑场景的测试,尽可能减少其它因素的干扰,反映两个框架本身的性能。
同时测算框架处理能力的极限,处理更加复杂的逻辑的性能不会比纯粹“输入-输出”更高。
用户作业耗时较长的场景
如果用户的处理逻辑较为复杂,或是访问了数据库等外部组件,其执行时间会增大,作业的性能会受到影响。因此,我们测试了用户作业耗时较长的场景下两个框架的调度性能。
窗口统计场景
实时计算中常有对时间窗口或计数窗口进行统计的需求,例如一天中每五分钟的访问量,每100个订单中有多少个使用了优惠等。Flink在窗口支持上的功能比Storm更加强大,API更加完善,但是我们同时也想了解在窗口统计这个常用场景下两个框架的性能。
精确计算场景(即消息投递语义为“恰好一次”)
Storm仅能保证“至多一次”(At Most Once)和“至少一次”(At Least Once)的消息投递语义,即可能存在重复发送的情况。有很多业务场景对数据的精确性要求较高,希望消息投递不重不漏。Flink支持“恰好一次”(Exactly Once)的语义,但是在限定的资源条件下,更加严格的精确度要求可能带来更高的代价,从而影响性能。因此,我们测试了在不同消息投递语义下两个框架的性能,希望为精确计算场景的资源规划提供数据参考。
2.2 性能指标
吞吐量(Throughput)
· 单位时间内由计算框架成功地传送数据的数量,本次测试吞吐量的单位为:条/秒。
· 反映了系统的负载能力,在相应的资源条件下,单位时间内系统能处理多少数据。
· 吞吐量常用于资源规划,同时也用于协助分析系统性能瓶颈,从而进行相应的资源调整以保证系统能达到用户所要求的处理能力。假设商家每小时能做二十份午餐(吞吐量20份/小时),一个外卖小哥每小时只能送两份(吞吐量2份/小时),这个系统的瓶颈就在小哥配送这个环节,可以给该商家安排十个外卖小哥配送。
延迟(Latency)
· 数据从进入系统到流出系统所用的时间,本次测试延迟的单位为:毫秒。
· 反映了系统处理的实时性。
· 金融交易分析等大量实时计算业务对延迟有较高要求,延迟越低,数据实时性越强。
· 假设商家做一份午餐需要5分钟,小哥配送需要25分钟,这个流程中用户感受到了30分钟的延迟。如果更换配送方案后延迟变成了60分钟,等送到了饭菜都凉了,这个新的方案就是无法接受的。
3.测试环境
为Storm和Flink分别搭建由1台主节点和2台从节点构成的Standalone集群进行本次测试。其中为了观察Flink在实际生产环境中的性能,对于部分测内容也进行了on Yarn环境的测试。
3.1 集群参数
3.2 框架参数
4.测试方法
4.1 测试流程
数据生产
Data Generator按特定速率生成数据,带上自增的id和eventTime时间戳写入Kafka的一个Topic(Topic Data)。
数据处理
Storm Task和Flink Task(每个测试用例不同)从Kafka Topic Data相同的Offset开始消费,并将结果及相应inTime、outTime时间戳分别写入两个Topic(Topic Storm和Topic Flink)中。
指标统计
Metrics Collector按outTime的时间窗口从这两个Topic中统计测试指标,每五分钟将相应的指标写入MySQL表中。
Metrics Collector按outTime取五分钟的滚动时间窗口,计算五分钟的平均吞吐(输出数据的条数)、五分钟内的延迟(outTime - eventTime或outTime - inTime)的中位数及99线等指标,写入MySQL相应的数据表中。最后对MySQL表中的吞吐计算均值,延迟中位数及延迟99线选取中位数,绘制图像并分析。
4.2 默认参数
· Storm和Flink默认均为At Least Once语义。
· Storm开启ACK, ACKer数量为1。
· Flink的Checkpoint时间间隔为30秒,默认StateBackend为Memory。
· 保证Kafka不是性能瓶颈,尽可能排除Kafka对测试结果的影响。
· 测试延迟时数据生产速率小于数据处理能力,假设数据被写入Kafka后立刻被读取,即eventTime等于数据进入系统的时间。
· 测试吞吐量时从Kafka Topic的最旧开始读取,假设该Topic中的测试数据量充足。
4.3 测试用例
Identity
· Identity用例主要模拟“输入-输出”简单处理场景,反映两个框架本身的性能。
· 输入数据为“msgId, eventTime”,其中eventTime视为数据生成时间。单条输入数据约20 B。
· 进入作业处理流程时记录inTime,作业处理完成后(准备输出时)记录outTime。
· 作业从Kafka Topic Data中读取数据后,在字符串末尾追加时间戳,然后直接输出到Kafka。
· 输出数据为“msgId, eventTime, inTime, outTime”。单条输出数据约50 B。
Sleep
· Sleep用例主要模拟用户作业耗时较长的场景,反映复杂用户逻辑对框架差异的削弱,比较两个框架的调度性能。
· 输入数据和输出数据均与Identity相同。
· 读入数据后,等待一定时长(1 ms)后在字符串末尾追加时间戳后输出
Windowed Word Count
· Windowed Word Count用例主要模拟窗口统计场景,反映两个框架在进行窗口统计时性能的差异。
· 此外,还用其进行了精确计算场景的测试,反映Flink恰好一次投递的性能。
· 输入为JSON格式,包含msgId、eventTime和一个由若干单词组成的句子,单词之间由空格分隔。单条输入数据约150 B。
· 读入数据后解析JSON,然后将句子分割为相应单词,带eventTime和inTime时间戳发给CountWindow进行单词计数,同时记录一个窗口中最大最小的eventTime和inTime,最后带outTime时间戳输出到Kafka相应的Topic。
· Spout/Source及OutputBolt/Output/Sink并发度恒为1,增大并发度时仅增大JSONParser、CountWindow的并发度。
· 由于Storm对window的支持较弱,CountWindow使用一个HashMap手动实现,Flink用了原生的CountWindow和相应的Reduce函数。
5.测试结果
5.1 Identity单线程吞吐量
· 上图中蓝色柱形为单线程Storm作业的吞吐,橙色柱形为单线程Flink作业的吞吐。
· Identity逻辑下,Storm单线程吞吐为8.7万条/秒,Flink单线程吞吐可达35万条/秒。
· 当Kafka Data的Partition数为1时,Flink的吞吐约为Storm的3.2倍;当其Partition数为8时,Flink的吞吐约为Storm的4.6倍。
· 由此可以看出,Flink吞吐约为Storm的3-5倍。
5.2 Identity单线程作业延迟
· 采用outTime - eventTime作为延迟,图中蓝色折线为Storm,橙色折线为Flink。虚线为99线,实线为中位数。
· 从图中可以看出随着数据量逐渐增大,Identity的延迟逐渐增大。其中99线的增大速度比中位数快,Storm的增大速度比Flink快。
· 其中QPS在80000以上的测试数据超过了Storm单线程的吞吐能力,无法对Storm进行测试,只有Flink的曲线。
· 对比折线最右端的数据可以看出,Storm QPS接近吞吐时延迟中位数约100毫秒,99线约700毫秒,Flink中位数约50毫秒,99线约300毫秒。Flink在满吞吐时的延迟约为Storm的一半。
5.3 Sleep吞吐量
· 从图中可以看出,Sleep 1毫秒时,Storm和Flink单线程的吞吐均在900条/秒左右,且随着并发增大基本呈线性增大。
· 对比蓝色和橙色的柱形可以发现,此时两个框架的吞吐能力基本一致。
5.4 Sleep单线程作业延迟(中位数)
· 依然采用outTime - eventTime作为延迟,从图中可以看出,Sleep 1毫秒时,Flink的延迟仍低于Storm。
5.5 Windowed Word Count单线程吞吐量
· 单线程执行大小为10的计数窗口,吞吐量统计如图。
· 从图中可以看出,Storm吞吐约为1.2万条/秒,Flink Standalone约为4.3万条/秒。Flink吞吐依然为Storm的3倍以上。
5.6 Windowed Word Count Flink At Least Once与Exactly Once吞吐量对比
· 由于同一算子的多个并行任务处理速度可能不同,在上游算子中不同快照里的内容,经过中间并行算子的处理,到达下游算子时可能被计入同一个快照中。这样一来,这部分数据会被重复处理。因此,Flink在Exactly Once语义下需要进行对齐,即当前最早的快照中所有数据处理完之前,属于下一个快照的数据不进行处理,而是在缓存区等待。当前测试用例中,在JSON Parser和CountWindow、CountWindow和Output之间均需要进行对齐,有一定消耗。为体现出对齐场景,Source/Output/Sink并发度的并发度仍为1,提高了JSONParser/CountWindow的并发度。具体流程细节参见前文Windowed Word Count流程图。
· 上图中橙色柱形为At Least Once的吞吐量,黄色柱形为Exactly Once的吞吐量。对比两者可以看出,在当前并发条件下,Exactly Once的吞吐较At Least Once而言下降了6.3%
5.7 Windowed Word Count Storm At Least Once与At Most Once吞吐量对比
· Storm将ACKer数量设置为零后,每条消息在发送时就自动ACK,不再等待Bolt的ACK,也不再重发消息,为At Most Once语义。
· 上图中蓝色柱形为At Least Once的吞吐量,浅蓝色柱形为At Most Once的吞吐量。对比两者可以看出,在当前并发条件下,At Most Once语义下的吞吐较At Least Once而言提高了16.8%
5.8 Windowed Word Count单线程作业延迟
· Identity和Sleep观测的都是outTime - eventTime,因为作业处理时间较短或Thread.sleep()精度不高,outTime - inTime为零或没有比较意义;Windowed Word Count中可以有效测得outTime - inTime的数值,将其与outTime - eventTime画在同一张图上,其中outTime -eventTime为虚线,outTime - InTime为实线。
· 观察橙色的两条折线可以发现,Flink用两种方式统计的延迟都维持在较低水平;观察两条蓝色的曲线可以发现,Storm的outTime - inTime较低,outTime - eventTime一直较高,即inTime和eventTime之间的差值一直较大,可能与Storm和Flink的数据读入方式有关。
· 蓝色折线表明Storm的延迟随数据量的增大而增大,而橙色折线表明Flink的延迟随着数据量的增大而减小(此处未测至Flink吞吐量,接近吞吐时Flink延迟依然会上升)。
· 即使仅关注outTime - inTime(即图中实线部分),依然可以发现,当QPS逐渐增大的时候,Flink在延迟上的优势开始体现出来。
5.9 Windowed Word Count Flink At Least Once与Exactly Once延迟对比
· 图中黄色为99线,橙色为中位数,虚线为At Least Once,实线为Exactly Once。图中相应颜色的虚实曲线都基本重合,可以看出Flink Exactly Once的延迟中位数曲线与At Least Once基本贴合,在延迟上性能没有太大差异。
5.10 Windowed Word Count Storm At Least Once与At Most Once延迟对比
· 图中蓝色为99线,浅蓝色为中位数,虚线为At Least Once,实线为At Most Once。QPS在4000及以前的时候,虚线实线基本重合;QPS在6000时两者已有差异,虚线略高;QPS接近8000时,已超过At Least Once语义下Storm的吞吐,因此只有实线上的点。
· 可以看出,QPS较低时Storm At Most Once与At Least Once的延迟观察不到差异,随着QPS增大差异开始增大,At Most Once的延迟较低。
5.11 Windowed Word Count Flink不同StateBackends吞吐量对比
· Flink支持Standalone和on Yarn的集群部署模式,同时支持Memory、FileSystem、RocksDB三种状态存储后端(StateBackends)。由于线上作业需要,测试了这三种StateBackends在两种集群部署模式上的性能差异。其中,Standalone时的存储路径为JobManager上的一个文件目录,on Yarn时存储路径为HDFS上一个文件目录。
· 对比三组柱形可以发现,使用FileSystem和Memory的吞吐差异不大,使用RocksDB的吞吐仅其余两者的十分之一左右。
· 对比两种颜色可以发现,Standalone和on Yarn的总体差异不大,使用FileSystem和Memory时on Yarn模式下吞吐稍高,使用RocksDB时Standalone模式下的吞吐稍高。
5.12 Windowed Word Count Flink不同StateBackends延迟对比
· 使用FileSystem和Memory作为Backends时,延迟基本一致且较低。
· 使用RocksDB作为Backends时,延迟稍高,且由于吞吐较低,在达到吞吐瓶颈前的延迟陡增。其中on Yarn模式下吞吐更低,接近吞吐时的延迟更高。
6.结论及建议
6.1 框架本身性能
· 由5.1、5.5的测试结果可以看出,Storm单线程吞吐约为8.7万条/秒,Flink单线程吞吐可达35万条/秒。Flink吞吐约为Storm的3-5倍。
· 由5.2、5.8的测试结果可以看出,Storm QPS接近吞吐时延迟(含Kafka读写时间)中位数约100毫秒,99线约700毫秒,Flink中位数约50毫秒,99线约300毫秒。Flink在满吞吐时的延迟约为Storm的一半,且随着QPS逐渐增大,Flink在延迟上的优势开始体现出来。
· 综上可得,Flink框架本身性能优于Storm。
6.2 复杂用户逻辑对框架差异的削弱
· 对比5.1和5.3、5.2和5.4的测试结果可以发现,单个Bolt Sleep时长达到1毫秒时,Flink的延迟仍低于Storm,但吞吐优势已基本无法体现。
· 因此,用户逻辑越复杂,本身耗时越长,针对该逻辑的测试体现出来的框架的差异越小。
6.3 不同消息投递语义的差异
· 由5.6、5.7、5.9、5.10的测试结果可以看出,Flink Exactly Once的吞吐较At Least Once而言下降6.3%,延迟差异不大;Storm At Most Once语义下的吞吐较At Least Once提升16.8%,延迟稍有下降。
· 由于Storm会对每条消息进行ACK, Flink是基于一批消息做的检查点,不同的实现原理导致两者在At Least Once语义的花费差异较大,从而影响了性能。而Flink实现Exactly Once语义仅增加了对齐操作,因此在算子并发量不大、没有出现慢节点的情况下对Flink性能的影响不大。Storm At Most Once语义下的性能仍然低于Flink。
6.4 Flink状态存储后端选择
· Flink提供了内存、文件系统、RocksDB三种StateBackends,结合5.11、5.12的测试结果,三者的对比如下:
6.5 推荐使用Flink的场景
综合上述测试结果,以下实时计算场景建议考虑使用Flink框架进行计算:
· 要求消息投递语义为Exactly Once的场景;
· 数据量较大,要求高吞吐低延迟的场景;
· 需要进行状态管理或窗口统计的场景。
7.展望
· 本次测试中尚有一些内容没有进行更加深入的测试,有待后续测试补充。例如:
· Exactly Once在并发量增大的时候是否吞吐会明显下降?
· 用户耗时到1ms时框架的差异已经不再明显(Thread.sleep()的精度只能到毫秒),用户耗时在什么范围内Flink的优势依然能体现出来?
· 本次测试仅观察了吞吐量和延迟两项指标,对于系统的可靠性、可扩展性等重要的性能指标没有在统计数据层面进行关注,有待后续补充。
· Flink使用RocksDBStateBackend时的吞吐较低,有待进一步探索和优化。
· 关于Flink的更高级API,如Table API & SQL及CEP等,需要进一步了解和完善。
8.参考内容
· 分布式流处理框架——功能对比和性能评估.
· intel-hadoop/HiBench: HiBench is a big data benchmark suite.
· Yahoo的流计算引擎基准测试.