前提
了解如何实现点对点聊天(客户端与客户端通信)请参看上一篇博客:SpringBoot+Netty整合websocket(二)——实现点对点聊天(客户端与客户端通信)
在上一篇博客中实现了点对点聊天(客户端与客户端通信),但仍存在一些问题:
- 客户端的聊天消息还未实现暂存redis或者存储mysql(浏览器刷新后,消息就会丢失)。
- 客户端如果离线则不能接受消息,即缺少对离线消息的处理。
接下来将以一个实例来说明。
注:此篇博客是在上一篇博客基础上所写,请同时参看上一篇博客。
需求
- 主要存在两种类型的用户,一个是提问方,可以提出问题;另一个是解答方,可以解答问题。
- 解答方和提问方在解答问题时,需要建立聊天室,进行聊天解答。
核心数据库设计
关于用户表就不展示了,主要用其user.id
问题表
问题表解读
ask_id
和answer_id
都是用户id,代表的是提问方id和解答方id
context
是解决问题时的聊天内容,类型是TEXT(重点)
其余字段和聊天存储关系不大,略。
后端实现
关于客户端与客户端通信,请参看上一篇文章。客户端聊天消息存储到redis和MySQL,并实现离线消息的处理,主要修改的是WebSocketHandler类。
1.引入依赖
<!-- redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--测试redis连接-->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.3</version>
</dependency>
<!--Mysql依赖包-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- druid数据源驱动 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
2.配置application.yml
spring:
datasource:
druid:
db-type: com.alibaba.druid.pool.DruidDataSource
driverClassName: net.sf.log4jdbc.sql.jdbcapi.DriverSpy
url: jdbc:log4jdbc:mysql://ip:3306/数据库?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false
username:
password:
redis:
#数据库索引
database: 0
host: IP地址
port: 6379
password: redis
#连接超时时间
timeout: 10000
jedis:
pool:
# 连接池最大连接数(使用负值表示没有限制)
max-active: -1
# 连接池最大阻塞等待时间(使用负值表示没有限制)
max-wait: -1
# netty运行端口
netty:
port: 10101
3.question的实体类略
4.整合redis
配置RedisConfig
@Slf4j
@Configuration
@EnableCaching
@ConditionalOnClass(RedisOperations.class)
@EnableConfigurationProperties(RedisProperties.class)
public class RedisConfig extends CachingConfigurerSupport {
/**
* 设置 redis 数据默认过期时间,默认2小时
* 设置@cacheable 序列化方式
*/
@Bean
public RedisCacheConfiguration redisCacheConfiguration(){
FastJsonRedisSerializer<Object> fastJsonRedisSerializer = new FastJsonRedisSerializer<>(Object.class);
RedisCacheConfiguration configuration = RedisCacheConfiguration.defaultCacheConfig();
configuration = configuration.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(fastJsonRedisSerializer)).entryTtl(Duration.ofHours(2));
return configuration;
}
@SuppressWarnings("all")
@Bean(name = "redisTemplate")
@ConditionalOnMissingBean(name = "redisTemplate")
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<Object, Object> template = new RedisTemplate<>();
//序列化
FastJsonRedisSerializer<Object> fastJsonRedisSerializer = new FastJsonRedisSerializer<>(Object.class);
// value值的序列化采用fastJsonRedisSerializer
template.setValueSerializer(fastJsonRedisSerializer);
template.setHashValueSerializer(fastJsonRedisSerializer);
// 全局开启AutoType,这里方便开发,使用全局的方式
ParserConfig.getGlobalInstance().setAutoTypeSupport(true);
// 建议使用这种方式,小范围指定白名单
// ParserConfig.getGlobalInstance().addAccept("me.zhengjie.domain");
// key的序列化采用StringRedisSerializer
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
template.setConnectionFactory(redisConnectionFactory);
return template;
}
/**
* 自定义缓存key生成策略,默认将使用该策略
*/
@Bean
@Override
public KeyGenerator keyGenerator() {
return (target, method, params) -> {
Map<String,Object> container = new HashMap<>(3);
Class<?> targetClassClass = target.getClass();
// 类地址
container.put("class",targetClassClass.toGenericString());
// 方法名称
container.put("methodName",method.getName());
// 包名称
container.put("package",targetClassClass.getPackage());
// 参数列表
for (int i = 0; i < params.length; i++) {
container.put(String.valueOf(i),params[i]);
}
// 转为JSON字符串
String jsonString = JSON.toJSONString(container);
// 做SHA256 Hash计算,得到一个SHA256摘要作为Key
return DigestUtils.sha256Hex(jsonString);
};
}
@Bean
@Override
public CacheErrorHandler errorHandler() {
// 异常处理,当Redis发生异常时,打印日志,但是程序正常走
log.info("初始化 -> [{}]", "Redis CacheErrorHandler");
return new CacheErrorHandler() {
@Override
public void handleCacheGetError( RuntimeException e, Cache cache, Object key) {
log.error("Redis occur handleCacheGetError:key -> [{}]", key, e);
}
@Override
public void handleCachePutError(RuntimeException e, Cache cache, Object key, Object value) {
log.error("Redis occur handleCachePutError:key -> [{}];value -> [{}]", key, value, e);
}
@Override
public void handleCacheEvictError(RuntimeException e, Cache cache, Object key) {
log.error("Redis occur handleCacheEvictError:key -> [{}]", key, e);
}
@Override
public void handleCacheClearError(RuntimeException e, Cache cache) {
log.error("Redis occur handleCacheClearError:", e);
}
};
}
/**
* 重写序列化器
*/
static class StringRedisSerializer implements RedisSerializer<Object> {
private final Charset charset;
StringRedisSerializer() {
this(StandardCharsets.UTF_8);
}
private StringRedisSerializer(Charset charset) {
Assert.notNull(charset, "Charset must not be null!");
this.charset = charset;
}
@Override
public String deserialize(byte[] bytes) {
return (bytes == null ? null : new String(bytes, charset));
}
@Override
public byte[] serialize(Object object) {
String string = JSON.toJSONString(object);
if (StringUtils.isBlank(string)) {
return null;
}
string = string.replace("\"", "");
return string.getBytes(charset);
}
}
}
/**
* Value 序列化
* @param <T>
*/
class FastJsonRedisSerializer<T> implements RedisSerializer<T> {
private Class<T> clazz;
FastJsonRedisSerializer(Class<T> clazz) {
super();
this.clazz = clazz;
}
@Override
public byte[] serialize(T t) {
if (t == null) {
return new byte[0];
}
return JSON.toJSONString(t, SerializerFeature.WriteClassName).getBytes(StandardCharsets.UTF_8);
}
@Override
public T deserialize(byte[] bytes) {
if (bytes == null || bytes.length <= 0) {
return null;
}
String str = new String(bytes, StandardCharsets.UTF_8);
return JSON.parseObject(str, clazz);
}
}
配置RedisUtil(redis常用的相关方法)
因为内容比较多,放在文章末尾“附”
与聊天相关的redis工具类
@Component
@Slf4j
public class ChatRedisUtil {
@Autowired
private RedisUtil redisUtil;
/**
* 功能描述:将JavaBean对象的信息缓存进Redis
*
* @param chatVO 聊天信息JavaBean
* @return 是否保存成功
*/
public boolean saveCacheChatMessage ( String key, ChatVO chatVO ) {
//判断key是否存在
if (redisUtils.hasKey(key)) {
//将javabean对象添加到缓存的list中
long redisSize = redisUtils.lGetListSize(key);
System.out.println("redis当前数据条数" + redisSize);
Long index = redisUtils.rightPushValue(key, chatVO);
System.out.println("redis执行rightPushList返回值:" + index);
return redisSize < index;
} else {
//不存在key时,将chatVO存进缓存,并设置过期时间
boolean isCache = redisUtils.lSet(key, chatVO);
//保存成功,设置过期时间
if (isCache) {
redisUtils.expire(key, 3L, TimeUnit.DAYS);
}
return isCache;
}
}
/**
* 功能描述:从缓存中读取聊天信息
*
* @param key 缓存聊天信息的键
* @return 缓存中聊天信息list
*/
public List<Object> getCacheChatMessage ( String key ) {
List<Object> chatList = null;
//判断key是否存在
if (redisUtils.hasKey(key)) {
//读取缓存中的聊天内容
chatList = redisUtils.lGet(key, 0, redisUtils.lGetListSize(key));
} else {
System.out.println("此次解答无聊天信息");
log.info("redis缓存中无此键值:" + key);
}
return chatList;
}
/**
* 功能描述: 在缓存中删除聊天信息
*
* @param key 缓存聊天信息的键
*/
public void deleteCacheChatMessage ( String key ) {
//判断key是否存在
if (redisUtils.hasKey(key)) {
redisUtils.del(key);
}
}
/**
* 功能描述: 创建已发送消息房间号
* 根据ChatVO中的fromUserId和toUserId生成聊天房间号:问题id-小号用户id-大号用户id
* 例如“1-2”: 小号在前,大号在后;保证房间号唯一
*
* @param fromUserId 发送方id
* @param toUserId 接收方id
*/
public String createChatNumber (Integer questionId, Integer fromUserId, Integer toUserId) {
StringBuilder key = new StringBuilder();
key.append(questionId).append("-");
if (fromUserId < toUserId) {
key.append(fromUserId).append("-").append(toUserId);
} else {
key.append(toUserId).append("-").append(fromUserId);
}
return key.toString();
}
/**
* 功能描述:创建离线聊天记录的房间号(redis的键)
* 拼接方式:发送方用户id-签证标识
* @param toUserId 发送方用户id
* @return 用户离线消息房间号
*/
public String createOffLineNumber(Integer toUserId){
return toUserId + "-" + MsgSignFlagEnum.unsign.type;
}
/**
* 功能描述:从redis读取缓存信息集合(List<Object>),并且存储到新的键中 oldKey——>newKey
*/
public void signQuestionMessageList(String oldKey,String newKey){
redisUtils.rightPushList(newKey,getCacheChatMessage(oldKey));
}
/**
* 功能描述:从redis读取每一条缓存信息,并且存储到新的键中 oldKey——>newKey
*/
public void signQuestionMessage(String oldKey,String newKey){
redisUtils.rightPushValue(newKey,getCacheChatMessage(oldKey));
}
}
5.自定义SpringContextHolder工具类
用于在Spring的bean容器中获取实例
/**
* 通过SpringContextHolder获取bean对象的工具类
*
* 需要将SpringContextHolder注入到bean容器中,所以加@Configuration注解
*/
@Slf4j
@Configuration
public class SpringContextHolder implements ApplicationContextAware, DisposableBean {
private static ApplicationContext applicationContext = null;
/**
* 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
*/
@SuppressWarnings("unchecked")
public static <T> T getBean(String name) {
assertContextInjected();
return (T) applicationContext.getBean(name);
}
/**
* 从静态变量applicationContext中取得Bean, 自动转型为所赋值对象的类型.
*/
public static <T> T getBean(Class<T> requiredType) {
assertContextInjected();
return applicationContext.getBean(requiredType);
}
/**
* 检查ApplicationContext不为空.
*/
private static void assertContextInjected() {
if (applicationContext == null) {
throw new IllegalStateException("applicaitonContext属性未注入, 请在applicationContext" +
".xml中定义SpringContextHolder或在SpringBoot启动类中注册SpringContextHolder.");
}
}
/**
* 清除SpringContextHolder中的ApplicationContext为Null.
*/
private static void clearHolder() {
log.debug("清除SpringContextHolder中的ApplicationContext:"
+ applicationContext);
applicationContext = null;
}
@Override
public void destroy(){
SpringContextHolder.clearHolder();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if (SpringContextHolder.applicationContext != null) {
log.warn("SpringContextHolder中的ApplicationContext被覆盖, 原有ApplicationContext为:" + SpringContextHolder.applicationContext);
}
SpringContextHolder.applicationContext = applicationContext;
}
}
6.修改WebSocketHandler
如果在WebSocketHandler
直接使用@Autowired
注入ChatRedisUtil
会报空指针错误,所以在WebSocketHandler中使用ChatRedisUtil
需要用SpringContextHolder
在bean容器中获取实例。使用如下:
//实例化redis对象,通过自定义的SpringContextHolder在bean容器中获取chatRedisUtil对象
ChatRedisUtil chatRedisUtil = SpringContextHolder.getBean(ChatRedisUtil.class);
@Slf4j
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
/**
* 客户端组
* 用于记录和管理所有客户端的channel
*/
public static ChannelGroup channelGroup;
static {
channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
/**
* 接收客户端传来的消息
*
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0 ( ChannelHandlerContext ctx, Object msg ) throws Exception {
Channel currentChannel = ctx.channel();
//实例化redis对象,通过自定义的SpringContextHolder在bean容器中获取chatRedisUtil对象
ChatRedisUtil chatRedisUtil = SpringContextHolder.getBean(ChatRedisUtil.class);
//文本消息
if (msg instanceof TextWebSocketFrame) {
String message = ((TextWebSocketFrame) msg).text();
System.out.println("收到客户端消息:" + message);
//json消息转换为Javabean对象
ChatMsgVO chatMsgVO = null;
try {
chatMsgVO = JSONUtil.toBean(message, ChatMsgVO.class, true);
} catch (JSONException e) {
e.printStackTrace();
System.out.println("json解析异常,发送的消息应该为json格式");
return;
}
Integer action = chatMsgVO.getAction();
if (action.equals(MsgActionEnum.CONNECT.type)) {
//当websocket第一次open的时候,初始化channel,把用的channel和userId关联起来
Integer fromUserId = chatMsgVO.getFromUserId();
UserChannelRel.put(fromUserId, currentChannel);
//测试
channelGroup.forEach(channel -> log.info(channel.id().asLongText()));
UserChannelRel.output();
/* 第一次或者断线重连,查询redis此用户的离线消息,并处理 */
//查询此用户的离线消息
String unsignKey=chatRedisUtil.createOffLineNumber(fromUserId);
List<Object> offLineMessageList = chatRedisUtil.getCacheChatMessage(unsignKey);
//若有离线消息
if (offLineMessageList!=null){
//遍历当前用户的所有离线消息
for (int i=0;i<offLineMessageList.size();i++){
//离线消息json转javabean
ChatVO chatVO= (ChatVO) offLineMessageList.get(i);
//将离线消息发送给当前用户
sendMessage(fromUserId, JSONUtil.toJsonStr(chatVO));
//每条消息对应的已读消息的房间号
String signKey = chatRedisUtil.createChatNumber(chatVO.getQuestionId(), chatVO.getFromUserId(), chatVO.getToUserId());
//每条消息的签证
chatRedisUtil.saveCacheChatMessage(signKey,chatVO);
}
//签证完成后,在redis中删除离线消息
chatRedisUtil.deleteCacheChatMessage(unsignKey);
}
} else if (action.equals(MsgActionEnum.CHAT.type)) {
//聊天类型的消息,把聊天记录保存到redis,同时标记消息的签收状态[是否签收]
Integer toUserId = chatMsgVO.getToUserId();
Channel receiverChannel = UserChannelRel.get(toUserId);
//将消息转化为需要保存在redis中的消息
ChatVO chatVO = JSONUtil.toBean(message, ChatVO.class, true);
//消息保存至redis的键
String key = "";
//设置发送消息的时间
chatVO.setDateTime(new DateTime());
if (receiverChannel == null) {
//接收方离线状态,将消息保存到redis,并设置[未签收]状态
//设置redis键为未接收房间号。拼接发送方和接收方,成为房间号;MsgSignFlagEnum.signed.type为0,代表未签收
key = chatRedisUtil.createOffLineNumber(chatVO.getToUserId());
} else {
//接受方在线,服务端直接转发聊天消息给用户,并将消息存储到redis,并设置[签收]状态
//设置redis键为已接收房间号。拼接发送方和接收方,成为房间号;MsgSignFlagEnum.signed.type为1,代表已经签收
key = chatRedisUtil.createChatNumber(chatVO.getQuestionId(), chatVO.getFromUserId(), chatVO.getToUserId());
/* 发送消息给指定用户 */
//判断消息是否符合定义的类型
if (ChatTypeVerificationUtil.verifyChatType(chatVO.getChatMessageType())) {
//发送消息给指定用户
if (toUserId > 0 && UserChannelRel.isContainsKey(toUserId)) {
sendMessage(toUserId, JSONUtil.toJsonStr(chatVO));
}
} else {
//消息不符合定义的类型的处理
}
}
/* 消息保存到redis中 */
boolean isCache = chatRedisUtil.saveCacheChatMessage(key, chatVO);
//缓存失败,抛出异常
if (!isCache) {
throw new BadRequestException("聊天内容缓存失败,聊天信息内容:" + message);
}
} else if (action.equals(MsgActionEnum.KEEPALIVE.type)) {
//心跳类型的消息
log.info("收到来自channel为[" + currentChannel + "]的心跳包");
}
}
//二进制消息
if (msg instanceof BinaryWebSocketFrame) {
System.out.println("收到二进制消息:" + ((BinaryWebSocketFrame) msg).content().readableBytes());
BinaryWebSocketFrame binaryWebSocketFrame = new BinaryWebSocketFrame(Unpooled.buffer().writeBytes("hello".getBytes()));
//给客户端发送的消息
ctx.channel().writeAndFlush(binaryWebSocketFrame);
}
//ping消息
if (msg instanceof PongWebSocketFrame) {
System.out.println("客户端ping成功");
}
//关闭消息
if (msg instanceof CloseWebSocketFrame) {
System.out.println("客户端关闭,通道关闭");
Channel channel = ctx.channel();
channel.close();
}
}
/**
* Handler活跃状态,表示连接成功
* 当客户端连接服务端之后(打开连接)
* 获取客户端的channel,并且放到ChannelGroup中去进行管理
*
* @param ctx
* @throws Exception
*/
@Override
public void handlerAdded ( ChannelHandlerContext ctx ) throws Exception {
System.out.println("与客户端连接成功");
channelGroup.add(ctx.channel());
}
/**
* @param ctx
* @throws Exception
*/
@Override
public void handlerRemoved ( ChannelHandlerContext ctx ) throws Exception {
//当触发handlerRemoved,ChannelGroup会自动移除对应的客户端的channel
//所以下面这条语句可不写
// clients.remove(ctx.channel());
log.info("客户端断开,channel对应的长id为:" + ctx.channel().id().asLongText());
log.info("客户端断开,channel对应的短id为:" + ctx.channel().id().asShortText());
}
/**
* 异常处理
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught ( ChannelHandlerContext ctx, Throwable cause ) throws Exception {
System.out.println("连接异常:" + cause.getMessage());
cause.printStackTrace();
ctx.channel().close();
channelGroup.remove(ctx.channel());
}
@Override
public void userEventTriggered ( ChannelHandlerContext ctx, Object evt ) throws Exception {
//IdleStateEvent是一个用户事件,包含读空闲/写空闲/读写空闲
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.info("进入读空闲");
} else if (event.state() == IdleState.WRITER_IDLE) {
log.info("进入写空闲");
} else if (event.state() == IdleState.ALL_IDLE) {
log.info("channel关闭前,用户数量为:" + channelGroup.size());
//关闭无用的channel,以防资源浪费
ctx.channel().close();
log.info("channel关闭后,用户数量为:" + channelGroup.size());
}
}
}
/**
* 给指定用户发内容
* 后续可以掉这个方法推送消息给客户端
*/
public void sendMessage ( Integer toUserId, String message ) {
Channel channel = UserChannelRel.get(toUserId);
channel.writeAndFlush(new TextWebSocketFrame(message));
}
/**
* 群发消息
*/
public void sendMessageAll ( String message ) {
channelGroup.writeAndFlush(new TextWebSocketFrame(message));
}
}
聊天消息整合进mysql
聊天业务使用比较频繁,为了降低mysql的压力,所以一般不采用,每发送一条消息,就直接存进mysql。
现在所有的消息,不管是离线还是在线都暂存在redis。
我们可以提供接口,手动将redis的消息存储进mysql,也可以利用定时任务等,将redis的消息读取出来然后存进mysql,这些都是可以的。
也可利用接口形式,查询redis暂存的聊天信息。
这里采用提供接口的方式。
Controller
@RestController
@RequestMapping("/api/question")
public class QuestionController {
/**
* 修改问题状态:聊天信息从redis保存到数据库(当结束此次解答时调用),status改为2
*/
@PutMapping("/updateChatContent")
public ResponseEntity<Object> updateChatContent(Integer questionId,Integer askId,Integer answerId){
questionService.updateChatContent(questionId,askId,answerId);
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
}
/**
* 查询解决问题时的聊天记录(从redis缓存中查询)
*/
@GetMapping("/getChatContent")
public ResponseEntity<Object> getChatContent(Integer questionId,Integer askId,Integer answerId){
return new ResponseEntity<>(questionService.getChatContent(questionId,askId,answerId),HttpStatus.OK);
}
}
Service
public interface QuestionService {
/**
* 功能描述:解答完毕时,将聊天内容存进mysql,清空redis缓存
* @param questionId 问题id
* @param askId 提问方id
* @param answerId 解答方id
*/
void updateChatContent ( Integer questionId, Integer askId, Integer answerId );
/**
* 功能描述:查询历史聊天记录(从redis缓存中查询)
* @param questionId 问题id
* @param askId 提问方id
* @param answerId 解答方id
* @return 聊天信息的json字符串
*/
String getChatContent ( Integer questionId, Integer askId, Integer answerId);
}
ServiceImpl
@Service
public class QuestionServiceImpl implements QuestionService {
@Override
@Transactional(rollbackFor = Exception.class)
public void updateChatContent ( Integer questionId, Integer askId, Integer answerId ) {
//实例化对象,从bean中取出 (还不理解)
ChatRedisUtil chatRedisUtil = SpringContextHolder.getBean(ChatRedisUtil.class);
//将用户未收到的消息,进行签证
//将askId和answerId拼接成key,即房间号
String key = chatRedisUtil.createChatNumber(questionId, askId, answerId);
//得到缓存中的聊天信息的json字符串
String chatContent = getChatContent(questionId, askId, answerId);
//将数据存进数据库
Question oldQuestion = questionRepository.findById(questionId).orElseGet(Question::new);
ValidationUtil.isNull(oldQuestion.getId(), "Question", "id", questionId);
oldQuestion.setContext(chatContent);
//设置问题状态为已解答
oldQuestion.setStatus(2);
oldQuestion.setSolveTime(new Timestamp(System.currentTimeMillis()));
Question newQuestion = questionRepository.save(oldQuestion);
if (newQuestion.getId() > 0) {
//清除redis中缓存的数据
chatRedisUtil.deleteCacheChatMessage(key);
}
}
@Override
public String getChatContent ( Integer questionId, Integer askId, Integer answerId) {
//实例化对象,从bean中取出 (还不理解)
ChatRedisUtil chatRedisUtil = SpringContextHolder.getBean(ChatRedisUtil.class);
//将askId和answerId拼接成key,即房间号
String key = chatRedisUtil.createChatNumber(questionId, askId, answerId);
List<Object> chatList = chatRedisUtil.getCacheChatMessage(key);
//将List数据转为json字符串,返回
return JSONUtil.toJsonStr(chatList);
}
}
question的mapper省略
原理解析
聊天消息存储到redis的原理
1.引入房间号的概念,作为redis的键值
创建房间号的方法详情见ChatRedisUtil
创建已发送消息房间号(客户端websocket在线,可直接发送消息)
根据ChatVO中的questionId、fromUserId和toUserId生成聊天房间号:问题id-小号用户id-大号用户id
例如房间号“11-1-2”——问题id11,用户1,用户2(用户id小号在前,大号在后)。即用户1、用户2关于问题1所产生的聊天记录。如果不需要和问题等实例关联,则可以忽略问题id,直接拼接用户id作为房间号。
创建离线聊天房间号(客户端websocket不在线,不能直接发送消息)
根据发送方用户id和签证标识拼接房间号:发送方用户id-签证标识(签证标识为0,说明是离线消息,可自行定义)
根据已发送消息房间号和离线聊天房间号就可以实现聊天记录存储在redis。
2.聊天记录在redis的存储方式
因为我们定义的有ChatMsgVO和ChatVO,说明一条消息是一个实体类,一个房间号里面可以存很多条消息,可以使用List也可以使用Json存储,这里我们采用在redis中以List方式存取聊天信息。
关于redis对List的使用详情请参看附录中的RedisUtil
3.如何区分接收方是在线状态还是离线状态?
通过UserChannelRel
的get
方法,如果得到的Channel
为空说明是离线状态,反之则是在线状态。
/** 根据用户id查询 */
public static Channel get(Integer senderId) {
return manager.get(senderId);
}
具体使用:
Channel receiverChannel = UserChannelRel.get(toUserId);
if (receiverChannel == null) {
//接收方离线,直接缓存到离线房间号的redis
} else {
//在线,直接发送消息,并缓存redis
}
4.处理聊天消息(redis)
if (action.equals(MsgActionEnum.CHAT.type)) {
//聊天类型的消息,把聊天记录保存到redis,同时标记消息的签收状态[是否签收]
Integer toUserId = chatMsgVO.getToUserId();
Channel receiverChannel = UserChannelRel.get(toUserId);
//将消息转化为需要保存在redis中的消息
ChatVO chatVO = JSONUtil.toBean(message, ChatVO.class, true);
//消息保存至redis的键
String key = "";
//设置发送消息的时间
chatVO.setDateTime(new DateTime());
if (receiverChannel == null) {
//设置redis键为未接收房间号。拼接发送方和接收方,成为房间号;MsgSignFlagEnum.signed.type为0,代表未签收
key = chatRedisUtil.createOffLineNumber(chatVO.getToUserId());
} else {
//设置redis键为已接收房间号。拼接发送方和接收方,成为房间号;MsgSignFlagEnum.signed.type为1,代表已经签收
key = chatRedisUtil.createChatNumber(chatVO.getQuestionId(), chatVO.getFromUserId(), chatVO.getToUserId());
/* 发送消息给指定用户 */
//判断消息是否符合定义的类型
if (ChatTypeVerificationUtil.verifyChatType(chatVO.getChatMessageType())) {
//发送消息给指定用户
if (toUserId > 0 && UserChannelRel.isContainsKey(toUserId)) {
sendMessage(toUserId, JSONUtil.toJsonStr(chatVO));
}
} else {
//消息不符合定义的类型的处理
}
/* 消息保存到redis中 */
boolean isCache = chatRedisUtil.saveCacheChatMessage(key, chatVO);
//缓存失败,抛出异常
if (!isCache) {
hrow new BaseException("聊天内容缓存失败,聊天信息内容:" + message);
}
}
5.用户第一次登陆或者重连时,处理离线消息
用户第一次登陆或者重连时,根据用户id,查看redis是否有离线消息(根据离线房间号是否存在判断)。如果有,遍历房间号内所有的离线消息,将其每条消息存储进对应的在线消息房间号的redis中,然后将消息重新发送给用户,执行完后删除离线房间号的所有记录。
/* 第一次或者断线重连,查询redis此用户的离线消息,并处理 */
//查询此用户的离线消息
String unsignKey=chatRedisUtil.createOffLineNumber(fromUserId);
List<Object> offLineMessageList = chatRedisUtil.getCacheChatMessage(unsignKey);
//若有离线消息
if (offLineMessageList!=null){
//遍历当前用户的所有离线消息
for (int i=0;i<offLineMessageList.size();i++){
//离线消息json转javabean
ChatVO chatVO= (ChatVO) offLineMessageList.get(i);
//将离线消息发送给当前用户
sendMessage(fromUserId, JSONUtil.toJsonStr(chatVO));
//每条消息对应的已读消息的房间号
String signKey = chatRedisUtil.createChatNumber(chatVO.getQuestionId(), chatVO.getFromUserId(), chatVO.getToUserId());
//每条消息的签证
chatRedisUtil.saveCacheChatMessage(signKey,chatVO);
}
//签证完成后,在redis中删除离线消息
chatRedisUtil.deleteCacheChatMessage(unsignKey);
}
5.消息存储进mysql
我的思路是每当问题解决后,调用接口,手动将redis的消息存储进mysql。这块可以根据业务逻辑具体实现。
附
RedisUtil
@Component
@SuppressWarnings({"unchecked", "all"})
public class RedisUtil {
@Autowired
private RedisTemplate<Object, Object> redisTemplate;
// =============================common============================
/**
* 指定缓存失效时间
*
* @param key 键
* @param time 时间(秒)
*/
public boolean expire ( String key, long time ) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* 根据 key 获取过期时间
*
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire ( Object key ) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
/**
* 查找匹配key
*
* @param pattern key
* @return /
*/
public List<String> scan ( String pattern ) {
ScanOptions options = ScanOptions.scanOptions().match(pattern).build();
RedisConnectionFactory factory = redisTemplate.getConnectionFactory();
RedisConnection rc = Objects.requireNonNull(factory).getConnection();
Cursor<byte[]> cursor = rc.scan(options);
List<String> result = new ArrayList<>();
while (cursor.hasNext()) {
result.add(new String(cursor.next()));
}
try {
RedisConnectionUtils.releaseConnection(rc, factory);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
/**
* 分页查询 key
*
* @param patternKey key
* @param page 页码
* @param size 每页数目
* @return /
*/
public List<String> findKeysForPage ( String patternKey, int page, int size ) {
ScanOptions options = ScanOptions.scanOptions().match(patternKey).build();
RedisConnectionFactory factory = redisTemplate.getConnectionFactory();
RedisConnection rc = Objects.requireNonNull(factory).getConnection();
Cursor<byte[]> cursor = rc.scan(options);
List<String> result = new ArrayList<>(size);
int tmpIndex = 0;
int fromIndex = page * size;
int toIndex = page * size + size;
while (cursor.hasNext()) {
if (tmpIndex >= fromIndex && tmpIndex < toIndex) {
result.add(new String(cursor.next()));
tmpIndex++;
continue;
}
// 获取到满足条件的数据后,就可以退出了
if (tmpIndex >= toIndex) {
break;
}
tmpIndex++;
cursor.next();
}
try {
RedisConnectionUtils.releaseConnection(rc, factory);
} catch (Exception e) {
e.printStackTrace();
}
return result;
}
/**
* 判断key是否存在
*
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey ( String key ) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除缓存
*
* @param key 可以传一个值 或多个
*/
public void del ( String... key ) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
}
// ============================String=============================
/**
* 普通缓存获取
*
* @param key 键
* @return 值
*/
public Object get ( String key ) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
/**
* 批量获取
*
* @param keys
* @return
*/
public List<Object> multiGet ( List<String> keys ) {
Object obj = redisTemplate.opsForValue().multiGet(Collections.singleton(keys));
return null;
}
/**
* 普通缓存放入
*
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set ( String key, Object value ) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通缓存放入并设置时间
*
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set ( String key, Object value, long time ) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 普通缓存放入并设置时间
*
* @param key 键
* @param value 值
* @param time 时间
* @param timeUnit 类型
* @return true成功 false 失败
*/
public boolean set ( String key, Object value, long time, TimeUnit timeUnit ) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, timeUnit);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
// ================================Map=================================
/**
* HashGet
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return 值
*/
public Object hget ( String key, String item ) {
return redisTemplate.opsForHash().get(key, item);
}
/**
* 获取hashKey对应的所有键值
*
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget ( String key ) {
return redisTemplate.opsForHash().entries(key);
}
/**
* HashSet
*
* @param key 键
* @param map 对应多个键值
* @return true 成功 false 失败
*/
public boolean hmset ( String key, Map<String, Object> map ) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* HashSet 并设置时间
*
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hmset ( String key, Map<String, Object> map, long time ) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset ( String key, String item, Object value ) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset ( String key, String item, Object value, long time ) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/
public void hdel ( String key, Object... item ) {
redisTemplate.opsForHash().delete(key, item);
}
/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey ( String key, String item ) {
return redisTemplate.opsForHash().hasKey(key, item);
}
/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
* @return
*/
public double hincr ( String key, String item, double by ) {
return redisTemplate.opsForHash().increment(key, item, by);
}
/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
* @return
*/
public double hdecr ( String key, String item, double by ) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
// ============================set=============================
/**
* 根据key获取Set中的所有值
*
* @param key 键
* @return
*/
public Set<Object> sGet ( String key ) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey ( String key, Object value ) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet ( String key, Object... values ) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime ( String key, long time, Object... values ) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0) {
expire(key, time);
}
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 获取set缓存的长度
*
* @param key 键
* @return
*/
public long sGetSetSize ( String key ) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long setRemove ( String key, Object... values ) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
// ===============================list=================================
/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
* @return
*/
public List<Object> lGet ( String key, long start, long end ) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 获取list缓存的长度
*
* @param key 键
* @return
*/
public long lGetListSize ( String key ) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
* @return
*/
public Object lGetIndex ( String key, long index ) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet ( String key, Object value ) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet ( String key, Object value, long time ) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet ( String key, List<Object> value ) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet ( String key, List<Object> value, long time ) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return /
*/
public boolean lUpdateIndex ( String key, long index, Object value ) {
try {
redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/
public long lRemove ( String key, long count, Object value ) {
try {
return redisTemplate.opsForList().remove(key, count, value);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
//-----------------------自定义工具扩展----------------------
/**
* 功能描述:在list的右边添加元素
* 如果键不存在,则在执行推送操作之前将其创建为空列表
*
* @param key 键
* @return value 值
* @author RenShiWei
* Date: 2020/2/6 23:22
*/
public Long rightPushValue ( String key, Object value ) {
return redisTemplate.opsForList().rightPush(key, value);
}
/**
* 功能描述:在list的右边添加集合元素
* 如果键不存在,则在执行推送操作之前将其创建为空列表
*
* @param key 键
* @return value 值
* @author RenShiWei
* Date: 2020/2/6 23:22
*/
public Long rightPushList ( String key, List<Object> values ) {
return redisTemplate.opsForList().rightPushAll(key, values);
}
/**
* 指定缓存失效时间,携带失效时间的类型
*
* @param key 键
* @param time 时间(秒)
* @param unit 时间的类型 TimeUnit枚举
*/
public boolean expire ( String key, long time ,TimeUnit unit) {
try {
if (time > 0) {
redisTemplate.expire(key, time, unit);
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
}
转载:https://blog.csdn.net/qq_42937522/article/details/106349990