新增登录推送历史为推送消息

This commit is contained in:
wanggeng 2021-11-25 19:11:45 +08:00
parent 2453cda0a7
commit f0e181a9bd
11 changed files with 133 additions and 3 deletions

View File

@ -41,6 +41,14 @@ public interface INoticeDao extends IInitBaseTable {
*/
void updateHandle(Map<String, Object> params) throws UpdateException;
/**
* 修改已发送
*
* @param params
* @throws UpdateException
*/
void updateSend(Map<String, Object> params) throws UpdateException;
/**
* 详情
*
@ -119,4 +127,5 @@ public interface INoticeDao extends IInitBaseTable {
* @throws SearchException
*/
List<NoticeDTO> list(Map<String, Object> params) throws SearchException;
}

View File

@ -4,6 +4,7 @@ import ink.wgink.module.instantmessage.pojo.bos.NoticeBO;
import ink.wgink.module.instantmessage.pojo.bos.NoticeTargetBO;
import ink.wgink.module.instantmessage.pojo.vos.NoticeSendVO;
import ink.wgink.module.instantmessage.pojo.vos.NoticeVO;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketSession;
import java.util.List;
@ -70,4 +71,11 @@ public interface IMessageService {
*/
void delete(List<String> serviceIds);
/**
* 通知未发送消息的用户
*
* @param 当前会话
*/
void noticeUnSendByUserId(WebSocketSession webSocketSession);
}

View File

@ -41,6 +41,14 @@ public interface INoticeService {
*/
void updateHandleStatus(String userId, CountNeedToDealWithBody countNeedToDealWithBody);
/**
* 更新发送状态
*
* @param noticeId
* @param isSend
*/
void updateSendStatus(String noticeId, int isSend);
/**
* 通知详情
*
@ -220,4 +228,14 @@ public interface INoticeService {
* @return
*/
List<NoticePO> listPOByStartTimeAndEndTime(String startTime, String endTime);
/**
* 发送通知列表
*
* @param userId 用户ID
* @param isSend 是否发送
* @return
*/
List<NoticePO> listPOByUserIdAndIsSend(String userId, int isSend);
}

View File

@ -224,4 +224,29 @@ public class MessageServiceImpl extends DefaultBaseService implements IMessageSe
}
}, 3, TimeUnit.SECONDS);
}
@Override
public void noticeUnSendByUserId(WebSocketSession webSocketSession) {
List<NoticePO> noticePOs = noticeService.listPOByUserIdAndIsSend(webSocketSession.getUserId(), 0);
if (noticePOs.isEmpty()) {
return;
}
// 3s后发送通知
scheduledExecutorService.schedule(() -> {
if (webSocketSession.getChannel() == null || !webSocketSession.getChannel().isOpen() || !webSocketSession.getChannel().isActive()) {
return;
}
noticePOs.forEach(noticePO -> {
NoticeBody noticeBody = new NoticeBody(noticePO.getNoticeTitle(), noticePO.getNoticeMsg());
WebSocketClientMessage webSocketClientMessage = new WebSocketClientMessage(MessageTypeEnum.NOTICE.getValue(),
true,
WebSocketChannelManager.FORM_SYSTEM,
webSocketSession.getUserId(),
JSONObject.toJSONString(noticeBody));
WebSocketChannelManager.getInstance().sendText(webSocketSession.getChannel(), webSocketClientMessage);
// 更新发送状态
noticeService.updateSendStatus(noticePO.getNoticeId(), 1);
});
}, 3, TimeUnit.SECONDS);
}
}

View File

