From 546d1b4c44002a9a3d79a693e1d17f07bf701034 Mon Sep 17 00:00:00 2001 From: xupei Date: Mon, 11 Nov 2024 15:03:21 +0800 Subject: [PATCH] =?UTF-8?q?ws=20=E8=AE=BE=E7=BD=AEsession2=E5=88=86?= =?UTF-8?q?=E9=92=9F=E8=B6=85=E6=97=B6=EF=BC=8C=E4=BB=A5=E5=8F=8A=E5=BF=83?= =?UTF-8?q?=E8=B7=B3=E5=9B=9E=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../websocket/NotificationConnection.java | 45 ++++++++++++------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/ai/da/common/websocket/NotificationConnection.java b/src/main/java/com/ai/da/common/websocket/NotificationConnection.java index 55e99ef3..5f2c244d 100644 --- a/src/main/java/com/ai/da/common/websocket/NotificationConnection.java +++ b/src/main/java/com/ai/da/common/websocket/NotificationConnection.java @@ -16,11 +16,14 @@ import java.util.concurrent.ConcurrentHashMap; @Slf4j public class NotificationConnection { + //连接超时 + public static final long MAX_TIME_OUT = 2 * 60 * 1000; + static Map sessionMap = new ConcurrentHashMap<>(); private Session session; // 这里用ConcurrentHashMap 因为他是一个线程安全的Map - private static ConcurrentHashMap websockets = new ConcurrentHashMap<>(); + private static ConcurrentHashMap websockets = new ConcurrentHashMap<>(); //连接建立时执行的操作 /*@OnOpen public void onOpen(Session session){ @@ -31,7 +34,8 @@ public class NotificationConnection { @OnOpen public void onOpen(Session session, @PathParam("id") String id) { // 接收到前端传来的用户ID this.session = session; - websockets.put(Long.parseLong(id), this); //将ID作为key,当前的对象作为Value + this.session.setMaxIdleTimeout(MAX_TIME_OUT); + websockets.put(this, Long.parseLong(id)); //将ID作为key,当前的对象作为Value log.info("【建立连接】 用户为:{}", this.session); log.info("【建立连接】 用户Id为:{}", id); log.info("【建立连接】 总数为:{}", websockets.size()); @@ -39,27 +43,34 @@ public class NotificationConnection { @OnClose public void onClose(CloseReason reason) { - websockets.remove(this); // 将当前的对象从集合中删除 - log.info("【连接断开】 用户为:{}, 原因为{}", this.session, reason); + if (websockets.containsKey(this)) { + websockets.remove(this); // 将当前的对象从集合中删除 + log.info("【连接断开】 用户为:{}, sessionId: {}, 原因为{}", websockets.get(this), this.session.getId(), reason); + } + // log.info("【连接断开】 总数为:{}", websockets.size()); } /** * 错误时调用 - * @param session 连接 * @param throwable 异常 */ @OnError - public void onError(Session session, Throwable throwable) { - log.info("【连接异常】[session({}) 发生异常]", session, throwable); + public void onError(Throwable throwable) { + log.info("【连接异常】 用户为:{} , sessionId: {}", websockets.get(this), this.session.getId(), throwable); websockets.remove(this); // 将当前的对象从集合中删除 } //收到了客户端消息执行的操作 @OnMessage - public void onMessage(String text){ + public void onMessage(Session session, String text){ log.info("收到了一条消息:"+text); // return "已收到你的消息"; + if (text.equals("PING")){ + sendMsg("PONG", websockets.get(this)); + session.setMaxIdleTimeout(MAX_TIME_OUT); + } + } /*//连接关闭的时候执行的操作 @OnClose @@ -74,27 +85,31 @@ public class NotificationConnection { } }*/ - public void sendMsg(String message, Long userId) throws IOException { + public void sendMsg(String message, Long userId) { if (userId == null) { // 如果等于null则证明是群发 // 获取当前Map的一个迭代器,遍历Map的方式有很多种,看着来 // 这个就是遍历这个集合的过程.... - for (Map.Entry entry : websockets.entrySet()) { + for (Map.Entry entry : websockets.entrySet()) { // 获取每一个Entry实例 // 获取每一个Value,而这个Value就是WebSocket的实例 - NotificationConnection webSocket = entry.getValue(); + NotificationConnection webSocket = entry.getKey(); // 接下来就是遍历群发 log.info("广播消息 【给用户】 :{}发送消息【{}】", webSocket, message); - webSocket.session.getBasicRemote().sendText(message); // 发送!!!!!!!!! + try { + webSocket.session.getBasicRemote().sendText(message); // 发送!!!!!!!!! + } catch (IOException e) { + log.error("Failed to send message to session {}: {}", webSocket.session.getId(), e.getMessage()); + } } } else { // 如果不是群发,则判断ID,其余步骤一致 // 获取当前Map的一个迭代器,遍历Map的方式有很多种,看着来 // 这个就是遍历这个集合的过程.... - for (Map.Entry entry : websockets.entrySet()) { + for (Map.Entry entry : websockets.entrySet()) { // 获取每一个Entry实例 // 获取每一个Value,而这个Value就是WebSocket的实例 - NotificationConnection webSocket = entry.getValue(); + NotificationConnection webSocket = entry.getKey(); // 获取每一个Key,这个Key就是用户ID - Long key = entry.getKey(); + Long key = entry.getValue(); // 判断用户ID与当前的Key相等 if (userId.equals(key)) { log.info("私发消息 【给用户】 :{}发送消息【{}】", key, message); // 打印