小言_互联网的博客

圣诞也要撸代码 |【消息中间件MQ系列】Spring整合kafka并设置多套kafka配置

325人阅读  评论(0)

 1、前言

        圣诞节的到来,程序员不会收到圣诞老人的🎁,但可以自己满足一下自己,所以,趁着有时间,就记录一下这会儿撸了些什么代码吧!!!

        因为业务原因,需要在系统内新增其他的kakfa配置使用,所以今天研究的是怎么在系统内整合多套kafka配置使用。


2、整合kafka实践

首先,引入pom依赖,我的版本是 2.3.1 


  
  1. <dependency>
  2.     <groupId>org.springframework.kafka </groupId>
  3.     <artifactId>spring-kafka </artifactId>
  4. </dependency>

然后,设置properties配置:


  
  1. spring.kafka.bootstrap-servers=127.0.0.1:9090,127.0.0.1:9091,127.0.0.1:9092
  2. spring.kafka2.bootstrap-servers=127.0.0.2:9090,127.0.0.2:9091,127.0.0.2:9092

 另外还可以设置很多其他配置,例如生产者、消费者各自的消息序列化、消费模式等等,

具体的配置样例,请参考如下:


  
  1. #################consumer的配置参数(开始)#################
  2. #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
  3. spring.kafka.consumer.auto-commit-interval;
  4. #当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量
  5. # latest:不存在offset时,消费最新的消息
  6. # earliest:不存在offset时,从最早消息开始消费
  7. # none :不存在offset时,直接报错
  8. spring.kafka.consumer.auto-offset-reset=latest;
  9. #以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接。
  10. spring.kafka.consumer.bootstrap-servers;
  11. #ID在发出请求时传递给服务器;用于服务器端日志记录。
  12. spring.kafka.consumer.client-id;
  13. #如果为true,则消费者的偏移量将在后台定期提交,默认值为true
  14. spring.kafka.consumer.enable-auto-commit= true;
  15. #如果没有足够的数据立即满足“fetch.min.bytes”给出的要求,服务器在回答获取请求之前将阻塞的最长时间(以毫秒为单位)
  16. #默认值为500
  17. spring.kafka.consumer.fetch-max-wait;
  18. #服务器应以字节为单位返回获取请求的最小数据量,默认值为1,对应的kafka的参数为fetch.min.bytes。
  19. spring.kafka.consumer.fetch-min-size;
  20. #用于标识此使用者所属的使用者组的唯一字符串。
  21. spring.kafka.consumer.group-id;
  22. #心跳与消费者协调员之间的预期时间(以毫秒为单位),默认值为3000
  23. spring.kafka.consumer.heartbeat-interval;
  24. #密钥的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
  25. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  26. #值的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
  27. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  28. #一次调用poll()操作时返回的最大记录数,默认值为500
  29. spring.kafka.consumer.max-poll-records;
  30. #################consumer的配置参数(结束)#################
  31. #################producer的配置参数(开始)#################
  32. #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
  33. #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
  34. #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
  35. #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
  36. #可以设置的值为:all, -1, 0, 1
  37. spring.kafka.producer.acks=1
  38. #每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,
  39. #这有助于提升客户端和服务器上的性能,此配置控制默认批量大小(以字节为单位),默认值为16384
  40. spring.kafka.producer.batch-size=16384
  41. #以逗号分隔的主机:端口对列表,用于建立与Kafka群集的初始连接
  42. spring.kafka.producer.bootstrap-servers
  43. #生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为33554432
  44. spring.kafka.producer.buffer-memory=33554432
  45. #ID在发出请求时传递给服务器,用于服务器端日志记录
  46. spring.kafka.producer.client-id
  47. #生产者生成的所有数据的压缩类型,此配置接受标准压缩编解码器('gzip','snappy','lz4'),
  48. #它还接受'uncompressed'以及'producer',分别表示没有压缩以及保留生产者设置的原始压缩编解码器,
  49. #默认值为producer
  50. spring.kafka.producer.compression-type=producer
  51. #key的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer
  52. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  53. #值的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer
  54. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  55. #如果该值大于零时,表示启用重试失败的发送次数
  56. spring.kafka.producer.retries
  57. #################producer的配置参数(结束)#################
  58. #################listener的配置参数(结束)#################
  59. #侦听器的AckMode,参见https://docs.spring.io/spring-kafka/reference/htmlsingle/#committing-offsets
  60. #当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
  61. spring.kafka.listener.ack-mode;
  62. #在侦听器容器中运行的线程数
  63. spring.kafka.listener.concurrency;
  64. #轮询消费者时使用的超时(以毫秒为单位)
  65. spring.kafka.listener.poll-timeout;
  66. #当ackMode为“COUNT”或“COUNT_TIME”时,偏移提交之间的记录数
  67. spring.kafka.listener.ack-count;
  68. #当ackMode为“TIME”或“COUNT_TIME”时,偏移提交之间的时间(以毫秒为单位)
  69. spring.kafka.listener.ack-time;
  70. #################listener的配置参数(结束)#################

