diff --git a/src/main/java/com/ai/da/common/websocket/NotificationConnection.java b/src/main/java/com/ai/da/common/websocket/NotificationConnection.java index f9a9cb52..490acdb2 100644 --- a/src/main/java/com/ai/da/common/websocket/NotificationConnection.java +++ b/src/main/java/com/ai/da/common/websocket/NotificationConnection.java @@ -4,40 +4,92 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.websocket.*; +import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -@ServerEndpoint(value = "/notification") +@ServerEndpoint(value = "/notification/{id}") @Component @Slf4j public class NotificationConnection { static Map sessionMap = new ConcurrentHashMap<>(); + + private Session session; + // 这里用ConcurrentHashMap 因为他是一个线程安全的Map + private static ConcurrentHashMap websockets = new ConcurrentHashMap<>(); //连接建立时执行的操作 - @OnOpen + /*@OnOpen public void onOpen(Session session){ sessionMap.put(session.getId(),session); - log.info("websocket is open"); + log.info("websocket is open, sessionId: {}",session.getId()); + }*/ + + @OnOpen + public void onOpen(Session session, @PathParam("id") String id) { // 接收到前端传来的用户ID + this.session = session; + websockets.put(Long.parseLong(id), this); //将ID作为key,当前的对象作为Value + log.info("【建立连接】 用户为:{}", this.session); + log.info("【建立连接】 用户Id为:{}", id); + log.info("【建立连接】 总数为:{}", websockets.size()); } + + @OnClose + public void onClose() { + websockets.remove(this); // 将当前的对象从集合中删除 + log.info("【连接断开】 用户为:{}", this.session); +// log.info("【连接断开】 总数为:{}", websockets.size()); + } + //收到了客户端消息执行的操作 @OnMessage public void onMessage(String text){ log.info("收到了一条消息:"+text); // return "已收到你的消息"; } - //连接关闭的时候执行的操作 + /*//连接关闭的时候执行的操作 @OnClose public void onClose(Session session){ sessionMap.remove(session.getId()); - log.info("websocket is close"); + log.info("websocket is close, sessionId: {}",session.getId()); } public void sendMsg(String message) throws IOException { for(String key:sessionMap.keySet()){ sessionMap.get(key).getBasicRemote().sendText(message); } + }*/ + + public void sendMsg(String message, Long userId) throws IOException { + if (userId == null) { // 如果等于null则证明是群发 + // 获取当前Map的一个迭代器,遍历Map的方式有很多种,看着来 + // 这个就是遍历这个集合的过程.... + for (Map.Entry entry : websockets.entrySet()) { + // 获取每一个Entry实例 + // 获取每一个Value,而这个Value就是WebSocket的实例 + NotificationConnection webSocket = entry.getValue(); + // 接下来就是遍历群发 + log.info("广播消息 【给用户】 :{}发送消息【{}】", webSocket, message); + webSocket.session.getBasicRemote().sendText(message); // 发送!!!!!!!!! + } + } else { // 如果不是群发,则判断ID,其余步骤一致 + // 获取当前Map的一个迭代器,遍历Map的方式有很多种,看着来 + // 这个就是遍历这个集合的过程.... + for (Map.Entry entry : websockets.entrySet()) { + // 获取每一个Entry实例 + // 获取每一个Value,而这个Value就是WebSocket的实例 + NotificationConnection webSocket = entry.getValue(); + // 获取每一个Key,这个Key就是用户ID + Long key = entry.getKey(); + // 判断用户ID与当前的Key相等 + if (userId.equals(key)) { + log.info("私发消息 【给用户】 :{}发送消息【{}】", key, message); // 打印 + webSocket.session.getBasicRemote().sendText(message); // 则发送给当前的用户即可 + } + } + } } } diff --git a/src/main/java/com/ai/da/service/impl/MessageCenterServiceImpl.java b/src/main/java/com/ai/da/service/impl/MessageCenterServiceImpl.java index 463df6c0..47b169c1 100644 --- a/src/main/java/com/ai/da/service/impl/MessageCenterServiceImpl.java +++ b/src/main/java/com/ai/da/service/impl/MessageCenterServiceImpl.java @@ -107,7 +107,16 @@ public class MessageCenterServiceImpl extends ServiceImpl> resp = new ArrayList<>(); HashMap data = new HashMap<>(); Long count; - Long accountId = UserContext.getUserHolder().getId(); if (!type.equals("system")) { // 个人未读消息 count = getUnreadCountByType(type, receiverId); @@ -179,21 +187,19 @@ public class MessageCenterServiceImpl extends ServiceImpl followee = new HashMap<>(); HashMap follower = new HashMap<>(); - follower.put("followerCount",portfolioService.getFollowerCount(accountId)); - followee.put("followeeCount",portfolioService.getFolloweeCount(accountId)); + follower.put("followerCount",portfolioService.getFollowerCount(receiverId)); + followee.put("followeeCount",portfolioService.getFolloweeCount(receiverId)); resp.add(followee); resp.add(follower); } data.put(type, count); - if (count != 0){ - resp.add(data); - } + resp.add(data); String jsonString = JSON.toJSONString(resp); log.info("消息推送 : {}", jsonString); try { - notificationConnection.sendMsg(jsonString); + notificationConnection.sendMsg(jsonString, receiverId); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/src/main/java/com/ai/da/service/impl/PortfolioServiceImpl.java b/src/main/java/com/ai/da/service/impl/PortfolioServiceImpl.java index f3128c6e..acf06ff3 100644 --- a/src/main/java/com/ai/da/service/impl/PortfolioServiceImpl.java +++ b/src/main/java/com/ai/da/service/impl/PortfolioServiceImpl.java @@ -1031,15 +1031,15 @@ public class PortfolioServiceImpl extends ServiceImpl e2.getValue().compareTo(e1.getValue())); // 获取 size 最大的前三个 - List top3PortfolioIds = portfolioSizes.stream() + // 去掉前缀,获取 portfolioId + // 输出 top3PortfolioIds +// top3PortfolioIds.forEach(System.out::println); + return portfolioSizes.stream() .limit(3) .map(Map.Entry::getKey) .map(key -> key.replace(prefix, "")) // 去掉前缀,获取 portfolioId .map(Long::valueOf) .collect(Collectors.toList()); - // 输出 top3PortfolioIds - top3PortfolioIds.forEach(System.out::println); - return top3PortfolioIds; } return null; } @@ -1067,14 +1067,13 @@ public class PortfolioServiceImpl extends ServiceImpl top3PortfolioIds = portfolioViews.entrySet().stream() + +// System.out.println("Top 3 Portfolio IDs after exclusion: " + top3PortfolioIds); + return portfolioViews.entrySet().stream() .sorted(Map.Entry.comparingByValue().reversed()) .limit(3) .map(Map.Entry::getKey) .collect(Collectors.toList()); - - System.out.println("Top 3 Portfolio IDs after exclusion: " + top3PortfolioIds); - return top3PortfolioIds; } return null; } @@ -1098,6 +1097,7 @@ public class PortfolioServiceImpl extends ServiceImpl