飞道的博客

8. 好客租房-WebSocket与即时通讯系统[项目必需]

481人阅读  评论(0)

本章节主要是学习WebSocket, 目标快速入门, 能够回答以下问题:

  1. WebSocket和HTTP的区别.

  1. WebSocket使用的是全双工通讯协议, 与其他类似协议有啥区别?

  1. WebSocket中的常用注解有哪些?

通过本章节的学习, 应该可以回答上来这几个问题.

8.1 WebSocket概念快速理解

WebSocket 是HTML5一种新的协议。 主要应用范围为实时通讯相关领域, 比如聊天软件, 实况更新和社交订阅等.

WebSocket 实现了浏览器与服务器全双工通信.

全双工通信就是我们现在的电话, 双方都可以讲话.
半双工通信就是指一个时间段内只有一个动作, 就是以前的对讲机, 同时只能有一个人说话, 说完需要加一个"over", 以便让别人说话.
单工通信更常见, 就是遥控 你的电视遥控只能发,不能收.

8.1.1 为什么HTML5提出WebSocket

思考这样一类问题, 现在有这样一类简单的需求需要你实现:

  • 在网页不刷新的情况下, 搭建一个网页版聊天室;

  • 网页不刷新, 并刷新购物车内的商品个数

  • 网页不刷新, 实时更新朋友的位置.

为什么一定要强调网页不刷新呢? 因为如果网页刷新,你就可以使用HTTP请求那套, 你发起一个请求, 服务器给你一个响应了.但是网页刷新在实时性要求较高的业务中根本没办法满足需求.

所以在HTML5之前. 通常的解决方案如下:

采用轮询的方式。即:通过js不断的请求服务器,查看是否有新数据,如果有,就获取到新数据

这种方案有很明显的缺点: js发出的大部分请求都没有获取到数据更新, 从而对双方机器造成了严重的资源浪费.

所以, 为了提出一个更好的方案, HTML5提出了WebSocket技术.

8.1.2 http与websocket的区别

http协议是短连接,每次请求之后,都会关闭连接,下次重新请求数据,需要再次打开链接.

WebSocket协议是一种长链接,只需要通过一次请求来初始化链接,然后所有的请求和响应都是通过这个TCP链接

进行通讯。

不理解的话,我举个例子.

HTTP就是发微信语音, 每次都需要先找到联系人, 然后按一下那个录音和发送.

WebSocket则与语音通话, 之后所有的对话都通过语音直接进行通讯. (双方只需要控制好发送的内容, 对流量也不会造成太大浪费)

8.2 WebSocket 的Java版demo快速实现

多说无益, 我们开始实战吧.

8.2.1 新建itcast-websocket工程(Maven工程)

首先先引入依赖


   
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation= "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0 </modelVersion>
  6. <groupId>org.example </groupId>
  7. <artifactId>itcast-websocket2 </artifactId>
  8. <version>1.0-SNAPSHOT </version>
  9. <!-- 我们用的maven的tomcat插件运行, 所以在maven中一定要配置打包方式为为war包 -->
  10. <packaging>war </packaging>
  11. <dependencies>
  12. <!-- websocket所需依赖 -->
  13. <dependency>
  14. <groupId>javax </groupId>
  15. <artifactId>javaee-api </artifactId>
  16. <version>7.0 </version>
  17. <scope>provided </scope>
  18. </dependency>
  19. </dependencies>
  20. <build>
  21. <plugins>
  22. <!-- java编译插件 -->
  23. <plugin>
  24. <groupId>org.apache.maven.plugins </groupId>
  25. <artifactId>maven-compiler-plugin </artifactId>
  26. <version>3.2 </version>
  27. <configuration>
  28. <source>1.8 </source>
  29. <target>1.8 </target>
  30. <encoding>UTF-8 </encoding>
  31. </configuration>
  32. </plugin>
  33. <!-- 配置Tomcat插件 -->
  34. <plugin>
  35. <groupId>org.apache.tomcat.maven </groupId>
  36. <artifactId>tomcat7-maven-plugin </artifactId>
  37. <version>2.2 </version>
  38. <configuration>
  39. <port>8082 </port>
  40. <path>/ </path>
  41. </configuration>
  42. </plugin>
  43. </plugins>
  44. </build>
  45. </project>

