小言_互联网的博客

【开发经验】springboot整合websocket实现群聊

279人阅读  评论(0)

实现思路

发送者向服务器发送大家早上好。其它客户端可以收到对应消息。

项目展示

通过springboot引入websocket,实现群聊,通过在线websocket测试进行展示。

核心代码

pom引入jar

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>

使用springboot 2.3.10.RELEASE版本;java8

WebSocketConfig配置类

@Configuration
public class WebSocketConfig
{
   
   @Bean
   public ServerEndpointExporter serverEndpointExporter() 	{
   
      return new ServerEndpointExporter();
   }
}

websocket有4个注解,解释如下

  • @OnOpen 有新连接
  • @OnMessage 有新消息时
  • @OnClose 关闭连接
  • @OnError 连接异常

对于这个4个注解,配置在方法上即可。

WebsocketServerEndpoint接收类

@Slf4j
@Controller
@ServerEndpoint ("/")
public class WebsocketServerEndpoint
{
   
   @OnOpen
   public void onOpen(Session session, EndpointConfig config) {
   
      log.info("[onOpen][session({}) 接入]", session);
      WebSocketUtil.broadcast (session.getId ()+"上线了");
      WebSocketUtil.addSession (session);
   }
   @OnMessage
   public void onMessage(Session session, String message) {
   
      WebSocketUtil.broadcast (message);
      log.info("[onOpen][session({}) 接收到一条消息({})]", session, message); // 生产环境下,请设置成 debug 级别
   }
   @OnClose
   public void onClose(Session session, CloseReason closeReason) {
   
      WebSocketUtil.removeSession (session);
      WebSocketUtil.broadcast (session.getId ()+"下线了");
      log.info("[onClose][session({}) 连接关闭。关闭原因是({})}]", session, closeReason);
   }
   @OnError
   public void onError(Session session, Throwable throwable) {
   
      log.info("[onClose][session({}) 发生异常]", session, throwable);
   }
}

WebSocketUtil工具类。

@Slf4j
public class WebSocketUtil
{
   


    // ========== 会话相关 ==========

    /**
     * Session 与用户的映射
     */
    private static final Map<String, Session> SESSION_USER_MAP = new ConcurrentHashMap<> ();

    /**
     * 添加 Session 。在这个方法中,会添加用户和 Session 之间的映射
     *
     * @param session Session
     */
    public static void addSession(Session session) {
   
        // 更新 SESSION_USER_MAP
        SESSION_USER_MAP.put(session.getId (), session);
    }

    /**
     * 移除 Session 。
     *
     * @param session Session
     */
    public static void removeSession(Session session) {
   
        // 从 SESSION_USER_MAP 中移除
          SESSION_USER_MAP.remove(session.getId ());

    }

    // ========== 消息相关 ==========

    /**
     * 广播发送消息给所有在线用户
     *
     * @param message 消息体
     * @param <T> 消息类型
     */
    public static <T extends Message> void broadcast(String message) {
   
        // 创建消息
//        String messageText = buildTextMessage(type, message);
        // 遍历 SESSION_USER_MAP ,进行逐个发送
        for (String sessionId : SESSION_USER_MAP.keySet()) {
   
            sendTextMessage(SESSION_USER_MAP.get (sessionId), message);
        }
    }

    /**
     * 发送消息给单个用户的 Session
     *
     * @param session Session
     * @param type 消息类型
     * @param message 消息体
     * @param <T> 消息类型
     */
    public static <T extends Message> void send(Session session, String type, T message) {
   
        // 创建消息
        String messageText = buildTextMessage(type, message);
        // 遍历给单个 Session ,进行逐个发送
        sendTextMessage(session, messageText);
    }


    /**
     * 构建完整的消息
     *
     * @param type 消息类型
     * @param message 消息体
     * @param <T> 消息类型
     * @return 消息
     */
    private static <T extends Message> String buildTextMessage(String type, T message) {
   
        JSONObject messageObject = new JSONObject();
        messageObject.put("type", type);
        messageObject.put("body", message);
        return messageObject.toString();
    }

    /**
     * 真正发送消息
     *
     * @param session Session
     * @param messageText 消息
     */
    private static void sendTextMessage(Session session, String messageText) {
   
        if (session == null) {
   
            log.error("[sendTextMessage][session 为 null]");
            return;
        }
        RemoteEndpoint.Basic basic = session.getBasicRemote();
        if (basic == null) {
   
            log.error("[sendTextMessage][session 的  为 null]");
            return;
        }
        try {
   
            basic.sendText(messageText);
        } catch (IOException e) {
   
            log.error("[sendTextMessage][session({}) 发送消息{}) 发生异常",
                    session, messageText, e);
        }
    }

}

在线websocket调试输入ws://localhost:8080/即可。


转载:https://blog.csdn.net/qq_30285985/article/details/117357241
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场