前言
Kafka 最初由 Linkedin 公司开发,是一个分布式、支持分区的、多副本的,基于 Zookeeper 协调的分布式消息系统,其最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于 Hadoop 的批处理系统、低延迟的实时系统、Storm/Spark 流式处理引擎、Web/Nginx 日志、访问日志,消息服务等等,用 Scala 和 Java 语言编写,Linkedin 于 2010 年将其贡献给了 Apache 基金会并成为顶级开源项目。
高吞吐量
Kafka 是大数据领域无处不在的消息中间件,目前广泛使用在企业内部的实时数据管道,并帮助企业构建自己的流计算应用程序。Kafka 虽然是基于磁盘做的数据存储,但却具有高性能、高吞吐、低延时的特点,其吞吐量动辄几万、几十上百万。
但是很多使用过 Kafka 的人,经常会被问到这样一个问题,Kafka 为什么速度快,吞吐量大?大部分被问的人都是一下子就懵了,或者是只知道一些简单的点,在这里我们就详细了解一下 Kafka 速度快、吞吐量大的原因。
顺序读写
众所周知,Kafka 是将消息记录持久化到本地磁盘中的,一般人会认为磁盘读写性能差,可能会对 Kafka 性能如何保证提出质疑。实际上不管是内存还是磁盘,快或慢关键在于寻址的方式,磁盘分为顺序读写与随机读写,内存也一样分为顺序读写与随机读写。基于磁盘的随机读写确实很慢,但磁盘的顺序读写性能却很高,一般而言要高出磁盘随机读写三个数量级,一些情况下磁盘顺序读写性能甚至要高于内存随机读写。
磁盘的顺序读写是磁盘使用模式中最有规律的,并且操作系统也对这种模式做了大量优化,Kafka 就是使用了磁盘顺序读写来提升性能的。Kafka 的消息是不断追加到本地磁盘文件末尾,而不是随机的写入,这使得 Kafka 写入吞吐量得到了显著提升 。在 Kafka 中, 每一个partition
其实都是一个文件 ,收到消息后 Kafka 会把数据插入到文件末尾。但这种方法有一个缺陷,那就是没有办法删除数据 ,所以 Kafka 是不会删除数据的,它会把所有的数据都保留下来,每个消费者对每个topic
都有一个offset
用来表示读取数据的偏移量 。
鉴于此,如果一直不删除数据,那么硬盘肯定会被撑爆,所以 Kakfa 提供了两种策略来删除数据。一是基于时间,二是基于partition
文件大小。具体配置可以参考 Kafka 的配置文档。
Page Cache
为了优化读写性能,Kafka 利用了操作系统本身的 Page Cache,就是利用操作系统自身的内存而不是 JVM 空间内存。这样做的好处有:
- 避免
Object
消耗:如果是使用 Java 堆,Java 对象的内存消耗比较大,通常是所存储数据的两倍甚至更多。 - 避免 GC 问题:随着 JVM 中数据不断增多,垃圾回收将会变得复杂与缓慢,使用系统缓存就不会存在 GC 问题
相比于使用 JVM 或in-memory cache
等数据结构,利用操作系统的 Page Cache 更加简单可靠。首先,操作系统层面的缓存利用率会更高,因为存储的都是紧凑的字节结构而不是独立的对象。其次,操作系统本身也对于 Page Cache 做了大量优化,提供了write-behind
、read-ahead
以及flush
等多种机制。再者,即使服务进程重启,系统缓存依然不会消失,避免了in-process cache
重建缓存的过程。通过操作系统的 Page Cache,Kafka 的读写操作基本上是基于内存的,读写速度得到了极大的提升。
零拷贝
Linux 操作系统 “零拷贝” 机制使用了sendfile
方法, 允许操作系统将数据从 Page Cache 直接发送到网络,只需要最后一步的copy
操作将数据复制到 NIC 缓冲区, 这样避免重新复制数据 。示意图如下:
通过这种 “零拷贝” 的机制,Page Cache 结合sendfile
方法,Kafka 消费端的性能也大幅提升。这也是为什么有时候消费端在不断消费数据时,我们并没有看到磁盘 IO 比较高,此刻正是操作系统缓存在提供数据。
当 Kafka 客户端从服务器读取数据时,如果不使用零拷贝技术,那么大致需要经历这样的一个过程:
- 操作系统将数据从磁盘上读入到内核空间的读缓冲区中;
- 应用程序(也就是 Kafka)从内核空间的读缓冲区将数据拷贝到用户空间的缓冲区中;
- 应用程序将数据从用户空间的缓冲区再写回到内核空间的
socket
缓冲区中; - 操作系统将
socket
缓冲区中的数据拷贝到 NIC 缓冲区中,然后通过网络发送给客户端。
从图中可以看到,数据在内核空间和用户空间之间穿梭了两次,那么能否避免这个多余的过程呢?当然可以,Kafka 使用了零拷贝技术,也就是直接将数据从内核空间的读缓冲区直接拷贝到内核空间的socket
缓冲区,然后再写入到 NIC 缓冲区,避免了在内核空间和用户空间之间穿梭。
可见,这里的零拷贝并非指一次拷贝都没有,而是避免了在内核空间和用户空间之间的拷贝。如果真是一次拷贝都没有,那么数据发给客户端就没了不是?不过,光是省下了这一步就可以带来性能上的极大提升。
分区分段+索引
Kafka 的消息是按topic
分类存储的,topic
中的数据又是按照一个一个的partition
即分区存储到不同broker
节点。每个partition
对应了操作系统上的一个文件夹,partition
实际上又是按照segment
分段存储的。这也非常符合分布式系统分区分桶的设计思想。
通过这种分区分段的设计,Kafka 的消息实际上是分布式存储在一个一个小的segment
中的,每次文件操作也是直接操作的segment
。为了进一步的查询优化,Kafka 又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index
文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。
批量读写
Kafka 数据读写也是批量的而不是单条的。除了利用底层的技术外,Kafka 还在应用程序层面提供了一些手段来提升性能。最明显的就是使用批次。在向 Kafka 写入数据时,可以启用批次写入,这样可以避免在网络上频繁传输单个消息带来的延迟和带宽开销。假设网络带宽为 10MB/S,一次性传输 10MB 的消息比传输 1KB 的消息 10000 万次显然要快得多。
批量压缩
在很多情况下,系统的瓶颈不是 CPU 或磁盘,而是网络 IO,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。进行数据压缩会消耗少量的 CPU 资源,不过对于 Kafka 而言,网络 IO 更应该需要考虑。
如果每个消息都压缩,但是压缩率相对很低,所以 Kafka 使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩。Kafka 允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩。Kafka 支持多种压缩协议,包括 Gzip 和 Snappy 压缩协议。
Kafka 速度的秘诀在于,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络 IO 损耗,通过mmap
提高 I/O 速度,写入数据的时候由于单个partion
是末尾添加所以速度最优;读取数据的时候配合sendfile
直接暴力输出。
消息精确一次语义
Kafka 的消息精确一次语义是在 Apache Kafka 0.11 Release 版本和 Confluent Platform 3.3 中引入的。
消息系统语义概述
在一个分布式发布订阅消息系统中,组成系统的计算机总会由于各自的故障而不能工作。在 Kafka 中,一个单独的broker
,可能会在生产者发送消息到一个topic
的时候宕机,或者出现网络故障,从而导致生产者发送消息失败。根据生产者如何处理这样的失败,产生了不同的语义:
- 至少一次语义:如果生产者收到了 Kafka broker 的确认,并且生产者的
acks
配置项设置为all
(或-1
),这就意味着消息已经被精确一次写入 Kafka topic 了。然而,如果生产者接收ack
超时或者收到了错误,它就会认为消息没有写入 Kafka topic 而尝试重新发送消息。如果broker
恰好在消息已经成功写入 Kafka topic 后,发送ack
前,出了故障,生产者的重试机制就会导致这条消息被写入 Kafka 两次,从而导致同样的消息会被消费者消费不止一次。 - 至多一次语义:如果生产者在
ack
超时或者返回错误的时候不重试发送消息,那么消息有可能最终并没有写入 Kafka topic 中,因此也就不会被消费者消费到。但是为了避免重复处理的可能性,我们需要接受有些消息可能被遗漏的情况。 - 精确一次语义:即使生产者重试发送消息,也只会让消息被发送给消费者一次。精确一次语义是最令人满意的保证,但也是最难实现的。因为它需要消息系统本身和生产消息的应用程序还有消费消息的应用程序一起合作。比如,在成功消费一条消息后,你又把消费的
offset
重置到之前的某个offset
位置,那么你将收到从那个offset
到最新的offset
之间的所有消息。这解释了为什么消息系统和客户端程序必须合作来保证精确一次语义。
必须被处理的故障
为了描述支持精确一次消息投递语义而引入的挑战,让我们从一个简单的例子开始。
假设有一个单进程生产者程序,发送了消息“Hello Kafka”给一个叫做“EoS“的单分区 Kafka topic,然后有一个单实例的消费者程序在另一端从topic
中拉取消息,然后打印。在没有故障的理想情况下,这能很好的工作,“Hello Kafka”只被写入到 EoS topic 一次。消费者拉取消息,处理消息,提交偏移量来说明它完成了处理。然后,即使消费者程序出故障重启也不会再收到“Hello Kafka”这条消息了。
然而,我们知道,我们不能总认为一切都是顺利的。在大规模的集群中,即使最不可能发生的故障场景都可能最终发生。比如:
broker
可能发生故障:Kafka 是一个高可用、持久化的系统,每一条写入一个分区的消息都会被持久化并且多副本备份(假设有n
个副本)。所以,Kafka 可以容忍n-1
个broker
故障,意味着一个分区只要至少有一个broker
可用,分区就可用。Kafka 的副本协议保证了只要消息被成功写入了主副本,它就会被复制到其他所有的可用副本。- 生产者到
broker
的 RPC 调用可能失败:Kafka 的持久性依赖于生产者接收broker
的ack
。没有接收成功ack
不代表生产请求本身失败了。broker
可能在写入消息后,发送ack
给生产者的时候挂了。甚至broker
也可能在写入消息前就挂了。由于生产者没有办法知道错误是什么造成的,所以它就只能认为消息没写入成功,并且会重试发送。在一些情况下,这会造成同样的消息在 Kafka 分区日志中重复,进而造成消费端多次收到这条消息。 - 客户端可能发生故障:精确一次传递也必须考虑客户端故障。但是我们如何知道一个客户端已经故障而不是暂时和
broker
断开,或者经历一个程序短暂的暂停?区分永久性故障和临时故障是很重要的,为了正确性,broker
应该丢弃僵住的生产者发送来的消息,同样,也应该不向已经僵住的消费者发送消息。一旦一个新的客户端实例启动,它应该能够从失败的实例留下的任何状态中恢复,从一个安全点开始处理。这意味着,消费的偏移量必须始终与生产的输出保持同步。
Kafka 中的精确一次语义
在 0.11 版本之前,Apache Kafka 支持至少一次交付传递,和分区内有序传递。
幂等性:每个分区中精确一次且有序
一个幂等性的操作就是一种被执行多次造成的影响和只执行一次造成的影响一样的操作。现在生产者发送的操作是幂等的。如果出现导致生产者重试的错误,同样的消息,仍由同样的生产者发送多次,将只被写到 Kafka broker 的日志中一次。对于单个分区,幂等生产者不会因为生产者或broker
故障而发送多条重复消息。想要开启这个特性,获得每个分区内的精确一次语义,也就是说没有重复,没有丢失,并且有序的语义,只需要设置生产者配置中的enable.idempotence=true
。
这个特性是怎么实现的呢?在底层,它和 TCP 的工作原理有点像,每一批发送到 Kafka 的消息都将包含一个序列号,broker
将使用这个序列号来删除重复的发送。和只能在瞬态内存中的连接中保证不重复的 TCP 不同,这个序列号被持久化到副本日志,所以,即使分区的leader
挂了,其他的broker
接管了leader
,新leader
仍可以判断重新发送的是否重复了。这种机制的开销非常低:每批消息只有几个额外的字段。
事务:跨分区原子写入
Kafka 现在通过新的事务 API 支持跨分区原子写入。这将允许一个生产者发送一批到不同分区的消息,这些消息要么全部对任何一个消费者可见,要么对任何一个消费者都不可见。这个特性也允许你在一个事务中处理消费数据和提交消费偏移量,从而实现端到端的精确一次语义。下面是的代码片段演示了事务 API 的使用:
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch(ProducerFencedException e) {
producer.close();
} catch(KafkaException e) {
producer.abortTransaction();
}
如上述代码所示,演示了我们可以如何使用新生产者 API 来原子性地发送消息到topic
的多个partition
。值得注意的是,一个 Kafka topic 的分区中的消息,可以有些是在事务中,有些不在事务中。因此在消费者方面,你有两种选择来读取事务性消息,通过隔离等级isolation.level
消费者配置表示:
read_commited
:除了读取不属于事务的消息之外,还可以读取事务提交后的消息。read_uncommited
:按照偏移位置读取所有消息,而不用等事务提交。这个选项类似 Kafka 消费者的当前语义。
为了使用事务,需要配置消费者使用正确的隔离等级,使用新版生产者,并且将生产者的transactional.id
配置项设置为某个唯一 ID,需要此唯一 ID 来提供跨越应用程序重新启动的事务状态的连续性。
保证消息顺序
在将消息发送到 Kafka 的时候,我们需要指定topic
,也就是明确的将消息发送到 Kafka 中的某个主题;而在 Kafka 中,每个topic
下的数据则是又存储在partition
之中。一般来说,为了使用 Kafka 的高吞吐量特性,我们需要为每个topic
设置多个partition
;同时,为了保证 Kafka 的高可用性,每个topic
下的多个partition
,又分被分散到不同的broker
下。因此,其大致结构如下图所示:
由于消息都是存储在partition
之中的,而且同一个partition
用一个write ahead log
组织,因此可以保证顺序,但不同partition
之间是不能保证顺序的。不过在向 Kafka 发送消息的时候,我们可以指定(topic, partition, key)
这 3 个参数,其中partiton
和key
是可选的:
- 如果我们指定了
partition
,那么就可以保证把所有消息发送到同一个partition
,也就是有序的。并且在消费端,Kafka 保证,一个partition
只能被一个消费者消费。 - 或者,我们可以指定
key
,如使用userId
,可以把具有相同key
的所有消息,会发送到同一个partition
,也是有序的。
除此之外,在我们还可以设置num.partitions=1
和max.in.flight.requests.per.connection=1
参数,即设置分区数量为 1,并且生产者在收到服务器响应之前只能发生 1 个消息。这样的配置,严格保证了 Kafka 收到消息以及消费者消费消息的顺序,但是却也严重降低了 Kafka 的吞吐量,因此在使用前,请慎重评估。
参考资料:
- Kafka是如何实现高吞吐率的
- Kafka为什么吞吐量大、速度快?
- Apache kafka是如何实现消息的精确一次(Exactly-once-semantics)语义的?
- kafka分布式的情况下,如何保证消息的顺序?
转载:https://blog.csdn.net/qq_35246620/article/details/106745689