新增WebSocket模块

This commit is contained in:
wanggeng 2021-09-12 23:06:35 +08:00
parent ca99fe345b
commit fc75b13e78
28 changed files with 1623 additions and 0 deletions

View File

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>wg-basic</artifactId>
<groupId>ink.wgink</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>module-instant-message</artifactId>
<description>WebSocket即时通讯</description>
<dependencies>
<dependency>
<groupId>ink.wgink</groupId>
<artifactId>common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<!-- netty start -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<!-- netty end -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,37 @@
package ink.wgink.module.instantmessage.controller.api;
import ink.wgink.interfaces.consts.ISystemConstant;
import ink.wgink.module.instantmessage.service.IWebSocketClientService;
import ink.wgink.pojo.result.ErrorResult;
import ink.wgink.pojo.result.SuccessResultData;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* @ClassName: WebSocketClientController
* @Description: WebSocket客户端
* @Author: wanggeng
* @Date: 2021/9/12 4:15 下午
* @Version: 1.0
*/
@Api(tags = ISystemConstant.API_TAGS_SYSTEM_PREFIX + "websocket客户端")
@RestController
@RequestMapping(ISystemConstant.API_PREFIX + "/websocket/client")
public class WebSocketClientController {
@Autowired
private IWebSocketClientService webSocketClientService;
@ApiOperation(value = "登录", notes = "登录接口返回Socket认证token")
@ApiResponses({@ApiResponse(code = 400, message = "请求失败", response = ErrorResult.class)})
@GetMapping("login/{clientName}")
public SuccessResultData<String> login(@PathVariable("clientName") String clientName) {
String sessionId = webSocketClientService.login(clientName);
return new SuccessResultData<>(sessionId);
}
}

View File

@ -0,0 +1,37 @@
package ink.wgink.module.instantmessage.controller.app;
import ink.wgink.interfaces.consts.ISystemConstant;
import ink.wgink.module.instantmessage.service.IWebSocketClientService;
import ink.wgink.pojo.result.ErrorResult;
import ink.wgink.pojo.result.SuccessResultData;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* @ClassName: WebSocketClientAppController
* @Description: WebSocket客户端
* @Author: wanggeng
* @Date: 2021/9/12 4:15 下午
* @Version: 1.0
*/
@Api(tags = ISystemConstant.API_TAGS_APP_PREFIX + "websocket客户端")
@RestController
@RequestMapping(ISystemConstant.APP_PREFIX + "/websocket/client")
public class WebSocketClientAppController {
@Autowired
private IWebSocketClientService webSocketClientService;
@ApiOperation(value = "登录", notes = "登录接口返回userId(sessionId)")
@ApiResponses({@ApiResponse(code = 400, message = "请求失败", response = ErrorResult.class)})
@GetMapping("login/{clientName}")
public SuccessResultData<String> login(@RequestHeader("token") String token, @PathVariable("clientName") String clientName) {
String sessionId = webSocketClientService.login(token, clientName);
return new SuccessResultData<>(sessionId);
}
}

View File

@ -0,0 +1,28 @@
package ink.wgink.module.instantmessage.service;
/**
* @ClassName: IWebSocketClientService
* @Description: websocket客户端
* @Author: wanggeng
* @Date: 2021/9/12 4:44 下午
* @Version: 1.0
*/
public interface IWebSocketClientService {
/**
* 登录
*
* @param clientName 客户端名称
* @return sessionId
*/
String login(String clientName);
/**
* 登录
*
* @param token
* @param clientName 客户端名称
* @return sessionId
*/
String login(String token, String clientName);
}

View File

@ -0,0 +1,46 @@
package ink.wgink.module.instantmessage.service.impl;
import ink.wgink.common.base.DefaultBaseService;
import ink.wgink.module.instantmessage.service.IWebSocketClientService;
import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager;
import ink.wgink.pojo.app.AppTokenUser;
import ink.wgink.util.UUIDUtil;
import org.springframework.stereotype.Service;
/**
* @ClassName: WebSocketClientServiceImpl
* @Description: webSocketk客户端
* @Author: wanggeng
* @Date: 2021/9/12 4:44 下午
* @Version: 1.0
*/
@Service
public class WebSocketClientServiceImpl extends DefaultBaseService implements IWebSocketClientService {
@Override
public String login(String clientName) {
String userId = securityComponent.getCurrentUser().getUserId();
return initSession(userId, clientName);
}
@Override
public String login(String token, String clientName) {
AppTokenUser appTokenUser = getAppTokenUser(token);
return initSession(appTokenUser.getId(), clientName);
}
/**
* 初始化Socket会话
*
* @param userId 用户ID
* @param clientName 客户端名称
* @return
*/
private String initSession(String userId, String clientName) {
String sessionId = UUIDUtil.getUUID();
WebSocketChannelManager.getInstance().init(userId, sessionId, clientName);
return sessionId;
}
}

View File

