ws 设置session2分钟超时,以及心跳回复

This commit is contained in:
2024-11-11 15:03:21 +08:00
parent 5fe605c130
commit 546d1b4c44

View File

@@ -16,11 +16,14 @@ import java.util.concurrent.ConcurrentHashMap;
@Slf4j @Slf4j
public class NotificationConnection { public class NotificationConnection {
//连接超时
public static final long MAX_TIME_OUT = 2 * 60 * 1000;
static Map<String,Session> sessionMap = new ConcurrentHashMap<>(); static Map<String,Session> sessionMap = new ConcurrentHashMap<>();
private Session session; private Session session;
// 这里用ConcurrentHashMap 因为他是一个线程安全的Map // 这里用ConcurrentHashMap 因为他是一个线程安全的Map
private static ConcurrentHashMap<Long, NotificationConnection> websockets = new ConcurrentHashMap<>(); private static ConcurrentHashMap<NotificationConnection, Long> websockets = new ConcurrentHashMap<>();
//连接建立时执行的操作 //连接建立时执行的操作
/*@OnOpen /*@OnOpen
public void onOpen(Session session){ public void onOpen(Session session){
@@ -31,7 +34,8 @@ public class NotificationConnection {
@OnOpen @OnOpen
public void onOpen(Session session, @PathParam("id") String id) { // 接收到前端传来的用户ID public void onOpen(Session session, @PathParam("id") String id) { // 接收到前端传来的用户ID
this.session = session; this.session = session;
websockets.put(Long.parseLong(id), this); //将ID作为key当前的对象作为Value this.session.setMaxIdleTimeout(MAX_TIME_OUT);
websockets.put(this, Long.parseLong(id)); //将ID作为key当前的对象作为Value
log.info("【建立连接】 用户为:{}", this.session); log.info("【建立连接】 用户为:{}", this.session);
log.info("【建立连接】 用户Id为{}", id); log.info("【建立连接】 用户Id为{}", id);
log.info("【建立连接】 总数为:{}", websockets.size()); log.info("【建立连接】 总数为:{}", websockets.size());
@@ -39,27 +43,34 @@ public class NotificationConnection {
@OnClose @OnClose
public void onClose(CloseReason reason) { public void onClose(CloseReason reason) {
websockets.remove(this); // 将当前的对象从集合中删除 if (websockets.containsKey(this)) {
log.info("【连接断开】 用户为:{}, 原因为{}", this.session, reason); websockets.remove(this); // 将当前的对象从集合中删除
log.info("【连接断开】 用户为:{}, sessionId: {}, 原因为{}", websockets.get(this), this.session.getId(), reason);
}
// log.info("【连接断开】 总数为:{}", websockets.size()); // log.info("【连接断开】 总数为:{}", websockets.size());
} }
/** /**
* 错误时调用 * 错误时调用
* @param session 连接
* @param throwable 异常 * @param throwable 异常
*/ */
@OnError @OnError
public void onError(Session session, Throwable throwable) { public void onError(Throwable throwable) {
log.info("【连接异常】[session({}) 发生异常]", session, throwable); log.info("【连接异常】 用户为:{} , sessionId: {}", websockets.get(this), this.session.getId(), throwable);
websockets.remove(this); // 将当前的对象从集合中删除 websockets.remove(this); // 将当前的对象从集合中删除
} }
//收到了客户端消息执行的操作 //收到了客户端消息执行的操作
@OnMessage @OnMessage
public void onMessage(String text){ public void onMessage(Session session, String text){
log.info("收到了一条消息:"+text); log.info("收到了一条消息:"+text);
// return "已收到你的消息"; // return "已收到你的消息";
if (text.equals("PING")){
sendMsg("PONG", websockets.get(this));
session.setMaxIdleTimeout(MAX_TIME_OUT);
}
} }
/*//连接关闭的时候执行的操作 /*//连接关闭的时候执行的操作
@OnClose @OnClose
@@ -74,27 +85,31 @@ public class NotificationConnection {
} }
}*/ }*/
public void sendMsg(String message, Long userId) throws IOException { public void sendMsg(String message, Long userId) {
if (userId == null) { // 如果等于null则证明是群发 if (userId == null) { // 如果等于null则证明是群发
// 获取当前Map的一个迭代器遍历Map的方式有很多种看着来 // 获取当前Map的一个迭代器遍历Map的方式有很多种看着来
// 这个就是遍历这个集合的过程.... // 这个就是遍历这个集合的过程....
for (Map.Entry<Long, NotificationConnection> entry : websockets.entrySet()) { for (Map.Entry<NotificationConnection, Long> entry : websockets.entrySet()) {
// 获取每一个Entry实例 // 获取每一个Entry实例
// 获取每一个Value而这个Value就是WebSocket的实例 // 获取每一个Value而这个Value就是WebSocket的实例
NotificationConnection webSocket = entry.getValue(); NotificationConnection webSocket = entry.getKey();
// 接下来就是遍历群发 // 接下来就是遍历群发
log.info("广播消息 【给用户】 {}发送消息【{}】", webSocket, message); log.info("广播消息 【给用户】 {}发送消息【{}】", webSocket, message);
webSocket.session.getBasicRemote().sendText(message); // 发送!!!!!!!!! try {
webSocket.session.getBasicRemote().sendText(message); // 发送!!!!!!!!!
} catch (IOException e) {
log.error("Failed to send message to session {}: {}", webSocket.session.getId(), e.getMessage());
}
} }
} else { // 如果不是群发则判断ID其余步骤一致 } else { // 如果不是群发则判断ID其余步骤一致
// 获取当前Map的一个迭代器遍历Map的方式有很多种看着来 // 获取当前Map的一个迭代器遍历Map的方式有很多种看着来
// 这个就是遍历这个集合的过程.... // 这个就是遍历这个集合的过程....
for (Map.Entry<Long, NotificationConnection> entry : websockets.entrySet()) { for (Map.Entry<NotificationConnection, Long> entry : websockets.entrySet()) {
// 获取每一个Entry实例 // 获取每一个Entry实例
// 获取每一个Value而这个Value就是WebSocket的实例 // 获取每一个Value而这个Value就是WebSocket的实例
NotificationConnection webSocket = entry.getValue(); NotificationConnection webSocket = entry.getKey();
// 获取每一个Key这个Key就是用户ID // 获取每一个Key这个Key就是用户ID
Long key = entry.getKey(); Long key = entry.getValue();
// 判断用户ID与当前的Key相等 // 判断用户ID与当前的Key相等
if (userId.equals(key)) { if (userId.equals(key)) {
log.info("私发消息 【给用户】 {}发送消息【{}】", key, message); // 打印 log.info("私发消息 【给用户】 {}发送消息【{}】", key, message); // 打印