(图片来源于网络,侵删)
这一篇博客对于Producer的概念做一些归纳整理!
废话不多说,那就开始吧!!!
【1】Producer写入方式:
Producer采用推(push)
模式将消息发布到broker,每条消息都被追加(append)
到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)
Producer写入数据大致流程:
1)Producer先从Kafka集群中获取该Partition的Leader
2)Producer将消息发送给该Leader
3)Leader将消息写入本地log
4)Followers从Leader pull消息,写入本地log后向Leader发送ACK
5)Leader收到所有ISR中的Replication的ACK后,增加HW(high watermark,最后commit 的offset)并向Producer发送ACK
同步发送:发送一批数据给kafka后,等待kafka返回结果
1、生产者等待10s,如果broker没有给出ack相应,就认为失败。
2、生产者重试3次,如果还没有相应,就报错
异步发送:发送一批数据给kafka,只是提供一个回调函数。
1、先将数据保存在生产者端的buffer中。buffer大小是2万条
2、满足数据阈值或者数量阈值其中的一个条件就可以发送数据。
3、发送一批数据的大小是500条
说明:如果broker迟迟不给ack,而buffer又满了,开发者可以设置是否直接清空buffer中的数据
【2】Producer发送消息流程:
拦截器 —> 序列化器 —> 分区器
Kafka 的 Producer 发送消息采用的是异步发送的方式,在消息发送的过程中,涉及到了
两个线程——main
线程和 Sender
线程,以及一个线程共享变量——RecordAccumulator
,
main
线程将消息发送给 RecordAccumulator
,Sender
线程不断从 RecordAccumulator
中拉取
消息发送到Kafka broker
相关参数:
batch.size:只有数据积累到 batch.size 之后,sender 才会发送数据
linger.ms:如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据
【3】分区:
如果Topic有多个分区,那么Producer生产的数据在进行发送的时候,会把数据根据分区规则放在对应的分区内,分区规则共有4种,如下:
1.当指定了Partition分区号时,分区号就是用户指定的分区号
2.当没有指定Partition分区号,但是设置了Key,则分区号为Key的hash值与分区数量取模的值
3.当既没有指定Parition分区号,也没有设置Key,则会随机产生一个整数,将这个整数与分区数量取模,得到的值即为分区号,并且每调用一次 send
方法 ,该整数都会加一(此规则即将数据进行轮询),这方法也就是 round-robin
算法
4.当没有指定Partition分区号,但是设置了自定义的分区器,则会按照用户的自定义分区器进行分区
【4】Ack:
首先,Ack出现是为了Producer发送消息之后,确认Leader是否收到消息
针对不同的情况,Ack可选值有3个,分别是 0 、1 、 -1(all)
ack = 0
:表示Producer发送消息到Leader之后,不管Leader是否收到消息,继续发送其他消息,这样会出现的问题是容易丢失数据,如果消息在发送途中,Leader宕机了,此时Leader没有收到消息,当其他Follower升为Leader之后,Producer并不会把发送失败的消息重发;这种方法能保证高效性,但保证不了数据的可靠性
ack = 1
:表示Producer发送消息到Leader之后,需要等待Leader接收完毕,才会继续发送其他消息,这样相较于前一个,数据的可靠性有所提高,但是如果Follower在拉取消息的过程中,Leader宕机,这时候也会造成数据丢失,原因同上
ack = -1(all)
:表示Producer发送消息到Leader之后,需要等待所有 ISR
中所有的副本同步完成之后才会继续发送其他消息,这样的数据可靠性最高,但也会有同样的问题,其一,那就是如果ISR中只有Leader一个,这样就退化
到了ack = 1
的情况;其二,如果Follower在同步完成之后向Leader发送同步完成消息的时候,Leader宕机了,这时候,会重新选一个Follower作为Leader,但因为此时还没有返回ack消息到Producer,Producer会重新发送一次消息给新的Leader,此时就会出现数据重复的问题
说明:如果broker端一直不给ack状态,producer永远不知道是否成功;producer可以设置一个超时时间10s
,超过时间认为失败
【5】Kafka生产者发送消息的三种方式
Kafka是一种分布式的基于发布/订阅的消息系统,它的高吞吐量、灵活的offset是其它消息系统所没有的
Kafka发送消息主要有三种方式:
- 1.发送并忘记
- 2.同步发送
- 3.异步发送+回调函数
1)方式一:发送并忘记(不关心消息是否正常到达,对返回结果不做任何判断处理)
发送并忘记的方式本质上也是一种异步的方式,只是它不会获取消息发送的返回结果,这种方式的吞吐量是最高的,但是无法保证消息的可靠性
2)方式二:同步发送(通过get方法等待Kafka的响应,判断消息是否发送成功)
以同步的方式发送消息时,一条一条的发送,对每条消息返回的结果判断, 可以明确地知道每条消息的发送情况,但是由于同步的方式会阻塞,只有当消息通过get返回future对象时,才会继续下一条消息的发送
3)方式三:异步发送+回调函数(消息以异步的方式发送,通过回调函数返回消息发送成功/失败)
在调用send方法发送消息的同时,指定一个回调函数,服务器在返回响应时会调用该回调函数,通过回调函数能够对异常情况进行处理,当调用了回调函数时,只有回调函数执行完毕生产者才会结束,否则一直会阻塞
应用场景
三种方式虽然在时间上有所差别,但并不是说时间越快的越好,具体要看业务的应用场景:
1)场景1:如果业务要求消息必须是按顺序发送的,那么可以使用同步的方式,并且只能在一个partation上,结合参数设置retries
的值让发送失败时重试,设置max_in_flight_requests_per_connection=1
,可以控制生产者在收到服务器晌应之前只能发送1个消息,从而控制消息顺序发送
2)场景2:如果业务只关心消息的吞吐量,容许少量消息发送失败,也不关注消息的发送顺序,那么可以使用发送并忘记的方式,并配合参数acks=0,这样生产者不需要等待服务器的响应,以网络能支持的最大速度发送消息
3)场景3:如果业务需要知道消息发送是否成功,并且对消息的顺序不关心,那么可以用异步+回调的方式来发送消息,配合参数retries=0,并将发送失败的消息记录到日志文件中
都看到这里了,点赞评论一下吧!!!
点击查看👇
【Kafka】Kafka入门解析(六)
转载:https://blog.csdn.net/qq_43733123/article/details/105692466