@ -0,0 +1,27 @@
package ink.wgink.module.instantmessage.startup;
import ink.wgink.module.instantmessage.websocket.server.WebSocketServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
/**
* @ClassName: WebSocketStartUp
* @Description:
* @Author: wanggeng
* @Date: 2021/9/11 10:38 下午
* @Version: 1.0
*/
@Component
public class WebSocketStartUp implements ApplicationRunner {
@Autowired
private WebSocketServer webSocketServer;
@Override
public void run(ApplicationArguments args) throws Exception {
new Thread(webSocketServer).start();
}
}

View File

@ -0,0 +1,43 @@
package ink.wgink.module.instantmessage.websocket.channel;
import ink.wgink.module.instantmessage.websocket.handler.WebSocketHandler;
import ink.wgink.properties.websocket.WebSocketProperties;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
/**
* @ClassName: WebSocketChannelInitializer
* @Description: WebSocket
* @Author: wanggeng
* @Date: 2021/9/11 11:38 上午
* @Version: 1.0
*/
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
private WebSocketProperties webSocketProperties;
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//设置log监听器并且日志级别为debug方便观察运行流程
socketChannel.pipeline().addLast("logging", new LoggingHandler("DEBUG"));
//设置解码器
socketChannel.pipeline().addLast("http-codec", new HttpServerCodec());
//聚合器使用websocket会用到
socketChannel.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
//用于大数据的分区传输
socketChannel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
//自定义的业务handler
WebSocketHandler webSocketHandler = new WebSocketHandler();
webSocketHandler.setWebSocketProperties(webSocketProperties);
socketChannel.pipeline().addLast("handler", webSocketHandler);
}
public void setWebSocketProperties(WebSocketProperties webSocketProperties) {
this.webSocketProperties = webSocketProperties;
}
}

View File

@ -0,0 +1,52 @@
package ink.wgink.module.instantmessage.websocket.enums;
/**
* When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始
*
* @ClassName: AppSocketTypeEnum
* @Description: App Socket 类型
* @Author: wanggeng
* @Date: 2021/1/13 11:20 下午
* @Version: 1.0
*/
public enum ClientSocketTypeEnum {
REGISTER(100, "注册消息body 为 RegisterBody 的 JSON 字符串"),
MESSAGE(101, "文本消息"),
GROUP_MESSAGE(102, "群发文本消息"),
SYSTEM_MESSAGE(103, "系统消息"),
SYSTEM_GROUP_MESSAGE(104, "系统群发问呢消息"),
SYSTEM_TARGET_MESSAGE(105, "系统目标消息"),
LAYIM_HREF_MESSAGE(106, "LAYIM 连接消息"),
LAYIM_IMAGE_MESSAGE(107, "LAYIM 图片消息"),
LAYIM_FILE_MESSAGE(108, "LAYIM 文件消息"),
LAYIM_AUDIO_MESSAGE(109, "LAYIM 音频消息"),
LAYIM_VIDEO_MESSAGE(110, "LAYIM 视频消息"),
NOTICE(106, "通知"),
NOTICE_GROUP_MESSAGE(107, "群通知"),
NOTICE_TARGET_MESSAGE(108, "目标通知用于APP打开特定页面"),
UPDATE_SERVICE_HANDLE_STATUS(501, "更新业务的处理状态"),
SEARCH_ONLINE_USER(600, "查询全部在线用户body 为查询用户的 userId"),
SEARCH_ONLINE_USER_FRIEND(601, "查询朋友在线用户body 为查询用户的 userId"),
SEARCH_COUNT_NEED_TO_DEALT_WITH(602, "查询全部待办总数"),
SEND_STATUS(1100, "发送状态body 为 BaseResult 的 JSON 字符串"),
SEND_STATUS_ONLINE(1101, "发送在线状态body 为 在线用户的ID JSONArray 字符串"),
SEND_STATUS_OFFLINE(1102, "发送离线状态body 为 离线用户的ID JSONArray 字符串");
private int value;
private String summary;
ClientSocketTypeEnum(int value, String summary) {
this.value = value;
this.summary = summary;
}
public int getValue() {
return value;
}
public String getSummary() {
return summary == null ? "" : summary.trim();
}
}

View File

@ -0,0 +1,64 @@
package ink.wgink.module.instantmessage.websocket.enums;
/**
* When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始
*
* @ClassName: ResultCodeEnum
* @Description: 返回编码类型
* @Author: wanggeng
* @Date: 2021/1/14 12:59 下午
* @Version: 1.0
*/
public enum SendStatusEnum {
/**
* 成功
*/
SUCCESS(200),
/**
* 失败
*/
FAILED(300),
/**
* 消息错误
*/
MESSAGE_ERROR(400),
/**
* 会话错误
*/
SESSION_ERROR(402),
/**
* APP会话错误
*/
APP_SESSION_ERROR(403),
/**
* 用户会话错误
*/
USER_SESSION_ERROR(404),
/**
* 类型错误
*/
TYPE_ERROR(405),
/**
* 消息体错误
*/
BODY_ERROR(406),
/**
* 来源错误
*/
FROM_ERROR(407),
/**
* 接受用户错误
*/
TO_ERROR(408);
private int value;
SendStatusEnum(int value) {
this.value = value;
}
public int getValue() {
return value;
}
}

