
kafka 延时消息处理

    作为一款优秀的消息处理服务,kafka 具有完善的事务管理,状态管理和灾难恢复功能。只要我们稍加变通一下,kafka 也能作为延迟消息处理的解决方案,而且实现上比用数据库简单得多。

    以下代码均在 spring-boot 2.0.5 和 spring-kafka 2.1.10 中测试通过。建议事先阅读文档 https://docs.spring.io/spring-kafka/docs/2.5.4.RELEASE/reference/html/#receiving-messages 以便能很好地理解以下内容。


    设计 2 个队列(topic),一个收到消息马上执行,另一个用来接收需延迟处理的消息。话句话说,接收延迟消息的队列直到消息可执行之前一直在 block 状态,所以有局限性,定时不能非常精确,并且任务执行次序与加进来的次序是一致的。

spring-boot 的配置

  1. application.yml
  2. ————————————————————
  3. spring:
  4.   ## kafka
  5.   kafka:
  6.     bootstrap-servers: 127.0 .0 .1 :9092
  7.     consumer:
  8.       group-id: myGroup
  9.       auto-offset-reset: earliest
  10.       enable-auto-commit: false
  11.       properties:
  12.         max:
  13.           poll:
  14.             interval:
  15.               # 设置时间必须比延迟处理的时间大,不然会报错
  16.               ms: 1200000
  17.     listener:
  18.       # 把提交模式改为手动
  19.       ack-mode: MANUAL
kafka 默认的消费模式是自动提交,意思是,当 MessageListener 收到消息,执行处理方法后自动提交已完成状态,该消息就从队列里移除了。配置 ack-mode: MANUAL 改为手动提交后,我们就可以根据需要保留数据在消息队列,以便以后再处理。
max.poll.interval.ms 设小了可能会收到下面的错误:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.


  1. @Autowired
  2. private KafkaTemplate kafkaTemplate;
  3. public void myAction(){
  4.     // 定义 data
  5.      // 任务推送到 Kafka
  6.     kafkaTemplate.send(“myJob ", data.toString());
  7. }



定义两个 topic:myJob 和 myJob-delay

  1. @SpringBootApplication
  2. @ServletComponentScan
  3. public class Application {
  4. @KafkaListener(topics = “myJob”)
  5. @SendTo(“myJob-delay")
  6. public String onMessage(ConsumerRecord<?, ?> cr, Acknowledgment ack) {
  7. String json = (String) cr.value();
  8. JSONObject data = JSON.parseObject(json);
  9. if ( /* 需要延迟处理 */){
  10. // 提交
  11. ack.acknowledge();
  12. // 发送到 @SendTo
  13. data.put( "until", System.currentTimeMillis() + msToDelay);
  14. return data.toString();
  15. }
  16. // 正常处理
  17. // do real work
  18. // 提交
  19. ack.acknowledge();
  20. return null;
  21. }
  22. @KafkaListener(topics = “myJob-delay")
  23. @SendTo(“myJob")
  24. public String delayMessage(ConsumerRecord<?, ?> cr, Acknowledgment ack){
  25. String json = (String) cr.value();
  26. JSONObject data = JSON.parseObject(json);
  27. Long until = data.getLong( "until");
  28. // 阻塞直到 until
  29. while (System.currentTimeMillis() < until){
  30. Thread.sleep( Math.max( 0, until - System.currentTimeMillis()) );
  31. }
  32. // 提交
  33. ack.acknowledge();
  34. // 转移到 @SendTo
  35. return json;
  36. }
  37. }


@KafkaListener 的方法参数里有 Acknowledgment ack,这是AckMode.MANUAL 模式下必须要添加的参数。

ack.acknowledge() 用来标记一条消息已经消费完成,即将从消息队列里移除。执行之前消息会一直保留在队列中,即时宕机重启后也能恢复。

@SendTo 用来在队列(topic)间转移消息,只要 return 非 null 的数据。以上代码中,当需要延迟处理时,消息从 myJob 转移到 myJob-delay;而当条件满足时,消息又从 myJob-delay 转移到了 myJob。

自从 spring-kafka 2.2.4 版本之后,可以在方法上定义 max.poll.interval.ms ,更加灵活了。例如

  1. @KafkaListener(topics = "myTopic", groupId = "group", properties = {
  2. "max.poll.interval.ms:60000”,
  3. ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100”}
  4. )