最后,编写kafka的config配置类:


  
  1. package ***.***.***.kafka;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.common.serialization.StringSerializer;
  4. import org.springframework.beans.factory.annotation.Value;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. import org.springframework.kafka.annotation.EnableKafka;
  8. import org.springframework.kafka.core.DefaultKafkaProducerFactory;
  9. import org.springframework.kafka.core.KafkaTemplate;
  10. import org.springframework.kafka.core.ProducerFactory;
  11. import java.util.HashMap;
  12. import java.util.Map;
  13. @Configuration
  14. @EnableKafka
  15. public class KafkaConfiuration {
  16. @Value("${spring.kafka.bootstrap-servers}")
  17. private String bootstrapServer;
  18. @Value("${spring.kafka2.bootstrap-servers}")
  19. private String bootstrapServer2;
  20. @Bean("kafkaTemplateOne")
  21. public KafkaTemplate<String, String> oneReqKafkaTemplate () {
  22. return new KafkaTemplate<>(oneEnquiryReqFactory());
  23. }
  24. @Bean
  25. public ProducerFactory<String, String> oneEnquiryReqFactory () {
  26. return new DefaultKafkaProducerFactory<>(oneProducerConfigs());
  27. }
  28. @Bean
  29. public Map<String, Object> oneProducerConfigs () {
  30. Map<String, Object> props = new HashMap<>();
  31. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
  32. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  33. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  34. return props;
  35. }
  36. @Bean("kafkaTemplateTwo")
  37. public KafkaTemplate<String, String> twoKafkaTemplate () {
  38. return new KafkaTemplate<>(twoFactory());
  39. }
  40. @Bean
  41. public ProducerFactory<String, String> twoFactory () {
  42. return new DefaultKafkaProducerFactory<>(twoProducerConfigs());
  43. }
  44. @Bean
  45. public Map<String, Object> twoProducerConfigs () {
  46. Map<String, Object> props = new HashMap<>();
  47. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer2);
  48. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  49. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  50. return props;
  51. }
  52. }

配置完成之后,就可以直接使用啦:


  
  1. package ***.***.***.controller;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.web.bind.annotation.PostMapping;
  5. import org.springframework.web.bind.annotation.RestController;
  6. import javax.annotation.Resource;
  7. @RestController
  8. public class KafkaTestController {
  9. @Resource(name = "kafkaTemplateOne")
  10. private KafkaTemplate<String, String> kafkaTemplateOne;
  11. @Resource(name = "kafkaTemplateTwo")
  12. private KafkaTemplate<String, String> kafkaTemplateTwo;
  13. private final static String TOPIC1 = "testTopic1";
  14. private final static String TOPIC2 = "testTopic2";
  15. @PostMapping("/testKafka")
  16. public void testKafka (){
  17. kafkaTemplateOne.send(TOPIC1, "test message 1");
  18. kafkaTemplateTwo.send(TOPIC2, "test message 2");
  19. }
  20. @KafkaListener(groupId = "TOPIC_DATA_ACCOUNT_GROUPID",topics = TOPIC1)
  21. public void receiveMsg (String msg){
  22. System.out.println(TOPIC1+ " 接收到的消息是:"+msg);
  23. }
  24. @KafkaListener(groupId = "TOPIC_DATA_ACCOUNT_GROUPID",topics = TOPIC2)
  25. public void receiveMsg1 (String msg){
  26. System.out.println(TOPIC2+ " 接收到的消息是:"+msg);
  27. }
  28. }

效果如下:

2022-11-15 18:35:28,973 [kafka-producer-network-thread | producer-2] [] INFO (Metadata.java:261)- [Producer clientId=producer-2] Cluster ID: SrufjpSTQpiu0QseCKbwYg
testTopic2 接收到的消息是:test message 2
testTopic1 接收到的消息是:test message 1


3、总结

        本次的记录内容,只是简单的demo实践,具体的使用情况,可以根据自身系统设置详细配置处理。
        若有疑问,欢迎留言讨论~~。

        最后祝大家圣诞节快乐,新年快到了,继续加油吧!!!

 


转载:https://blog.csdn.net/yy339452689/article/details/128457739
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场