diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/BaseSocketException.java b/basic-exception/src/main/java/ink/wgink/exceptions/websocket/BaseSocketException.java similarity index 93% rename from module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/BaseSocketException.java rename to basic-exception/src/main/java/ink/wgink/exceptions/websocket/BaseSocketException.java index 0ce49288..2fa773df 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/BaseSocketException.java +++ b/basic-exception/src/main/java/ink/wgink/exceptions/websocket/BaseSocketException.java @@ -1,4 +1,4 @@ -package ink.wgink.module.instantmessage.websocket.exception; +package ink.wgink.exceptions.websocket; /** * When you feel like quitting. Think about why you started diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/BodyException.java b/basic-exception/src/main/java/ink/wgink/exceptions/websocket/BodyException.java similarity index 92% rename from module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/BodyException.java rename to basic-exception/src/main/java/ink/wgink/exceptions/websocket/BodyException.java index fe1f0722..bc47e19f 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/BodyException.java +++ b/basic-exception/src/main/java/ink/wgink/exceptions/websocket/BodyException.java @@ -1,4 +1,4 @@ -package ink.wgink.module.instantmessage.websocket.exception; +package ink.wgink.exceptions.websocket; /** * When you feel like quitting. Think about why you started diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/CustomHandleException.java b/basic-exception/src/main/java/ink/wgink/exceptions/websocket/CustomHandleException.java similarity index 92% rename from module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/CustomHandleException.java rename to basic-exception/src/main/java/ink/wgink/exceptions/websocket/CustomHandleException.java index 97d81c2b..629fb806 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/CustomHandleException.java +++ b/basic-exception/src/main/java/ink/wgink/exceptions/websocket/CustomHandleException.java @@ -1,4 +1,4 @@ -package ink.wgink.module.instantmessage.websocket.exception; +package ink.wgink.exceptions.websocket; /** * @ClassName: CustomHandleException diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/SessionException.java b/basic-exception/src/main/java/ink/wgink/exceptions/websocket/SessionException.java similarity index 93% rename from module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/SessionException.java rename to basic-exception/src/main/java/ink/wgink/exceptions/websocket/SessionException.java index e2c39646..e2bba129 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/SessionException.java +++ b/basic-exception/src/main/java/ink/wgink/exceptions/websocket/SessionException.java @@ -1,4 +1,4 @@ -package ink.wgink.module.instantmessage.websocket.exception; +package ink.wgink.exceptions.websocket; /** * When you feel like quitting. Think about why you started diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/TypeException.java b/basic-exception/src/main/java/ink/wgink/exceptions/websocket/TypeException.java similarity index 92% rename from module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/TypeException.java rename to basic-exception/src/main/java/ink/wgink/exceptions/websocket/TypeException.java index 749f855d..9468894d 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/TypeException.java +++ b/basic-exception/src/main/java/ink/wgink/exceptions/websocket/TypeException.java @@ -1,4 +1,4 @@ -package ink.wgink.module.instantmessage.websocket.exception; +package ink.wgink.exceptions.websocket; /** * When you feel like quitting. Think about why you started diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/UserException.java b/basic-exception/src/main/java/ink/wgink/exceptions/websocket/UserException.java similarity index 92% rename from module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/UserException.java rename to basic-exception/src/main/java/ink/wgink/exceptions/websocket/UserException.java index 5461e5ef..9fd2c8d5 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/UserException.java +++ b/basic-exception/src/main/java/ink/wgink/exceptions/websocket/UserException.java @@ -1,4 +1,4 @@ -package ink.wgink.module.instantmessage.websocket.exception; +package ink.wgink.exceptions.websocket; /** * When you feel like quitting. Think about why you started diff --git a/basic-interface/pom.xml b/basic-interface/pom.xml index a7703e4d..9bd3f426 100644 --- a/basic-interface/pom.xml +++ b/basic-interface/pom.xml @@ -22,6 +22,13 @@ basic-pojo 1.0-SNAPSHOT + + + io.netty + netty-all + provided + + diff --git a/basic-interface/src/main/java/ink/wgink/interfaces/manager/IAppManager.java b/basic-interface/src/main/java/ink/wgink/interfaces/manager/IAppManager.java index 7d8a538a..a33f04c3 100644 --- a/basic-interface/src/main/java/ink/wgink/interfaces/manager/IAppManager.java +++ b/basic-interface/src/main/java/ink/wgink/interfaces/manager/IAppManager.java @@ -41,4 +41,11 @@ public interface IAppManager { * @return */ List listCurrentUsers(); + + /** + * 设置redis token 管理器 + * + * @param redisAppTokenManager + */ + void setRedisAppTokenManager(IRedisAppTokenManager redisAppTokenManager); } diff --git a/basic-interface/src/main/java/ink/wgink/interfaces/manager/IRedisAppTokenManager.java b/basic-interface/src/main/java/ink/wgink/interfaces/manager/IRedisAppTokenManager.java new file mode 100644 index 00000000..d98167bd --- /dev/null +++ b/basic-interface/src/main/java/ink/wgink/interfaces/manager/IRedisAppTokenManager.java @@ -0,0 +1,46 @@ +package ink.wgink.interfaces.manager; + +import ink.wgink.pojo.app.AppToken; +import ink.wgink.pojo.app.AppTokenUser; + +import java.util.List; + +/** + * @ClassName: IRedisAppTokenManager + * @Description: redis app token 管理 + * @Author: wanggeng + * @Date: 2021/11/29 4:30 下午 + * @Version: 1.0 + */ +public interface IRedisAppTokenManager { + + String TOKEN_HASH_KEY = "app:tokens"; + + /** + * 获取token + * + * @param token + * @return + */ + AppToken getToken(String token); + + /** + * 添加token + * + * @param appToken + */ + void addToken(AppToken appToken); + + /** + * app当前在线用户 + * + * @return + */ + List listCurrentUsers(); + + /** + * 清理超时token + * @param clearTokenKeys + */ + void clearTimeoutToken(); +} diff --git a/basic-interface/src/main/java/ink/wgink/interfaces/manager/IWebSocketChannelManager.java b/basic-interface/src/main/java/ink/wgink/interfaces/manager/IWebSocketChannelManager.java new file mode 100644 index 00000000..a82e2979 --- /dev/null +++ b/basic-interface/src/main/java/ink/wgink/interfaces/manager/IWebSocketChannelManager.java @@ -0,0 +1,19 @@ +package ink.wgink.interfaces.manager; + +/** + * @ClassName: IWebSocketChannelManager + * @Description: websocket + * @Author: wanggeng + * @Date: 2021/11/29 5:56 下午 + * @Version: 1.0 + */ +public interface IWebSocketChannelManager { + + /** + * 设置redis socket channel 管理器 + * + * @param webSocketUserSessionManager + */ + void setWebSocketUserSessionManager(IWebSocketUserSessionManager webSocketUserSessionManager); + +} diff --git a/basic-interface/src/main/java/ink/wgink/interfaces/manager/IWebSocketUserSessionManager.java b/basic-interface/src/main/java/ink/wgink/interfaces/manager/IWebSocketUserSessionManager.java new file mode 100644 index 00000000..8eb125b7 --- /dev/null +++ b/basic-interface/src/main/java/ink/wgink/interfaces/manager/IWebSocketUserSessionManager.java @@ -0,0 +1,97 @@ +package ink.wgink.interfaces.manager; + +import ink.wgink.exceptions.websocket.SessionException; +import ink.wgink.pojo.session.WebSocketSession; +import io.netty.channel.Channel; + +import java.util.List; + +/** + * @ClassName: IRedisWebSocketChannelManager + * @Description: redis websocket channel 管理器 + * @Author: wanggeng + * @Date: 2021/11/29 5:57 下午 + * @Version: 1.0 + */ +public interface IWebSocketUserSessionManager { + + String CHANNEL_HASH_KEY = "websocket:channels"; + + /** + * 初始化 + * + * @param userId + * @param sessionId + * @param clientName + */ + void init(String userId, String sessionId, String clientName); + + /** + * 添加通道 + * + * @param sessionId + * @param userId + * @param channel + * @return + * @throws SessionException + */ + WebSocketSession addChannel(String sessionId, String userId, Channel channel) throws SessionException; + + /** + * 删除通道 + * + * @param channel + */ + void removeChannel(Channel channel); + + /** + * 在线会话列表 + * + * @param userId 用户ID + * @return + */ + List listOnlineUser(String userId); + + /** + * 在线会话 + * + * @param userId 用户ID + * @param clientName 客户端名称 + * @return + */ + WebSocketSession getOnlineUser(String userId, String clientName); + + /** + * 在线会话列表 + * + * @param userIds 用户ID列表 + * @return + */ + List listOnlineUser(List userIds); + + /** + * 在线会话列表 + * + * @param userIds 用户ID + * @param clientName 客户端名称 + * @return + */ + List listOnlineUser(List userIds, String clientName); + + /** + * 在线用户 + * + * @param sessionId 会话ID + * @return + */ + WebSocketSession getOnlineUserBySessionId(String sessionId); + + /** + * 通过通道获取用户 + * + * @param channelId + * @return + */ + WebSocketSession getOnlineUserByChannelId(String channelId); + +} diff --git a/basic-pojo/pom.xml b/basic-pojo/pom.xml index 7a563b0b..967ea9fd 100644 --- a/basic-pojo/pom.xml +++ b/basic-pojo/pom.xml @@ -96,6 +96,14 @@ 1.0-SNAPSHOT + + + + io.netty + netty-all + provided + + \ No newline at end of file diff --git a/basic-pojo/src/main/java/ink/wgink/pojo/app/AppToken.java b/basic-pojo/src/main/java/ink/wgink/pojo/app/AppToken.java index 60978204..d0116150 100644 --- a/basic-pojo/src/main/java/ink/wgink/pojo/app/AppToken.java +++ b/basic-pojo/src/main/java/ink/wgink/pojo/app/AppToken.java @@ -1,5 +1,7 @@ package ink.wgink.pojo.app; +import java.io.Serializable; + /** * When you feel like quitting. Think about why you started * 当你想要放弃的时候,想想当初你为何开始 @@ -10,8 +12,9 @@ package ink.wgink.pojo.app; * @Date: 2019-08-02 11:19 * @Version: 1.0 **/ -public class AppToken { +public class AppToken implements Serializable { + private static final long serialVersionUID = 1338652146163739437L; private String token; private AppTokenTypeEnum appTokenTypeEnum; private long lastTime; diff --git a/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUser.java b/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUser.java index 9967e16c..c38112a8 100644 --- a/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUser.java +++ b/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUser.java @@ -1,5 +1,6 @@ package ink.wgink.pojo.app; +import java.io.Serializable; import java.util.List; /** @@ -12,8 +13,9 @@ import java.util.List; * @Date: 2019-08-02 11:20 * @Version: 1.0 **/ -public class AppTokenUser { +public class AppTokenUser implements Serializable { + private static final long serialVersionUID = -1803419036314192907L; private String id; private String name; private String avatar; diff --git a/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUserDepartment.java b/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUserDepartment.java index e1e98fec..e2dfe22b 100644 --- a/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUserDepartment.java +++ b/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUserDepartment.java @@ -1,5 +1,7 @@ package ink.wgink.pojo.app; +import java.io.Serializable; + /** * When you feel like quitting. Think about why you started * 当你想要放弃的时候,想想当初你为何开始 @@ -10,8 +12,9 @@ package ink.wgink.pojo.app; * @Date: 2019-08-10 14:21 * @Version: 1.0 **/ -public class AppTokenUserDepartment { +public class AppTokenUserDepartment implements Serializable { + private static final long serialVersionUID = -4352969540408749470L; private String departmentId; private String departmentName; diff --git a/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUserGroup.java b/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUserGroup.java index 8645acb0..b4e6cc21 100644 --- a/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUserGroup.java +++ b/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUserGroup.java @@ -1,5 +1,7 @@ package ink.wgink.pojo.app; +import java.io.Serializable; + /** * When you feel like quitting. Think about why you started * 当你想要放弃的时候,想想当初你为何开始 @@ -10,8 +12,9 @@ package ink.wgink.pojo.app; * @Date: 2020/2/10 9:33 下午 * @Version: 1.0 **/ -public class AppTokenUserGroup { +public class AppTokenUserGroup implements Serializable { + private static final long serialVersionUID = -4130999187990640889L; private String groupId; private String groupName; diff --git a/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUserPosition.java b/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUserPosition.java index b07e0ad8..fa1ca20f 100644 --- a/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUserPosition.java +++ b/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUserPosition.java @@ -1,5 +1,7 @@ package ink.wgink.pojo.app; +import java.io.Serializable; + /** * When you feel like quitting. Think about why you started * 当你想要放弃的时候,想想当初你为何开始 @@ -10,8 +12,9 @@ package ink.wgink.pojo.app; * @Date: 2019-08-10 14:22 * @Version: 1.0 **/ -public class AppTokenUserPosition { +public class AppTokenUserPosition implements Serializable { + private static final long serialVersionUID = -3348976168384827012L; private String positionId; private String positionName; diff --git a/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUserRole.java b/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUserRole.java index 06e2cdfd..b8ea536d 100644 --- a/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUserRole.java +++ b/basic-pojo/src/main/java/ink/wgink/pojo/app/AppTokenUserRole.java @@ -1,5 +1,7 @@ package ink.wgink.pojo.app; +import java.io.Serializable; + /** * When you feel like quitting. Think about why you started * 当你想要放弃的时候,想想当初你为何开始 @@ -10,8 +12,9 @@ package ink.wgink.pojo.app; * @Date: 2019-08-10 12:25 * @Version: 1.0 **/ -public class AppTokenUserRole { +public class AppTokenUserRole implements Serializable { + private static final long serialVersionUID = 2153811587508975532L; private String roleId; private String roleName; diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/WebSocketSession.java b/basic-pojo/src/main/java/ink/wgink/pojo/session/WebSocketSession.java similarity index 93% rename from module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/WebSocketSession.java rename to basic-pojo/src/main/java/ink/wgink/pojo/session/WebSocketSession.java index 9a0e2b7b..348162c7 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/WebSocketSession.java +++ b/basic-pojo/src/main/java/ink/wgink/pojo/session/WebSocketSession.java @@ -1,7 +1,9 @@ -package ink.wgink.module.instantmessage.websocket.pojo; +package ink.wgink.pojo.session; import io.netty.channel.Channel; +import java.io.Serializable; + /** * @ClassName: WebSocketUser * @Description: webSocket用户 @@ -9,7 +11,7 @@ import io.netty.channel.Channel; * @Date: 2021/9/11 11:28 下午 * @Version: 1.0 */ -public class WebSocketSession { +public class WebSocketSession implements Serializable { /** * 会话ID,登录时返回,socket注册时携带 diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/IMessageService.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/IMessageService.java index 29905729..e5b1e463 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/IMessageService.java +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/IMessageService.java @@ -4,7 +4,7 @@ import ink.wgink.module.instantmessage.pojo.bos.NoticeBO; import ink.wgink.module.instantmessage.pojo.bos.NoticeTargetBO; import ink.wgink.module.instantmessage.pojo.vos.NoticeSendVO; import ink.wgink.module.instantmessage.pojo.vos.NoticeVO; -import ink.wgink.module.instantmessage.websocket.pojo.WebSocketSession; +import ink.wgink.pojo.session.WebSocketSession; import java.util.List; diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/IWebSocketTextCustomService.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/IWebSocketTextCustomService.java index b97e7812..1495613f 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/IWebSocketTextCustomService.java +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/IWebSocketTextCustomService.java @@ -1,6 +1,6 @@ package ink.wgink.module.instantmessage.service; -import ink.wgink.module.instantmessage.websocket.exception.CustomHandleException; +import ink.wgink.exceptions.websocket.CustomHandleException; import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage; import io.netty.channel.Channel; diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/impl/MessageServiceImpl.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/impl/MessageServiceImpl.java index d96dac4c..1aadc6d6 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/impl/MessageServiceImpl.java +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/impl/MessageServiceImpl.java @@ -15,7 +15,7 @@ import ink.wgink.module.instantmessage.service.INoticeService; import ink.wgink.module.instantmessage.websocket.enums.MessageTypeEnum; import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager; import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage; -import ink.wgink.module.instantmessage.websocket.pojo.WebSocketSession; +import ink.wgink.pojo.session.WebSocketSession; import ink.wgink.module.instantmessage.websocket.pojo.body.CountNeedToDealWithBody; import ink.wgink.module.instantmessage.websocket.pojo.body.NoticeBody; import ink.wgink.pojo.dtos.user.UserDTO; diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/startup/WebSocketStartUp.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/startup/WebSocketStartUp.java index f850d2e4..5240d4c3 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/startup/WebSocketStartUp.java +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/startup/WebSocketStartUp.java @@ -1,5 +1,7 @@ package ink.wgink.module.instantmessage.startup; +import ink.wgink.interfaces.manager.IWebSocketUserSessionManager; +import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager; import ink.wgink.module.instantmessage.websocket.server.WebSocketServer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; @@ -18,9 +20,12 @@ public class WebSocketStartUp implements ApplicationRunner { @Autowired private WebSocketServer webSocketServer; + @Autowired + private IWebSocketUserSessionManager webSocketUserSessionManager; @Override public void run(ApplicationArguments args) throws Exception { + WebSocketChannelManager.getInstance().setWebSocketUserSessionManager(webSocketUserSessionManager); new Thread(webSocketServer).start(); } diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/handler/text/WebSocketTextHandler.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/handler/text/WebSocketTextHandler.java index 77b90f26..3dc60e71 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/handler/text/WebSocketTextHandler.java +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/handler/text/WebSocketTextHandler.java @@ -2,14 +2,14 @@ package ink.wgink.module.instantmessage.websocket.handler.text; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; +import ink.wgink.exceptions.websocket.*; import ink.wgink.module.instantmessage.service.IMessageService; import ink.wgink.module.instantmessage.websocket.enums.MessageTypeEnum; import ink.wgink.module.instantmessage.websocket.enums.StatusEnum; -import ink.wgink.module.instantmessage.websocket.exception.*; import ink.wgink.module.instantmessage.service.IWebSocketTextCustomService; import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager; import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage; -import ink.wgink.module.instantmessage.websocket.pojo.WebSocketSession; +import ink.wgink.pojo.session.WebSocketSession; import ink.wgink.module.instantmessage.websocket.pojo.body.IdsBody; import ink.wgink.module.instantmessage.websocket.pojo.body.RegisterBody; import ink.wgink.module.instantmessage.websocket.pojo.body.StatusBody; diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/manager/DefaultWebSocketUserSessionManager.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/manager/DefaultWebSocketUserSessionManager.java new file mode 100644 index 00000000..c071e2b5 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/manager/DefaultWebSocketUserSessionManager.java @@ -0,0 +1,164 @@ +package ink.wgink.module.instantmessage.websocket.manager; + +import ink.wgink.exceptions.websocket.SessionException; +import ink.wgink.interfaces.manager.IWebSocketUserSessionManager; +import ink.wgink.pojo.session.WebSocketSession; +import io.netty.channel.Channel; +import org.apache.commons.lang3.StringUtils; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * @ClassName: DefaultWebSocketChannelManager + * @Description: 默认websocket管理器 + * @Author: wanggeng + * @Date: 2021/11/30 9:46 上午 + * @Version: 1.0 + */ +@Service +public class DefaultWebSocketUserSessionManager implements IWebSocketUserSessionManager { + + private ConcurrentMap> webSocketUserMap = new ConcurrentHashMap<>(); + + @Override + public void init(String userId, String sessionId, String clientName) { + WebSocketSession onlineUser = getOnlineUser(userId, clientName); + if (onlineUser == null) { + onlineUser = new WebSocketSession(); + onlineUser.setUserId(userId); + onlineUser.setClientName(clientName); + // 更新到session队列中 + List webSocketSessions = listOnlineUser(userId); + webSocketSessions.add(onlineUser); + webSocketUserMap.put(userId, webSocketSessions); + } + // 更新最后时间 + onlineUser.setSessionId(sessionId); + onlineUser.setUpdateTime(System.currentTimeMillis()); + } + + @Override + public WebSocketSession addChannel(String sessionId, String userId, Channel channel) throws SessionException { + WebSocketSession onlineUser = getOnlineUserBySessionId(sessionId); + if (onlineUser == null) { + throw new SessionException("无效会话,请登录"); + } + if (!StringUtils.equals(userId, onlineUser.getUserId())) { + throw new SessionException("from错误"); + } + // 更新会话通道 + onlineUser.setChannel(channel); + onlineUser.setChannelId(channel.id().asLongText()); + return onlineUser; + } + + @Override + public void removeChannel(Channel channel) { + // 获取session + for (Map.Entry> kv : webSocketUserMap.entrySet()) { + List webSocketSessions = kv.getValue(); + for (int i = 0; i < webSocketSessions.size(); i++) { + WebSocketSession webSocketSession = webSocketSessions.get(i); + if (StringUtils.equals(webSocketSession.getChannelId(), channel.id().asLongText())) { + webSocketSessions.remove(i); + return; + } + } + } + } + + @Override + public List listOnlineUser(String userId) { + if (StringUtils.isBlank(userId)) { + return new ArrayList<>(); + } + List webSocketSessions = webSocketUserMap.get(userId); + if (webSocketSessions == null) { + return new ArrayList<>(); + } + return webSocketSessions; + } + + @Override + public WebSocketSession getOnlineUser(String userId, String clientName) { + if (StringUtils.isBlank(userId) || StringUtils.isBlank(clientName)) { + return null; + } + List webSocketSessions = listOnlineUser(userId); + if (webSocketSessions.isEmpty()) { + return null; + } + for (WebSocketSession webSocketSession : webSocketSessions) { + if (StringUtils.equals(clientName, webSocketSession.getClientName())) { + return webSocketSession; + } + } + return null; + } + + @Override + public List listOnlineUser(List userIds) { + if (userIds == null || userIds.isEmpty()) { + return new ArrayList<>(); + } + List webSocketSessions = new ArrayList<>(); + for (String userId : userIds) { + List users = webSocketUserMap.get(userId); + if (users != null) { + webSocketSessions.addAll(users); + } + } + return webSocketSessions; + } + + @Override + public List listOnlineUser(List userIds, String clientName) { + if (userIds == null || userIds.isEmpty() || StringUtils.isBlank(clientName)) { + return new ArrayList<>(); + } + List webSocketSessions = new ArrayList<>(); + for (String userId : userIds) { + List users = webSocketUserMap.get(userId); + if (users == null) { + continue; + } + for (WebSocketSession user : users) { + if (StringUtils.equals(clientName, user.getClientName())) { + webSocketSessions.add(user); + } + } + } + return webSocketSessions; + } + + @Override + public WebSocketSession getOnlineUserBySessionId(String sessionId) { + for (Map.Entry> kv : webSocketUserMap.entrySet()) { + for (WebSocketSession webSocketSession : kv.getValue()) { + if (StringUtils.equals(sessionId, webSocketSession.getSessionId())) { + return webSocketSession; + } + } + } + return null; + } + + @Override + public WebSocketSession getOnlineUserByChannelId(String channelId) { + for (Map.Entry> kv : webSocketUserMap.entrySet()) { + for (WebSocketSession webSocketSession : kv.getValue()) { + if (StringUtils.equals(channelId, webSocketSession.getChannelId())) { + return webSocketSession; + } + } + } + return null; + } + + +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/manager/WebSocketChannelManager.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/manager/WebSocketChannelManager.java index 681b7e04..252844fb 100644 --- a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/manager/WebSocketChannelManager.java +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/manager/WebSocketChannelManager.java @@ -1,21 +1,18 @@ package ink.wgink.module.instantmessage.websocket.manager; import com.alibaba.fastjson.JSONObject; -import ink.wgink.module.instantmessage.websocket.exception.SessionException; +import ink.wgink.exceptions.websocket.SessionException; +import ink.wgink.interfaces.manager.IWebSocketUserSessionManager; +import ink.wgink.interfaces.manager.IWebSocketChannelManager; import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage; -import ink.wgink.module.instantmessage.websocket.pojo.WebSocketSession; +import ink.wgink.pojo.session.WebSocketSession; import io.netty.channel.Channel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; -import org.apache.commons.lang3.StringUtils; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; /** * @ClassName: WebSocketChannelManager @@ -24,14 +21,13 @@ import java.util.concurrent.ConcurrentMap; * @Date: 2021/9/11 11:56 上午 * @Version: 1.0 */ -public class WebSocketChannelManager { +public class WebSocketChannelManager implements IWebSocketChannelManager { public static final String FORM_SYSTEM = "SYSTEM"; private static final WebSocketChannelManager webSocketChannelManager = WebSocketChannelManagerBuilder.webSocketChannelManager; private ChannelGroup globalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); - private ConcurrentMap> webSocketUserMap = new ConcurrentHashMap<>(); + private IWebSocketUserSessionManager webSocketUserSessionManager; - private WebSocketChannelManager() { - } + private WebSocketChannelManager() {} public static WebSocketChannelManager getInstance() { return webSocketChannelManager; @@ -45,23 +41,12 @@ public class WebSocketChannelManager { * @param clientName 客户端名称 */ public void init(String userId, String sessionId, String clientName) { - WebSocketSession onlineUser = getOnlineUser(userId, clientName); - if (onlineUser == null) { - onlineUser = new WebSocketSession(); - onlineUser.setUserId(userId); - onlineUser.setClientName(clientName); - // 更新到session队列中 - List webSocketSessions = listOnlineUser(userId); - webSocketSessions.add(onlineUser); - webSocketUserMap.put(userId, webSocketSessions); - } - // 更新最后时间 - onlineUser.setSessionId(sessionId); - onlineUser.setUpdateTime(System.currentTimeMillis()); + webSocketUserSessionManager.init(userId, sessionId, clientName); } /** * 添加通道,绑定通道与会话 + * * @param sessionId * @param userId * @param channel @@ -69,32 +54,19 @@ public class WebSocketChannelManager { * @throws SessionException */ public WebSocketSession addChannel(String sessionId, String userId, Channel channel) throws SessionException { - WebSocketSession onlineUser = getOnlineUserBySessionId(sessionId); - if (onlineUser == null) { - throw new SessionException("无效会话,请登录"); - } - if (!StringUtils.equals(userId, onlineUser.getUserId())) { - throw new SessionException("from错误"); - } - // 更新会话通道 - onlineUser.setChannel(channel); - onlineUser.setChannelId(channel.id().asLongText()); + WebSocketSession onlineUser = webSocketUserSessionManager.addChannel(sessionId, userId, channel); globalGroup.add(channel); return onlineUser; } + /** + * 删除通道 + * + * @param channel + */ public void removeChannel(Channel channel) { + webSocketUserSessionManager.removeChannel(channel); globalGroup.remove(channel); - for (Map.Entry> kv : webSocketUserMap.entrySet()) { - List webSocketSessions = kv.getValue(); - for (int i = 0; i < webSocketSessions.size(); i++) { - WebSocketSession webSocketSession = webSocketSessions.get(i); - if (StringUtils.equals(webSocketSession.getChannelId(), channel.id().asLongText())) { - webSocketSessions.remove(i); - return; - } - } - } } /** @@ -104,17 +76,9 @@ public class WebSocketChannelManager { * @return */ public List listOnlineUser(String userId) { - if (StringUtils.isBlank(userId)) { - return new ArrayList<>(); - } - List webSocketSessions = webSocketUserMap.get(userId); - if (webSocketSessions == null) { - return new ArrayList<>(); - } - return webSocketSessions; + return webSocketUserSessionManager.listOnlineUser(userId); } - /** * 获取在线用户 * @@ -123,19 +87,7 @@ public class WebSocketChannelManager { * @return */ public WebSocketSession getOnlineUser(String userId, String clientName) { - if (StringUtils.isBlank(userId) || StringUtils.isBlank(clientName)) { - return null; - } - List webSocketSessions = listOnlineUser(userId); - if (webSocketSessions.isEmpty()) { - return null; - } - for (WebSocketSession webSocketSession : webSocketSessions) { - if (StringUtils.equals(clientName, webSocketSession.getClientName())) { - return webSocketSession; - } - } - return null; + return webSocketUserSessionManager.getOnlineUser(userId, clientName); } /** @@ -145,17 +97,7 @@ public class WebSocketChannelManager { * @return */ public List listOnlineUser(List userIds) { - if (userIds == null || userIds.isEmpty()) { - return new ArrayList<>(); - } - List webSocketSessions = new ArrayList<>(); - for (String userId : userIds) { - List users = webSocketUserMap.get(userId); - if (users != null) { - webSocketSessions.addAll(users); - } - } - return webSocketSessions; + return webSocketUserSessionManager.listOnlineUser(userIds); } /** @@ -166,22 +108,7 @@ public class WebSocketChannelManager { * @return */ public List listOnlineUser(List userIds, String clientName) { - if (userIds == null || userIds.isEmpty() || StringUtils.isBlank(clientName)) { - return new ArrayList<>(); - } - List webSocketSessions = new ArrayList<>(); - for (String userId : userIds) { - List users = webSocketUserMap.get(userId); - if (users == null) { - continue; - } - for (WebSocketSession user : users) { - if (StringUtils.equals(clientName, user.getClientName())) { - webSocketSessions.add(user); - } - } - } - return webSocketSessions; + return webSocketUserSessionManager.listOnlineUser(userIds, clientName); } /** @@ -191,14 +118,7 @@ public class WebSocketChannelManager { * @return */ private WebSocketSession getOnlineUserBySessionId(String sessionId) { - for (Map.Entry> kv : webSocketUserMap.entrySet()) { - for (WebSocketSession webSocketSession : kv.getValue()) { - if (StringUtils.equals(sessionId, webSocketSession.getSessionId())) { - return webSocketSession; - } - } - } - return null; + return webSocketUserSessionManager.getOnlineUserBySessionId(sessionId); } /** @@ -208,14 +128,7 @@ public class WebSocketChannelManager { * @return */ private WebSocketSession getOnlineUserByChannelId(String channelId) { - for (Map.Entry> kv : webSocketUserMap.entrySet()) { - for (WebSocketSession webSocketSession : kv.getValue()) { - if (StringUtils.equals(channelId, webSocketSession.getChannelId())) { - return webSocketSession; - } - } - } - return null; + return webSocketUserSessionManager.getOnlineUserByChannelId(channelId); } /** @@ -253,6 +166,11 @@ public class WebSocketChannelManager { } } + @Override + public void setWebSocketUserSessionManager(IWebSocketUserSessionManager webSocketUserSessionManager) { + this.webSocketUserSessionManager = webSocketUserSessionManager; + } + private static class WebSocketChannelManagerBuilder { public static final WebSocketChannelManager webSocketChannelManager = new WebSocketChannelManager(); } diff --git a/pom.xml b/pom.xml index 1962ed0f..a83a67bc 100644 --- a/pom.xml +++ b/pom.xml @@ -41,6 +41,7 @@ login-oauth2-client module-oauth2-client mongo-module-dictionary + redis-cache pom @@ -70,9 +71,9 @@ 3.4 2.11.0 3.3 + 1.18 1.3.1 1.12 - 1.18 1.9.4 4.5.13 1.3.18 @@ -92,6 +93,7 @@ 1.10 7.0.2 3.2.5 + 2.5.5 2.3.31 @@ -521,6 +523,14 @@ + + + org.springframework.data + spring-data-redis + ${redis.version} + + + org.freemarker