Uber的Kafka实践:踩坑5年,随时像替换汽车引擎一样替换Kafka
Uber从2013年开始使用Kafka。Uber的主流应用,如打车、外卖等服务,都需要实时处理数据,所有核心的Business都是通过Kafka进行消息的传递。这决定了Kafka在Uber的技术平台中占据非常核心的定位。经过7年的发展,Uber的Kafka集群已经发展成为了全球数一数二的规模,每天处理PB级别的数据、Trillion级别的消息。
从2013年到2018年,Uber主要是踩坑,修复各种Bug。到现阶段,整个消息平台已经相当复杂,三分之二的代码是自研,开源Kafka仅作为平台核心部分。但这个消息平台也不会被Kafka所绑定,当整个系统自动化、标准化工作完成后,又可以用潜在的其他更好的开源软件,像替换汽车引擎一样将这个核心替换掉。
近日InfoQ记者采访了Uber Staff Engineer富羽鹏,了解Uber消息平台实践过程,他将在QCon全球软件开发大会(北京站)2020分享主题为《Uber大规模实时数据平台架构演进与实践》的演讲。
InfoQ:您认为应用开源Kafka时,它本身有哪些问题是普通使用者必须要解决的?Uber的Kafka有何特殊性?
富羽鹏:Uber部署的Kafka集群规模,是全球数一数二的,每天处理PB级别的数据、Trillion级别的消息。对于一般的小企业,一两个集群就够了。在Uber, Kafka的Topic数量非常多,生产者和消费者的量级也非常大,还有很多精细化的不同场景的Use Case,我们会针对不同的场景部署集群,并对容灾性有比较多的考虑。
如果说只从代码量来看,开源Kafka只是我们在整个系统里面的最核心(Core)的一部分。从生态系统周边,我们层层把它(Core)包围起来,几乎有2/3是Uber自己开发的,来解决各种具体的问题。
所以对于中小企业来说,如果没有Uber这么大规模和复杂性的话,建议直接使用付费的解决方案。Kafka开源几年了,已经是一个比较成熟的系统,但是到真正生产环境里,要很好的正确的使用和运维这套系统的话,需要针对不同场景,解决很多的设置和参数调优问题,出了故障还要知道如何快速排查与修复。需要使用者把这套系统吃透,这需要很多年的积累和沉淀。
也就是说对于一个新的企业来说,如果想很快的能把这个开源Kafka使用进去的话,有很多坑要踩,有很多路要走。
但对Uber来说,解决的问题已经不是使用开源软件踩坑的问题了。Uber这种级别规模的部署,导致我们会遇到很多其他公司可能遇到不了的一些问题。在高吞吐量、高并发性出现的时候,会触发很多在普通的流量情况下遇不到的问题。我们Kafka的架构演进,都是因为目前的规模量带来的,需要我们不断的对系统进行迭代。
InfoQ:在Uber里,是否有实时备份机制ISR机制吗?一份Topic整体算起来会有多少份备份?
富羽鹏:Uber在备份的问题上考虑的非常多。
备份并不是越多越好。备份越多,管理起来就越麻烦。另外一方面,备份的成本也非常昂贵。Uber作为一个上市公司,成本是我们非常关注的一点。我们需要保证每一个副本,都是从容灾意义上去考虑的。在Uber内部,比如其他团队或组织里Kafka的Service用户,都需要了解他们到底使用了多少副本,重要的集群用四副本,不重要的、允许数据丢失的使用三副本。
与此同时,跨数据中心的拷贝,会略微复杂一些。每个Service需要在哪些Region或者Data Center进行拷贝,需要跟我们的用户进行协商。我们可以提供SLA,把数据链发到某个Cluster上,我们需要保证在一定时间内,将它复制到指定的数据中心。如果是跨数据中心读取的话,其实这并不是一个非常经济的选择,因为这包括对网络带宽的使用,更长的延时。
对于实时和离线,处理会有很大的区别。对于离线数据会永久性的来保存。但实时数据本身是个流数据,它更像是一个Buffer,是数据的一个缓存。我们对数据有一定的保存的时间的,比如说大部分的集群的话,我们只把数据保留三天,三天之后我们会把数据删除。大部分下游的消费者,他们更关注的是实时数据的消费。也就是说绝大多数的Service来讲,他们会在一天之内读走数据。我们留“三天”,更多的是为我们下游的消费者进行容灾。如果需要有更久的时间,比如说超过三天,我们会从Data Lake里面,再把这个数据再给读出来,再放到Kafka Topic里面,再次消费。所以从副本使用的角度来讲,更关键的看Data Lake那一边,他们会将数据保留多久。
在一个Topic四个副本的机制下,最少也得有两个Cluster,所以最少也会有八副本存在。再加多数据中心,副本数量就会加倍。Uber每天产生PB级的数据,因此副本也至少有PB级。这一点,我们自己也在跟开源社区合作,做Kafka的Remote Storage功能。传统的Kafka都是在使用Host本身的硬盘和内存来存储数据。从Cost角度来讲,SSD的价格是非常高的。同时我们发现,绝大多数数据的消费者只需要去读过去六个小时之内的数据。所以我们决定去修改本身的Broker代码,它会把数据在设定的时间里直接拷贝到指定的Storage。可以让Kafka本身理解本地数据和Remote数据之间的关系,通过使用这样一套机制,把本地的副本量给降下来。
InfoQ:Uber有没有必须不能丢数据的场景?以及不能容忍乱序的场景?如果有,能具体描述场景和问题是怎么解决的吗?
富羽鹏:第一类场景是我们服务的一些最核心的业务。这种场景下,数据的可靠性、系统可靠性非常重要,数据不能有任何丢失。为了满足数据的Lossless,我们做了很多相关的定制化开发。
从宏观的Pipeline角度来看,相当于说是一个Topic,有上游的生产者Producer发数据,同时下游有Consumer来消费数据。但是在系统角度上来看,其实这个还有很多的Stage。比如说它从Client Service,从Producer来讲,它是在一个client这边,是在客户端这边开始发数据,它发的第一点,并不是直接发到我们cluster broker上面,而是发到我们叫REST Proxy这样一个REST Server上面。然后从REST Proxy的话,它会再发给这样一个Broker。然后这个Broker Cluster,用一个数据拷贝的Pipeline,拷到其他的一个Cluster。因为这样一个多区域的、多集群的架构。每个地域的生产者它都往本地集群发数据,但是对于消费者来讲,他们有时候可能比较关注全局的数据量。所以,我们会有一个数据拷贝的Pipeline来拷到另外一个Cluster。
针对数据流的不同的Stage,我们要真正做到Lossless,就需要对每个Stage去考虑数据容灾。比如说机器坏了、数据量丢失,是不是应该重置、Retry,或者找其他的地方做一个Buffer。基于每一点,我们都做了一些相应的定制化开发。另一个是要做数据的审查。就是怎么知道我们任何的数据丢失。针对Pipeline的每一个Stage,我们都有拿一些数据,或者拿一些Matrix来进行比较。保证整个Pipeline下没有任何的一个数据丢失。
第二类场景是日志收集。这一类特点是数据量特别大。Uber有成千上万个Service,每天产生很多的日志,我们需要把日志通过Kafka给聚合起来。对于我们的Cluster就有非常高的时效性和高吞吐量支持的要求。对于这一场景我们做了提高吞吐量的优化。
第三类场景是将Kafka作为数据库的更改数据捕获(change data capture, CDC)。特点是数据库对的transaction顺序性要求比较高。我们针对数据的有序性、吞吐量的要求也做了一些特别的开发工作。
第四类场景是将Kafka作为流处理平台数据的来源。在Uber我们有一个开源的流处理平台运行Flink的job,它们从Kafka读取数据进行实时计算,计算完成后也可以将数据发回给Kafka,再传回到下游;我们专门设置了一个集群,跟Flink做了深度的一些整合,做了一定的优化。
第五类场景也是一个非常有意思的定制化开发的,Kafka的协议之上做了Dead Letter Queue(DLQ),可以允许将个别不能暂时处理的信息放到另外一个Queue里面,之后再重新处理这些已经失败的Message。对于Kafka的Consumer API来讲,如果有不能处理的Message通常就两个选择,要么直接丢弃消息,不处理,继续往前走;要么就不断的重试。
但是Uber有一些非常重要的Topic, Kafka本身的处理方法是不符合我们的要求的。比如说在处理一些跟Money相关的事情的时候,每个Message都是非常重要的。如果说Message丢失,客户的账就对不上了,这是不行的。但是系统又不能卡在这里,阻止后面的流水进来。
这种情况下我们需要有一个额外缓存的功能,如果当前处理不了,那就先把它分到另外的地方。然后过了一段时间再重新进行处理。我们在Kafka这一层之上,做另外的一套Message Platform来封装这些额外的功能。
InfoQ:您认为Kafka的演进,在Uber大概分为几个阶段,分别解决的关键问题是什么**?**
富羽鹏:Uber里Kafka的演进,我认为主要分为三个阶段。
第一阶段,最早期的时候,是2013年到2015年之间,从架构角度来讲,我们主要是在使用Kafka开源版本。主要是提高稳定性,因为那个时候Kafka本身也比较早期,主要做很多Bugfix、调优、多语言支持,让Kafka更好的匹配到Uber的各个不同的Use Case,让整个架构可以更加稳定的运行起来。那时Uber的流量增长也很快,我们要保障在流量起来后,也不至于击垮集群。
另外还有一个有意思的工作是“数据拷贝”。我们对跨数据中心的拷贝做了非常多的优化。在开源的项目里面,Kafka原生提出了一个MirrorMaker的项目,用于集群之间的拷贝。但Uber遇到了很多问题,于是开源了自己的一个拷贝项目uReplicator。主要原因是在消息规模非常大的时候,MirrorMaker有一些的性能、可靠性的问题,于是我们重新对整个架构进行了大改,最终开发了我们自己这样的一个开源项目。
第二阶段,从2015年到2018年之间。我们观察到之前的第一阶段,遇到了很多的一些可靠性、扩展性方面的问题,我们更需要去打造一个更加成熟的消息平台。这个时候对Uber的业务在爆炸性的增长,几乎就是每六个月Uber的Business会翻一番,也就意味着我们的数据量每六个月也会翻一番。但是我们的团队的人员并不能翻倍,于是需要考虑自动化方面的开发,来应对数据量和集群数量增长带来的挑战。另外这个时候也做了不少多租户管理方面的定制化工作,比如用户配额、用户黑名单以及重要Topic的物理隔离。此外,对于稳定性和容灾机制也做了很多,逐渐形成多地多活、备份集群的架构,另外前面所讲的DLQ也是那个时候开发出来的。
第三阶段,是从2018年到现在。这个阶段我们希望建立一个标准化、智能化与自动化的消息处理平台。因为我们的应用场景越来越多,数据查询、数据拷贝的需求越来越多,我们的系统越做越复杂。
我们整个生态系统,过去几年上下游有一些用户在开发自己的Client或者开源的Client和我们的Kafka平台进行交互。发展到一定阶段后,我们发现在整个Uber内部,林林总总有相当数量的不同语言、不同版本的客户端在跟我们的平台进行交互,让我们管理起来非常的困难。同时这些用户也会让我们的系统开发演进带来额外的挑战。比如说当我们要加一个新的功能或者弃用某些功能时,我们要考虑是不是有某些用户的客户端不能升级,会不会给他们的Service带来影响。当这个公司规模发展到一定阶段的时候,标准化就成为了必须要做得一件事情。
其中一个很重要的标准化工作是做Consumer端的Proxy。这项工作目前只有Uber在做,其他的公司和开源还没有开始。在开源与Uber的Kafka架构中,生产者这边都有一个REST Proxy。但是在消费者这端,因为逻辑会复杂很多,还没有Proxy这样的概念,我们在做的工作就是填补这个空白。
这个Consumer Proxy不但可以简化很多客户端的API,同时也可以打破很多Kafka对于Consumer的一些限制。比如说Kafka有个限制是Consumer数量不能比Partition数量多。但是当我们做了Consumer Proxy以后,我们就可以把这个限制给去掉。于是一个八个Partition的Topic的message可以把数据发到成百上千个Client上面去。这样能大大降低了一个Cluster中Partition的的总数,降低Partition数量带来的物理资源开销。此外DLQ这个功能我们也做到了Consumer Proxy的API里面去,作为一个原生的功能。
还有一个方面标准化是,在做跨集群跨地域的元数据的整合与管理Cluster Topology。也就是说把一整套的Ecosystem、Service都给整合起来。让所有的Service之间更有序,更加多连接起来。这样可以解决我们之前的这样一个有很多很多小Service但无序的问题,也同时可以快速的帮我们找到Topology中错误与缺失的问题,比如某个Topic的数据没有被备份到某个Cluster上。
我们的想法是:整套Ecosystem最核心的中间部分是一个Kafka的Topic,这个Topic跟开源Kafka不一样,它可以在多个Cluster之间存在,而这套系统理解它的Topology。这样当用户创建一个Topic的时候,可以根据他的需求自动在多个Cluster上面把这个Topic创建出来,同时在Cluster之间进行数据拷贝等。
有了Cluster Topology后,我们的系统还会做一些非常智能的事情,比如说当它发现这个Cluster有问题的时候,会自动的把Producer导到这个其他对应的Cluster等等。它可以大大降低管理人员的灾难发现与恢复工作,带来非常好的容灾效果,我们将这个项目叫做Cluster Federation,是我们之前备份Cluster工作的延伸。这个项目的核心思想是认为一个Topic可以在多个Cluster之间存在。当一个Cluster出问题,数据可以自动迁移到另外一个Cluster。这个系统本身的可以很智能的把存在多个Cluster上的Topic发送给下游的消费者,也就是说Cluster之间互为备份。
最后,我们还需要追求自动化和可伸缩性,来更加的有效的使用机器资源。在这段时间里,我们将所有的集群做“容器化”,这也是做自动化的一个先决条件。这给我们的运维带来了极大的便利,完成机器的添加置换等。我们的运维人数并没有增加,但是我们的机器的数量确实在成倍的增长,这都得益于“自动化”。
总体来说,我们最关注的是高可靠性。就当你数据量增长,或者说是数据规模增长很大的时候,怎么能保证这个系统的可靠性和鲁棒性。其次是可用性,当你这个数据量特别大的时候,如何有效、快速的来对整个平台进行管理,包括对数据本身的管理,包括上游,上下游客户的管理。
我们也在很关注一些新兴的开源软件,比如说像Pulsar,这个开源软件解决了一些Kafka最开始架构上的一些缺陷,比如说存储计算分离。但它还需要被时间检验。但从这个角度来看,对Uber来说,通过我们在做的标准化工作,把两端的Proxy都给建立好了之后,我们的流处理平台的交互的衔接点,已经不是Kafka的协议了。在这个时候,我们可以像换汽车引擎一样,换成任何各种各样其他潜在的开源的解决方案,而不是说被某一个开源的项目绑定。
嘉宾介绍:
富羽鹏,Uber Staff Engineer。在Uber负责实时数据与分析平台的架构与运营,包括Kafka及其周边生态系统。在加入Uber之前,是大数据存储平台Alluxio的创始成员与PMC,再之前在Palantir从事大数据平台的研发与管理。本科与硕士毕业于清华大学,并在University of California San Diego进行了数据库方向的博士研究。