Merge remote-tracking branch 'origin/dev/dev' into dev/dev
This commit is contained in:
@@ -4,40 +4,104 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.websocket.*;
|
||||
import javax.websocket.server.PathParam;
|
||||
import javax.websocket.server.ServerEndpoint;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
|
||||
@ServerEndpoint(value = "/notification")
|
||||
@ServerEndpoint(value = "/notification/{id}")
|
||||
@Component
|
||||
@Slf4j
|
||||
public class NotificationConnection {
|
||||
|
||||
static Map<String,Session> sessionMap = new ConcurrentHashMap<>();
|
||||
|
||||
private Session session;
|
||||
// 这里用ConcurrentHashMap 因为他是一个线程安全的Map
|
||||
private static ConcurrentHashMap<Long, NotificationConnection> websockets = new ConcurrentHashMap<>();
|
||||
//连接建立时执行的操作
|
||||
@OnOpen
|
||||
/*@OnOpen
|
||||
public void onOpen(Session session){
|
||||
sessionMap.put(session.getId(),session);
|
||||
log.info("websocket is open");
|
||||
log.info("websocket is open, sessionId: {}",session.getId());
|
||||
}*/
|
||||
|
||||
@OnOpen
|
||||
public void onOpen(Session session, @PathParam("id") String id) { // 接收到前端传来的用户ID
|
||||
this.session = session;
|
||||
websockets.put(Long.parseLong(id), this); //将ID作为key,当前的对象作为Value
|
||||
log.info("【建立连接】 用户为:{}", this.session);
|
||||
log.info("【建立连接】 用户Id为:{}", id);
|
||||
log.info("【建立连接】 总数为:{}", websockets.size());
|
||||
}
|
||||
|
||||
@OnClose
|
||||
public void onClose() {
|
||||
websockets.remove(this); // 将当前的对象从集合中删除
|
||||
log.info("【连接断开】 用户为:{}", this.session);
|
||||
// log.info("【连接断开】 总数为:{}", websockets.size());
|
||||
}
|
||||
|
||||
//收到了客户端消息执行的操作
|
||||
@OnMessage
|
||||
public void onMessage(String text){
|
||||
log.info("收到了一条消息:"+text);
|
||||
// return "已收到你的消息";
|
||||
}
|
||||
//连接关闭的时候执行的操作
|
||||
/*//连接关闭的时候执行的操作
|
||||
@OnClose
|
||||
public void onClose(Session session){
|
||||
sessionMap.remove(session.getId());
|
||||
log.info("websocket is close");
|
||||
log.info("websocket is close, sessionId: {}",session.getId());
|
||||
}
|
||||
|
||||
public void sendMsg(String message) throws IOException {
|
||||
for(String key:sessionMap.keySet()){
|
||||
sessionMap.get(key).getBasicRemote().sendText(message);
|
||||
}
|
||||
}*/
|
||||
|
||||
public void sendMsg(String message, Long userId) throws IOException {
|
||||
if (userId == null) { // 如果等于null则证明是群发
|
||||
// 获取当前Map的一个迭代器,遍历Map的方式有很多种,看着来
|
||||
// 这个就是遍历这个集合的过程....
|
||||
for (Map.Entry<Long, NotificationConnection> entry : websockets.entrySet()) {
|
||||
// 获取每一个Entry实例
|
||||
// 获取每一个Value,而这个Value就是WebSocket的实例
|
||||
NotificationConnection webSocket = entry.getValue();
|
||||
// 接下来就是遍历群发
|
||||
log.info("广播消息 【给用户】 :{}发送消息【{}】", webSocket, message);
|
||||
webSocket.session.getBasicRemote().sendText(message); // 发送!!!!!!!!!
|
||||
}
|
||||
} else { // 如果不是群发,则判断ID,其余步骤一致
|
||||
// 获取当前Map的一个迭代器,遍历Map的方式有很多种,看着来
|
||||
// 这个就是遍历这个集合的过程....
|
||||
for (Map.Entry<Long, NotificationConnection> entry : websockets.entrySet()) {
|
||||
// 获取每一个Entry实例
|
||||
// 获取每一个Value,而这个Value就是WebSocket的实例
|
||||
NotificationConnection webSocket = entry.getValue();
|
||||
// 获取每一个Key,这个Key就是用户ID
|
||||
Long key = entry.getKey();
|
||||
// 判断用户ID与当前的Key相等
|
||||
if (userId.equals(key)) {
|
||||
log.info("私发消息 【给用户】 :{}发送消息【{}】", key, message); // 打印
|
||||
if (webSocket.session.isOpen()){
|
||||
// 避免因为网络问题或其他原因导致连接突然关闭而报错
|
||||
try {
|
||||
webSocket.session.getBasicRemote().sendText(message); // 则发送给当前的用户即可
|
||||
} catch (IOException e) {
|
||||
log.error("Failed to send message to session {}: {}", webSocket.session.getId(), e.getMessage());
|
||||
// 这里可以选择移除关闭的 session
|
||||
websockets.remove(entry.getKey());
|
||||
}
|
||||
}else {
|
||||
log.info("连接已关闭,sessionId:{}, userId:{}", webSocket.session.getId(), key);
|
||||
websockets.remove(entry.getKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -93,6 +93,7 @@ public class MessageCenterServiceImpl extends ServiceImpl<NotificationMapper, No
|
||||
}
|
||||
|
||||
QueryWrapper<Notification> queryWrapper = new QueryWrapper<>();
|
||||
queryWrapper.orderByDesc("create_time");
|
||||
if (!StringUtils.isNullOrEmpty(getNotificationDTO.getType())) {
|
||||
queryWrapper.eq("type", getNotificationDTO.getType());
|
||||
}
|
||||
@@ -107,7 +108,16 @@ public class MessageCenterServiceImpl extends ServiceImpl<NotificationMapper, No
|
||||
Account senderAccount = accountService.getById(notificationVO.getSenderId());
|
||||
notificationVO.setUserName(senderAccount.getUserName());
|
||||
// notificationVO.setSenderUserAvatar(StringUtils.isNullOrEmpty(senderAccount.getAvatar()) ? null : minioUtil.getPreSignedUrl(senderAccount.getAvatar(), CommonConstant.MINIO_IMAGE_EXPIRE_TIME));
|
||||
notificationVO.setPortfolioName(Objects.isNull(notificationVO.getPortfolioId()) ? null : portfolioService.getById(notificationVO.getPortfolioId()).getPortfolioName());
|
||||
if (Objects.isNull(notificationVO.getPortfolioId())){
|
||||
notificationVO.setPortfolioId(null);
|
||||
}else {
|
||||
Portfolio byId = portfolioService.getById(notificationVO.getPortfolioId());
|
||||
if (Objects.isNull(byId.getPortfolioName())){
|
||||
notificationVO.setPortfolioName(null);
|
||||
}else {
|
||||
notificationVO.setPortfolioName(byId.getPortfolioName());
|
||||
}
|
||||
}
|
||||
// 设置单个人 系统消息是否已读
|
||||
if (notificationVO.getType().equals("system")){
|
||||
if (unreadSysNotificationIds.contains(notificationVO.getId())){
|
||||
@@ -136,7 +146,7 @@ public class MessageCenterServiceImpl extends ServiceImpl<NotificationMapper, No
|
||||
save(notification);
|
||||
}
|
||||
// 推送消息
|
||||
pushMessage(notification.getType(), notification.getSenderId());
|
||||
pushMessage(notification.getType(), notification.getReceiverId());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -168,7 +178,6 @@ public class MessageCenterServiceImpl extends ServiceImpl<NotificationMapper, No
|
||||
ArrayList<Map<String, Object>> resp = new ArrayList<>();
|
||||
HashMap<String, Object> data = new HashMap<>();
|
||||
Long count;
|
||||
Long accountId = UserContext.getUserHolder().getId();
|
||||
if (!type.equals("system")) {
|
||||
// 个人未读消息
|
||||
count = getUnreadCountByType(type, receiverId);
|
||||
@@ -179,21 +188,19 @@ public class MessageCenterServiceImpl extends ServiceImpl<NotificationMapper, No
|
||||
if (type.equals("follow")){
|
||||
HashMap<String, Object> followee = new HashMap<>();
|
||||
HashMap<String, Object> follower = new HashMap<>();
|
||||
follower.put("followerCount",portfolioService.getFollowerCount(accountId));
|
||||
followee.put("followeeCount",portfolioService.getFolloweeCount(accountId));
|
||||
follower.put("followerCount",portfolioService.getFollowerCount(receiverId));
|
||||
followee.put("followeeCount",portfolioService.getFolloweeCount(receiverId));
|
||||
resp.add(followee);
|
||||
resp.add(follower);
|
||||
}
|
||||
|
||||
data.put(type, count);
|
||||
if (count != 0){
|
||||
resp.add(data);
|
||||
}
|
||||
resp.add(data);
|
||||
String jsonString = JSON.toJSONString(resp);
|
||||
log.info("消息推送 : {}", jsonString);
|
||||
// log.info("消息推送 : {}", jsonString);
|
||||
|
||||
try {
|
||||
notificationConnection.sendMsg(jsonString);
|
||||
notificationConnection.sendMsg(jsonString, receiverId);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
@@ -1031,15 +1031,15 @@ public class PortfolioServiceImpl extends ServiceImpl<PortfolioMapper, Portfolio
|
||||
// 按 size 倒序排序
|
||||
portfolioSizes.sort((e1, e2) -> e2.getValue().compareTo(e1.getValue()));
|
||||
// 获取 size 最大的前三个
|
||||
List<Long> top3PortfolioIds = portfolioSizes.stream()
|
||||
// 去掉前缀,获取 portfolioId
|
||||
// 输出 top3PortfolioIds
|
||||
// top3PortfolioIds.forEach(System.out::println);
|
||||
return portfolioSizes.stream()
|
||||
.limit(3)
|
||||
.map(Map.Entry::getKey)
|
||||
.map(key -> key.replace(prefix, "")) // 去掉前缀,获取 portfolioId
|
||||
.map(Long::valueOf)
|
||||
.collect(Collectors.toList());
|
||||
// 输出 top3PortfolioIds
|
||||
top3PortfolioIds.forEach(System.out::println);
|
||||
return top3PortfolioIds;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@@ -1067,14 +1067,13 @@ public class PortfolioServiceImpl extends ServiceImpl<PortfolioMapper, Portfolio
|
||||
|
||||
// Step 3: Find the top 3 portfolio IDs with the largest values
|
||||
if (!portfolioViews.isEmpty()){
|
||||
List<Long> top3PortfolioIds = portfolioViews.entrySet().stream()
|
||||
|
||||
// System.out.println("Top 3 Portfolio IDs after exclusion: " + top3PortfolioIds);
|
||||
return portfolioViews.entrySet().stream()
|
||||
.sorted(Map.Entry.<Long, Long>comparingByValue().reversed())
|
||||
.limit(3)
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
System.out.println("Top 3 Portfolio IDs after exclusion: " + top3PortfolioIds);
|
||||
return top3PortfolioIds;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
@@ -1098,6 +1097,7 @@ public class PortfolioServiceImpl extends ServiceImpl<PortfolioMapper, Portfolio
|
||||
userFollowMapper.insert(newFollower);
|
||||
// 4、推送消息
|
||||
messageCenterService.prePushMessage(new Notification("follow", accountId, followeeId));
|
||||
messageCenterService.pushMessage("follow", accountId);
|
||||
}
|
||||
|
||||
// 取消关注
|
||||
@@ -1117,6 +1117,7 @@ public class PortfolioServiceImpl extends ServiceImpl<PortfolioMapper, Portfolio
|
||||
}
|
||||
// 3、更新关注人数
|
||||
messageCenterService.pushMessage("follow", accountId);
|
||||
messageCenterService.pushMessage("follow", followeeId);
|
||||
}
|
||||
|
||||
public Long getFolloweeCount(Long accountId) {
|
||||
|
||||
Reference in New Issue
Block a user