一、简介
- 1、发布订阅
SUBSCRIBE, UNSUBSCRIBE 和 PUBLISH 实现了 发布/订阅消息范例,发送者 (publishers) 不用编程就可以向特定的接受者发送消息 (subscribers). Rather, 发布的消息进入通道,不需要知道有没有订阅者. 订阅者发表感兴趣的一个或多个通道,并且只接受他们感兴趣的消息,不管发布者是不是存在. 发布者和订阅者的解耦可以允许更大的伸缩性和更多动态的网络拓扑。 - 2、说明
本篇文章是继:
【SpringBoot】三十四、SpringBoot整合Redis实现序列化存储Java对象
以及
【SpringBoot】三十五、SpringBoot整合Redis监听Key过期事件
其中涉及到的知识及代码,本篇文章不再进行赘述
二、注入消息发布/订阅
- 1、添加消息监听器
/**
* 消息监听
* <p>
* 可以传入多个 MessageListenerAdapter
*/
@Bean
RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
// 监听所有库的key过期事件
container.setConnectionFactory(connectionFactory);
// 可以添加多个 messageListener,配置不同的通道
container.addMessageListener(messageListenerAdapter, new PatternTopic("user"));
return container;
}
所有的订阅消息,都需要在这里进行注册绑定,new PatternTopic(“user”),表示发布的主题信息
- 小插曲
前面我们学习了监听 key 过期事件,如果我们只需要监听当前库的 key 过期事件,可以这样写:
@Value("${spring.redis.database}")
public String redisDatabaseIndex;
先拿到我们项目中使用的 Redis 的库索引
// 监听当前库的key过期
container.addMessageListener(messageListenerAdapter, new PatternTopic("__keyevent@" + redisDatabaseIndex + "__:expired"));
然后使用发布/订阅模式,订阅主题为:keyevent@0:expired 的消息,则表示订阅数据库索引为 0 的 key 过期事件,监听所有的库则为:keyevent@*:expired
- 2、绑定消息处理器
/**
* 消息监听器适配器,绑定消息处理器
* <p>
* 可以配置多个 listenerAdapter,监听不同的通道
*/
@Bean
MessageListenerAdapter listenerAdapter(RedisMessageListener receiver) {
return new MessageListenerAdapter(receiver, "onMessage");
}
也就是说,当我们订阅的频道,当有消息进来时,指定它的处理类以及处理方法
三、注入消息处理器
上面我们已经注入了 RedisMessageListener 消息处理器,并指定了处理方法 onMessage(),代码如下:
package com.zyxx.common.redis;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
/**
* Redis 消息接收
*
* @Author Lizhou
**/
@Slf4j
@Component
public class RedisMessageListener implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
// 接收的topic
log.info("channel:" + new String(pattern));
// 消息的POJO
log.info("message:" + message.toString());
}
}
需要实现 MessageListener 接口,重写 onMessage() 方法,然后就可以获取到通道以及消息了,从而进行我们的一些业务逻辑处理
四、操作API
在 RedisUtils 中,我们增加一个操作方法
/**
* 向通道发布消息
*/
public boolean convertAndSend(String channel, Object message) {
if (StringUtils.isBlank(channel)) {
return false;
}
try {
template.convertAndSend(channel, message);
log.info("发送消息成功,channel:{},message:{}", channel, message);
return true;
} catch (Exception e) {
log.info("发送消息失败,channel:{},message:{}", channel, message);
e.printStackTrace();
}
return false;
}
这里的 channel 相当于 我们存入数据的时候的 key,如果该通道不存在,则会新建一个通道
五、测试
- 1、测试用例
package com.zyxx.redistest;
import com.zyxx.redistest.common.RedisUtils;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class RedisTestApplicationTests {
@Autowired
private RedisUtils redisUtil;
@Test
void contextLoads() {
String message = "Hello World!";
// 发送消息
redisUtil.convertAndSend("user", message);
}
}
我们向通道 user 发送了一条 “Hello World!” 的消息
- 2、测试结果
可以看出,我们的消息发送成功,再看控制台
我们接收到通道 user 发送了一条 “Hello World!” 的消息
如您在阅读中发现不足,欢迎留言!!!
转载:https://blog.csdn.net/qq_40065776/article/details/109238701
查看评论