Loading... ## 1. 问题描述 公司原项目单一架构无法承载突增的用户量所以将架构改为分布式然后遇到的问题是WebSocket患者与医生聊天功能受到影响,如果患者与医生连接的socket节点不为同一个 那么他们不能实时收到对方的消息 最初的解决方案是通过Redis存储因为 "`WebSocket Session是不能够被序列化的,java.io.NotSerializableException`" 的原因 此方案无法实现 后来纠结了好久灵光一现 既然不能序列化 那么我就把消息体发布所有节点然后他们去查询相关的session 如果存在就send消息 不存在就直接return ## 2. 实现 #### 2.1 实体类 ###### 2.1.1 聊天消息类 - WebsocketMessagePo.java ``` package ai.pojo; import java.io.Serializable; /** * <p> * WebsocketMessagePo - 聊天消息 * </p> * * @author JiuyeXD * @version 1.0 * @since 2022/3/2 */ public class WebSocketMessagePo implements Serializable { private static final long serialVersionUID = 8291778884850726540L; private String key; private String message; public WebSocketMessagePo() { } public WebSocketMessagePo(String key, String message) { this.key = key; this.message = message; } public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } } ``` ###### 2.1.2 医生在线状态类 - OnlineControlPo ``` package ai.pojo; /** * <p> * OnlineControlPo - 医生在线 * </p> * * @author JiuyeXD * @version 1.0 * @since 2022/3/3 */ public class OnlineControlPo { public String action; public String message; public OnlineControlPo() { } public OnlineControlPo(String atcion, String message) { this.action = atcion; this.message = message; } public String getAction() { return action; } public void setAction(String action) { this.action = action; } public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } @Override public String toString() { return "OnlineControlPo{" + "action='" + action + '\'' + ", message='" + message + '\'' + '}'; } } ``` #### 2.2 Redis消息订阅配置类 - RedisReceiver.java ``` package ai.config; import ai.utils.RedisReceiver; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; /** * <p> * RedisCacheConfig - Redis消息订阅配置类 * </p> * * @author JiuyeXD * @version 1.0 * @since 2022/3/2 */ @Configuration @EnableCaching public class RedisCacheConfig { @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 可以添加多个 messageListener,配置不同的交换机 System.out.println("messageListenerAdapter: " + messageListenerAdapter); container.addMessageListener(messageListenerAdapter, new PatternTopic("channel:message")); return container; } @Bean RedisMessageListenerContainer container2(RedisConnectionFactory connectionFactory, MessageListenerAdapter onlineStatusListenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 可以添加多个 messageListener,配置不同的交换机 System.out.println("onlineStatusListenerAdapter: " + onlineStatusListenerAdapter); container.addMessageListener(onlineStatusListenerAdapter, new PatternTopic("channel:online")); return container; } /** * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法 * @param receiver * @return */ @Bean MessageListenerAdapter messageListenerAdapter(RedisReceiver receiver) { System.out.println("ws消息适配器"); return new MessageListenerAdapter(receiver, "sendMessage"); } @Bean MessageListenerAdapter onlineStatusListenerAdapter(RedisReceiver receiver) { System.out.println("在线状态适配器"); return new MessageListenerAdapter(receiver, "onlineControl"); } @Bean StringRedisTemplate redisSub(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } } ``` #### 2.3 Redis消息监听类 - RedisReceiver.java ``` package ai.utils; import ai.pojo.OnlineControlPo; import ai.pojo.WebSocketMessagePo; import ai.service.ciming.service.IWebSocketService; import com.alibaba.fastjson.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.web.socket.WebSocketSession; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.util.List; /** * <p> * RedisReceiver - Redis消息监听类 * </p> * * @author JiuyeXD * @version 1.0 * @since 2022/3/2 */ @Component public class RedisReceiver { @Autowired private IWebSocketService webSocketService; /** * @Author: JiuyeXD * @Date: 2022/3/2 18:06 * @Description: TODO 消息 * @param message */ public void sendMessage(String message) { WebSocketMessagePo messagePo = JSONObject.parseObject(message, WebSocketMessagePo.class); webSocketService.sendMessage(messagePo.getKey(), messagePo.getMessage()); } /** * @Author: JiuyeXD * @Date: 2022/3/2 18:06 * @Description: TODO 在线 * @param message */ public void onlineControl(String message) { OnlineControlPo messagePo = JSONObject.parseObject(message, OnlineControlPo.class); webSocketService.onlineControl(messagePo.getAction(),messagePo.getMessage()); } } ``` #### 2.3 WebSocket消息订阅发布业务逻辑层处理 ###### 2.3.1 抽象类 ``` package ai.service.ciming.service; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; /** * <p> * IWebSocketService - websocket业务逻辑层 * </p> * * @author JiuyeXD * @version 1.0 * @since 2022/3/2 */ public interface IWebSocketService { void messagePub(String key, String message); void onlinePub(String key, String message); void sendMessage(String key, String message); void onlineControl(String action, String message); } ``` ###### 2.3.2 实现类 ``` package ai.service.ciming.service.impl; import ai.common.constant.MagicString; import ai.pojo.OnlineControlPo; import ai.pojo.WebSocketMessagePo; import ai.service.ciming.service.IWebSocketService; import ai.util.CommUtil; import ai.util.RedisCache; import ai.websocket.SystemWebSocketHandler; import com.alibaba.fastjson.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.List; import java.util.Map; /** * <p> * WebSocketServiceImpl - websocket业务逻辑层实现 * </p> * * @author JiuyeXD * @version 1.0 * @since 2022/3/2 */ @Service public class WebSocketServiceImpl implements IWebSocketService { @Autowired StringRedisTemplate redisSub; @Override public void messagePub(String key, String message) { WebSocketMessagePo messagePo = new WebSocketMessagePo(key, message); redisSub.convertAndSend("channel:message", JSONObject.toJSONString(messagePo)); } @Override public void onlinePub(String action, String message) { OnlineControlPo messagePo = new OnlineControlPo(action, message); redisSub.convertAndSend("channel:online", JSONObject.toJSONString(messagePo)); } @Override public void sendMessage(String key, String message) { List<WebSocketSession> session = SystemWebSocketHandler.sessionMap.get(key); if(!CommUtil.checkNull(session)){ session.forEach(item-> { try { item.sendMessage(new TextMessage(message)); } catch (IOException e) { e.printStackTrace(); } }); } } @Override public void onlineControl(String action, String message) { if(MagicString.DOCTOR_LOGIN.equals(action)){ SystemWebSocketHandler.onlineInfo.add(message); }else if(MagicString.DOCTOR_LOGOUT.equals(action)){ SystemWebSocketHandler.onlineInfo.remove(message); } } } ``` #### 2.4 应用例子 ###### 2.4.1 声明sessionMap和OnlineInfo ``` public static final Map<String, List<WebSocketSession>> sessionMap = new HashMap<String, List<WebSocketSession>>(); public static Set<String> onlineInfo = new HashSet<String>(); ``` ###### 2.4.2 当socket建立连接 ``` @Override public void afterConnectionEstablished(WebSocketSession session) { try { System.out.println("链接webSocket成功" + session.toString() + parseUri(session.getUri())); /** * 连接成功,保存会话,用fromId标识,fromId */ if (CommUtil.checkNull(sessionMap.get(getFromSessionId(session.getUri())))){ sessionMap.put(getFromSessionId(session.getUri()), new ArrayList<WebSocketSession>(){ { add(session); } }); } else { sessionMap.get(getFromSessionId(session.getUri())).add(session); } webSocketService.onlinePub(MagicString.DOCTOR_LOGIN, getFromId(session.getUri())); System.out.println("连接成功"); //session.sendMessage(new TextMessage(MessageUtil.wrapMessage("连接成功", MessageType.TEXT))); } catch (Exception e) { e.printStackTrace(); } } ``` ###### 2.4.3 发送消息 ``` @Override public void handleMessage(WebSocketSession wss, WebSocketMessage<?> wsm) throws Exception { try { System.out.println(wss.getId() + " " + wsm.getPayload() + " " + wss.toString() + wsm.toString()); JSONObject json = JSONObject.parseObject(wsm.getPayload().toString()); String contentId = snowFlakeUtil.getUUID(); String customerID = json.get("customerID").toString(); String doctorID = json.get("doctorID").toString(); String dcType = json.get("dcType").toString(); String content = json.get("content").toString(); String messageType = json.get("messageType").toString(); String msgTimestamp = ""; if(json.containsKey("msgTimestamp")){ msgTimestamp = json.get("msgTimestamp").toString(); } MessageType msgType = MessageType.getMessageType(messageType); if(msgType==null){ messageType = MessageType.TEXT.getMessageType(); } /* 下面这行代码存储聊天记录 */ communicationService.addChatContent(contentId, customerID, doctorID, dcType, content, messageType, msgTimestamp); if (onlineInfo.contains(getToId(wss.getUri()))) { try { String textMessage = MessageUtil.wrapMessage(contentId, content, msgType == null ? MessageType.TEXT : msgType); webSocketService.messagePub(getToSessionId(wss.getUri()),textMessage); } catch (Exception e){ e.printStackTrace(); } /* 这行代码是更新对方未读消息的标志 */ communicationService.updateReadFlag(customerID, doctorID, MagicString.READ, MagicString.READ); } } catch (Exception e){ e.printStackTrace(); } } ``` Last modification:March 8, 2022 © Allow specification reprint Support Appreciate the author AliPayWeChat Like 0 感谢大佬投喂 啾咪~