From f0e181a9bdeefe39b79e96a4fc33b93b9192f683 Mon Sep 17 00:00:00 2001 From: wanggeng <450292408@qq.com> Date: Thu, 25 Nov 2021 19:11:45 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E7=99=BB=E5=BD=95=E6=8E=A8?= =?UTF-8?q?=E9=80=81=E5=8E=86=E5=8F=B2=E4=B8=BA=E6=8E=A8=E9=80=81=E6=B6=88?= =?UTF-8?q?=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../module/instantmessage/dao/INoticeDao.java | 9 +++++++ .../service/IMessageService.java | 8 ++++++ .../service/INoticeService.java | 18 +++++++++++++ .../service/impl/MessageServiceImpl.java | 25 +++++++++++++++++++ .../service/impl/NoticeServiceImpl.java | 17 +++++++++++++ .../channel/WebSocketChannelInitializer.java | 7 ++++++ .../websocket/handler/WebSocketHandler.java | 7 ++++++ .../handler/text/WebSocketTextHandler.java | 19 +++++++++++++- .../manager/WebSocketChannelManager.java | 5 ++-- .../websocket/server/WebSocketServer.java | 5 ++++ .../mybatis/mapper/notice-mapper.xml | 16 ++++++++++++ 11 files changed, 133 insertions(+), 3 deletions(-) diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/dao/INoticeDao.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/dao/INoticeDao.java index c41ad48b..63ae0317 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/dao/INoticeDao.java +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/dao/INoticeDao.java @@ -41,6 +41,14 @@ public interface INoticeDao extends IInitBaseTable { */ void updateHandle(Map params) throws UpdateException; + /** + * 修改已发送 + * + * @param params + * @throws UpdateException + */ + void updateSend(Map params) throws UpdateException; + /** * 详情 * @@ -119,4 +127,5 @@ public interface INoticeDao extends IInitBaseTable { * @throws SearchException */ List list(Map params) throws SearchException; + } diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/IMessageService.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/IMessageService.java index d0a7a307..29905729 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/IMessageService.java +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/IMessageService.java @@ -4,6 +4,7 @@ import ink.wgink.module.instantmessage.pojo.bos.NoticeBO; import ink.wgink.module.instantmessage.pojo.bos.NoticeTargetBO; import ink.wgink.module.instantmessage.pojo.vos.NoticeSendVO; import ink.wgink.module.instantmessage.pojo.vos.NoticeVO; +import ink.wgink.module.instantmessage.websocket.pojo.WebSocketSession; import java.util.List; @@ -70,4 +71,11 @@ public interface IMessageService { */ void delete(List serviceIds); + /** + * 通知未发送消息的用户 + * + * @param 当前会话 + */ + void noticeUnSendByUserId(WebSocketSession webSocketSession); + } diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/INoticeService.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/INoticeService.java index 6376913b..32fc3a38 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/INoticeService.java +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/INoticeService.java @@ -41,6 +41,14 @@ public interface INoticeService { */ void updateHandleStatus(String userId, CountNeedToDealWithBody countNeedToDealWithBody); + /** + * 更新发送状态 + * + * @param noticeId + * @param isSend + */ + void updateSendStatus(String noticeId, int isSend); + /** * 通知详情 * @@ -220,4 +228,14 @@ public interface INoticeService { * @return */ List listPOByStartTimeAndEndTime(String startTime, String endTime); + + /** + * 发送通知列表 + * + * @param userId 用户ID + * @param isSend 是否发送 + * @return + */ + List listPOByUserIdAndIsSend(String userId, int isSend); + } diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/impl/MessageServiceImpl.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/impl/MessageServiceImpl.java index f27d8766..d96dac4c 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/impl/MessageServiceImpl.java +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/impl/MessageServiceImpl.java @@ -224,4 +224,29 @@ public class MessageServiceImpl extends DefaultBaseService implements IMessageSe } }, 3, TimeUnit.SECONDS); } + + @Override + public void noticeUnSendByUserId(WebSocketSession webSocketSession) { + List noticePOs = noticeService.listPOByUserIdAndIsSend(webSocketSession.getUserId(), 0); + if (noticePOs.isEmpty()) { + return; + } + // 3s后发送通知 + scheduledExecutorService.schedule(() -> { + if (webSocketSession.getChannel() == null || !webSocketSession.getChannel().isOpen() || !webSocketSession.getChannel().isActive()) { + return; + } + noticePOs.forEach(noticePO -> { + NoticeBody noticeBody = new NoticeBody(noticePO.getNoticeTitle(), noticePO.getNoticeMsg()); + WebSocketClientMessage webSocketClientMessage = new WebSocketClientMessage(MessageTypeEnum.NOTICE.getValue(), + true, + WebSocketChannelManager.FORM_SYSTEM, + webSocketSession.getUserId(), + JSONObject.toJSONString(noticeBody)); + WebSocketChannelManager.getInstance().sendText(webSocketSession.getChannel(), webSocketClientMessage); + // 更新发送状态 + noticeService.updateSendStatus(noticePO.getNoticeId(), 1); + }); + }, 3, TimeUnit.SECONDS); + } } diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/impl/NoticeServiceImpl.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/impl/NoticeServiceImpl.java index 301ce2bb..25049d99 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/impl/NoticeServiceImpl.java +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/impl/NoticeServiceImpl.java @@ -90,6 +90,15 @@ public class NoticeServiceImpl extends DefaultBaseService implements INoticeServ noticeDao.updateHandle(params); } + @Override + public void updateSendStatus(String noticeId, int isSend) { + Map params = getHashMap(4); + params.put("noticeId", noticeId); + params.put("isSend", isSend); + setUpdateInfoByUserId(params, "1"); + noticeDao.updateSend(params); + } + @Override public NoticePO getPO(Map params) { return noticeDao.getPO(params); @@ -298,6 +307,14 @@ public class NoticeServiceImpl extends DefaultBaseService implements INoticeServ return listPO(params); } + @Override + public List listPOByUserIdAndIsSend(String userId, int isSend) { + Map params = getHashMap(4); + params.put("userId", userId); + params.put("isSend", isSend); + return listPO(params); + } + /** * 系统列表 * diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/channel/WebSocketChannelInitializer.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/channel/WebSocketChannelInitializer.java index 19259cae..59942371 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/channel/WebSocketChannelInitializer.java +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/channel/WebSocketChannelInitializer.java @@ -1,5 +1,6 @@ package ink.wgink.module.instantmessage.websocket.channel; +import ink.wgink.module.instantmessage.service.IMessageService; import ink.wgink.module.instantmessage.websocket.handler.WebSocketHandler; import ink.wgink.module.instantmessage.service.IWebSocketTextCustomService; import ink.wgink.properties.websocket.WebSocketProperties; @@ -22,6 +23,7 @@ public class WebSocketChannelInitializer extends ChannelInitializer { private WebSocketProperties webSocketProperties; private WebSocketServerHandshaker webSocketServerHandshaker; private IWebSocketTextCustomService IWebSocketTextCustomService; + private IMessageService messageService; @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { @@ -81,6 +83,7 @@ public class WebSocketHandler extends SimpleChannelInboundHandler { if (frame instanceof TextWebSocketFrame) { WebSocketTextHandler webSocketTextHandler = new WebSocketTextHandler(); webSocketTextHandler.setWebSocketTextCustomHandler(IWebSocketTextCustomService); + webSocketTextHandler.setMessageService(messageService); webSocketTextHandler.handler(ctx, (TextWebSocketFrame) frame); return; } @@ -150,4 +153,8 @@ public class WebSocketHandler extends SimpleChannelInboundHandler { public void setWebSocketTextCustomHandler(IWebSocketTextCustomService IWebSocketTextCustomService) { this.IWebSocketTextCustomService = IWebSocketTextCustomService; } + + public void setMessageService(IMessageService messageService) { + this.messageService = messageService; + } } diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/handler/text/WebSocketTextHandler.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/handler/text/WebSocketTextHandler.java index 3b23feb7..77b90f26 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/handler/text/WebSocketTextHandler.java +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/handler/text/WebSocketTextHandler.java @@ -2,6 +2,7 @@ package ink.wgink.module.instantmessage.websocket.handler.text; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; +import ink.wgink.module.instantmessage.service.IMessageService; import ink.wgink.module.instantmessage.websocket.enums.MessageTypeEnum; import ink.wgink.module.instantmessage.websocket.enums.StatusEnum; import ink.wgink.module.instantmessage.websocket.exception.*; @@ -33,6 +34,7 @@ import java.util.Set; public class WebSocketTextHandler { private IWebSocketTextCustomService IWebSocketTextCustomService; + private IMessageService messageService; public void handler(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) { WebSocketClientMessage clientSocketMessage = null; @@ -139,7 +141,8 @@ public class WebSocketTextHandler { private StatusBody clientRegisterSession(Channel channel, WebSocketClientMessage clientSocketMessage) throws SessionException { RegisterBody registerBody = JSONObject.parseObject(clientSocketMessage.getBody(), RegisterBody.class); // 更新通道 - WebSocketChannelManager.getInstance().addChannel(registerBody.getSessionId(), clientSocketMessage.getFrom(), channel); + WebSocketSession webSocketSession = WebSocketChannelManager.getInstance().addChannel(registerBody.getSessionId(), clientSocketMessage.getFrom(), channel); + sendHistory(webSocketSession); return new StatusBody(StatusEnum.SUCCESS.getValue(), StatusEnum.SUCCESS, "OK"); } @@ -219,6 +222,16 @@ public class WebSocketTextHandler { sendText(fromChannel, webSocketClientMessage); } + /** + * 发送历史 + * + * @param webSocketSession + */ + private void sendHistory(WebSocketSession webSocketSession) { + // 3s之后更新,更新历史通知 + messageService.noticeUnSendByUserId(webSocketSession); + } + /** * 发送失败内容 * @@ -235,4 +248,8 @@ public class WebSocketTextHandler { public void setWebSocketTextCustomHandler(IWebSocketTextCustomService IWebSocketTextCustomService) { this.IWebSocketTextCustomService = IWebSocketTextCustomService; } + + public void setMessageService(IMessageService messageService) { + this.messageService = messageService; + } } diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/manager/WebSocketChannelManager.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/manager/WebSocketChannelManager.java index 3cc53f80..681b7e04 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/manager/WebSocketChannelManager.java +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/manager/WebSocketChannelManager.java @@ -62,13 +62,13 @@ public class WebSocketChannelManager { /** * 添加通道,绑定通道与会话 - * * @param sessionId * @param userId * @param channel + * @return 返回创建的会话 * @throws SessionException */ - public void addChannel(String sessionId, String userId, Channel channel) throws SessionException { + public WebSocketSession addChannel(String sessionId, String userId, Channel channel) throws SessionException { WebSocketSession onlineUser = getOnlineUserBySessionId(sessionId); if (onlineUser == null) { throw new SessionException("无效会话,请登录"); @@ -80,6 +80,7 @@ public class WebSocketChannelManager { onlineUser.setChannel(channel); onlineUser.setChannelId(channel.id().asLongText()); globalGroup.add(channel); + return onlineUser; } public void removeChannel(Channel channel) { diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/server/WebSocketServer.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/server/WebSocketServer.java index 083e6f8b..50c8c6a6 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/server/WebSocketServer.java +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/server/WebSocketServer.java @@ -1,5 +1,6 @@ package ink.wgink.module.instantmessage.websocket.server; +import ink.wgink.module.instantmessage.service.IMessageService; import ink.wgink.module.instantmessage.service.IWebSocketTextCustomService; import ink.wgink.module.instantmessage.websocket.channel.WebSocketChannelInitializer; import ink.wgink.properties.websocket.WebSocketProperties; @@ -26,6 +27,8 @@ public class WebSocketServer implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class); @Autowired private WebSocketProperties webSocketProperties; + @Autowired + private IMessageService messageService; @Autowired(required = false) private IWebSocketTextCustomService webSocketTextCustomService; @@ -35,6 +38,8 @@ public class WebSocketServer implements Runnable { WebSocketChannelInitializer webSocketChannelInitializer = new WebSocketChannelInitializer(); webSocketChannelInitializer.setWebSocketProperties(webSocketProperties); webSocketChannelInitializer.setWebSocketTextCustomHandler(webSocketTextCustomService); + webSocketChannelInitializer.setMessageService(messageService); + // server ServerBootstrap serverBootstrap = new ServerBootstrap(); EventLoopGroup bossGroup = new NioEventLoopGroup(); diff --git a/module-instant-message/src/main/resources/mybatis/mapper/notice-mapper.xml b/module-instant-message/src/main/resources/mybatis/mapper/notice-mapper.xml index f48c3541..6b18ae76 100644 --- a/module-instant-message/src/main/resources/mybatis/mapper/notice-mapper.xml +++ b/module-instant-message/src/main/resources/mybatis/mapper/notice-mapper.xml @@ -136,6 +136,18 @@ notice_service_id = #{noticeServiceId} + + + UPDATE + im_notice + SET + is_send = #{isSend}, + gmt_modified = #{gmtModified}, + modifier = #{modifier} + WHERE + notice_id = #{noticeId} + + DELETE FROM @@ -260,6 +272,10 @@ AND user_id = #{userId} + + AND + is_send = #{isSend} + AND is_handle = #{isHandle}