@ -90,6 +90,15 @@ public class NoticeServiceImpl extends DefaultBaseService implements INoticeServ
noticeDao.updateHandle(params);
}
@Override
public void updateSendStatus(String noticeId, int isSend) {
Map<String, Object> params = getHashMap(4);
params.put("noticeId", noticeId);
params.put("isSend", isSend);
setUpdateInfoByUserId(params, "1");
noticeDao.updateSend(params);
}
@Override
public NoticePO getPO(Map<String, Object> params) {
return noticeDao.getPO(params);
@ -298,6 +307,14 @@ public class NoticeServiceImpl extends DefaultBaseService implements INoticeServ
return listPO(params);
}
@Override
public List<NoticePO> listPOByUserIdAndIsSend(String userId, int isSend) {
Map<String, Object> params = getHashMap(4);
params.put("userId", userId);
params.put("isSend", isSend);
return listPO(params);
}
/**
* 系统列表
*

View File

@ -1,5 +1,6 @@
package ink.wgink.module.instantmessage.websocket.channel;
import ink.wgink.module.instantmessage.service.IMessageService;
import ink.wgink.module.instantmessage.websocket.handler.WebSocketHandler;
import ink.wgink.module.instantmessage.service.IWebSocketTextCustomService;
import ink.wgink.properties.websocket.WebSocketProperties;
@ -22,6 +23,7 @@ public class WebSocketChannelInitializer extends ChannelInitializer<SocketChanne
private WebSocketProperties webSocketProperties;
private IWebSocketTextCustomService IWebSocketTextCustomService;
private IMessageService messageService;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
@ -37,6 +39,7 @@ public class WebSocketChannelInitializer extends ChannelInitializer<SocketChanne
WebSocketHandler webSocketHandler = new WebSocketHandler();
webSocketHandler.setWebSocketProperties(webSocketProperties);
webSocketHandler.setWebSocketTextCustomHandler(IWebSocketTextCustomService);
webSocketHandler.setMessageService(messageService);
socketChannel.pipeline().addLast("handler", webSocketHandler);
}
@ -47,4 +50,8 @@ public class WebSocketChannelInitializer extends ChannelInitializer<SocketChanne
public void setWebSocketTextCustomHandler(IWebSocketTextCustomService IWebSocketTextCustomService) {
this.IWebSocketTextCustomService = IWebSocketTextCustomService;
}
public void setMessageService(IMessageService messageService) {
this.messageService = messageService;
}
}

View File

@ -1,5 +1,6 @@
package ink.wgink.module.instantmessage.websocket.handler;
import ink.wgink.module.instantmessage.service.IMessageService;
import ink.wgink.module.instantmessage.service.IWebSocketTextCustomService;
import ink.wgink.module.instantmessage.websocket.handler.text.WebSocketTextHandler;
import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager;
@ -36,6 +37,7 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
private WebSocketProperties webSocketProperties;
private WebSocketServerHandshaker webSocketServerHandshaker;
private IWebSocketTextCustomService IWebSocketTextCustomService;
private IMessageService messageService;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
@ -81,6 +83,7 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
if (frame instanceof TextWebSocketFrame) {
WebSocketTextHandler webSocketTextHandler = new WebSocketTextHandler();
webSocketTextHandler.setWebSocketTextCustomHandler(IWebSocketTextCustomService);
webSocketTextHandler.setMessageService(messageService);
webSocketTextHandler.handler(ctx, (TextWebSocketFrame) frame);
return;
}
@ -150,4 +153,8 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
public void setWebSocketTextCustomHandler(IWebSocketTextCustomService IWebSocketTextCustomService) {
this.IWebSocketTextCustomService = IWebSocketTextCustomService;
}
public void setMessageService(IMessageService messageService) {
this.messageService = messageService;
}
}

View File

@ -2,6 +2,7 @@ package ink.wgink.module.instantmessage.websocket.handler.text;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import ink.wgink.module.instantmessage.service.IMessageService;
import ink.wgink.module.instantmessage.websocket.enums.MessageTypeEnum;
import ink.wgink.module.instantmessage.websocket.enums.StatusEnum;
import ink.wgink.module.instantmessage.websocket.exception.*;
@ -33,6 +34,7 @@ import java.util.Set;
public class WebSocketTextHandler {
private IWebSocketTextCustomService IWebSocketTextCustomService;
private IMessageService messageService;
public void handler(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) {
WebSocketClientMessage clientSocketMessage = null;
@ -139,7 +141,8 @@ public class WebSocketTextHandler {
private StatusBody clientRegisterSession(Channel channel, WebSocketClientMessage clientSocketMessage) throws SessionException {
RegisterBody registerBody = JSONObject.parseObject(clientSocketMessage.getBody(), RegisterBody.class);
// 更新通道
WebSocketChannelManager.getInstance().addChannel(registerBody.getSessionId(), clientSocketMessage.getFrom(), channel);
WebSocketSession webSocketSession = WebSocketChannelManager.getInstance().addChannel(registerBody.getSessionId(), clientSocketMessage.getFrom(), channel);
sendHistory(webSocketSession);
return new StatusBody(StatusEnum.SUCCESS.getValue(), StatusEnum.SUCCESS, "OK");
}
@ -219,6 +222,16 @@ public class WebSocketTextHandler {
sendText(fromChannel, webSocketClientMessage);
}
/**
* 发送历史
*
* @param webSocketSession
*/
private void sendHistory(WebSocketSession webSocketSession) {
// 3s之后更新更新历史通知
messageService.noticeUnSendByUserId(webSocketSession);
}
/**
* 发送失败内容
*
@ -235,4 +248,8 @@ public class WebSocketTextHandler {
public void setWebSocketTextCustomHandler(IWebSocketTextCustomService IWebSocketTextCustomService) {
this.IWebSocketTextCustomService = IWebSocketTextCustomService;
}
public void setMessageService(IMessageService messageService) {
this.messageService = messageService;
}
}

View File

@ -62,13 +62,13 @@ public class WebSocketChannelManager {
/**
* 添加通道绑定通道与会话
*
* @param sessionId
* @param userId
* @param channel
* @return 返回创建的会话
* @throws SessionException
*/
public void addChannel(String sessionId, String userId, Channel channel) throws SessionException {
public WebSocketSession addChannel(String sessionId, String userId, Channel channel) throws SessionException {
WebSocketSession onlineUser = getOnlineUserBySessionId(sessionId);
if (onlineUser == null) {
throw new SessionException("无效会话,请登录");
@ -80,6 +80,7 @@ public class WebSocketChannelManager {
onlineUser.setChannel(channel);
onlineUser.setChannelId(channel.id().asLongText());
globalGroup.add(channel);
return onlineUser;
}
public void removeChannel(Channel channel) {

View File

@ -1,5 +1,6 @@
package ink.wgink.module.instantmessage.websocket.server;
import ink.wgink.module.instantmessage.service.IMessageService;
import ink.wgink.module.instantmessage.service.IWebSocketTextCustomService;
import ink.wgink.module.instantmessage.websocket.channel.WebSocketChannelInitializer;
import ink.wgink.properties.websocket.WebSocketProperties;
@ -26,6 +27,8 @@ public class WebSocketServer implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class);
@Autowired
private WebSocketProperties webSocketProperties;
@Autowired
private IMessageService messageService;
@Autowired(required = false)
private IWebSocketTextCustomService webSocketTextCustomService;
@ -35,6 +38,8 @@ public class WebSocketServer implements Runnable {
WebSocketChannelInitializer webSocketChannelInitializer = new WebSocketChannelInitializer();
webSocketChannelInitializer.setWebSocketProperties(webSocketProperties);
webSocketChannelInitializer.setWebSocketTextCustomHandler(webSocketTextCustomService);
webSocketChannelInitializer.setMessageService(messageService);
// server
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup();

View File

@ -136,6 +136,18 @@
notice_service_id = #{noticeServiceId}
</update>
<!-- 修改发送状态 -->
<update id="updateSend" parameterType="map" flushCache="true">
UPDATE
im_notice
SET
is_send = #{isSend},
gmt_modified = #{gmtModified},
modifier = #{modifier}
WHERE
notice_id = #{noticeId}
</update>
<!-- 删除 -->
<delete id="delete" parameterType="map" flushCache="true">
DELETE FROM
@ -260,6 +272,10 @@
AND
user_id = #{userId}
</if>
<if test="isSend != null">
AND
is_send = #{isSend}
</if>
<if test="isHandle != null">
AND
is_handle = #{isHandle}