你一定遇到过这种情况,接收到消息时并不符合马上处理的条件(例如频率限制),但是又不能丢掉,于是先存起来,过一阵子再来处理。系统应该怎么设计呢?可能你会想到数据库,用一个字段来标记执行的状态,或者设置一个等待的时间戳,不管是哪种都需要反复地从数据库存取,还要考虑出异常情况状态的维护。
作为一款优秀的消息处理服务,kafka 具有完善的事务管理,状态管理和灾难恢复功能。只要我们稍加变通一下,kafka 也能作为延迟消息处理的解决方案,而且实现上比用数据库简单得多。
以下代码均在 spring-boot 2.0.5 和 spring-kafka 2.1.10 中测试通过。建议事先阅读文档 https://docs.spring.io/spring-kafka/docs/2.5.4.RELEASE/reference/html/#receiving-messages 以便能很好地理解以下内容。
设计思路
spring-boot 的配置
-
application.yml
-
————————————————————
-
-
spring:
-
## kafka
-
kafka:
-
bootstrap-servers:
127.0
.0
.1
:9092
-
consumer:
-
group-id:
myGroup
-
auto-offset-reset:
earliest
-
enable-auto-commit:
false
-
properties:
-
max:
-
poll:
-
interval:
-
# 设置时间必须比延迟处理的时间大,不然会报错
-
ms:
1200000
-
listener:
-
# 把提交模式改为手动
-
ack-mode:
MANUAL
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
发送消息
-
@Autowired
-
private KafkaTemplate kafkaTemplate;
-
-
public void myAction(){
-
// 定义 data
-
// 任务推送到 Kafka
-
kafkaTemplate.send(“myJob
", data.toString());
-
}
该部分没有特别的地方,跟普通的消息消息发送一样。
-
@SpringBootApplication
-
@ServletComponentScan
-
public
class Application {
-
-
@KafkaListener(topics = “myJob”)
-
@SendTo(“myJob-delay")
-
public String onMessage(ConsumerRecord<?, ?> cr, Acknowledgment ack) {
-
String json = (String) cr.value();
-
JSONObject data = JSON.parseObject(json);
-
-
if (
/* 需要延迟处理 */){
-
// 提交
-
ack.acknowledge();
-
// 发送到 @SendTo
-
data.put(
"until", System.currentTimeMillis() + msToDelay);
-
return data.toString();
-
}
-
-
// 正常处理
-
// do real work
-
-
// 提交
-
ack.acknowledge();
-
return
null;
-
}
-
-
@KafkaListener(topics = “myJob-delay")
-
@SendTo(“myJob")
-
public String delayMessage(ConsumerRecord<?, ?> cr, Acknowledgment ack){
-
-
String json = (String) cr.value();
-
JSONObject data = JSON.parseObject(json);
-
Long until = data.getLong(
"until");
-
// 阻塞直到 until
-
while (System.currentTimeMillis() < until){
-
Thread.sleep( Math.max(
0, until - System.currentTimeMillis()) );
-
}
-
-
// 提交
-
ack.acknowledge();
-
// 转移到 @SendTo
-
return json;
-
-
}
-
}
代码很简单,不用解释也能看明白。稍微提一下几个重要的地方。
@KafkaListener 的方法参数里有 Acknowledgment ack,这是AckMode.MANUAL 模式下必须要添加的参数。
ack.acknowledge() 用来标记一条消息已经消费完成,即将从消息队列里移除。执行之前消息会一直保留在队列中,即时宕机重启后也能恢复。
@SendTo 用来在队列(topic)间转移消息,只要 return 非 null 的数据。以上代码中,当需要延迟处理时,消息从 myJob 转移到 myJob-delay;而当条件满足时,消息又从 myJob-delay 转移到了 myJob。
自从 spring-kafka 2.2.4 版本之后,可以在方法上定义 max.poll.interval.ms ,更加灵活了。例如
-
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
-
"max.poll.interval.ms:60000”,
-
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100”}
-
)
以上是延迟消息处理的简单实现,适合延时要求不那么高的场合。朋友们想一下,假如延时比较复杂,执行的次序也不一定跟消息到达的次序一致,系统又该怎样设计呢?
假如这篇文章对你有所帮助, 请关注我公众号, 发现更多有用的文章
转载:https://blog.csdn.net/afeiqiang/article/details/107903620