然后新建一个demo类, 话都在注释里:


   
  1. package cn.itcast.websocket;
  2. import javax.websocket.*;
  3. import javax.websocket.server.PathParam;
  4. import javax.websocket.server.ServerEndpoint;
  5. import java.io.IOException;
  6. /**
  7. * Websocket快速入门demo, 阅读代码前最好理解'WebSocket生命周期概念'
  8. * 连接->正常收发信息/异常消息->正常收发信息/异常消息->.....->正常收发信息/异常消息->断开连接
  9. *
  10. * @author 过道
  11. */
  12. // @ServerEndpoint 申明这是一个websocket服务, 可以简单类比为Controller注解
  13. @ServerEndpoint("/websocket/{uid}")
  14. public class MyWebsocket {
  15. // @OnOpen 该注解标识的方法将在建立连接后执行
  16. // @OnOpen 标识的方法可以接收到session对象,就是客户端与服务端建立的长连接通道
  17. // @PathParam, 从路径中的{}中读取内容, 与SpringMVC一致.
  18. @OnOpen
  19. public void onOpen (Session session, @PathParam("uid") String uid) throws
  20. IOException {
  21. // 连接成功, 使用 session 的 api 发送文字
  22. session.getBasicRemote().sendText(uid + ",你好,欢迎连接WebSocket!");
  23. }
  24. // 关闭连接时, 会触发对这个注解@OnClose标识的方法的调用
  25. @OnClose
  26. public void onClose () {
  27. System.out.println( this + "关闭连接");
  28. }
  29. /**
  30. * 该方法用于接收客户端发来的消息
  31. *
  32. * @param message 发来的消息数据
  33. * @param session 会话对象(也是通道)
  34. */
  35. @OnMessage
  36. public void onMessage (String message, Session session) throws IOException {
  37. System.out.println( "接收到消息:" + message);
  38. session.getBasicRemote().sendText( "消息已收到.");
  39. }
  40. }

写完之后, 我们使用maven的tomcat插件运行下项目:

8.2.2 进行测试

打开网页: http://www.easyswoole.com/wstool.html

然后url改为

ws://localhost:8082/websocket/12

如图所示, 点击开始连接, 连接成功后发送就可以了.

8.3 SpringBoot整合WebSocket

为了避免麻烦, 我们直接在8.2的demo项目上改进

pom.xml修改为下:


   
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation= "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0 </modelVersion>
  6. <!--spring boot的支持-->
  7. <parent>
  8. <groupId>org.springframework.boot </groupId>
  9. <artifactId>spring-boot-starter-parent </artifactId>
  10. <version>2.1.0.RELEASE </version>
  11. </parent>
  12. <groupId>cn.itcast.websocket </groupId>
  13. <artifactId>itcast-websocket </artifactId>
  14. <version>1.0-SNAPSHOT </version>
  15. <!-- 这里不用管, 我们之后会用boot的main方式启动 -->
  16. <packaging>war </packaging>
  17. <dependencies>
  18. <!--<dependency>-->
  19. <!--<groupId>javax</groupId>-->
  20. <!--<artifactId>javaee-api</artifactId>-->
  21. <!--<version>7.0</version>-->
  22. <!--<scope>provided</scope>-->
  23. <!--</dependency>-->
  24. <!-- 修改为SpringBoot的webstocket -->
  25. <dependency>
  26. <groupId>org.springframework.boot </groupId>
  27. <artifactId>spring-boot-starter-websocket </artifactId>
  28. </dependency>
  29. </dependencies>
  30. <build>
  31. <plugins>
  32. <!-- java编译插件 -->
  33. <plugin>
  34. <groupId>org.apache.maven.plugins </groupId>
  35. <artifactId>maven-compiler-plugin </artifactId>
  36. <version>3.2 </version>
  37. <configuration>
  38. <source>1.8 </source>
  39. <target>1.8 </target>
  40. <encoding>UTF-8 </encoding>
  41. </configuration>
  42. </plugin>
  43. <!-- 配置Tomcat插件 -->
  44. <plugin>
  45. <groupId>org.apache.tomcat.maven </groupId>
  46. <artifactId>tomcat7-maven-plugin </artifactId>
  47. <version>2.2 </version>
  48. <configuration>
  49. <port>8082 </port>
  50. <path>/ </path>
  51. </configuration>
  52. </plugin>
  53. </plugins>
  54. </build>
  55. </project>

