4.4 如何选择合适的分区数
如何选择合适的分区数?这是很多Kafka的使用者经常面临的问题,不过对这个问题而言,似乎并没有非常权威的答案。而且这个问题显然也没有固定的答案,只能从某些角度来做具体的分析,最终还是要根据实际的业务场景、软件条件、硬件条件、负载情况等来做具体的考量。本节主要介绍与本问题相关的一些重要决策因素,使读者在遇到类似问题时能够有参考依据。
4.4.1 性能测试工具
在Kafka中,性能与分区数有着必然的关系,在设定分区数时一般也需要考虑性能的因素。对不同的硬件而言,其对应的性能也会不太一样。在实际生产环境中,我们需要了解一套硬件所对应的性能指标之后才能分配其合适的应用和负荷,所以性能测试工具必不可少。
本节要讨论的性能测试工具是 Kafka 本身提供的用于生产者性能测试的 kafka-producer-perf-test.sh和用于消费者性能测试的kafka-consumer-perf-test.sh。
首先我们通过一个示例来了解一下kafka-producer-perf-test.sh脚本的使用。我们向一个只有1个分区和1个副本的主题topic-1中发送100万条消息,并且每条消息大小为1024B,生产者对应的acks参数为1。详细内容参考如下:
示例中在使用kafka-producer-perf-test.sh脚本时用了多一个参数,其中topic用来指定生产者发送消息的目标主题;num-records 用来指定发送消息的总条数;record-size 用来设置每条消息的字节数;producer-props 参数用来指定生产者的配置,可同时指定多组配置,各组配置之间以空格分隔,与producer-props参数对应的还有一个producer.config参数,它用来指定生产者的配置文件;throughput用来进行限流控制,当设定的值小于0时不限流,当设定的值大于0时,当发送的吞吐量大于该值时就会被阻塞一段时间。下面的示例中设置了throughout的值为100字节,我们来看一下实际的效果:
kafka-producer-perf-test.sh脚本中还有一个有意思的参数print-metrics,指定了这个参数时会在测试完成之后打印很多指标信息,对很多测试任务而言具有一定的参考价值。示例参考如下:
kafka-producer-perf-test.sh 脚本中还有一些其他的参数,比如 payload-delimiter、transactional-id等,读者可以自行探索一下此脚本的更多细节。
我们再来关注kafka-producer-perf-test.sh脚本的输出信息,以下面的一行内容为例:
records sent表示测试时发送的消息总数;records/sec表示以每秒发送的消息数来统计吞吐量,括号中的MB/sec表示以每秒发送的消息大小来统计吞吐量,注意这两者的维度;avg latency表示消息处理的平均耗时;max latency表示消息处理的最大耗时;50th、95th、99th和99.9th分别表示50%、95%、99%和99.9%的消息处理耗时。
kafka-consumer-perf-test.sh 脚本的使用也比较简单,下面的示例简单地演示了其使用方式:
示例中只是简单地消费主题topic-1中的100万条消息。脚本中还包含了许多其他的参数,比如from-latest、group、print-metrics、threads等,篇幅限制,读者可以自行了解这些参数的使用细节。
输出结果中包含了多项信息,分别对应起始运行时间(start.time)、结束运行时间(end.time)、消费的消息总量(data.consumed.in.MB,单位为MB)、按字节大小计算的消费吞吐量(MB.sec,单位为MB/s)、消费的消息总数(data.consumed.in.nMsg)、按消息个数计算的吞吐量(nMsg.sec)、再平衡的时间(rebalance.time.ms,单位为ms)、拉取消息的持续时间(fetch.time.ms,单位为ms)、每秒拉取消息的字节大小(fetch.MB.sec,单位为 MB/s)、每秒拉取消息的个数(fetch.nMsg.sec)。其中fetch.time.ms=end.time-start.time-rebalance.time.ms。
这里只是简单地了解两个脚本的基本用法,读者还可以通过设置不同的参数来调节测试场景以获得针对当前硬件资源的一份相对比较完善的测试报告。
4.4.2 分区数越多吞吐量就越高吗
分区是Kafka 中最小的并行操作单元,对生产者而言,每一个分区的数据写入是完全可以并行化的;对消费者而言,Kafka 只允许单个分区中的消息被一个消费者线程消费,一个消费组的消费并行度完全依赖于所消费的分区数。如此看来,如果一个主题中的分区数越多,理论上所能达到的吞吐量就越大,那么事实真的如预想的一样吗?
我们使用4.4.1节中介绍的性能测试工具来实际测试一下。首先分别创建分区数为1、20、50、100、200、500、1000的主题,对应的主题名称分别为topic-1、topic-20、topic-50、topic-100、topic-200、topic-500、topic-1000,所有主题的副本因子都设置为1。
消息中间件的性能一般是指吞吐量(广义来说还包括延迟)。抛开硬件资源的影响,消息写入的吞吐量还会受到消息大小、消息压缩方式、消息发送方式(同步/异步)、消息确认类型(acks)、副本因子等参数的影响,消息消费的吞吐量还会受到应用逻辑处理速度的影响。本案例中暂不考虑这些因素的影响,所有的测试除了主题的分区数不同,其余的因素都保持相同。
本次案例中使用的测试环境为一个由3台普通云主机组成的3节点的Kafka集群,每台云主机的内存大小为8GB、磁盘大小为40GB、4核CPU的主频为2600MHz。JVM版本为1.8.0_112,Linux系统版本为2.6.32-504.23.4.el6.x86_64。
使用kafka-producer-perf-test.sh脚本分别向这些主题中发送100万条消息体大小为1KB的消息,对应的测试命令如下:
对应的生产者性能测试结果如图 4-2 所示。不同的硬件环境,甚至不同批次的测试得到的测试结果也不会完全相同,但总体趋势还是会保持和图4-2中的一样。
图4-2 生产者性能测试结果
在图4-2中,我们可以看到分区数为1时吞吐量最低,随着分区数的增长,相应的吞吐量也跟着上涨。一旦分区数超过了某个阈值之后,整体的吞吐量是不升反降的。也就是说,并不是分区数越多吞吐量也越大。这里的分区数临界阈值针对不同的测试环境也会表现出不同的结果,实际应用中可以通过类似的测试案例(比如复制生产流量以便进行测试回放)来找到一个合理的临界值区间。
上面针对的是消息生产者的测试,对消息消费者而言同样有吞吐量方面的考量。使用kafka-consumer-perf-test.sh脚本分别消费这些主题中的100万条消息,对应的测试命令如下:
消费者性能测试的结果如图 4-3 所示。与生产者性能测试相同的是,不同的测试环境或不同的测试批次所得到的测试结果也不尽相同,但总体趋势还是会保持和图4-3中的一样。
在图 4-3 中,随着分区数的增加,相应的吞吐量也会有所增长。一旦分区数超过了某个阈值之后,整体的吞吐量也是不升反降的,同样说明了分区数越多并不会使吞吐量一直增长。
图4-3 消费者性能测试
在同一套环境下,我们还可以测试一下同时往两个分区数为200的主题中发送消息的性能,假设测试结果中两个主题所对应的吞吐量分别为A和B,再测试一下只往一个分区数为200的主题中发送消息的性能,假设此次测试结果中得到的吞吐量为C,会发现A<C、B<C且A+B>C。可以发现由于共享系统资源的因素,A和B之间会彼此影响。通过A+B>C的结果,可知图4-2中topic-200的那个点位也并没有触及系统资源的瓶颈,发生吞吐量有所下降的结果也并非是系统资源瓶颈造成的。
本节针对分区数越多吞吐量越高这个命题进行反证,其实要证明一个观点是错误的,只需要举个反例即可,本节的内容亦是如此。不过本节并没有指明分区数越多吞吐量就越低这个观点,并且具体吞吐量的数值和走势还会和磁盘、文件系统、I/O调度策略相关。分区数越多吞吐量也就越高?网络上很多资料都认可这一观点,但实际上很多事情都会有一个临界值,当超过这个临界值之后,很多原本符合既定逻辑的走向又会变得不同。读者需要对此有清晰的认知,懂得去伪求真,实地测试验证不失为一座通向真知的桥梁。
4.4.3 分区数的上限
一味地增加分区数并不能使吞吐量一直得到提升,并且分区数也并不能一直增加,如果超过默认的配置值,还会引起Kafka进程的崩溃。读者可以试着在一台普通的Linux机器上创建包含10000个分区的主题,比如在下面示例中创建一个主题topic-bomb:
执行完成后可以检查 Kafka 的进程是否还存在(比如通过 jps 命令或 ps-aux|grep kafka命令)。一般情况下,会发现原本运行完好的Kafka服务已经崩溃。此时或许会想到,创建这么多分区,是不是因为内存不够而引起的进程崩溃?我们在启动 Kafka 进程的时候将JVM堆设置得大一点是不是就可以解决问题了。其实不然,创建这些分区而引起的内存增长完全不足以让Kafka“畏惧”。
为了分析真实的原因,我们可以打开 Kafka 的服务日志文件($KAFKA_HOME/logs/server.log)来一探究竟,会发现服务日志中出现大量的异常:
异常中最关键的信息是“Too many open flies”,这是一种常见的Linux系统错误,通常意味着文件描述符不足,它一般发生在创建线程、创建 Socket、打开文件这些场景下。在 Linux系统的默认设置下,这个文件描述符的个数不是很多,通过ulimit命令可以查看:
ulimit是在系统允许的情况下,提供对特定shell可利用的资源的控制。-H和-S选项指定资源的硬限制和软限制。硬限制设定之后不能再添加,而软限制则可以增加到硬限制规定的值。如果-H和-S选项都没有指定,则软限制和硬限制同时设定。限制值可以是指定资源的数值或hard、soft、unlimited这些特殊值,其中hard代表当前硬限制,soft代表当前软件限制,unlimited代表不限制。如果不指定限制值,则打印指定资源的软限制值,除非指定了-H选项。硬限制可以在任何时候、任何进程中设置,但硬限制只能由超级用户设置。软限制是内核实际执行的限制,任何进程都可以将软限制设置为任意小于等于硬限制的值。
我们可以通过测试来验证本案例中的 Kafka 的崩溃是否是由于文件描述符的限制而引起的。下面我们在一个包含3个节点的Kafka集群中挑选一个节点进行具体的分析。首先通过jps命令查看Kafka进程pid的值:
查看当前Kafka进程所占用的文件描述符的个数(注意这个值并不是Kafka第一次启动时就需要占用的文件描述符的个数,示例中的Kafka环境下已经存在了若干主题):
我们再新建一个只有一个分区的主题,并查看Kafka进程所占用的文件描述符的个数:
可以看到增加了一个分区,对应的也只增加了一个文件描述符。之前我们通过ulimit命令可以看到软限制是1024,我们创建一个具有829(1024-195=829)个分区的主题:
可以看到Kafka进程此时占用了1024个文件描述符,并且运行完好。这时我们还可以联想到硬限制4096这个关键数字,我们再创建一个包含3071(4096-1024=3072,这里特地少创建1个分区)个分区的主题,示例如下:
Kafka进程依旧完好,文件描述符占用为4095,逼近最高值4096。最后我们再次创建一个只有一个分区的主题:
此时Kafka已经崩溃,查看进程号时已没有相关信息。查看Kafka中的日志,还会发现报出前面提及的异常“java.io.IOException:Too many open files”,表明已达到上限。
如何避免这种异常情况?对于一个高并发、高性能的应用来说,1024 或 4096 的文件描述符限制未免太少,可以适当调大这个参数。比如使用 ulimit-n 65535 命令将上限提高到65535,这样足以应对大多数的应用情况,再高也完全没有必要了。
也可以在/etc/security/limits.conf文件中设置,参考如下:
limits.conf文件修改之后需要重启才能生效。limits.conf文件与ulimit命令的区别在于前者是针对所有用户的,而且在任何shell中都是生效的,即与shell无关,而后者只是针对特定用户的当前shell的设定。在修改最大文件打开数时,最好使用limits.conf文件来修改,通过这个文件,可以定义用户、资源类型、软硬限制等。也可以通过在/etc/profile文件中添加ulimit的设置语句来使全局生效。
设置之后可以再次尝试创建10000个分区的主题,检查一下Kafka是否还会再次崩溃。
4.4.4 考量因素
如何选择合适的分区数?一个“恰如其分”的答案就是视具体情况而定。
从吞吐量方面考虑,增加合适的分区数可以在一定程度上提升整体吞吐量,但超过对应的阈值之后吞吐量不升反降。如果应用对吞吐量有一定程度上的要求,则建议在投入生产环境之前对同款硬件资源做一个完备的吞吐量相关的测试,以找到合适的分区数阈值区间。
在创建主题之后,虽然我们还能够增加分区的个数,但基于key计算的主题需要严谨对待。当生产者向Kafka中写入基于key的消息时,Kafka通过消息的key来计算出消息将要写入哪个具体的分区,这样具有相同 key 的数据可以写入同一个分区。Kafka 的这一功能对于一部分应用是极为重要的,比如日志压缩(Log Compaction),详细可以参考5.4节;再比如对于同一个key 的所有消息,消费者需要按消息的顺序进行有序的消费,如果分区的数量发生变化,那么有序性就得不到保证。在创建主题时,最好能确定好分区数,这样也可以省去后期增加分区所带来的多余操作。尤其对于与key高关联的应用,在创建主题时可以适当地多创建一些分区,以满足未来的需求。通常情况下,可以根据未来2年内的目标吞吐量来设定分区数。当然如果应用与key弱关联,并且具备便捷的增加分区数的操作接口,那么也可以不用考虑那么长远的目标。
有些应用场景会要求主题中的消息都能保证顺序性,这种情况下在创建主题时可以设定分区数为1,通过分区有序性的这一特性来达到主题有序性的目的。
当然分区数也不能一味地增加,参考4.4.3节的内容,分区数会占用文件描述符,而一个进程所能支配的文件描述符是有限的,这也是通常所说的文件句柄的开销。虽然我们可以通过修改配置来增加可用文件描述符的个数,但凡事总有一个上限,在选择合适的分区数之前,最好再考量一下当前Kafka进程中已经使用的文件描述符的个数。
分区数的多少还会影响系统的可用性。在前面章节中,我们了解到 Kafka通过多副本机制来实现集群的高可用和高可靠,每个分区都会有一至多个副本,每个副本分别存在于不同的broker节点上,并且只有leader副本对外提供服务。在Kafka集群的内部,所有的副本都采用自动化的方式进行管理,并确保所有副本中的数据都能保持一定程度上的同步。当broker发生故障时,leader副本所属宿主的broker节点上的所有分区将暂时处于不可用的状态,此时Kafka会自动在其他的 follower 副本中选举出新的 leader 用于接收外部客户端的请求,整个过程由Kafka控制器负责完成(有关控制器的内容可以参考6.4节)。分区在进行leader角色切换的过程中会变得不可用,不过对于单个分区来说这个过程非常短暂,对用户而言可以忽略不计。如果集群中的某个broker节点宕机,那么就会有大量的分区需要同时进行leader角色切换,这个切换的过程会耗费一笔可观的时间,并且在这个时间窗口内这些分区也会变得不可用。
分区数越多也会让Kafka的正常启动和关闭的耗时变得越长,与此同时,主题的分区数越多不仅会增加日志清理的耗时,而且在被删除时也会耗费更多的时间。对旧版的生产者和消费者客户端而言,分区数越多,也会增加它们的开销,不过这一点在新版的生产者和消费者客户端中有效地得到了抑制。
如何选择合适的分区数?从某种意思来说,考验的是决策者的实战经验,更透彻地说,是对Kafka本身、业务应用、硬件资源、环境配置等多方面的考量而做出的选择。在设定完分区数,或者更确切地说是创建主题之后,还要对其追踪、监控、调优以求更好地利用它。读者看到本节的内容之前或许没有对分区数有太大的困扰,而看完本节的内容之后反而困惑了起来,其实大可不必太过惊慌,一般情况下,根据预估的吞吐量及是否与key相关的规则来设定分区数即可,后期可以通过增加分区数、增加broker或分区重分配等手段来进行改进。如果一定要给一个准则,则建议将分区数设定为集群中broker的倍数,即假定集群中有3个broker节点,可以设定分区数为3、6、9等,至于倍数的选定可以参考预估的吞吐量。不过,如果集群中的broker 节点数有很多,比如大几十或上百、上千,那么这种准则也不太适用,在选定分区数时进一步可以引入基架等参考因素。