From fc75b13e78b49a25f49499e2fe261f563284981c Mon Sep 17 00:00:00 2001 From: wanggeng <450292408@qq.com> Date: Sun, 12 Sep 2021 23:06:35 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9EWebSocket=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- module-instant-message/pom.xml | 37 +++ .../api/WebSocketClientController.java | 37 +++ .../app/WebSocketClientAppController.java | 37 +++ .../service/IWebSocketClientService.java | 28 +++ .../impl/WebSocketClientServiceImpl.java | 46 ++++ .../startup/WebSocketStartUp.java | 27 ++ .../channel/WebSocketChannelInitializer.java | 43 ++++ .../websocket/enums/ClientSocketTypeEnum.java | 52 ++++ .../websocket/enums/SendStatusEnum.java | 64 +++++ .../enums/WebSocketClientTypeEnum.java | 24 ++ .../exception/AppSessionException.java | 33 +++ .../exception/BaseSocketException.java | 33 +++ .../websocket/exception/BodyException.java | 32 +++ .../websocket/exception/SessionException.java | 33 +++ .../exception/TypeErrorException.java | 32 +++ .../exception/UserErrorException.java | 33 +++ .../exception/UserSessionException.java | 33 +++ .../websocket/handler/WebSocketHandler.java | 146 +++++++++++ .../handler/text/WebSocketTextHandler.java | 162 ++++++++++++ .../manager/WebSocketChannelManager.java | 233 ++++++++++++++++++ .../pojo/WebSocketClientMessage.java | 113 +++++++++ .../websocket/pojo/WebSocketSession.java | 86 +++++++ .../websocket/pojo/body/NoticeBody.java | 80 ++++++ .../websocket/pojo/body/RegisterBody.java | 27 ++ .../websocket/pojo/body/SendStatusBody.java | 49 ++++ .../websocket/pojo/body/TargetBody.java | 37 +++ .../websocket/server/WebSocketServer.java | 65 +++++ pom.xml | 1 + 28 files changed, 1623 insertions(+) create mode 100644 module-instant-message/pom.xml create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/controller/api/WebSocketClientController.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/controller/app/WebSocketClientAppController.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/IWebSocketClientService.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/impl/WebSocketClientServiceImpl.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/startup/WebSocketStartUp.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/channel/WebSocketChannelInitializer.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/enums/ClientSocketTypeEnum.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/enums/SendStatusEnum.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/enums/WebSocketClientTypeEnum.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/AppSessionException.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/BaseSocketException.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/BodyException.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/SessionException.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/TypeErrorException.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/UserErrorException.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/UserSessionException.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/handler/WebSocketHandler.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/handler/text/WebSocketTextHandler.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/manager/WebSocketChannelManager.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/WebSocketClientMessage.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/WebSocketSession.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/body/NoticeBody.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/body/RegisterBody.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/body/SendStatusBody.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/body/TargetBody.java create mode 100644 module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/server/WebSocketServer.java diff --git a/module-instant-message/pom.xml b/module-instant-message/pom.xml new file mode 100644 index 00000000..2f48ef3c --- /dev/null +++ b/module-instant-message/pom.xml @@ -0,0 +1,37 @@ + + + + wg-basic + ink.wgink + 1.0-SNAPSHOT + + 4.0.0 + + module-instant-message + WebSocket即时通讯 + + + + ink.wgink + common + 1.0-SNAPSHOT + + + + + io.netty + netty-all + + + + + junit + junit + 4.13.1 + test + + + + \ No newline at end of file diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/controller/api/WebSocketClientController.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/controller/api/WebSocketClientController.java new file mode 100644 index 00000000..ecdcee74 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/controller/api/WebSocketClientController.java @@ -0,0 +1,37 @@ +package ink.wgink.module.instantmessage.controller.api; + +import ink.wgink.interfaces.consts.ISystemConstant; +import ink.wgink.module.instantmessage.service.IWebSocketClientService; +import ink.wgink.pojo.result.ErrorResult; +import ink.wgink.pojo.result.SuccessResultData; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +/** + * @ClassName: WebSocketClientController + * @Description: WebSocket客户端 + * @Author: wanggeng + * @Date: 2021/9/12 4:15 下午 + * @Version: 1.0 + */ +@Api(tags = ISystemConstant.API_TAGS_SYSTEM_PREFIX + "websocket客户端") +@RestController +@RequestMapping(ISystemConstant.API_PREFIX + "/websocket/client") +public class WebSocketClientController { + + @Autowired + private IWebSocketClientService webSocketClientService; + + @ApiOperation(value = "登录", notes = "登录接口,返回Socket认证token") + @ApiResponses({@ApiResponse(code = 400, message = "请求失败", response = ErrorResult.class)}) + @GetMapping("login/{clientName}") + public SuccessResultData login(@PathVariable("clientName") String clientName) { + String sessionId = webSocketClientService.login(clientName); + return new SuccessResultData<>(sessionId); + } + +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/controller/app/WebSocketClientAppController.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/controller/app/WebSocketClientAppController.java new file mode 100644 index 00000000..f8fb23ac --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/controller/app/WebSocketClientAppController.java @@ -0,0 +1,37 @@ +package ink.wgink.module.instantmessage.controller.app; + +import ink.wgink.interfaces.consts.ISystemConstant; +import ink.wgink.module.instantmessage.service.IWebSocketClientService; +import ink.wgink.pojo.result.ErrorResult; +import ink.wgink.pojo.result.SuccessResultData; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +/** + * @ClassName: WebSocketClientAppController + * @Description: WebSocket客户端 + * @Author: wanggeng + * @Date: 2021/9/12 4:15 下午 + * @Version: 1.0 + */ +@Api(tags = ISystemConstant.API_TAGS_APP_PREFIX + "websocket客户端") +@RestController +@RequestMapping(ISystemConstant.APP_PREFIX + "/websocket/client") +public class WebSocketClientAppController { + + @Autowired + private IWebSocketClientService webSocketClientService; + + @ApiOperation(value = "登录", notes = "登录接口,返回userId(sessionId)") + @ApiResponses({@ApiResponse(code = 400, message = "请求失败", response = ErrorResult.class)}) + @GetMapping("login/{clientName}") + public SuccessResultData login(@RequestHeader("token") String token, @PathVariable("clientName") String clientName) { + String sessionId = webSocketClientService.login(token, clientName); + return new SuccessResultData<>(sessionId); + } + +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/IWebSocketClientService.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/IWebSocketClientService.java new file mode 100644 index 00000000..61173bd7 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/IWebSocketClientService.java @@ -0,0 +1,28 @@ +package ink.wgink.module.instantmessage.service; + +/** + * @ClassName: IWebSocketClientService + * @Description: websocket客户端 + * @Author: wanggeng + * @Date: 2021/9/12 4:44 下午 + * @Version: 1.0 + */ +public interface IWebSocketClientService { + /** + * 登录 + * + * @param clientName 客户端名称 + * @return sessionId + */ + String login(String clientName); + + /** + * 登录 + * + * @param token + * @param clientName 客户端名称 + * @return sessionId + */ + String login(String token, String clientName); + +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/impl/WebSocketClientServiceImpl.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/impl/WebSocketClientServiceImpl.java new file mode 100644 index 00000000..deed8423 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/service/impl/WebSocketClientServiceImpl.java @@ -0,0 +1,46 @@ +package ink.wgink.module.instantmessage.service.impl; + +import ink.wgink.common.base.DefaultBaseService; +import ink.wgink.module.instantmessage.service.IWebSocketClientService; +import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager; +import ink.wgink.pojo.app.AppTokenUser; +import ink.wgink.util.UUIDUtil; +import org.springframework.stereotype.Service; + +/** + * @ClassName: WebSocketClientServiceImpl + * @Description: webSocketk客户端 + * @Author: wanggeng + * @Date: 2021/9/12 4:44 下午 + * @Version: 1.0 + */ +@Service +public class WebSocketClientServiceImpl extends DefaultBaseService implements IWebSocketClientService { + + @Override + public String login(String clientName) { + String userId = securityComponent.getCurrentUser().getUserId(); + return initSession(userId, clientName); + } + + @Override + public String login(String token, String clientName) { + AppTokenUser appTokenUser = getAppTokenUser(token); + return initSession(appTokenUser.getId(), clientName); + } + + /** + * 初始化Socket会话 + * + * @param userId 用户ID + * @param clientName 客户端名称 + * @return + */ + private String initSession(String userId, String clientName) { + String sessionId = UUIDUtil.getUUID(); + WebSocketChannelManager.getInstance().init(userId, sessionId, clientName); + return sessionId; + } + + +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/startup/WebSocketStartUp.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/startup/WebSocketStartUp.java new file mode 100644 index 00000000..f850d2e4 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/startup/WebSocketStartUp.java @@ -0,0 +1,27 @@ +package ink.wgink.module.instantmessage.startup; + +import ink.wgink.module.instantmessage.websocket.server.WebSocketServer; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.stereotype.Component; + +/** + * @ClassName: WebSocketStartUp + * @Description: + * @Author: wanggeng + * @Date: 2021/9/11 10:38 下午 + * @Version: 1.0 + */ +@Component +public class WebSocketStartUp implements ApplicationRunner { + + @Autowired + private WebSocketServer webSocketServer; + + @Override + public void run(ApplicationArguments args) throws Exception { + new Thread(webSocketServer).start(); + } + +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/channel/WebSocketChannelInitializer.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/channel/WebSocketChannelInitializer.java new file mode 100644 index 00000000..7260ba11 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/channel/WebSocketChannelInitializer.java @@ -0,0 +1,43 @@ +package ink.wgink.module.instantmessage.websocket.channel; + +import ink.wgink.module.instantmessage.websocket.handler.WebSocketHandler; +import ink.wgink.properties.websocket.WebSocketProperties; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.stream.ChunkedWriteHandler; + + +/** + * @ClassName: WebSocketChannelInitializer + * @Description: WebSocket + * @Author: wanggeng + * @Date: 2021/9/11 11:38 上午 + * @Version: 1.0 + */ +public class WebSocketChannelInitializer extends ChannelInitializer { + + private WebSocketProperties webSocketProperties; + + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + //设置log监听器,并且日志级别为debug,方便观察运行流程 + socketChannel.pipeline().addLast("logging", new LoggingHandler("DEBUG")); + //设置解码器 + socketChannel.pipeline().addLast("http-codec", new HttpServerCodec()); + //聚合器,使用websocket会用到 + socketChannel.pipeline().addLast("aggregator", new HttpObjectAggregator(65536)); + //用于大数据的分区传输 + socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); + //自定义的业务handler + WebSocketHandler webSocketHandler = new WebSocketHandler(); + webSocketHandler.setWebSocketProperties(webSocketProperties); + socketChannel.pipeline().addLast("handler", webSocketHandler); + } + + public void setWebSocketProperties(WebSocketProperties webSocketProperties) { + this.webSocketProperties = webSocketProperties; + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/enums/ClientSocketTypeEnum.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/enums/ClientSocketTypeEnum.java new file mode 100644 index 00000000..61859fbc --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/enums/ClientSocketTypeEnum.java @@ -0,0 +1,52 @@ +package ink.wgink.module.instantmessage.websocket.enums; + +/** + * When you feel like quitting. Think about why you started + * 当你想要放弃的时候,想想当初你为何开始 + * + * @ClassName: AppSocketTypeEnum + * @Description: App Socket 类型 + * @Author: wanggeng + * @Date: 2021/1/13 11:20 下午 + * @Version: 1.0 + */ +public enum ClientSocketTypeEnum { + + REGISTER(100, "注册消息,body 为 RegisterBody 的 JSON 字符串"), + MESSAGE(101, "文本消息"), + GROUP_MESSAGE(102, "群发文本消息"), + SYSTEM_MESSAGE(103, "系统消息"), + SYSTEM_GROUP_MESSAGE(104, "系统群发问呢消息"), + SYSTEM_TARGET_MESSAGE(105, "系统目标消息"), + LAYIM_HREF_MESSAGE(106, "LAYIM 连接消息"), + LAYIM_IMAGE_MESSAGE(107, "LAYIM 图片消息"), + LAYIM_FILE_MESSAGE(108, "LAYIM 文件消息"), + LAYIM_AUDIO_MESSAGE(109, "LAYIM 音频消息"), + LAYIM_VIDEO_MESSAGE(110, "LAYIM 视频消息"), + NOTICE(106, "通知"), + NOTICE_GROUP_MESSAGE(107, "群通知"), + NOTICE_TARGET_MESSAGE(108, "目标通知,用于APP打开特定页面"), + UPDATE_SERVICE_HANDLE_STATUS(501, "更新业务的处理状态"), + SEARCH_ONLINE_USER(600, "查询全部在线用户,body 为查询用户的 userId"), + SEARCH_ONLINE_USER_FRIEND(601, "查询朋友在线用户,body 为查询用户的 userId"), + SEARCH_COUNT_NEED_TO_DEALT_WITH(602, "查询全部待办总数"), + SEND_STATUS(1100, "发送状态,body 为 BaseResult 的 JSON 字符串"), + SEND_STATUS_ONLINE(1101, "发送在线状态,body 为 在线用户的ID JSONArray 字符串"), + SEND_STATUS_OFFLINE(1102, "发送离线状态,body 为 离线用户的ID JSONArray 字符串"); + + private int value; + private String summary; + + ClientSocketTypeEnum(int value, String summary) { + this.value = value; + this.summary = summary; + } + + public int getValue() { + return value; + } + + public String getSummary() { + return summary == null ? "" : summary.trim(); + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/enums/SendStatusEnum.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/enums/SendStatusEnum.java new file mode 100644 index 00000000..4812a5be --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/enums/SendStatusEnum.java @@ -0,0 +1,64 @@ +package ink.wgink.module.instantmessage.websocket.enums; + +/** + * When you feel like quitting. Think about why you started + * 当你想要放弃的时候,想想当初你为何开始 + * + * @ClassName: ResultCodeEnum + * @Description: 返回编码类型 + * @Author: wanggeng + * @Date: 2021/1/14 12:59 下午 + * @Version: 1.0 + */ +public enum SendStatusEnum { + /** + * 成功 + */ + SUCCESS(200), + /** + * 失败 + */ + FAILED(300), + /** + * 消息错误 + */ + MESSAGE_ERROR(400), + /** + * 会话错误 + */ + SESSION_ERROR(402), + /** + * APP会话错误 + */ + APP_SESSION_ERROR(403), + /** + * 用户会话错误 + */ + USER_SESSION_ERROR(404), + /** + * 类型错误 + */ + TYPE_ERROR(405), + /** + * 消息体错误 + */ + BODY_ERROR(406), + /** + * 来源错误 + */ + FROM_ERROR(407), + /** + * 接受用户错误 + */ + TO_ERROR(408); + + private int value; + + SendStatusEnum(int value) { + this.value = value; + } + + public int getValue() { + return value; + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/enums/WebSocketClientTypeEnum.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/enums/WebSocketClientTypeEnum.java new file mode 100644 index 00000000..e4c8dbf7 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/enums/WebSocketClientTypeEnum.java @@ -0,0 +1,24 @@ +package ink.wgink.module.instantmessage.websocket.enums; + +/** + * @ClassName: WebSocketClientTypeEnum + * @Description: 客户端类型 + * @Author: wanggeng + * @Date: 2021/9/11 11:59 下午 + * @Version: 1.0 + */ +public enum WebSocketClientTypeEnum { + + APP("app"), + WEB("web"); + + private String value; + + WebSocketClientTypeEnum(String value) { + this.value = value; + } + + public String getValue() { + return value == null ? "" : value.trim(); + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/AppSessionException.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/AppSessionException.java new file mode 100644 index 00000000..3c56e2b0 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/AppSessionException.java @@ -0,0 +1,33 @@ +package ink.wgink.module.instantmessage.websocket.exception; + +/** + * When you feel like quitting. Think about why you started + * 当你想要放弃的时候,想想当初你为何开始 + * + * @ClassName: AppException + * @Description: App异常 + * @Author: wanggeng + * @Date: 2021/1/14 12:23 下午 + * @Version: 1.0 + */ +public class AppSessionException extends SessionException { + + public AppSessionException() { + } + + public AppSessionException(String message) { + super(message); + } + + public AppSessionException(String message, Throwable cause) { + super(message, cause); + } + + public AppSessionException(Throwable cause) { + super(cause); + } + + public AppSessionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/BaseSocketException.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/BaseSocketException.java new file mode 100644 index 00000000..0ce49288 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/BaseSocketException.java @@ -0,0 +1,33 @@ +package ink.wgink.module.instantmessage.websocket.exception; + +/** + * When you feel like quitting. Think about why you started + * 当你想要放弃的时候,想想当初你为何开始 + * + * @ClassName: BaseSocketException + * @Description: socket基础异常 + * @Author: wanggeng + * @Date: 2021/1/14 12:12 下午 + * @Version: 1.0 + */ +public class BaseSocketException extends Exception{ + + public BaseSocketException() { + } + + public BaseSocketException(String message) { + super(message); + } + + public BaseSocketException(String message, Throwable cause) { + super(message, cause); + } + + public BaseSocketException(Throwable cause) { + super(cause); + } + + public BaseSocketException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/BodyException.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/BodyException.java new file mode 100644 index 00000000..fe1f0722 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/BodyException.java @@ -0,0 +1,32 @@ +package ink.wgink.module.instantmessage.websocket.exception; + +/** + * When you feel like quitting. Think about why you started + * 当你想要放弃的时候,想想当初你为何开始 + * + * @ClassName: BodyEmptyException + * @Description: 主体为空异常 + * @Author: wanggeng + * @Date: 2021/1/14 12:11 下午 + * @Version: 1.0 + */ +public class BodyException extends BaseSocketException { + public BodyException() { + } + + public BodyException(String message) { + super(message); + } + + public BodyException(String message, Throwable cause) { + super(message, cause); + } + + public BodyException(Throwable cause) { + super(cause); + } + + public BodyException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/SessionException.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/SessionException.java new file mode 100644 index 00000000..e2c39646 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/SessionException.java @@ -0,0 +1,33 @@ +package ink.wgink.module.instantmessage.websocket.exception; + +/** + * When you feel like quitting. Think about why you started + * 当你想要放弃的时候,想想当初你为何开始 + * + * @ClassName: SessionInvalidException + * @Description: 会话无效异常 + * @Author: wanggeng + * @Date: 2021/1/14 12:04 下午 + * @Version: 1.0 + */ +public class SessionException extends BaseSocketException { + + public SessionException() { + } + + public SessionException(String message) { + super(message); + } + + public SessionException(String message, Throwable cause) { + super(message, cause); + } + + public SessionException(Throwable cause) { + super(cause); + } + + public SessionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/TypeErrorException.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/TypeErrorException.java new file mode 100644 index 00000000..b510bbef --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/TypeErrorException.java @@ -0,0 +1,32 @@ +package ink.wgink.module.instantmessage.websocket.exception; + +/** + * When you feel like quitting. Think about why you started + * 当你想要放弃的时候,想想当初你为何开始 + * + * @ClassName: TypeErrorException + * @Description: 类型错误异常 + * @Author: wanggeng + * @Date: 2021/1/14 12:14 下午 + * @Version: 1.0 + */ +public class TypeErrorException extends BaseSocketException { + public TypeErrorException() { + } + + public TypeErrorException(String message) { + super(message); + } + + public TypeErrorException(String message, Throwable cause) { + super(message, cause); + } + + public TypeErrorException(Throwable cause) { + super(cause); + } + + public TypeErrorException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/UserErrorException.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/UserErrorException.java new file mode 100644 index 00000000..e962687e --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/UserErrorException.java @@ -0,0 +1,33 @@ +package ink.wgink.module.instantmessage.websocket.exception; + +/** + * When you feel like quitting. Think about why you started + * 当你想要放弃的时候,想想当初你为何开始 + * + * @ClassName: UserErrorException + * @Description: 用户错误异常 + * @Author: wanggeng + * @Date: 2021/1/14 12:51 下午 + * @Version: 1.0 + */ +public class UserErrorException extends BaseSocketException { + + public UserErrorException() { + } + + public UserErrorException(String message) { + super(message); + } + + public UserErrorException(String message, Throwable cause) { + super(message, cause); + } + + public UserErrorException(Throwable cause) { + super(cause); + } + + public UserErrorException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/UserSessionException.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/UserSessionException.java new file mode 100644 index 00000000..83735158 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/exception/UserSessionException.java @@ -0,0 +1,33 @@ +package ink.wgink.module.instantmessage.websocket.exception; + +/** + * When you feel like quitting. Think about why you started + * 当你想要放弃的时候,想想当初你为何开始 + * + * @ClassName: UserException + * @Description: 用户异常 + * @Author: wanggeng + * @Date: 2021/1/14 12:21 下午 + * @Version: 1.0 + */ +public class UserSessionException extends SessionException { + + public UserSessionException() { + } + + public UserSessionException(String message) { + super(message); + } + + public UserSessionException(String message, Throwable cause) { + super(message, cause); + } + + public UserSessionException(Throwable cause) { + super(cause); + } + + public UserSessionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/handler/WebSocketHandler.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/handler/WebSocketHandler.java new file mode 100644 index 00000000..296fe342 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/handler/WebSocketHandler.java @@ -0,0 +1,146 @@ +package ink.wgink.module.instantmessage.websocket.handler; + +import ink.wgink.module.instantmessage.websocket.handler.text.WebSocketTextHandler; +import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager; +import ink.wgink.properties.websocket.WebSocketProperties; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.websocketx.*; +import io.netty.util.CharsetUtil; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.HttpStatus; + +/** + * @ClassName: WebSocketHandler + * @Description: 1.web发起一次类似是http的请求,并在channelRead0方法中进行处理,并通过instanceof去判断帧对象是FullHttpRequest还是WebSocketFrame,建立连接是时候会是FullHttpRequest + * 2.在handleHttpRequest方法中去创建websocket,首先是判断Upgrade是不是websocket协议,若不是则通过sendHttpResponse将错误信息返回给客户端,紧接着通过WebSocketServerHandshakerFactory创建socket对象并通过handshaker握手创建连接 + * 3.在连接创建好后的所以消息流动都是以WebSocketFrame来体现 + * 4.在handlerWebSocketFrame去处理消息,也可能是客户端发起的关闭指令,ping指令等等 + * @Author: wanggeng + * @Date: 2021/9/11 11:40 上午 + * @Version: 1.0 + */ +public class WebSocketHandler extends SimpleChannelInboundHandler { + private static final Logger LOG = LoggerFactory.getLogger(WebSocketHandler.class); + private WebSocketProperties webSocketProperties; + private WebSocketServerHandshaker webSocketServerHandshaker; + + @Override + protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { + LOG.debug("收到消息:" + msg); + if (msg instanceof FullHttpRequest) { + //以http请求形式接入,但是走的是websocket + handleHttpRequest(ctx, (FullHttpRequest) msg); + } else if (msg instanceof WebSocketFrame) { + //处理websocket客户端的消息 + handlerWebSocketFrame(ctx, (WebSocketFrame) msg); + } + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + LOG.debug("新客户端链接:" + ctx.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + //断开连接 + LOG.debug("客户端断开连接:" + ctx.channel()); + WebSocketChannelManager.getInstance().removeChannel(ctx.channel()); + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.flush(); + } + + private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { + // 判断是否关闭链路的指令 + if (frame instanceof CloseWebSocketFrame) { + webSocketServerHandshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); + return; + } + // 判断是否ping消息 + if (frame instanceof PingWebSocketFrame) { + ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); + return; + } + // 文本消息 + if (frame instanceof TextWebSocketFrame) { + WebSocketTextHandler webSocketTextHandler = new WebSocketTextHandler(); + webSocketTextHandler.handler(ctx, (TextWebSocketFrame) frame); + return; + } + // 二进制文件,暂不处理 + if (frame instanceof BinaryWebSocketFrame) { + return; + } + } + + /** + * 唯一的一次http请求,用于创建websocket + */ + private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { + //要求Upgrade为websocket,过滤掉get/Post + if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { + //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端 + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); + return; + } + WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(String.format("ws://%s:%s/%s", webSocketProperties.getUrl(), webSocketProperties.getPort(), webSocketProperties.getContext()), null, false); + webSocketServerHandshaker = wsFactory.newHandshaker(req); + if (webSocketServerHandshaker == null) { + WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); + } else { + webSocketServerHandshaker.handshake(ctx.channel(), req); + } + } + + /** + * 拒绝不合法的请求,并返回错误信息 + */ + private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { + // 返回应答给客户端 + if (res.status().code() != HttpStatus.OK.value()) { + ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); + res.content().writeBytes(buf); + buf.release(); + } + ChannelFuture f = ctx.channel().writeAndFlush(res); + // 如果是非Keep-Alive,关闭连接 + if (!isKeepAlive(req) || res.status().code() != HttpStatus.OK.value()) { + f.addListener(ChannelFutureListener.CLOSE); + } + } + + /** + * 判断是不是长连接 + * + * @param req + * @return + */ + private boolean isKeepAlive(FullHttpRequest req) { + String connection = req.headers().get("Connection"); + if (StringUtils.isBlank(connection)) { + return false; + } + if (!StringUtils.equals("keep-alive", connection)) { + return false; + } + return true; + } + + public void setWebSocketProperties(WebSocketProperties webSocketProperties) { + this.webSocketProperties = webSocketProperties; + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/handler/text/WebSocketTextHandler.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/handler/text/WebSocketTextHandler.java new file mode 100644 index 00000000..3e7e4b14 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/handler/text/WebSocketTextHandler.java @@ -0,0 +1,162 @@ +package ink.wgink.module.instantmessage.websocket.handler.text; + +import com.alibaba.fastjson.JSONException; +import com.alibaba.fastjson.JSONObject; +import ink.wgink.module.instantmessage.websocket.enums.ClientSocketTypeEnum; +import ink.wgink.module.instantmessage.websocket.enums.SendStatusEnum; +import ink.wgink.module.instantmessage.websocket.exception.*; +import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager; +import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage; +import ink.wgink.module.instantmessage.websocket.pojo.WebSocketSession; +import ink.wgink.module.instantmessage.websocket.pojo.body.RegisterBody; +import ink.wgink.module.instantmessage.websocket.pojo.body.SendStatusBody; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import org.apache.commons.compress.utils.Sets; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.List; + +/** + * @ClassName: WebSocketTextHandler + * @Description: WebSocket文本处理器 + * @Author: wanggeng + * @Date: 2021/9/12 8:08 下午 + * @Version: 1.0 + */ +public class WebSocketTextHandler { + + public void handler(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) { + WebSocketClientMessage clientSocketMessage = null; + SendStatusBody sendStatusBody = null; + try { + clientSocketMessage = JSONObject.parseObject(textWebSocketFrame.text(), WebSocketClientMessage.class); + if (clientSocketMessage.getType() == null) { + throw new TypeErrorException("Type类型不能为空"); + } + if (StringUtils.isBlank(clientSocketMessage.getBody())) { + throw new BodyException("Body主体不能为空"); + } + if (ClientSocketTypeEnum.REGISTER.getValue() == clientSocketMessage.getType()) { + // 客户端注册消息 + sendStatusBody = clientRegisterSession(ctx.channel(), clientSocketMessage); + } else if (ClientSocketTypeEnum.MESSAGE.getValue() == clientSocketMessage.getType()) { + // 文本消息 + sendText(ctx.channel(), clientSocketMessage); + } else if (ClientSocketTypeEnum.GROUP_MESSAGE.getValue() == clientSocketMessage.getType()) { + // 群发消息 + sendGroupText(ctx.channel(), clientSocketMessage); + } else if (ClientSocketTypeEnum.SEARCH_ONLINE_USER.getValue() == clientSocketMessage.getType()) { + // 用户在线状态 + listOnlineUser(ctx.channel(), clientSocketMessage); + } + } catch (SessionException e) { + // 没有登录时,返回内容,关闭连接 + sendStatusBody = new SendStatusBody(SendStatusEnum.SESSION_ERROR.getValue(), SendStatusEnum.SESSION_ERROR, e.getMessage()); + clientSocketMessage.setBody(JSONObject.toJSONString(sendStatusBody)); + WebSocketChannelManager.getInstance().sendText(ctx.channel(), clientSocketMessage); + ctx.close(); + } catch (TypeErrorException e) { + sendStatusBody = new SendStatusBody(SendStatusEnum.SESSION_ERROR.getValue(), SendStatusEnum.SESSION_ERROR, e.getMessage()); + } catch (BodyException e) { + sendStatusBody = new SendStatusBody(SendStatusEnum.SESSION_ERROR.getValue(), SendStatusEnum.SESSION_ERROR, e.getMessage()); + } catch (UserErrorException e) { + sendStatusBody = new SendStatusBody(SendStatusEnum.SESSION_ERROR.getValue(), SendStatusEnum.SESSION_ERROR, e.getMessage()); + } catch (JSONException e) { + clientSocketMessage = new WebSocketClientMessage(); + clientSocketMessage.setSystem(true); + clientSocketMessage.setFrom(WebSocketChannelManager.FORM_SYSTEM); + sendStatusBody = new SendStatusBody(SendStatusEnum.TO_ERROR.getValue(), SendStatusEnum.MESSAGE_ERROR, e.getMessage()); + } finally { + if (sendStatusBody != null && ctx.channel().isOpen()) { + clientSocketMessage.setBody(JSONObject.toJSONString(sendStatusBody)); + WebSocketChannelManager.getInstance().sendText(ctx.channel(), clientSocketMessage); + } + } + } + + /** + * 客户端会话注册 + * + * @param channel 会话通道 + * @param clientSocketMessage 客户端信息 + * @return + * @throws SessionException + */ + private SendStatusBody clientRegisterSession(Channel channel, WebSocketClientMessage clientSocketMessage) throws SessionException { + RegisterBody registerBody = JSONObject.parseObject(clientSocketMessage.getBody(), RegisterBody.class); + // 更新通道 + WebSocketChannelManager.getInstance().addChannel(registerBody.getSessionId(), clientSocketMessage.getFrom(), channel); + return new SendStatusBody(SendStatusEnum.SUCCESS.getValue(), SendStatusEnum.SUCCESS, "OK"); + } + + /** + * 发送文字消息 + * + * @param channel 通道 + * @param webSocketClientMessage 客户端消息 + * @throws UserErrorException + */ + private void sendText(Channel channel, WebSocketClientMessage webSocketClientMessage) throws UserErrorException, UserSessionException { + if (StringUtils.isBlank(webSocketClientMessage.getTo())) { + throw new UserErrorException("To 值不能为空"); + } + webSocketClientMessage.setSystem(false); + List webSocketSessions = WebSocketChannelManager.getInstance().listOnlineUser(webSocketClientMessage.getTo()); + // 返回失败结果 + if (webSocketSessions.isEmpty()) { + sendTextFailed(channel, webSocketClientMessage, "用户不在线"); + return; + } + // 发送消息 + for (WebSocketSession webSocketSession : webSocketSessions) { + WebSocketChannelManager.getInstance().sendText(webSocketSession.getChannel(), webSocketClientMessage); + } + } + + /** + * 群发文字消息 + * + * @param channel 通道 + * @param webSocketClientMessage 客户端消息 + * @throws UserErrorException + * @throws UserSessionException + */ + private void sendGroupText(Channel channel, WebSocketClientMessage webSocketClientMessage) throws UserErrorException, UserSessionException { + if (StringUtils.isBlank(webSocketClientMessage.getTo())) { + throw new UserErrorException("To 值不能为空"); + } + webSocketClientMessage.setSystem(false); + List toUserIds = new ArrayList<>(Sets.newHashSet(webSocketClientMessage.getTo().split(","))); + List webSocketSessions = WebSocketChannelManager.getInstance().listOnlineUser(toUserIds); + // 返回失败结果 + if (webSocketSessions.isEmpty()) { + sendTextFailed(channel, webSocketClientMessage, "无用户在线"); + return; + } + // 发送消息 + for (WebSocketSession webSocketSession : webSocketSessions) { + WebSocketChannelManager.getInstance().sendText(webSocketSession.getChannel(), webSocketClientMessage); + } + } + + private void listOnlineUser(Channel channel, WebSocketClientMessage clientSocketMessage) { + + } + + /** + * 发送文本失败 + * + * @param fromChannel 来源通道 + * @param webSocketClientMessage webSocket客户端消息 + * @param msg + */ + public static void sendTextFailed(Channel fromChannel, WebSocketClientMessage webSocketClientMessage, String msg) { + webSocketClientMessage.setType(ClientSocketTypeEnum.SEND_STATUS.getValue()); + webSocketClientMessage.setBody(JSONObject.toJSONString(new SendStatusBody(SendStatusEnum.FAILED.getValue(), SendStatusEnum.FAILED, msg))); + WebSocketChannelManager.getInstance().sendText(fromChannel, webSocketClientMessage); + } + +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/manager/WebSocketChannelManager.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/manager/WebSocketChannelManager.java new file mode 100644 index 00000000..151ef70d --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/manager/WebSocketChannelManager.java @@ -0,0 +1,233 @@ +package ink.wgink.module.instantmessage.websocket.manager; + +import com.alibaba.fastjson.JSONObject; +import ink.wgink.module.instantmessage.websocket.exception.SessionException; +import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage; +import ink.wgink.module.instantmessage.websocket.pojo.WebSocketSession; +import io.netty.channel.Channel; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; +import io.netty.util.concurrent.GlobalEventExecutor; +import org.apache.commons.lang3.StringUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * @ClassName: WebSocketChannelManager + * @Description: WebSocket通道管理器 + * @Author: wanggeng + * @Date: 2021/9/11 11:56 上午 + * @Version: 1.0 + */ +public class WebSocketChannelManager { + public static final String FORM_SYSTEM = "SYSTEM"; + private static final WebSocketChannelManager webSocketChannelManager = WebSocketChannelManagerBuilder.webSocketChannelManager; + private ChannelGroup globalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + private ConcurrentMap> webSocketUserMap = new ConcurrentHashMap<>(); + + private WebSocketChannelManager() { + } + + public static WebSocketChannelManager getInstance() { + return webSocketChannelManager; + } + + /** + * 初始化会话队列 + * + * @param userId 用户ID + * @param sessionId 会话ID + * @param clientName 客户端名称 + */ + public void init(String userId, String sessionId, String clientName) { + WebSocketSession onlineUser = getOnlineUser(userId, clientName); + if (onlineUser == null) { + onlineUser = new WebSocketSession(); + onlineUser.setUserId(userId); + onlineUser.setClientName(clientName); + // 更新到session队列中 + List webSocketSessions = listOnlineUser(userId); + webSocketSessions.add(onlineUser); + webSocketUserMap.put(userId, webSocketSessions); + } + // 更新最后时间 + onlineUser.setSessionId(sessionId); + onlineUser.setUpdateTime(System.currentTimeMillis()); + } + + /** + * 添加通道,绑定通道与会话 + * + * @param sessionId + * @param userId + * @param channel + * @throws SessionException + */ + public void addChannel(String sessionId, String userId, Channel channel) throws SessionException { + WebSocketSession onlineUser = getOnlineUserBySessionId(sessionId); + if (onlineUser == null) { + throw new SessionException("无效会话,请登录"); + } + if (!StringUtils.equals(userId, onlineUser.getUserId())) { + throw new SessionException("from错误"); + } + // 更新会话通道 + onlineUser.setChannel(channel); + onlineUser.setChannelId(channel.id().asLongText()); + globalGroup.add(channel); + } + + public void removeChannel(Channel channel) { + globalGroup.remove(channel); + for (Map.Entry> kv : webSocketUserMap.entrySet()) { + List webSocketSessions = kv.getValue(); + for (int i = 0; i < webSocketSessions.size(); i++) { + WebSocketSession webSocketSession = webSocketSessions.get(i); + if (StringUtils.equals(webSocketSession.getChannelId(), channel.id().asLongText())) { + webSocketSessions.remove(i); + return; + } + } + } + } + + /** + * 获取在线用户客户端列表 + * + * @param userId 用户ID + * @return + */ + public List listOnlineUser(String userId) { + if (StringUtils.isBlank(userId)) { + return new ArrayList<>(); + } + List webSocketSessions = webSocketUserMap.get(userId); + if (webSocketSessions == null) { + return new ArrayList<>(); + } + return webSocketSessions; + } + + + /** + * 获取在线用户 + * + * @param userId 用户ID + * @param clientName 客户端名称 + * @return + */ + public WebSocketSession getOnlineUser(String userId, String clientName) { + if (StringUtils.isBlank(userId) || StringUtils.isBlank(clientName)) { + return null; + } + List webSocketSessions = listOnlineUser(userId); + if (webSocketSessions.isEmpty()) { + return null; + } + for (WebSocketSession webSocketSession : webSocketSessions) { + if (StringUtils.equals(clientName, webSocketSession.getClientName())) { + return webSocketSession; + } + } + return null; + } + + /** + * 获取在线用户客户端列表 + * + * @param userIds 用户ID列表 + * @return + */ + public List listOnlineUser(List userIds) { + if (userIds == null || userIds.isEmpty()) { + return new ArrayList<>(); + } + List webSocketSessions = new ArrayList<>(); + for (String userId : userIds) { + List users = webSocketUserMap.get(userId); + if (users != null) { + webSocketSessions.addAll(users); + } + } + return webSocketSessions; + } + + /** + * 获取在线用户客户端列表 + * + * @param userIds 用户ID列表 + * @param clientName 客户端名称 + * @return + */ + public List listOnlineUser(List userIds, String clientName) { + if (userIds == null || userIds.isEmpty() || StringUtils.isBlank(clientName)) { + return new ArrayList<>(); + } + List webSocketSessions = new ArrayList<>(); + for (String userId : userIds) { + List users = webSocketUserMap.get(userId); + if (users == null) { + continue; + } + for (WebSocketSession user : users) { + if (StringUtils.equals(clientName, user.getClientName())) { + webSocketSessions.add(user); + } + } + } + return webSocketSessions; + } + + /** + * 通过会话获取用户 + * + * @param sessionId 会话ID + * @return + */ + private WebSocketSession getOnlineUserBySessionId(String sessionId) { + for (Map.Entry> kv : webSocketUserMap.entrySet()) { + for (WebSocketSession webSocketSession : kv.getValue()) { + if (StringUtils.equals(sessionId, webSocketSession.getSessionId())) { + return webSocketSession; + } + } + } + return null; + } + + /** + * 通过会话获取用户 + * + * @param channelId 通道ID + * @return + */ + private WebSocketSession getOnlineUserByChannelId(String channelId) { + for (Map.Entry> kv : webSocketUserMap.entrySet()) { + for (WebSocketSession webSocketSession : kv.getValue()) { + if (StringUtils.equals(channelId, webSocketSession.getChannelId())) { + return webSocketSession; + } + } + } + return null; + } + + public void sendAll(TextWebSocketFrame tws) { + globalGroup.writeAndFlush(tws); + } + + public void sendText(Channel channel, WebSocketClientMessage webSocketClientMessage) { + TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(JSONObject.toJSONString(webSocketClientMessage)); + channel.writeAndFlush(textWebSocketFrame); + } + + private static class WebSocketChannelManagerBuilder { + public static final WebSocketChannelManager webSocketChannelManager = new WebSocketChannelManager(); + } + +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/WebSocketClientMessage.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/WebSocketClientMessage.java new file mode 100644 index 00000000..53f14dbc --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/WebSocketClientMessage.java @@ -0,0 +1,113 @@ +package ink.wgink.module.instantmessage.websocket.pojo; + + +/** + * When you feel like quitting. Think about why you started + * 当你想要放弃的时候,想想当初你为何开始 + * + * @ClassName: AppSocketMessage + * @Description: App Socket 消息 + * @Author: wanggeng + * @Date: 2021/1/13 11:15 下午 + * @Version: 1.0 + */ +public class WebSocketClientMessage { + + /** + * id + */ + private String id; + /** + * AppSocketTypeEnum 消息类型 + */ + private Integer type; + /** + * 是否系统消息 + */ + private Boolean isSystem; + /** + * 来源,系统消息为:SYSTEM,非系统消息为 userId + */ + private String from; + /** + * 去处,系统消息为 clientName, 非系统消息为 userId + */ + private String to; + /** + * 消息体 + */ + private String body; + /** + * 时间戳 + */ + private long timestamp; + + public WebSocketClientMessage() { + } + + public WebSocketClientMessage(Integer type, Boolean isSystem, String from, String to, String body) { + this.type = type; + this.isSystem = isSystem; + this.from = from; + this.to = to; + this.body = body; + } + + public String getId() { + return id == null ? "" : id; + } + + public void setId(String id) { + this.id = id; + } + + public Integer getType() { + return type == null ? 0 : type; + } + + public void setType(Integer type) { + this.type = type; + } + + public Boolean getSystem() { + return isSystem; + } + + public void setSystem(Boolean system) { + isSystem = system; + } + + public String getFrom() { + return from == null ? "" : from; + } + + public void setFrom(String from) { + this.from = from; + } + + public String getTo() { + return to == null ? "" : to; + } + + public void setTo(String to) { + this.to = to; + } + + public String getBody() { + return body == null ? "" : body; + } + + public void setBody(String body) { + this.body = body; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public long getTimestamp() { + timestamp = System.currentTimeMillis(); + return timestamp; + } + +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/WebSocketSession.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/WebSocketSession.java new file mode 100644 index 00000000..9a0e2b7b --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/WebSocketSession.java @@ -0,0 +1,86 @@ +package ink.wgink.module.instantmessage.websocket.pojo; + +import io.netty.channel.Channel; + +/** + * @ClassName: WebSocketUser + * @Description: webSocket用户 + * @Author: wanggeng + * @Date: 2021/9/11 11:28 下午 + * @Version: 1.0 + */ +public class WebSocketSession { + + /** + * 会话ID,登录时返回,socket注册时携带 + */ + private String sessionId; + /** + * 用户ID + */ + private String userId; + /** + * 客户端名称,同一用户的同一客户端只能有一个 + */ + private String clientName; + /** + * 通道ID + */ + private String channelId; + /** + * 通道 + */ + private Channel channel; + /** + * 更新时间 + */ + private Long updateTime; + + public String getSessionId() { + return sessionId == null ? "" : sessionId.trim(); + } + + public void setSessionId(String sessionId) { + this.sessionId = sessionId; + } + + public String getUserId() { + return userId == null ? "" : userId.trim(); + } + + public void setUserId(String userId) { + this.userId = userId; + } + + public String getClientName() { + return clientName == null ? "" : clientName.trim(); + } + + public void setClientName(String clientName) { + this.clientName = clientName; + } + + public String getChannelId() { + return channelId == null ? "" : channelId.trim(); + } + + public void setChannelId(String channelId) { + this.channelId = channelId; + } + + public Channel getChannel() { + return channel; + } + + public void setChannel(Channel channel) { + this.channel = channel; + } + + public Long getUpdateTime() { + return updateTime == null ? 0 : updateTime; + } + + public void setUpdateTime(Long updateTime) { + this.updateTime = updateTime; + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/body/NoticeBody.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/body/NoticeBody.java new file mode 100644 index 00000000..1161f7c7 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/body/NoticeBody.java @@ -0,0 +1,80 @@ +package ink.wgink.module.instantmessage.websocket.pojo.body; + +/** + * @ClassName: NoticeBody + * @Description: 通知 + * @Author: wanggeng + * @Date: 2021/9/12 10:09 下午 + * @Version: 1.0 + */ +public class NoticeBody { + /** + * 标题 + */ + private String title; + /** + * 内容 + */ + private String msg; + /** + * 目标,需要客户端点击触发效果时使用,可以为约定好的 url、page 等等 + */ + private String target; + /** + * 业务ID + */ + private String serviceId; + + public NoticeBody() { + } + + public NoticeBody(String title, String msg) { + this.title = title; + this.msg = msg; + } + + public NoticeBody(String title, String msg, String target) { + this.title = title; + this.msg = msg; + this.target = target; + } + + public NoticeBody(String title, String msg, String target, String serviceId) { + this.title = title; + this.msg = msg; + this.target = target; + this.serviceId = serviceId; + } + + public String getTitle() { + return title == null ? "" : title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getMsg() { + return msg == null ? "" : msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + public String getTarget() { + return target == null ? "" : target; + } + + public void setTarget(String target) { + this.target = target; + } + + public String getServiceId() { + return serviceId; + } + + public void setServiceId(String serviceId) { + this.serviceId = serviceId; + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/body/RegisterBody.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/body/RegisterBody.java new file mode 100644 index 00000000..776b3933 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/body/RegisterBody.java @@ -0,0 +1,27 @@ +package ink.wgink.module.instantmessage.websocket.pojo.body; + +/** + * @ClassName: RegisterBody + * @Description: 注册 + * @Author: wanggeng + * @Date: 2021/9/12 10:07 下午 + * @Version: 1.0 + */ +public class RegisterBody { + + /** + * 会话ID + */ + private String sessionId; + + public RegisterBody() { + } + + public String getSessionId() { + return sessionId == null ? "" : sessionId.trim(); + } + + public void setSessionId(String sessionId) { + this.sessionId = sessionId; + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/body/SendStatusBody.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/body/SendStatusBody.java new file mode 100644 index 00000000..22e9e66b --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/body/SendStatusBody.java @@ -0,0 +1,49 @@ +package ink.wgink.module.instantmessage.websocket.pojo.body; + +import ink.wgink.module.instantmessage.websocket.enums.SendStatusEnum; + +/** + * @ClassName: SendStatusBody + * @Description: 发送状态 + * @Author: wanggeng + * @Date: 2021/9/12 10:08 下午 + * @Version: 1.0 + */ +public class SendStatusBody { + private Integer code; + private SendStatusEnum status; + private String msg; + + public SendStatusBody() { + } + + public SendStatusBody(int code, SendStatusEnum status, String msg) { + this.code = code; + this.status = status; + this.msg = msg; + } + + public Integer getCode() { + return code == null ? 0 : code; + } + + public void setCode(Integer code) { + this.code = code; + } + + public SendStatusEnum getStatus() { + return status; + } + + public void setStatus(SendStatusEnum status) { + this.status = status; + } + + public String getMsg() { + return msg == null ? "" : msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/body/TargetBody.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/body/TargetBody.java new file mode 100644 index 00000000..306addb0 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/pojo/body/TargetBody.java @@ -0,0 +1,37 @@ +package ink.wgink.module.instantmessage.websocket.pojo.body; + +/** + * @ClassName: TargetBody + * @Description: 目标 + * @Author: wanggeng + * @Date: 2021/9/12 10:09 下午 + * @Version: 1.0 + */ +public class TargetBody { + private String msg; + private String target; + + public TargetBody() { + } + + public TargetBody(String msg, String target) { + this.msg = msg; + this.target = target; + } + + public String getMsg() { + return msg == null ? "" : msg; + } + + public void setMsg(String msg) { + this.msg = msg; + } + + public String getTarget() { + return target == null ? "" : target; + } + + public void setTarget(String target) { + this.target = target; + } +} diff --git a/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/server/WebSocketServer.java b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/server/WebSocketServer.java new file mode 100644 index 00000000..e7ee9fa2 --- /dev/null +++ b/module-instant-message/src/main/java/ink/wgink/module/instantmessage/websocket/server/WebSocketServer.java @@ -0,0 +1,65 @@ +package ink.wgink.module.instantmessage.websocket.server; + +import ink.wgink.module.instantmessage.websocket.channel.WebSocketChannelInitializer; +import ink.wgink.properties.websocket.WebSocketProperties; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @ClassName: WebSocketServer + * @Description: WebSocket服务 + * @Author: wanggeng + * @Date: 2021/9/11 11:00 上午 + * @Version: 1.0 + */ +@Component +public class WebSocketServer implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class); + @Autowired + private WebSocketProperties webSocketProperties; + + @Override + public void run() { + // 通道初始化 + WebSocketChannelInitializer webSocketChannelInitializer = new WebSocketChannelInitializer(); + webSocketChannelInitializer.setWebSocketProperties(webSocketProperties); + // server + ServerBootstrap serverBootstrap = new ServerBootstrap(); + EventLoopGroup bossGroup = new NioEventLoopGroup(); + EventLoopGroup childGroup = new NioEventLoopGroup(); + try { + serverBootstrap.group(bossGroup, childGroup); + serverBootstrap.channel(NioServerSocketChannel.class); + serverBootstrap.childHandler(webSocketChannelInitializer); + ChannelFuture channelFuture = serverBootstrap.bind(webSocketProperties.getPort()).sync().addListener(listener -> { + if (listener.isSuccess()) { + StringBuilder socketOutSB = new StringBuilder(); + socketOutSB.append("******************************\n"); + socketOutSB.append("** **\n"); + socketOutSB.append("** WebSocket Server **\n"); + socketOutSB.append("** Start Successfully! **\n"); + socketOutSB.append("** **\n"); + socketOutSB.append("** Port: " + webSocketProperties.getPort() + " **\n"); + socketOutSB.append("** **\n"); + socketOutSB.append("******************************\n"); + System.out.println(socketOutSB); + } + }); + // 监听通道关闭 + channelFuture.channel().closeFuture().sync(); + } catch (InterruptedException e) { + LOG.error(e.getMessage(), e); + } finally { + bossGroup.shutdownGracefully(); + childGroup.shutdownGracefully(); + } + } +} diff --git a/pom.xml b/pom.xml index e1870e52..c879486f 100644 --- a/pom.xml +++ b/pom.xml @@ -37,6 +37,7 @@ module-file-media module-map module-activiti + module-instant-message pom