8.3.1 快速接入SpringBoot

在Spring中,处理消息的具体业务逻辑需要实现WebSocketHandler接口。


   
  1. package cn.itcast.websocket.spring;
  2. import org.springframework.stereotype.Component;
  3. import org.springframework.web.socket.CloseStatus;
  4. import org.springframework.web.socket.TextMessage;
  5. import org.springframework.web.socket.WebSocketSession;
  6. import org.springframework.web.socket.handler.TextWebSocketHandler;
  7. import java.io.IOException;
  8. /**
  9. * 在Spring中, 我们需要实现WebSocketHandler接口, 并且需要注册为组件,交给IOC池子维护.
  10. */
  11. @Component
  12. public class MyHandler extends TextWebSocketHandler {
  13. /**
  14. * 这里就是我们demo版本的 @onMessage 哦,每次收到信息都会走这个方法.
  15. */
  16. @Override
  17. public void handleTextMessage (WebSocketSession session, TextMessage message)
  18. throws IOException {
  19. System.out.println( "获取到消息 >> " + message.getPayload());
  20. // 向客户端发送消息
  21. session.sendMessage( new TextMessage( "消息已收到"));
  22. // 如果收到了 '10', 那么给另一方发送0,1,2,3,... 9
  23. if(message.getPayload().equals( "10")){
  24. for ( int i = 0; i < 10; i++) {
  25. session.sendMessage( new TextMessage( "消息 -> " + i));
  26. try {
  27. Thread.sleep( 100);
  28. } catch (InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. }
  33. }
  34. /**
  35. * 这个就是我们demo版本中的 @OnOpen 了, 建立连接后会调用这个.
  36. */
  37. @Override
  38. public void afterConnectionEstablished (WebSocketSession session) throws
  39. Exception {
  40. session.sendMessage( new TextMessage( " 你好!欢迎连接到ws服务"));
  41. }
  42. /**
  43. * 这就是demo版本的 @OnClose 注解, 关闭连接后会触发这个.
  44. */
  45. @Override
  46. public void afterConnectionClosed (WebSocketSession session, CloseStatus status)
  47. throws Exception {
  48. System.out.println( "断开连接!");
  49. }
  50. }

编写配置类


   
  1. package cn.itcast.websocket.spring;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.web.socket.config.annotation.EnableWebSocket;
  5. import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
  6. import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
  7. /**
  8. * WebSocket相关配置类
  9. */
  10. @Configuration
  11. @EnableWebSocket
  12. public class WebSocketConfig implements WebSocketConfigurer {
  13. // 从IOC池子中取出我们自定义的socket处理器
  14. @Autowired
  15. private MyHandler myHandler;
  16. @Override
  17. public void registerWebSocketHandlers (WebSocketHandlerRegistry registry) {
  18. // 将webSocket收到的所有路径中带有 '/ws'的交给myHandler处理
  19. registry.addHandler(myHandler, "/ws").setAllowedOrigins( "*");
  20. }
  21. }

编写启动类


   
  1. package cn.itcast.websocket;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class MyApplication {
  6. public static void main (String[] args) {
  7. SpringApplication.run(MyApplication.class, args);
  8. }
  9. }

然后运行后打开测试网站http://www.easyswoole.com/wstool.html

8.3.2 websocket拦截器

在Spring中提供了websocket拦截器,可以在建立连接之前写些业务逻辑,比如校验登录等。

新建一个 MyHandshakeInterceptor 拦截器


   
  1. package cn.itcast.websocket.spring;
  2. import org.springframework.http.server.ServerHttpRequest;
  3. import org.springframework.http.server.ServerHttpResponse;
  4. import org.springframework.stereotype.Component;
  5. import org.springframework.web.socket.WebSocketHandler;
  6. import org.springframework.web.socket.server.HandshakeInterceptor;
  7. import java.util.Map;
  8. @Component
  9. public class MyHandshakeInterceptor implements HandshakeInterceptor {
  10. /**
  11. * 握手之前,若返回false,则不建立链接
  12. */
  13. @Override
  14. public boolean beforeHandshake (ServerHttpRequest request, ServerHttpResponse
  15. response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws
  16. Exception {
  17. //将用户id放入socket处理器的会话(WebSocketSession)中
  18. attributes.put( "uid", 1001);
  19. System.out.println( "开始握手。。。。。。。");
  20. return true;
  21. }
  22. @Override
  23. public void afterHandshake (ServerHttpRequest request, ServerHttpResponse
  24. response, WebSocketHandler wsHandler, Exception exception) {
  25. System.out.println( "握手成功啦。。。。。。");
  26. }
  27. }

修改WebSocketConfig类注册拦截器


   
  1. package cn.itcast.websocket.spring;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.web.socket.config.annotation.EnableWebSocket;
  5. import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
  6. import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
  7. /**
  8. * WebSocket相关配置类
  9. */
  10. @Configuration
  11. @EnableWebSocket
  12. public class WebSocketConfig implements WebSocketConfigurer {
  13. // 从IOC池子中取出我们自定义的socket处理器
  14. @Autowired
  15. private MyHandler myHandler;
  16. @Autowired
  17. private MyHandshakeInterceptor myHandshakeInterceptor;
  18. @Override
  19. public void registerWebSocketHandlers (WebSocketHandlerRegistry registry) {
  20. registry.addHandler( this.myHandler, "/ws")
  21. .setAllowedOrigins( "*")
  22. // 追加注册一个拦截器的代码
  23. .addInterceptors( this.myHandshakeInterceptor);
  24. }
  25. }

然后为了证明我们再在MyHandler的afterConnectionEstablished 修改下, 加上对方的uid


   
  1. /**
  2. * 这个就是我们demo版本中的 @OnOpen 了, 建立连接后会调用这个.
  3. */
  4. @Override
  5. public void afterConnectionEstablished (WebSocketSession session) throws
  6. Exception {
  7. // 获取uid
  8. Integer uid = (Integer) session.getAttributes().get( "uid");
  9. session.sendMessage( new TextMessage(uid + ", 你好!欢迎连接到ws服务"));
  10. }

再次启动并测试, 发现ok

8.4 使用WebSocket搭建即时通讯系统

在这里我们不连接数据库, 只做简单的缓存, 之后在其他一章节中连接数据库.

8.4.1 引入依赖

pom.xml文件如下.


   
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi= "http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation= "http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0 </modelVersion>
  6. <groupId>org.example </groupId>
  7. <artifactId>itcast-haoke-im-webstocket </artifactId>
  8. <version>1.0-SNAPSHOT </version>
  9. <parent>
  10. <groupId>org.springframework.boot </groupId>
  11. <artifactId>spring-boot-starter-parent </artifactId>
  12. <version>2.1.0.RELEASE </version>
  13. </parent>
  14. <dependencies>
  15. <dependency>
  16. <groupId>org.springframework.boot </groupId>
  17. <artifactId>spring-boot-starter-web </artifactId>
  18. </dependency>
  19. <dependency>
  20. <groupId>org.springframework.boot </groupId>
  21. <artifactId>spring-boot-starter-websocket </artifactId>
  22. </dependency>
  23. <dependency>
  24. <groupId>org.springframework.boot </groupId>
  25. <artifactId>spring-boot-starter-test </artifactId>
  26. <scope>test </scope>
  27. </dependency>
  28. <dependency>
  29. <groupId>junit </groupId>
  30. <artifactId>junit </artifactId>
  31. <version>4.12 </version>
  32. <scope>test </scope>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.projectlombok </groupId>
  36. <artifactId>lombok </artifactId>
  37. <version>1.18.4 </version>
  38. </dependency>
  39. <dependency>
  40. <groupId>org.apache.commons </groupId>
  41. <artifactId>commons-lang3 </artifactId>
  42. </dependency>
  43. </dependencies>
  44. <build>
  45. <plugins>
  46. <!-- java编译插件 -->
  47. <plugin>
  48. <groupId>org.apache.maven.plugins </groupId>
  49. <artifactId>maven-compiler-plugin </artifactId>
  50. <version>3.2 </version>
  51. <configuration>
  52. <source>1.8 </source>
  53. <target>1.8 </target>
  54. <encoding>UTF-8 </encoding>
  55. </configuration>
  56. </plugin>
  57. </plugins>
  58. </build>
  59. </project>

项目结构图如下所示 :

我们先只实现WebSocket相关内容

8.4.2 搭建pojo和对应的DAO层


   
  1. package cn.itcast.haoke.im.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Builder;
  4. import lombok.Data;
  5. import lombok.NoArgsConstructor;
  6. import java.util.Date;
  7. @Data
  8. @AllArgsConstructor
  9. @NoArgsConstructor
  10. @Builder
  11. public class Message {
  12. private long id;
  13. // 消息体暂不支持复杂消息.
  14. private String msg;
  15. /**
  16. * 消息状态,1-未读,2-已读
  17. */
  18. private Integer status;
  19. // 发送的时间和已读的时间
  20. private Date sendDate;
  21. private Date readDate;
  22. // 发送方和接收方
  23. private User from;
  24. private User to;
  25. }

用到的User类代码如下:


   
  1. package cn.itcast.haoke.im.pojo;
  2. import lombok.AllArgsConstructor;
  3. import lombok.Builder;
  4. import lombok.Data;
  5. import lombok.NoArgsConstructor;
  6. @Data
  7. @AllArgsConstructor
  8. @NoArgsConstructor
  9. @Builder
  10. public class User {
  11. private Long id;
  12. private String username;
  13. }

MessageDao如下, 我这里先用HashMap模拟数据库


   
  1. package cn.itcast.haoke.im.dao;
  2. import cn.itcast.haoke.im.pojo.Message;
  3. import java.util.List;
  4. public interface MessageDAO {
  5. /**
  6. * 查询点对点聊天记录
  7. */
  8. List<Message> findListByFromAndTo (Long fromId, Long toId, Integer page, Integer
  9. rows);
  10. /**
  11. * 根据id查询数据
  12. */
  13. Message findMessageById (Long id);
  14. /**
  15. * 新增消息数据*
  16. * * @param message
  17. * * @return
  18. */
  19. Message saveMessage (Message message);
  20. void updateMessageState (long id, int i);
  21. }

具体实现如下:


   
  1. package cn.itcast.haoke.im.dao.impl;
  2. import cn.itcast.haoke.im.dao.MessageDAO;
  3. import cn.itcast.haoke.im.pojo.Message;
  4. import org.springframework.stereotype.Component;
  5. import java.util.*;
  6. import java.util.concurrent.atomic.AtomicLong;
  7. /**
  8. * 我们在内存中简单实现一个查询类, 之后使用数据库替换.
  9. */
  10. @Component
  11. public class MessageDAOImpl implements MessageDAO {
  12. private static AtomicLong ID_BUILDER = new AtomicLong( 1L);
  13. Map<Long, Message> db = new HashMap<>();
  14. @Override
  15. public List<Message> findListByFromAndTo (Long fromId, Long toId, Integer page, Integer rows) {
  16. // 遍历所有信息, 找到满足的信息
  17. Collection<Message> values = db.values();
  18. // 先查出所有内容
  19. List<Message> messageList = new ArrayList<>();
  20. for (Message value : values) {
  21. if (value.getFrom() != null && Objects.equals(value.getFrom().getId(), fromId)) {
  22. if (value.getTo() != null && Objects.equals(value.getTo().getId(), toId)) {
  23. messageList.add(value);
  24. }
  25. }
  26. }
  27. // 处理分页
  28. return messageList.subList(page * rows, rows);
  29. }
  30. @Override
  31. public Message findMessageById (Long id) {
  32. return db.get(id);
  33. }
  34. @Override
  35. public Message saveMessage (Message message) {
  36. // 按照调用顺序生成一个id
  37. message.setId(ID_BUILDER.getAndIncrement());
  38. db.put(message.getId(), message);
  39. return message;
  40. }
  41. @Override
  42. public void updateMessageState (long id, int i) {
  43. Message messageById = findMessageById(id);
  44. if (messageById != null) {
  45. // 因为是在map中操作, 所以直接修改状态就可以了.
  46. messageById.setStatus(i);
  47. }
  48. }
  49. }

因为我们用的是假数据, 所以再写一些默认的假数据


   
  1. package cn.itcast.haoke.im.pojo;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. public class UserData {
  5. public static final Map<Long, User> USER_MAP = new HashMap<>();
  6. static {
  7. USER_MAP.put( 1001L, User.builder().id( 1001L).username( "zhangsan").build());
  8. USER_MAP.put( 1002L, User.builder().id( 1002L).username( "lisi").build());
  9. USER_MAP.put( 1003L, User.builder().id( 1003L).username( "wangwu").build());
  10. USER_MAP.put( 1004L, User.builder().id( 1004L).username( "zhaoliu").build());
  11. USER_MAP.put( 1005L, User.builder().id( 1005L).username( "sunqi").build());
  12. }
  13. }

8.4.3 接入WebSocket

首先, 我们按照之前的demo版流程, 编写一个Handler来处理WebSocket的几个生命周期.


   
  1. package cn.itcast.haoke.im.websocket;
  2. import cn.itcast.haoke.im.dao.MessageDAO;
  3. import cn.itcast.haoke.im.pojo.Message;
  4. import cn.itcast.haoke.im.pojo.UserData;
  5. import com.fasterxml.jackson.databind.JsonNode;
  6. import com.fasterxml.jackson.databind.ObjectMapper;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.stereotype.Component;
  9. import org.springframework.web.socket.TextMessage;
  10. import org.springframework.web.socket.WebSocketSession;
  11. import org.springframework.web.socket.handler.TextWebSocketHandler;
  12. import java.util.HashMap;
  13. import java.util.Map;
  14. /**
  15. * webSocket 消息处理器
  16. *
  17. * @author 过道
  18. */
  19. @Component
  20. public class MessageHandler extends TextWebSocketHandler {
  21. @Autowired
  22. private MessageDAO messageDAO;
  23. private static final ObjectMapper MAPPER = new ObjectMapper();
  24. // 记录所有在线的终端, 并配置唯一标识.
  25. private static final Map<Long, WebSocketSession> SESSIONS = new HashMap<>();
  26. @Override
  27. public void afterConnectionEstablished (WebSocketSession session) {
  28. // 将当前用户的session放置到map中,后面会使用相应的session通信
  29. Long uid = (Long) session.getAttributes().get( "uid");
  30. SESSIONS.put(uid, session);
  31. }
  32. @Override
  33. protected void handleTextMessage (WebSocketSession session, TextMessage
  34. textMessage) throws Exception {
  35. // 解析消息中的发送方, 接收方, 消息内容.
  36. Long uid = (Long) session.getAttributes().get( "uid");
  37. JsonNode jsonNode = MAPPER.readTree(textMessage.getPayload());
  38. Long toId = jsonNode.get( "toId").asLong();
  39. String msg = jsonNode.get( "msg").asText();
  40. Message message = Message.builder()
  41. // 假装发送用户和接受用户都是从数据库中查出来的
  42. .from(UserData.USER_MAP.get(uid))
  43. .to(UserData.USER_MAP.get(toId))
  44. .msg(msg)
  45. .build();
  46. // 存入数据库
  47. message = this.messageDAO.saveMessage(message);
  48. // 判断to用户是否在线
  49. WebSocketSession toSession = SESSIONS.get(toId);
  50. if (toSession != null && toSession.isOpen()) {
  51. // 在线且可收消息的话, 实时发送给接收方.
  52. //TODO 具体格式需要和前端对接
  53. toSession.sendMessage( new TextMessage(MAPPER.writeValueAsString(message)));
  54. // 更新消息状态为已读
  55. this.messageDAO.updateMessageState(message.getId(), 2);
  56. }
  57. }
  58. }

为了简单起见,我们直接使用url来传当前用户的id, 格式如下

ws://{服务器url}:{服务器端口}/ws/{当前用户id}
示例如下:
ws://localhost:8080/ws/1002
ws://localhost:8080/ws/1001

约定好id传输格式,我们需要在每个连接建立时, 将uid放入到attributes中, 所以需要用到拦截器


   
  1. package cn.itcast.haoke.im.websocket;
  2. import org.apache.commons.lang3.StringUtils;
  3. import org.springframework.http.server.ServerHttpRequest;
  4. import org.springframework.http.server.ServerHttpResponse;
  5. import org.springframework.stereotype.Component;
  6. import org.springframework.web.socket.WebSocketHandler;
  7. import org.springframework.web.socket.server.HandshakeInterceptor;
  8. import java.util.Map;
  9. @Component
  10. public class MessageHandshakeInterceptor implements HandshakeInterceptor {
  11. /**
  12. * 解析路径中的uid, 并放入 attributes 中, 以便之后使用
  13. */
  14. @Override
  15. public boolean beforeHandshake (ServerHttpRequest request, ServerHttpResponse
  16. response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws
  17. Exception {
  18. String path = request.getURI().getPath();
  19. String[] ss = StringUtils.split(path, '/');
  20. if (ss.length != 2) {
  21. return false;
  22. }
  23. if (!StringUtils.isNumeric(ss[ 1])) {
  24. return false;
  25. }
  26. attributes.put( "uid", Long.valueOf(ss[ 1]));
  27. return true;
  28. }
  29. @Override
  30. public void afterHandshake (ServerHttpRequest request, ServerHttpResponse
  31. response, WebSocketHandler wsHandler, Exception exception) {
  32. }
  33. }

现在, 只需要我们将我们自定义的拦截器和Handler配置到Spring中, 让Spring将接收到的请求转发给我们就ok了


   
  1. package cn.itcast.haoke.im.websocket;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.context.annotation.Configuration;
  4. import org.springframework.web.socket.config.annotation.EnableWebSocket;
  5. import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
  6. import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
  7. @Configuration
  8. @EnableWebSocket
  9. public class WebSocketConfig implements WebSocketConfigurer {
  10. @Autowired
  11. private MessageHandler messageHandler;
  12. @Autowired
  13. private MessageHandshakeInterceptor messageHandshakeInterceptor;
  14. @Override
  15. public void registerWebSocketHandlers (WebSocketHandlerRegistry registry) {
  16. registry.addHandler( this.messageHandler, "/ws/{uid}")
  17. .setAllowedOrigins( "*")
  18. .addInterceptors( this.messageHandshakeInterceptor);
  19. }
  20. }

8.4.4 编写启动类并测试


   
  1. package cn.itcast.haoke.im;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class ImApplication {
  6. public static void main (String[] args) {
  7. SpringApplication.run(ImApplication.class, args);
  8. }
  9. }

启动项目, 启动成功;

这次还是打开在线测试工具 http://www.easyswoole.com/wstool.html

注意要打开两份

一份的url : ws://localhost:8080/ws/1001
另一份的url: ws://localhost:8080/ws/1002

点击连接, 都连接成功.

我们打开url是 1001的页面, 发送内容

{
"toId":1002,
"msg" : "你好, 1002"
}

然后看一眼url为1002的页面, 确实收到了内容. 如此测试就可以了.


转载:https://blog.csdn.net/m0_37961948/article/details/128761995
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场