View File

@ -0,0 +1,24 @@
package ink.wgink.module.instantmessage.websocket.enums;
/**
* @ClassName: WebSocketClientTypeEnum
* @Description: 客户端类型
* @Author: wanggeng
* @Date: 2021/9/11 11:59 下午
* @Version: 1.0
*/
public enum WebSocketClientTypeEnum {
APP("app"),
WEB("web");
private String value;
WebSocketClientTypeEnum(String value) {
this.value = value;
}
public String getValue() {
return value == null ? "" : value.trim();
}
}

View File

@ -0,0 +1,33 @@
package ink.wgink.module.instantmessage.websocket.exception;
/**
* When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始
*
* @ClassName: AppException
* @Description: App异常
* @Author: wanggeng
* @Date: 2021/1/14 12:23 下午
* @Version: 1.0
*/
public class AppSessionException extends SessionException {
public AppSessionException() {
}
public AppSessionException(String message) {
super(message);
}
public AppSessionException(String message, Throwable cause) {
super(message, cause);
}
public AppSessionException(Throwable cause) {
super(cause);
}
public AppSessionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,33 @@
package ink.wgink.module.instantmessage.websocket.exception;
/**
* When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始
*
* @ClassName: BaseSocketException
* @Description: socket基础异常
* @Author: wanggeng
* @Date: 2021/1/14 12:12 下午
* @Version: 1.0
*/
public class BaseSocketException extends Exception{
public BaseSocketException() {
}
public BaseSocketException(String message) {
super(message);
}
public BaseSocketException(String message, Throwable cause) {
super(message, cause);
}
public BaseSocketException(Throwable cause) {
super(cause);
}
public BaseSocketException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,32 @@
package ink.wgink.module.instantmessage.websocket.exception;
/**
* When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始
*
* @ClassName: BodyEmptyException
* @Description: 主体为空异常
* @Author: wanggeng
* @Date: 2021/1/14 12:11 下午
* @Version: 1.0
*/
public class BodyException extends BaseSocketException {
public BodyException() {
}
public BodyException(String message) {
super(message);
}
public BodyException(String message, Throwable cause) {
super(message, cause);
}
public BodyException(Throwable cause) {
super(cause);
}
public BodyException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,33 @@
package ink.wgink.module.instantmessage.websocket.exception;
/**
* When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始
*
* @ClassName: SessionInvalidException
* @Description: 会话无效异常
* @Author: wanggeng
* @Date: 2021/1/14 12:04 下午
* @Version: 1.0
*/
public class SessionException extends BaseSocketException {
public SessionException() {
}
public SessionException(String message) {
super(message);
}
public SessionException(String message, Throwable cause) {
super(message, cause);
}
public SessionException(Throwable cause) {
super(cause);
}
public SessionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,32 @@
package ink.wgink.module.instantmessage.websocket.exception;
/**
* When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始
*
* @ClassName: TypeErrorException
* @Description: 类型错误异常
* @Author: wanggeng
* @Date: 2021/1/14 12:14 下午
* @Version: 1.0
*/
public class TypeErrorException extends BaseSocketException {
public TypeErrorException() {
}
public TypeErrorException(String message) {
super(message);
}
public TypeErrorException(String message, Throwable cause) {
super(message, cause);
}
public TypeErrorException(Throwable cause) {
super(cause);
}
public TypeErrorException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,33 @@
package ink.wgink.module.instantmessage.websocket.exception;
/**
* When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始
*
* @ClassName: UserErrorException
* @Description: 用户错误异常
* @Author: wanggeng
* @Date: 2021/1/14 12:51 下午
* @Version: 1.0
*/
public class UserErrorException extends BaseSocketException {
public UserErrorException() {
}
public UserErrorException(String message) {
super(message);
}
public UserErrorException(String message, Throwable cause) {
super(message, cause);
}
public UserErrorException(Throwable cause) {
super(cause);
}
public UserErrorException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,33 @@
package ink.wgink.module.instantmessage.websocket.exception;
/**
* When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始
*
* @ClassName: UserException
* @Description: 用户异常
* @Author: wanggeng
* @Date: 2021/1/14 12:21 下午
* @Version: 1.0
*/
public class UserSessionException extends SessionException {
public UserSessionException() {
}
public UserSessionException(String message) {
super(message);
}
public UserSessionException(String message, Throwable cause) {
super(message, cause);
}
public UserSessionException(Throwable cause) {
super(cause);
}
public UserSessionException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,146 @@
package ink.wgink.module.instantmessage.websocket.handler;
import ink.wgink.module.instantmessage.websocket.handler.text.WebSocketTextHandler;
import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager;
import ink.wgink.properties.websocket.WebSocketProperties;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
/**
* @ClassName: WebSocketHandler
* @Description: 1.web发起一次类似是http的请求并在channelRead0方法中进行处理并通过instanceof去判断帧对象是FullHttpRequest还是WebSocketFrame建立连接是时候会是FullHttpRequest
* 2.在handleHttpRequest方法中去创建websocket首先是判断Upgrade是不是websocket协议若不是则通过sendHttpResponse将错误信息返回给客户端紧接着通过WebSocketServerHandshakerFactory创建socket对象并通过handshaker握手创建连接
* 3.在连接创建好后的所以消息流动都是以WebSocketFrame来体现
* 4.在handlerWebSocketFrame去处理消息也可能是客户端发起的关闭指令ping指令等等
* @Author: wanggeng
* @Date: 2021/9/11 11:40 上午
* @Version: 1.0
*/
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketHandler.class);
private WebSocketProperties webSocketProperties;
private WebSocketServerHandshaker webSocketServerHandshaker;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
LOG.debug("收到消息:" + msg);
if (msg instanceof FullHttpRequest) {
//以http请求形式接入但是走的是websocket
handleHttpRequest(ctx, (FullHttpRequest) msg);
} else if (msg instanceof WebSocketFrame) {
//处理websocket客户端的消息
handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
LOG.debug("新客户端链接:" + ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//断开连接
LOG.debug("客户端断开连接:" + ctx.channel());
WebSocketChannelManager.getInstance().removeChannel(ctx.channel());
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
// 判断是否关闭链路的指令
if (frame instanceof CloseWebSocketFrame) {
webSocketServerHandshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
// 判断是否ping消息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
// 文本消息
if (frame instanceof TextWebSocketFrame) {
WebSocketTextHandler webSocketTextHandler = new WebSocketTextHandler();
webSocketTextHandler.handler(ctx, (TextWebSocketFrame) frame);
return;
}
// 二进制文件暂不处理
if (frame instanceof BinaryWebSocketFrame) {
return;
}
}
/**
* 唯一的一次http请求用于创建websocket
*/
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
//要求Upgrade为websocket过滤掉get/Post
if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
//若不是websocket方式则创建BAD_REQUEST的req返回给客户端
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(String.format("ws://%s:%s/%s", webSocketProperties.getUrl(), webSocketProperties.getPort(), webSocketProperties.getContext()), null, false);
webSocketServerHandshaker = wsFactory.newHandshaker(req);
if (webSocketServerHandshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
webSocketServerHandshaker.handshake(ctx.channel(), req);
}
}
/**
* 拒绝不合法的请求并返回错误信息
*/
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
// 返回应答给客户端
if (res.status().code() != HttpStatus.OK.value()) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
}
ChannelFuture f = ctx.channel().writeAndFlush(res);
// 如果是非Keep-Alive关闭连接
if (!isKeepAlive(req) || res.status().code() != HttpStatus.OK.value()) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
/**
* 判断是不是长连接
*
* @param req
* @return
*/
private boolean isKeepAlive(FullHttpRequest req) {
String connection = req.headers().get("Connection");
if (StringUtils.isBlank(connection)) {
return false;
}
if (!StringUtils.equals("keep-alive", connection)) {
return false;
}
return true;
}
public void setWebSocketProperties(WebSocketProperties webSocketProperties) {
this.webSocketProperties = webSocketProperties;
}
}

View File

@ -0,0 +1,162 @@
package ink.wgink.module.instantmessage.websocket.handler.text;
import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import ink.wgink.module.instantmessage.websocket.enums.ClientSocketTypeEnum;
import ink.wgink.module.instantmessage.websocket.enums.SendStatusEnum;
import ink.wgink.module.instantmessage.websocket.exception.*;
import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketSession;
import ink.wgink.module.instantmessage.websocket.pojo.body.RegisterBody;
import ink.wgink.module.instantmessage.websocket.pojo.body.SendStatusBody;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.apache.commons.compress.utils.Sets;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List;
/**
* @ClassName: WebSocketTextHandler
* @Description: WebSocket文本处理器
* @Author: wanggeng
* @Date: 2021/9/12 8:08 下午
* @Version: 1.0
*/
public class WebSocketTextHandler {
public void handler(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) {
WebSocketClientMessage clientSocketMessage = null;
SendStatusBody sendStatusBody = null;
try {
clientSocketMessage = JSONObject.parseObject(textWebSocketFrame.text(), WebSocketClientMessage.class);
if (clientSocketMessage.getType() == null) {
throw new TypeErrorException("Type类型不能为空");
}
if (StringUtils.isBlank(clientSocketMessage.getBody())) {
throw new BodyException("Body主体不能为空");
}
if (ClientSocketTypeEnum.REGISTER.getValue() == clientSocketMessage.getType()) {
// 客户端注册消息
sendStatusBody = clientRegisterSession(ctx.channel(), clientSocketMessage);
} else if (ClientSocketTypeEnum.MESSAGE.getValue() == clientSocketMessage.getType()) {
// 文本消息
sendText(ctx.channel(), clientSocketMessage);
} else if (ClientSocketTypeEnum.GROUP_MESSAGE.getValue() == clientSocketMessage.getType()) {
// 群发消息
sendGroupText(ctx.channel(), clientSocketMessage);
} else if (ClientSocketTypeEnum.SEARCH_ONLINE_USER.getValue() == clientSocketMessage.getType()) {
// 用户在线状态
listOnlineUser(ctx.channel(), clientSocketMessage);
}
} catch (SessionException e) {
// 没有登录时返回内容关闭连接
sendStatusBody = new SendStatusBody(SendStatusEnum.SESSION_ERROR.getValue(), SendStatusEnum.SESSION_ERROR, e.getMessage());
clientSocketMessage.setBody(JSONObject.toJSONString(sendStatusBody));
WebSocketChannelManager.getInstance().sendText(ctx.channel(), clientSocketMessage);
ctx.close();
} catch (TypeErrorException e) {
sendStatusBody = new SendStatusBody(SendStatusEnum.SESSION_ERROR.getValue(), SendStatusEnum.SESSION_ERROR, e.getMessage());
} catch (BodyException e) {
sendStatusBody = new SendStatusBody(SendStatusEnum.SESSION_ERROR.getValue(), SendStatusEnum.SESSION_ERROR, e.getMessage());
} catch (UserErrorException e) {
sendStatusBody = new SendStatusBody(SendStatusEnum.SESSION_ERROR.getValue(), SendStatusEnum.SESSION_ERROR, e.getMessage());
} catch (JSONException e) {
clientSocketMessage = new WebSocketClientMessage();
clientSocketMessage.setSystem(true);
clientSocketMessage.setFrom(WebSocketChannelManager.FORM_SYSTEM);
sendStatusBody = new SendStatusBody(SendStatusEnum.TO_ERROR.getValue(), SendStatusEnum.MESSAGE_ERROR, e.getMessage());
} finally {
if (sendStatusBody != null && ctx.channel().isOpen()) {
clientSocketMessage.setBody(JSONObject.toJSONString(sendStatusBody));
WebSocketChannelManager.getInstance().sendText(ctx.channel(), clientSocketMessage);
}
}
}
/**
* 客户端会话注册
*
* @param channel 会话通道
* @param clientSocketMessage 客户端信息
* @return
* @throws SessionException
*/
private SendStatusBody clientRegisterSession(Channel channel, WebSocketClientMessage clientSocketMessage) throws SessionException {
RegisterBody registerBody = JSONObject.parseObject(clientSocketMessage.getBody(), RegisterBody.class);
// 更新通道
WebSocketChannelManager.getInstance().addChannel(registerBody.getSessionId(), clientSocketMessage.getFrom(), channel);
return new SendStatusBody(SendStatusEnum.SUCCESS.getValue(), SendStatusEnum.SUCCESS, "OK");
}
/**
* 发送文字消息
*
* @param channel 通道
* @param webSocketClientMessage 客户端消息
* @throws UserErrorException
*/
private void sendText(Channel channel, WebSocketClientMessage webSocketClientMessage) throws UserErrorException, UserSessionException {
if (StringUtils.isBlank(webSocketClientMessage.getTo())) {
throw new UserErrorException("To 值不能为空");
}
webSocketClientMessage.setSystem(false);
List<WebSocketSession> webSocketSessions = WebSocketChannelManager.getInstance().listOnlineUser(webSocketClientMessage.getTo());
// 返回失败结果
if (webSocketSessions.isEmpty()) {
sendTextFailed(channel, webSocketClientMessage, "用户不在线");
return;
}
// 发送消息
for (WebSocketSession webSocketSession : webSocketSessions) {
WebSocketChannelManager.getInstance().sendText(webSocketSession.getChannel(), webSocketClientMessage);
}
}
/**
* 群发文字消息
*
* @param channel 通道
* @param webSocketClientMessage 客户端消息
* @throws UserErrorException
* @throws UserSessionException
*/
private void sendGroupText(Channel channel, WebSocketClientMessage webSocketClientMessage) throws UserErrorException, UserSessionException {
if (StringUtils.isBlank(webSocketClientMessage.getTo())) {
throw new UserErrorException("To 值不能为空");
}
webSocketClientMessage.setSystem(false);
List<String> toUserIds = new ArrayList<>(Sets.newHashSet(webSocketClientMessage.getTo().split(",")));
List<WebSocketSession> webSocketSessions = WebSocketChannelManager.getInstance().listOnlineUser(toUserIds);
// 返回失败结果
if (webSocketSessions.isEmpty()) {
sendTextFailed(channel, webSocketClientMessage, "无用户在线");
return;
}
// 发送消息
for (WebSocketSession webSocketSession : webSocketSessions) {
WebSocketChannelManager.getInstance().sendText(webSocketSession.getChannel(), webSocketClientMessage);
}
}
private void listOnlineUser(Channel channel, WebSocketClientMessage clientSocketMessage) {
}
/**
* 发送文本失败
*
* @param fromChannel 来源通道
* @param webSocketClientMessage webSocket客户端消息
* @param msg
*/
public static void sendTextFailed(Channel fromChannel, WebSocketClientMessage webSocketClientMessage, String msg) {
webSocketClientMessage.setType(ClientSocketTypeEnum.SEND_STATUS.getValue());
webSocketClientMessage.setBody(JSONObject.toJSONString(new SendStatusBody(SendStatusEnum.FAILED.getValue(), SendStatusEnum.FAILED, msg)));
WebSocketChannelManager.getInstance().sendText(fromChannel, webSocketClientMessage);
}
}

View File

@ -0,0 +1,233 @@
package ink.wgink.module.instantmessage.websocket.manager;
import com.alibaba.fastjson.JSONObject;
import ink.wgink.module.instantmessage.websocket.exception.SessionException;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketSession;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.commons.lang3.StringUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @ClassName: WebSocketChannelManager
* @Description: WebSocket通道管理器
* @Author: wanggeng
* @Date: 2021/9/11 11:56 上午
* @Version: 1.0
*/
public class WebSocketChannelManager {
public static final String FORM_SYSTEM = "SYSTEM";
private static final WebSocketChannelManager webSocketChannelManager = WebSocketChannelManagerBuilder.webSocketChannelManager;
private ChannelGroup globalGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private ConcurrentMap<String, List<WebSocketSession>> webSocketUserMap = new ConcurrentHashMap<>();
private WebSocketChannelManager() {
}
public static WebSocketChannelManager getInstance() {
return webSocketChannelManager;
}
/**
* 初始化会话队列
*
* @param userId 用户ID
* @param sessionId 会话ID
* @param clientName 客户端名称
*/
public void init(String userId, String sessionId, String clientName) {
WebSocketSession onlineUser = getOnlineUser(userId, clientName);
if (onlineUser == null) {
onlineUser = new WebSocketSession();
onlineUser.setUserId(userId);
onlineUser.setClientName(clientName);
// 更新到session队列中
List<WebSocketSession> webSocketSessions = listOnlineUser(userId);
webSocketSessions.add(onlineUser);
webSocketUserMap.put(userId, webSocketSessions);
}
// 更新最后时间
onlineUser.setSessionId(sessionId);
onlineUser.setUpdateTime(System.currentTimeMillis());
}
/**
* 添加通道绑定通道与会话
*
* @param sessionId
* @param userId
* @param channel
* @throws SessionException
*/
public void addChannel(String sessionId, String userId, Channel channel) throws SessionException {
WebSocketSession onlineUser = getOnlineUserBySessionId(sessionId);
if (onlineUser == null) {
throw new SessionException("无效会话,请登录");
}
if (!StringUtils.equals(userId, onlineUser.getUserId())) {
throw new SessionException("from错误");
}
// 更新会话通道
onlineUser.setChannel(channel);
onlineUser.setChannelId(channel.id().asLongText());
globalGroup.add(channel);
}
public void removeChannel(Channel channel) {
globalGroup.remove(channel);
for (Map.Entry<String, List<WebSocketSession>> kv : webSocketUserMap.entrySet()) {
List<WebSocketSession> webSocketSessions = kv.getValue();
for (int i = 0; i < webSocketSessions.size(); i++) {
WebSocketSession webSocketSession = webSocketSessions.get(i);
if (StringUtils.equals(webSocketSession.getChannelId(), channel.id().asLongText())) {
webSocketSessions.remove(i);
return;
}
}
}
}
/**
* 获取在线用户客户端列表
*
* @param userId 用户ID
* @return
*/
public List<WebSocketSession> listOnlineUser(String userId) {
if (StringUtils.isBlank(userId)) {
return new ArrayList<>();
}
List<WebSocketSession> webSocketSessions = webSocketUserMap.get(userId);
if (webSocketSessions == null) {
return new ArrayList<>();
}
return webSocketSessions;
}
/**
* 获取在线用户
*
* @param userId 用户ID
* @param clientName 客户端名称
* @return
*/
public WebSocketSession getOnlineUser(String userId, String clientName) {
if (StringUtils.isBlank(userId) || StringUtils.isBlank(clientName)) {
return null;
}
List<WebSocketSession> webSocketSessions = listOnlineUser(userId);
if (webSocketSessions.isEmpty()) {
return null;
}
for (WebSocketSession webSocketSession : webSocketSessions) {
if (StringUtils.equals(clientName, webSocketSession.getClientName())) {
return webSocketSession;
}
}
return null;
}
/**
* 获取在线用户客户端列表
*
* @param userIds 用户ID列表
* @return
*/
public List<WebSocketSession> listOnlineUser(List<String> userIds) {
if (userIds == null || userIds.isEmpty()) {
return new ArrayList<>();
}
List<WebSocketSession> webSocketSessions = new ArrayList<>();
for (String userId : userIds) {
List<WebSocketSession> users = webSocketUserMap.get(userId);
if (users != null) {
webSocketSessions.addAll(users);
}
}
return webSocketSessions;
}
/**
* 获取在线用户客户端列表
*
* @param userIds 用户ID列表
* @param clientName 客户端名称
* @return
*/
public List<WebSocketSession> listOnlineUser(List<String> userIds, String clientName) {
if (userIds == null || userIds.isEmpty() || StringUtils.isBlank(clientName)) {
return new ArrayList<>();
}
List<WebSocketSession> webSocketSessions = new ArrayList<>();
for (String userId : userIds) {
List<WebSocketSession> users = webSocketUserMap.get(userId);
if (users == null) {
continue;
}
for (WebSocketSession user : users) {
if (StringUtils.equals(clientName, user.getClientName())) {
webSocketSessions.add(user);
}
}
}
return webSocketSessions;
}
/**
* 通过会话获取用户
*
* @param sessionId 会话ID
* @return
*/
private WebSocketSession getOnlineUserBySessionId(String sessionId) {
for (Map.Entry<String, List<WebSocketSession>> kv : webSocketUserMap.entrySet()) {
for (WebSocketSession webSocketSession : kv.getValue()) {
if (StringUtils.equals(sessionId, webSocketSession.getSessionId())) {
return webSocketSession;
}
}
}
return null;
}
/**
* 通过会话获取用户
*
* @param channelId 通道ID
* @return
*/
private WebSocketSession getOnlineUserByChannelId(String channelId) {
for (Map.Entry<String, List<WebSocketSession>> kv : webSocketUserMap.entrySet()) {
for (WebSocketSession webSocketSession : kv.getValue()) {
if (StringUtils.equals(channelId, webSocketSession.getChannelId())) {
return webSocketSession;
}
}
}
return null;
}
public void sendAll(TextWebSocketFrame tws) {
globalGroup.writeAndFlush(tws);
}
public void sendText(Channel channel, WebSocketClientMessage webSocketClientMessage) {
TextWebSocketFrame textWebSocketFrame = new TextWebSocketFrame(JSONObject.toJSONString(webSocketClientMessage));
channel.writeAndFlush(textWebSocketFrame);
}
private static class WebSocketChannelManagerBuilder {
public static final WebSocketChannelManager webSocketChannelManager = new WebSocketChannelManager();
}
}

View File

@ -0,0 +1,113 @@
package ink.wgink.module.instantmessage.websocket.pojo;
/**
* When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始
*
* @ClassName: AppSocketMessage
* @Description: App Socket 消息
* @Author: wanggeng
* @Date: 2021/1/13 11:15 下午
* @Version: 1.0
*/
public class WebSocketClientMessage {
/**
* id
*/
private String id;
/**
* AppSocketTypeEnum 消息类型
*/
private Integer type;
/**
* 是否系统消息
*/
private Boolean isSystem;
/**
* 来源系统消息为SYSTEM非系统消息为 userId
*/
private String from;
/**
* 去处系统消息为 clientName, 非系统消息为 userId
*/
private String to;
/**
* 消息体
*/
private String body;
/**
* 时间戳
*/
private long timestamp;
public WebSocketClientMessage() {
}
public WebSocketClientMessage(Integer type, Boolean isSystem, String from, String to, String body) {
this.type = type;
this.isSystem = isSystem;
this.from = from;
this.to = to;
this.body = body;
}
public String getId() {
return id == null ? "" : id;
}
public void setId(String id) {
this.id = id;
}
public Integer getType() {
return type == null ? 0 : type;
}
public void setType(Integer type) {
this.type = type;
}
public Boolean getSystem() {
return isSystem;
}
public void setSystem(Boolean system) {
isSystem = system;
}
public String getFrom() {
return from == null ? "" : from;
}
public void setFrom(String from) {
this.from = from;
}
public String getTo() {
return to == null ? "" : to;
}
public void setTo(String to) {
this.to = to;
}
public String getBody() {
return body == null ? "" : body;
}
public void setBody(String body) {
this.body = body;
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public long getTimestamp() {
timestamp = System.currentTimeMillis();
return timestamp;
}
}

View File

@ -0,0 +1,86 @@
package ink.wgink.module.instantmessage.websocket.pojo;
import io.netty.channel.Channel;
/**
* @ClassName: WebSocketUser
* @Description: webSocket用户
* @Author: wanggeng
* @Date: 2021/9/11 11:28 下午
* @Version: 1.0
*/
public class WebSocketSession {
/**
* 会话ID登录时返回socket注册时携带
*/
private String sessionId;
/**
* 用户ID
*/
private String userId;
/**
* 客户端名称同一用户的同一客户端只能有一个
*/
private String clientName;
/**
* 通道ID
*/
private String channelId;
/**
* 通道
*/
private Channel channel;
/**
* 更新时间
*/
private Long updateTime;
public String getSessionId() {
return sessionId == null ? "" : sessionId.trim();
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
public String getUserId() {
return userId == null ? "" : userId.trim();
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getClientName() {
return clientName == null ? "" : clientName.trim();
}
public void setClientName(String clientName) {
this.clientName = clientName;
}
public String getChannelId() {
return channelId == null ? "" : channelId.trim();
}
public void setChannelId(String channelId) {
this.channelId = channelId;
}
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
public Long getUpdateTime() {
return updateTime == null ? 0 : updateTime;
}
public void setUpdateTime(Long updateTime) {
this.updateTime = updateTime;
}
}

View File

@ -0,0 +1,80 @@
package ink.wgink.module.instantmessage.websocket.pojo.body;
/**
* @ClassName: NoticeBody
* @Description: 通知
* @Author: wanggeng
* @Date: 2021/9/12 10:09 下午
* @Version: 1.0
*/
public class NoticeBody {
/**
* 标题
*/
private String title;
/**
* 内容
*/
private String msg;
/**
* 目标需要客户端点击触发效果时使用可以为约定好的 urlpage 等等
*/
private String target;
/**
* 业务ID
*/
private String serviceId;
public NoticeBody() {
}
public NoticeBody(String title, String msg) {
this.title = title;
this.msg = msg;
}
public NoticeBody(String title, String msg, String target) {
this.title = title;
this.msg = msg;
this.target = target;
}
public NoticeBody(String title, String msg, String target, String serviceId) {
this.title = title;
this.msg = msg;
this.target = target;
this.serviceId = serviceId;
}
public String getTitle() {
return title == null ? "" : title;
}
public void setTitle(String title) {
this.title = title;
}
public String getMsg() {
return msg == null ? "" : msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public String getTarget() {
return target == null ? "" : target;
}
public void setTarget(String target) {
this.target = target;
}
public String getServiceId() {
return serviceId;
}
public void setServiceId(String serviceId) {
this.serviceId = serviceId;
}
}

View File

@ -0,0 +1,27 @@
package ink.wgink.module.instantmessage.websocket.pojo.body;
/**
* @ClassName: RegisterBody
* @Description: 注册
* @Author: wanggeng
* @Date: 2021/9/12 10:07 下午
* @Version: 1.0
*/
public class RegisterBody {
/**
* 会话ID
*/
private String sessionId;
public RegisterBody() {
}
public String getSessionId() {
return sessionId == null ? "" : sessionId.trim();
}
public void setSessionId(String sessionId) {
this.sessionId = sessionId;
}
}

View File

@ -0,0 +1,49 @@
package ink.wgink.module.instantmessage.websocket.pojo.body;
import ink.wgink.module.instantmessage.websocket.enums.SendStatusEnum;
/**
* @ClassName: SendStatusBody
* @Description: 发送状态
* @Author: wanggeng
* @Date: 2021/9/12 10:08 下午
* @Version: 1.0
*/
public class SendStatusBody {
private Integer code;
private SendStatusEnum status;
private String msg;
public SendStatusBody() {
}
public SendStatusBody(int code, SendStatusEnum status, String msg) {
this.code = code;
this.status = status;
this.msg = msg;
}
public Integer getCode() {
return code == null ? 0 : code;
}
public void setCode(Integer code) {
this.code = code;
}
public SendStatusEnum getStatus() {
return status;
}
public void setStatus(SendStatusEnum status) {
this.status = status;
}
public String getMsg() {
return msg == null ? "" : msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
}

View File

@ -0,0 +1,37 @@
package ink.wgink.module.instantmessage.websocket.pojo.body;
/**
* @ClassName: TargetBody
* @Description: 目标
* @Author: wanggeng
* @Date: 2021/9/12 10:09 下午
* @Version: 1.0
*/
public class TargetBody {
private String msg;
private String target;
public TargetBody() {
}
public TargetBody(String msg, String target) {
this.msg = msg;
this.target = target;
}
public String getMsg() {
return msg == null ? "" : msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public String getTarget() {
return target == null ? "" : target;
}
public void setTarget(String target) {
this.target = target;
}
}

View File

@ -0,0 +1,65 @@
package ink.wgink.module.instantmessage.websocket.server;
import ink.wgink.module.instantmessage.websocket.channel.WebSocketChannelInitializer;
import ink.wgink.properties.websocket.WebSocketProperties;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @ClassName: WebSocketServer
* @Description: WebSocket服务
* @Author: wanggeng
* @Date: 2021/9/11 11:00 上午
* @Version: 1.0
*/
@Component
public class WebSocketServer implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class);
@Autowired
private WebSocketProperties webSocketProperties;
@Override
public void run() {
// 通道初始化
WebSocketChannelInitializer webSocketChannelInitializer = new WebSocketChannelInitializer();
webSocketChannelInitializer.setWebSocketProperties(webSocketProperties);
// server
ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
serverBootstrap.group(bossGroup, childGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(webSocketChannelInitializer);
ChannelFuture channelFuture = serverBootstrap.bind(webSocketProperties.getPort()).sync().addListener(listener -> {
if (listener.isSuccess()) {
StringBuilder socketOutSB = new StringBuilder();
socketOutSB.append("******************************\n");
socketOutSB.append("** **\n");
socketOutSB.append("** WebSocket Server **\n");
socketOutSB.append("** Start Successfully! **\n");
socketOutSB.append("** **\n");
socketOutSB.append("** Port: " + webSocketProperties.getPort() + " **\n");
socketOutSB.append("** **\n");
socketOutSB.append("******************************\n");
System.out.println(socketOutSB);
}
});
// 监听通道关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
LOG.error(e.getMessage(), e);
} finally {
bossGroup.shutdownGracefully();
childGroup.shutdownGracefully();
}
}
}

View File

@ -37,6 +37,7 @@
<module>module-file-media</module>
<module>module-map</module>
<module>module-activiti</module>
<module>module-instant-message</module>
</modules>
<packaging>pom</packaging>