调整了代码结构

This commit is contained in:
wanggeng 2021-11-30 13:59:49 +08:00
parent 87a4d7270b
commit 186d3dcc68
27 changed files with 429 additions and 129 deletions

View File

@ -1,4 +1,4 @@
package ink.wgink.module.instantmessage.websocket.exception; package ink.wgink.exceptions.websocket;
/** /**
* When you feel like quitting. Think about why you started * When you feel like quitting. Think about why you started

View File

@ -1,4 +1,4 @@
package ink.wgink.module.instantmessage.websocket.exception; package ink.wgink.exceptions.websocket;
/** /**
* When you feel like quitting. Think about why you started * When you feel like quitting. Think about why you started

View File

@ -1,4 +1,4 @@
package ink.wgink.module.instantmessage.websocket.exception; package ink.wgink.exceptions.websocket;
/** /**
* @ClassName: CustomHandleException * @ClassName: CustomHandleException

View File

@ -1,4 +1,4 @@
package ink.wgink.module.instantmessage.websocket.exception; package ink.wgink.exceptions.websocket;
/** /**
* When you feel like quitting. Think about why you started * When you feel like quitting. Think about why you started

View File

@ -1,4 +1,4 @@
package ink.wgink.module.instantmessage.websocket.exception; package ink.wgink.exceptions.websocket;
/** /**
* When you feel like quitting. Think about why you started * When you feel like quitting. Think about why you started

View File

@ -1,4 +1,4 @@
package ink.wgink.module.instantmessage.websocket.exception; package ink.wgink.exceptions.websocket;
/** /**
* When you feel like quitting. Think about why you started * When you feel like quitting. Think about why you started

View File

@ -22,6 +22,13 @@
<artifactId>basic-pojo</artifactId> <artifactId>basic-pojo</artifactId>
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
</dependency> </dependency>
<!-- netty start -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<scope>provided</scope>
</dependency>
<!-- netty end -->
</dependencies> </dependencies>

View File

@ -41,4 +41,11 @@ public interface IAppManager {
* @return * @return
*/ */
List<AppTokenUser> listCurrentUsers(); List<AppTokenUser> listCurrentUsers();
/**
* 设置redis token 管理器
*
* @param redisAppTokenManager
*/
void setRedisAppTokenManager(IRedisAppTokenManager redisAppTokenManager);
} }

View File

@ -0,0 +1,46 @@
package ink.wgink.interfaces.manager;
import ink.wgink.pojo.app.AppToken;
import ink.wgink.pojo.app.AppTokenUser;
import java.util.List;
/**
* @ClassName: IRedisAppTokenManager
* @Description: redis app token 管理
* @Author: wanggeng
* @Date: 2021/11/29 4:30 下午
* @Version: 1.0
*/
public interface IRedisAppTokenManager {
String TOKEN_HASH_KEY = "app:tokens";
/**
* 获取token
*
* @param token
* @return
*/
AppToken getToken(String token);
/**
* 添加token
*
* @param appToken
*/
void addToken(AppToken appToken);
/**
* app当前在线用户
*
* @return
*/
List<AppTokenUser> listCurrentUsers();
/**
* 清理超时token
* @param clearTokenKeys
*/
void clearTimeoutToken();
}

View File

@ -0,0 +1,19 @@
package ink.wgink.interfaces.manager;
/**
* @ClassName: IWebSocketChannelManager
* @Description: websocket
* @Author: wanggeng
* @Date: 2021/11/29 5:56 下午
* @Version: 1.0
*/
public interface IWebSocketChannelManager {
/**
* 设置redis socket channel 管理器
*
* @param webSocketUserSessionManager
*/
void setWebSocketUserSessionManager(IWebSocketUserSessionManager webSocketUserSessionManager);
}

View File

@ -0,0 +1,97 @@
package ink.wgink.interfaces.manager;
import ink.wgink.exceptions.websocket.SessionException;
import ink.wgink.pojo.session.WebSocketSession;
import io.netty.channel.Channel;
import java.util.List;
/**
* @ClassName: IRedisWebSocketChannelManager
* @Description: redis websocket channel 管理器
* @Author: wanggeng
* @Date: 2021/11/29 5:57 下午
* @Version: 1.0
*/
public interface IWebSocketUserSessionManager {
String CHANNEL_HASH_KEY = "websocket:channels";
/**
* 初始化
*
* @param userId
* @param sessionId
* @param clientName
*/
void init(String userId, String sessionId, String clientName);
/**
* 添加通道
*
* @param sessionId
* @param userId
* @param channel
* @return
* @throws SessionException
*/
WebSocketSession addChannel(String sessionId, String userId, Channel channel) throws SessionException;
/**
* 删除通道
*
* @param channel
*/
void removeChannel(Channel channel);
/**
* 在线会话列表
*
* @param userId 用户ID
* @return
*/
List<WebSocketSession> listOnlineUser(String userId);
/**
* 在线会话
*
* @param userId 用户ID
* @param clientName 客户端名称
* @return
*/
WebSocketSession getOnlineUser(String userId, String clientName);
/**
* 在线会话列表
*
* @param userIds 用户ID列表
* @return
*/
List<WebSocketSession> listOnlineUser(List<String> userIds);
/**
* 在线会话列表
*
* @param userIds 用户ID
* @param clientName 客户端名称
* @return
*/
List<WebSocketSession> listOnlineUser(List<String> userIds, String clientName);
/**
* 在线用户
*
* @param sessionId 会话ID
* @return
*/
WebSocketSession getOnlineUserBySessionId(String sessionId);
/**
* 通过通道获取用户
*
* @param channelId
* @return
*/
WebSocketSession getOnlineUserByChannelId(String channelId);
}

View File

@ -96,6 +96,14 @@
<version>1.0-SNAPSHOT</version> <version>1.0-SNAPSHOT</version>
</dependency> </dependency>
<!-- wgink end --> <!-- wgink end -->
<!-- netty start -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<scope>provided</scope>
</dependency>
<!-- netty end -->
</dependencies> </dependencies>
</project> </project>

View File

@ -1,5 +1,7 @@
package ink.wgink.pojo.app; package ink.wgink.pojo.app;
import java.io.Serializable;
/** /**
* When you feel like quitting. Think about why you started * When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始 * 当你想要放弃的时候想想当初你为何开始
@ -10,8 +12,9 @@ package ink.wgink.pojo.app;
* @Date: 2019-08-02 11:19 * @Date: 2019-08-02 11:19
* @Version: 1.0 * @Version: 1.0
**/ **/
public class AppToken { public class AppToken implements Serializable {
private static final long serialVersionUID = 1338652146163739437L;
private String token; private String token;
private AppTokenTypeEnum appTokenTypeEnum; private AppTokenTypeEnum appTokenTypeEnum;
private long lastTime; private long lastTime;

View File

@ -1,5 +1,6 @@
package ink.wgink.pojo.app; package ink.wgink.pojo.app;
import java.io.Serializable;
import java.util.List; import java.util.List;
/** /**
@ -12,8 +13,9 @@ import java.util.List;
* @Date: 2019-08-02 11:20 * @Date: 2019-08-02 11:20
* @Version: 1.0 * @Version: 1.0
**/ **/
public class AppTokenUser { public class AppTokenUser implements Serializable {
private static final long serialVersionUID = -1803419036314192907L;
private String id; private String id;
private String name; private String name;
private String avatar; private String avatar;

View File

@ -1,5 +1,7 @@
package ink.wgink.pojo.app; package ink.wgink.pojo.app;
import java.io.Serializable;
/** /**
* When you feel like quitting. Think about why you started * When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始 * 当你想要放弃的时候想想当初你为何开始
@ -10,8 +12,9 @@ package ink.wgink.pojo.app;
* @Date: 2019-08-10 14:21 * @Date: 2019-08-10 14:21
* @Version: 1.0 * @Version: 1.0
**/ **/
public class AppTokenUserDepartment { public class AppTokenUserDepartment implements Serializable {
private static final long serialVersionUID = -4352969540408749470L;
private String departmentId; private String departmentId;
private String departmentName; private String departmentName;

View File

@ -1,5 +1,7 @@
package ink.wgink.pojo.app; package ink.wgink.pojo.app;
import java.io.Serializable;
/** /**
* When you feel like quitting. Think about why you started * When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始 * 当你想要放弃的时候想想当初你为何开始
@ -10,8 +12,9 @@ package ink.wgink.pojo.app;
* @Date: 2020/2/10 9:33 下午 * @Date: 2020/2/10 9:33 下午
* @Version: 1.0 * @Version: 1.0
**/ **/
public class AppTokenUserGroup { public class AppTokenUserGroup implements Serializable {
private static final long serialVersionUID = -4130999187990640889L;
private String groupId; private String groupId;
private String groupName; private String groupName;

View File

@ -1,5 +1,7 @@
package ink.wgink.pojo.app; package ink.wgink.pojo.app;
import java.io.Serializable;
/** /**
* When you feel like quitting. Think about why you started * When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始 * 当你想要放弃的时候想想当初你为何开始
@ -10,8 +12,9 @@ package ink.wgink.pojo.app;
* @Date: 2019-08-10 14:22 * @Date: 2019-08-10 14:22
* @Version: 1.0 * @Version: 1.0
**/ **/
public class AppTokenUserPosition { public class AppTokenUserPosition implements Serializable {
private static final long serialVersionUID = -3348976168384827012L;
private String positionId; private String positionId;
private String positionName; private String positionName;

View File

@ -1,5 +1,7 @@
package ink.wgink.pojo.app; package ink.wgink.pojo.app;
import java.io.Serializable;
/** /**
* When you feel like quitting. Think about why you started * When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始 * 当你想要放弃的时候想想当初你为何开始
@ -10,8 +12,9 @@ package ink.wgink.pojo.app;
* @Date: 2019-08-10 12:25 * @Date: 2019-08-10 12:25
* @Version: 1.0 * @Version: 1.0
**/ **/
public class AppTokenUserRole { public class AppTokenUserRole implements Serializable {
private static final long serialVersionUID = 2153811587508975532L;
private String roleId; private String roleId;
private String roleName; private String roleName;

View File

@ -1,7 +1,9 @@
package ink.wgink.module.instantmessage.websocket.pojo; package ink.wgink.pojo.session;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import java.io.Serializable;
/** /**
* @ClassName: WebSocketUser * @ClassName: WebSocketUser
* @Description: webSocket用户 * @Description: webSocket用户
@ -9,7 +11,7 @@ import io.netty.channel.Channel;
* @Date: 2021/9/11 11:28 下午 * @Date: 2021/9/11 11:28 下午
* @Version: 1.0 * @Version: 1.0
*/ */
public class WebSocketSession { public class WebSocketSession implements Serializable {
/** /**
* 会话ID登录时返回socket注册时携带 * 会话ID登录时返回socket注册时携带

View File

@ -4,7 +4,7 @@ import ink.wgink.module.instantmessage.pojo.bos.NoticeBO;
import ink.wgink.module.instantmessage.pojo.bos.NoticeTargetBO; import ink.wgink.module.instantmessage.pojo.bos.NoticeTargetBO;
import ink.wgink.module.instantmessage.pojo.vos.NoticeSendVO; import ink.wgink.module.instantmessage.pojo.vos.NoticeSendVO;
import ink.wgink.module.instantmessage.pojo.vos.NoticeVO; import ink.wgink.module.instantmessage.pojo.vos.NoticeVO;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketSession; import ink.wgink.pojo.session.WebSocketSession;
import java.util.List; import java.util.List;

View File

@ -1,6 +1,6 @@
package ink.wgink.module.instantmessage.service; package ink.wgink.module.instantmessage.service;
import ink.wgink.module.instantmessage.websocket.exception.CustomHandleException; import ink.wgink.exceptions.websocket.CustomHandleException;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage; import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage;
import io.netty.channel.Channel; import io.netty.channel.Channel;

View File

@ -15,7 +15,7 @@ import ink.wgink.module.instantmessage.service.INoticeService;
import ink.wgink.module.instantmessage.websocket.enums.MessageTypeEnum; import ink.wgink.module.instantmessage.websocket.enums.MessageTypeEnum;
import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager; import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage; import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketSession; import ink.wgink.pojo.session.WebSocketSession;
import ink.wgink.module.instantmessage.websocket.pojo.body.CountNeedToDealWithBody; import ink.wgink.module.instantmessage.websocket.pojo.body.CountNeedToDealWithBody;
import ink.wgink.module.instantmessage.websocket.pojo.body.NoticeBody; import ink.wgink.module.instantmessage.websocket.pojo.body.NoticeBody;
import ink.wgink.pojo.dtos.user.UserDTO; import ink.wgink.pojo.dtos.user.UserDTO;

View File

@ -1,5 +1,7 @@
package ink.wgink.module.instantmessage.startup; package ink.wgink.module.instantmessage.startup;
import ink.wgink.interfaces.manager.IWebSocketUserSessionManager;
import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager;
import ink.wgink.module.instantmessage.websocket.server.WebSocketServer; import ink.wgink.module.instantmessage.websocket.server.WebSocketServer;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationArguments;
@ -18,9 +20,12 @@ public class WebSocketStartUp implements ApplicationRunner {
@Autowired @Autowired
private WebSocketServer webSocketServer; private WebSocketServer webSocketServer;
@Autowired
private IWebSocketUserSessionManager webSocketUserSessionManager;
@Override @Override
public void run(ApplicationArguments args) throws Exception { public void run(ApplicationArguments args) throws Exception {
WebSocketChannelManager.getInstance().setWebSocketUserSessionManager(webSocketUserSessionManager);
new Thread(webSocketServer).start(); new Thread(webSocketServer).start();
} }

View File

@ -2,14 +2,14 @@ package ink.wgink.module.instantmessage.websocket.handler.text;
import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import ink.wgink.exceptions.websocket.*;
import ink.wgink.module.instantmessage.service.IMessageService; import ink.wgink.module.instantmessage.service.IMessageService;
import ink.wgink.module.instantmessage.websocket.enums.MessageTypeEnum; import ink.wgink.module.instantmessage.websocket.enums.MessageTypeEnum;
import ink.wgink.module.instantmessage.websocket.enums.StatusEnum; import ink.wgink.module.instantmessage.websocket.enums.StatusEnum;
import ink.wgink.module.instantmessage.websocket.exception.*;
import ink.wgink.module.instantmessage.service.IWebSocketTextCustomService; import ink.wgink.module.instantmessage.service.IWebSocketTextCustomService;
import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager; import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage; import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketSession; import ink.wgink.pojo.session.WebSocketSession;
import ink.wgink.module.instantmessage.websocket.pojo.body.IdsBody; import ink.wgink.module.instantmessage.websocket.pojo.body.IdsBody;
import ink.wgink.module.instantmessage.websocket.pojo.body.RegisterBody; import ink.wgink.module.instantmessage.websocket.pojo.body.RegisterBody;
import ink.wgink.module.instantmessage.websocket.pojo.body.StatusBody; import ink.wgink.module.instantmessage.websocket.pojo.body.StatusBody;

View File

@ -0,0 +1,164 @@
package ink.wgink.module.instantmessage.websocket.manager;
import ink.wgink.exceptions.websocket.SessionException;
import ink.wgink.interfaces.manager.IWebSocketUserSessionManager;
import ink.wgink.pojo.session.WebSocketSession;
import io.netty.channel.Channel;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @ClassName: DefaultWebSocketChannelManager
* @Description: 默认websocket管理器
* @Author: wanggeng
* @Date: 2021/11/30 9:46 上午
* @Version: 1.0
*/
@Service
public class DefaultWebSocketUserSessionManager implements IWebSocketUserSessionManager {
private ConcurrentMap<String, List<WebSocketSession>> webSocketUserMap = new ConcurrentHashMap<>();
@Override
public void init(String userId, String sessionId, String clientName) {
WebSocketSession onlineUser = getOnlineUser(userId, clientName);
if (onlineUser == null) {
onlineUser = new WebSocketSession();
onlineUser.setUserId(userId);
onlineUser.setClientName(clientName);
// 更新到session队列中
List<WebSocketSession> webSocketSessions = listOnlineUser(userId);
webSocketSessions.add(onlineUser);
webSocketUserMap.put(userId, webSocketSessions);
}
// 更新最后时间
onlineUser.setSessionId(sessionId);
onlineUser.setUpdateTime(System.currentTimeMillis());
}
@Override
public WebSocketSession addChannel(String sessionId, String userId, Channel channel) throws SessionException {
WebSocketSession onlineUser = getOnlineUserBySessionId(sessionId);
if (onlineUser == null) {
throw new SessionException("无效会话,请登录");
}
if (!StringUtils.equals(userId, onlineUser.getUserId())) {
throw new SessionException("from错误");
}
// 更新会话通道
onlineUser.setChannel(channel);
onlineUser.setChannelId(channel.id().asLongText());
return onlineUser;
}
@Override
public void removeChannel(Channel channel) {
// 获取session
for (Map.Entry<String, List<WebSocketSession>> kv : webSocketUserMap.entrySet()) {
List<WebSocketSession> webSocketSessions = kv.getValue();
for (int i = 0; i < webSocketSessions.size(); i++) {
WebSocketSession webSocketSession = webSocketSessions.get(i);
if (StringUtils.equals(webSocketSession.getChannelId(), channel.id().asLongText())) {
webSocketSessions.remove(i);
return;
}
}
}
}
@Override
public List<WebSocketSession> listOnlineUser(String userId) {
if (StringUtils.isBlank(userId)) {
return new ArrayList<>();
}
List<WebSocketSession> webSocketSessions = webSocketUserMap.get(userId);
if (webSocketSessions == null) {
return new ArrayList<>();
}
return webSocketSessions;
}
@Override
public WebSocketSession getOnlineUser(String userId, String clientName) {
if (StringUtils.isBlank(userId) || StringUtils.isBlank(clientName)) {
return null;
}
List<WebSocketSession> webSocketSessions = listOnlineUser(userId);
if (webSocketSessions.isEmpty()) {
return null;
}
for (WebSocketSession webSocketSession : webSocketSessions) {
if (StringUtils.equals(clientName, webSocketSession.getClientName())) {
return webSocketSession;
}
}
return null;
}
@Override
public List<WebSocketSession> listOnlineUser(List<String> userIds) {
if (userIds == null || userIds.isEmpty()) {
return new ArrayList<>();
}
List<WebSocketSession> webSocketSessions = new ArrayList<>();
for (String userId : userIds) {
List<WebSocketSession> users = webSocketUserMap.get(userId);
if (users != null) {
webSocketSessions.addAll(users);
}
}
return webSocketSessions;
}
@Override
public List<WebSocketSession> listOnlineUser(List<String> userIds, String clientName) {
if (userIds == null || userIds.isEmpty() || StringUtils.isBlank(clientName)) {
return new ArrayList<>();
}
List<WebSocketSession> webSocketSessions = new ArrayList<>();
for (String userId : userIds) {
List<WebSocketSession> users = webSocketUserMap.get(userId);
if (users == null) {
continue;
}
for (WebSocketSession user : users) {
if (StringUtils.equals(clientName, user.getClientName())) {
webSocketSessions.add(user);
}
}
}
return webSocketSessions;
}
@Override
public WebSocketSession getOnlineUserBySessionId(String sessionId) {
for (Map.Entry<String, List<WebSocketSession>> kv : webSocketUserMap.entrySet()) {
for (WebSocketSession webSocketSession : kv.getValue()) {
if (StringUtils.equals(sessionId, webSocketSession.getSessionId())) {
return webSocketSession;
}
}
}
return null;
}
@Override
public WebSocketSession getOnlineUserByChannelId(String channelId) {
for (Map.Entry<String, List<WebSocketSession>> kv : webSocketUserMap.entrySet()) {
for (WebSocketSession webSocketSession : kv.getValue()) {
if (StringUtils.equals(channelId, webSocketSession.getChannelId())) {
return webSocketSession;
}
}
}
return null;
}
}

View File

@ -1,21 +1,18 @@
package ink.wgink.module.instantmessage.websocket.manager; package ink.wgink.module.instantmessage.websocket.manager;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import ink.wgink.module.instantmessage.websocket.exception.SessionException; import ink.wgink.exceptions.websocket.SessionException;
import ink.wgink.interfaces.manager.IWebSocketUserSessionManager;
import ink.wgink.interfaces.manager.IWebSocketChannelManager;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage; import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketSession; import ink.wgink.pojo.session.WebSocketSession;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/** /**
* @ClassName: WebSocketChannelManager * @ClassName: WebSocketChannelManager
@ -24,14 +21,13 @@ import java.util.concurrent.ConcurrentMap;
* @Date: 2021/9/11 11:56 上午 * @Date: 2021/9/11 11:56 上午
* @Version: 1.0 * @Version: 1.0
*/ */
public class WebSocketChannelManager { public class WebSocketChannelManager implements IWebSocketChannelManager {
public static final String FORM_SYSTEM = "SYSTEM"; public static final String FORM_SYSTEM = "SYSTEM";
private static final WebSocketChannelManager webSocketChannelManager = WebSocketChannelManagerBuilder.webSocketChannelManager; private static final WebSocketChannelManager webSocketChannelManager = WebSocketChannelManagerBuilder.webSocketChannelManager;
private ChannelGroup globalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private ChannelGroup globalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private ConcurrentMap<String, List<WebSocketSession>> webSocketUserMap = new ConcurrentHashMap<>(); private IWebSocketUserSessionManager webSocketUserSessionManager;
private WebSocketChannelManager() { private WebSocketChannelManager() {}
}
public static WebSocketChannelManager getInstance() { public static WebSocketChannelManager getInstance() {
return webSocketChannelManager; return webSocketChannelManager;
@ -45,23 +41,12 @@ public class WebSocketChannelManager {
* @param clientName 客户端名称 * @param clientName 客户端名称
*/ */
public void init(String userId, String sessionId, String clientName) { public void init(String userId, String sessionId, String clientName) {
WebSocketSession onlineUser = getOnlineUser(userId, clientName); webSocketUserSessionManager.init(userId, sessionId, clientName);
if (onlineUser == null) {
onlineUser = new WebSocketSession();
onlineUser.setUserId(userId);
onlineUser.setClientName(clientName);
// 更新到session队列中
List<WebSocketSession> webSocketSessions = listOnlineUser(userId);
webSocketSessions.add(onlineUser);
webSocketUserMap.put(userId, webSocketSessions);
}
// 更新最后时间
onlineUser.setSessionId(sessionId);
onlineUser.setUpdateTime(System.currentTimeMillis());
} }
/** /**
* 添加通道绑定通道与会话 * 添加通道绑定通道与会话
*
* @param sessionId * @param sessionId
* @param userId * @param userId
* @param channel * @param channel
@ -69,32 +54,19 @@ public class WebSocketChannelManager {
* @throws SessionException * @throws SessionException
*/ */
public WebSocketSession addChannel(String sessionId, String userId, Channel channel) throws SessionException { public WebSocketSession addChannel(String sessionId, String userId, Channel channel) throws SessionException {
WebSocketSession onlineUser = getOnlineUserBySessionId(sessionId); WebSocketSession onlineUser = webSocketUserSessionManager.addChannel(sessionId, userId, channel);
if (onlineUser == null) {
throw new SessionException("无效会话,请登录");
}
if (!StringUtils.equals(userId, onlineUser.getUserId())) {
throw new SessionException("from错误");
}
// 更新会话通道
onlineUser.setChannel(channel);
onlineUser.setChannelId(channel.id().asLongText());
globalGroup.add(channel); globalGroup.add(channel);
return onlineUser; return onlineUser;
} }
/**
* 删除通道
*
* @param channel
*/
public void removeChannel(Channel channel) { public void removeChannel(Channel channel) {
webSocketUserSessionManager.removeChannel(channel);
globalGroup.remove(channel); globalGroup.remove(channel);
for (Map.Entry<String, List<WebSocketSession>> kv : webSocketUserMap.entrySet()) {
List<WebSocketSession> webSocketSessions = kv.getValue();
for (int i = 0; i < webSocketSessions.size(); i++) {
WebSocketSession webSocketSession = webSocketSessions.get(i);
if (StringUtils.equals(webSocketSession.getChannelId(), channel.id().asLongText())) {
webSocketSessions.remove(i);
return;
}
}
}
} }
/** /**
@ -104,17 +76,9 @@ public class WebSocketChannelManager {
* @return * @return
*/ */
public List<WebSocketSession> listOnlineUser(String userId) { public List<WebSocketSession> listOnlineUser(String userId) {
if (StringUtils.isBlank(userId)) { return webSocketUserSessionManager.listOnlineUser(userId);
return new ArrayList<>();
}
List<WebSocketSession> webSocketSessions = webSocketUserMap.get(userId);
if (webSocketSessions == null) {
return new ArrayList<>();
}
return webSocketSessions;
} }
/** /**
* 获取在线用户 * 获取在线用户
* *
@ -123,19 +87,7 @@ public class WebSocketChannelManager {
* @return * @return
*/ */
public WebSocketSession getOnlineUser(String userId, String clientName) { public WebSocketSession getOnlineUser(String userId, String clientName) {
if (StringUtils.isBlank(userId) || StringUtils.isBlank(clientName)) { return webSocketUserSessionManager.getOnlineUser(userId, clientName);
return null;
}
List<WebSocketSession> webSocketSessions = listOnlineUser(userId);
if (webSocketSessions.isEmpty()) {
return null;
}
for (WebSocketSession webSocketSession : webSocketSessions) {
if (StringUtils.equals(clientName, webSocketSession.getClientName())) {
return webSocketSession;
}
}
return null;
} }
/** /**
@ -145,17 +97,7 @@ public class WebSocketChannelManager {
* @return * @return
*/ */
public List<WebSocketSession> listOnlineUser(List<String> userIds) { public List<WebSocketSession> listOnlineUser(List<String> userIds) {
if (userIds == null || userIds.isEmpty()) { return webSocketUserSessionManager.listOnlineUser(userIds);
return new ArrayList<>();
}
List<WebSocketSession> webSocketSessions = new ArrayList<>();
for (String userId : userIds) {
List<WebSocketSession> users = webSocketUserMap.get(userId);
if (users != null) {
webSocketSessions.addAll(users);
}
}
return webSocketSessions;
} }
/** /**
@ -166,22 +108,7 @@ public class WebSocketChannelManager {
* @return * @return
*/ */
public List<WebSocketSession> listOnlineUser(List<String> userIds, String clientName) { public List<WebSocketSession> listOnlineUser(List<String> userIds, String clientName) {
if (userIds == null || userIds.isEmpty() || StringUtils.isBlank(clientName)) { return webSocketUserSessionManager.listOnlineUser(userIds, clientName);
return new ArrayList<>();
}
List<WebSocketSession> webSocketSessions = new ArrayList<>();
for (String userId : userIds) {
List<WebSocketSession> users = webSocketUserMap.get(userId);
if (users == null) {
continue;
}
for (WebSocketSession user : users) {
if (StringUtils.equals(clientName, user.getClientName())) {
webSocketSessions.add(user);
}
}
}
return webSocketSessions;
} }
/** /**
@ -191,14 +118,7 @@ public class WebSocketChannelManager {
* @return * @return
*/ */
private WebSocketSession getOnlineUserBySessionId(String sessionId) { private WebSocketSession getOnlineUserBySessionId(String sessionId) {
for (Map.Entry<String, List<WebSocketSession>> kv : webSocketUserMap.entrySet()) { return webSocketUserSessionManager.getOnlineUserBySessionId(sessionId);
for (WebSocketSession webSocketSession : kv.getValue()) {
if (StringUtils.equals(sessionId, webSocketSession.getSessionId())) {
return webSocketSession;
}
}
}
return null;
} }
/** /**
@ -208,14 +128,7 @@ public class WebSocketChannelManager {
* @return * @return
*/ */
private WebSocketSession getOnlineUserByChannelId(String channelId) { private WebSocketSession getOnlineUserByChannelId(String channelId) {
for (Map.Entry<String, List<WebSocketSession>> kv : webSocketUserMap.entrySet()) { return webSocketUserSessionManager.getOnlineUserByChannelId(channelId);
for (WebSocketSession webSocketSession : kv.getValue()) {
if (StringUtils.equals(channelId, webSocketSession.getChannelId())) {
return webSocketSession;
}
}
}
return null;
} }
/** /**
@ -253,6 +166,11 @@ public class WebSocketChannelManager {
} }
} }
@Override
public void setWebSocketUserSessionManager(IWebSocketUserSessionManager webSocketUserSessionManager) {
this.webSocketUserSessionManager = webSocketUserSessionManager;
}
private static class WebSocketChannelManagerBuilder { private static class WebSocketChannelManagerBuilder {
public static final WebSocketChannelManager webSocketChannelManager = new WebSocketChannelManager(); public static final WebSocketChannelManager webSocketChannelManager = new WebSocketChannelManager();
} }

12
pom.xml
View File

@ -41,6 +41,7 @@
<module>login-oauth2-client</module> <module>login-oauth2-client</module>
<module>module-oauth2-client</module> <module>module-oauth2-client</module>
<module>mongo-module-dictionary</module> <module>mongo-module-dictionary</module>
<module>redis-cache</module>
</modules> </modules>
<packaging>pom</packaging> <packaging>pom</packaging>
@ -70,9 +71,9 @@
<commonLang3.version>3.4</commonLang3.version> <commonLang3.version>3.4</commonLang3.version>
<common-io.version>2.11.0</common-io.version> <common-io.version>2.11.0</common-io.version>
<common-net.version>3.3</common-net.version> <common-net.version>3.3</common-net.version>
<common-compress>1.18</common-compress>
<common-fileupload.version>1.3.1</common-fileupload.version> <common-fileupload.version>1.3.1</common-fileupload.version>
<common-codec.version>1.12</common-codec.version> <common-codec.version>1.12</common-codec.version>
<common-compress>1.18</common-compress>
<common-beanutils.version>1.9.4</common-beanutils.version> <common-beanutils.version>1.9.4</common-beanutils.version>
<httpclient.version>4.5.13</httpclient.version> <httpclient.version>4.5.13</httpclient.version>
<saaj.version>1.3.18</saaj.version> <saaj.version>1.3.18</saaj.version>
@ -92,6 +93,7 @@
<xmlgraphics.version>1.10</xmlgraphics.version> <xmlgraphics.version>1.10</xmlgraphics.version>
<minio.version>7.0.2</minio.version> <minio.version>7.0.2</minio.version>
<mongo.version>3.2.5</mongo.version> <mongo.version>3.2.5</mongo.version>
<redis.version>2.5.5</redis.version>
<freemarker.version>2.3.31</freemarker.version> <freemarker.version>2.3.31</freemarker.version>
</properties> </properties>
@ -521,6 +523,14 @@
</dependency> </dependency>
<!-- mongo end --> <!-- mongo end -->
<!-- redis start -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>${redis.version}</version>
</dependency>
<!-- redis end -->
<!-- freemarker start --> <!-- freemarker start -->
<dependency> <dependency>
<groupId>org.freemarker</groupId> <groupId>org.freemarker</groupId>