新增自定义文本消息业务处理接口

This commit is contained in:
wanggeng 2021-10-09 17:49:20 +08:00
parent 36acebe9b0
commit 80f3cd9732
8 changed files with 92 additions and 4 deletions

View File

@ -0,0 +1,25 @@
package ink.wgink.module.instantmessage.service;
import ink.wgink.module.instantmessage.websocket.exception.CustomHandleException;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage;
import io.netty.channel.Channel;
/**
* @ClassName: WebSocketTextCustomHandler
* @Description: 文本自定义处理器
* @Author: wanggeng
* @Date: 2021/10/9 3:02 下午
* @Version: 1.0
*/
public interface IWebSocketTextCustomService {
/**
* 消息处理逻辑
*
* @param channel
* @param webSocketClientMessage
* @throws CustomHandleException
*/
void handle(Channel channel, WebSocketClientMessage webSocketClientMessage) throws CustomHandleException;
}

View File

@ -1,6 +1,7 @@
package ink.wgink.module.instantmessage.websocket.channel; package ink.wgink.module.instantmessage.websocket.channel;
import ink.wgink.module.instantmessage.websocket.handler.WebSocketHandler; import ink.wgink.module.instantmessage.websocket.handler.WebSocketHandler;
import ink.wgink.module.instantmessage.service.IWebSocketTextCustomService;
import ink.wgink.properties.websocket.WebSocketProperties; import ink.wgink.properties.websocket.WebSocketProperties;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
@ -20,6 +21,7 @@ import io.netty.handler.stream.ChunkedWriteHandler;
public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> { public class WebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {
private WebSocketProperties webSocketProperties; private WebSocketProperties webSocketProperties;
private IWebSocketTextCustomService IWebSocketTextCustomService;
@Override @Override
protected void initChannel(SocketChannel socketChannel) throws Exception { protected void initChannel(SocketChannel socketChannel) throws Exception {
@ -34,10 +36,15 @@ public class WebSocketChannelInitializer extends ChannelInitializer<SocketChanne
//自定义的业务handler //自定义的业务handler
WebSocketHandler webSocketHandler = new WebSocketHandler(); WebSocketHandler webSocketHandler = new WebSocketHandler();
webSocketHandler.setWebSocketProperties(webSocketProperties); webSocketHandler.setWebSocketProperties(webSocketProperties);
webSocketHandler.setWebSocketTextCustomHandler(IWebSocketTextCustomService);
socketChannel.pipeline().addLast("handler", webSocketHandler); socketChannel.pipeline().addLast("handler", webSocketHandler);
} }
public void setWebSocketProperties(WebSocketProperties webSocketProperties) { public void setWebSocketProperties(WebSocketProperties webSocketProperties) {
this.webSocketProperties = webSocketProperties; this.webSocketProperties = webSocketProperties;
} }
public void setWebSocketTextCustomHandler(IWebSocketTextCustomService IWebSocketTextCustomService) {
this.IWebSocketTextCustomService = IWebSocketTextCustomService;
}
} }

View File

@ -19,7 +19,8 @@ public enum StatusEnum {
BODY_ERROR(404, "消息体错误"), BODY_ERROR(404, "消息体错误"),
FROM_ERROR(405, "来源错误"), FROM_ERROR(405, "来源错误"),
TO_ERROR(406, "接收人错误"), TO_ERROR(406, "接收人错误"),
RECEIVE_ERROR(407, "接收错误"); RECEIVE_ERROR(407, "接收错误"),
CUSTOM_HANDLE_ERROR(408, "自定义处理异常");
private int value; private int value;
private String summary; private String summary;

View File

@ -0,0 +1,31 @@
package ink.wgink.module.instantmessage.websocket.exception;
/**
* @ClassName: CustomHandleException
* @Description: 自定义处理异常
* @Author: wanggeng
* @Date: 2021/10/9 4:09 下午
* @Version: 1.0
*/
public class CustomHandleException extends BaseSocketException {
public CustomHandleException() {
super();
}
public CustomHandleException(String message) {
super(message);
}
public CustomHandleException(String message, Throwable cause) {
super(message, cause);
}
public CustomHandleException(Throwable cause) {
super(cause);
}
public CustomHandleException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -1,5 +1,6 @@
package ink.wgink.module.instantmessage.websocket.handler; package ink.wgink.module.instantmessage.websocket.handler;
import ink.wgink.module.instantmessage.service.IWebSocketTextCustomService;
import ink.wgink.module.instantmessage.websocket.handler.text.WebSocketTextHandler; import ink.wgink.module.instantmessage.websocket.handler.text.WebSocketTextHandler;
import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager; import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager;
import ink.wgink.properties.websocket.WebSocketProperties; import ink.wgink.properties.websocket.WebSocketProperties;
@ -34,6 +35,7 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketHandler.class); private static final Logger LOG = LoggerFactory.getLogger(WebSocketHandler.class);
private WebSocketProperties webSocketProperties; private WebSocketProperties webSocketProperties;
private WebSocketServerHandshaker webSocketServerHandshaker; private WebSocketServerHandshaker webSocketServerHandshaker;
private IWebSocketTextCustomService IWebSocketTextCustomService;
@Override @Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
@ -78,6 +80,7 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
// 文本消息 // 文本消息
if (frame instanceof TextWebSocketFrame) { if (frame instanceof TextWebSocketFrame) {
WebSocketTextHandler webSocketTextHandler = new WebSocketTextHandler(); WebSocketTextHandler webSocketTextHandler = new WebSocketTextHandler();
webSocketTextHandler.setWebSocketTextCustomHandler(IWebSocketTextCustomService);
webSocketTextHandler.handler(ctx, (TextWebSocketFrame) frame); webSocketTextHandler.handler(ctx, (TextWebSocketFrame) frame);
return; return;
} }
@ -143,4 +146,8 @@ public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
public void setWebSocketProperties(WebSocketProperties webSocketProperties) { public void setWebSocketProperties(WebSocketProperties webSocketProperties) {
this.webSocketProperties = webSocketProperties; this.webSocketProperties = webSocketProperties;
} }
public void setWebSocketTextCustomHandler(IWebSocketTextCustomService IWebSocketTextCustomService) {
this.IWebSocketTextCustomService = IWebSocketTextCustomService;
}
} }

View File

@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSONObject;
import ink.wgink.module.instantmessage.websocket.enums.MessageTypeEnum; import ink.wgink.module.instantmessage.websocket.enums.MessageTypeEnum;
import ink.wgink.module.instantmessage.websocket.enums.StatusEnum; import ink.wgink.module.instantmessage.websocket.enums.StatusEnum;
import ink.wgink.module.instantmessage.websocket.exception.*; import ink.wgink.module.instantmessage.websocket.exception.*;
import ink.wgink.module.instantmessage.service.IWebSocketTextCustomService;
import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager; import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage; import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketSession; import ink.wgink.module.instantmessage.websocket.pojo.WebSocketSession;
@ -31,6 +32,8 @@ import java.util.Set;
*/ */
public class WebSocketTextHandler { public class WebSocketTextHandler {
private IWebSocketTextCustomService IWebSocketTextCustomService;
public void handler(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) { public void handler(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) {
WebSocketClientMessage clientSocketMessage = null; WebSocketClientMessage clientSocketMessage = null;
StatusBody statusBody = null; StatusBody statusBody = null;
@ -58,7 +61,12 @@ public class WebSocketTextHandler {
// 消息接收状态 // 消息接收状态
sendReceiveStatus(clientSocketMessage); sendReceiveStatus(clientSocketMessage);
} else { } else {
throw new TypeException("请求类型错误"); if (IWebSocketTextCustomService != null) {
// 自定义文本处理
IWebSocketTextCustomService.handle(ctx.channel(), clientSocketMessage);
} else {
throw new TypeException("请求类型错误");
}
} }
} catch (SessionException e) { } catch (SessionException e) {
// 没有登录时返回内容关闭连接 // 没有登录时返回内容关闭连接
@ -72,7 +80,9 @@ public class WebSocketTextHandler {
statusBody = new StatusBody(StatusEnum.BODY_ERROR.getValue(), StatusEnum.SESSION_ERROR, e.getMessage()); statusBody = new StatusBody(StatusEnum.BODY_ERROR.getValue(), StatusEnum.SESSION_ERROR, e.getMessage());
} catch (UserException e) { } catch (UserException e) {
statusBody = new StatusBody(StatusEnum.SESSION_ERROR.getValue(), StatusEnum.SESSION_ERROR, e.getMessage()); statusBody = new StatusBody(StatusEnum.SESSION_ERROR.getValue(), StatusEnum.SESSION_ERROR, e.getMessage());
} catch (JSONException e) { } catch (CustomHandleException e) {
statusBody = new StatusBody(StatusEnum.CUSTOM_HANDLE_ERROR.getValue(), StatusEnum.SESSION_ERROR, e.getMessage());
}catch (JSONException e) {
clientSocketMessage = new WebSocketClientMessage(); clientSocketMessage = new WebSocketClientMessage();
clientSocketMessage.setSystem(true); clientSocketMessage.setSystem(true);
clientSocketMessage.setType(MessageTypeEnum.MESSAGE_SYSTEM.getValue()); clientSocketMessage.setType(MessageTypeEnum.MESSAGE_SYSTEM.getValue());
@ -203,4 +213,7 @@ public class WebSocketTextHandler {
WebSocketChannelManager.getInstance().sendText(fromChannel, webSocketClientMessage); WebSocketChannelManager.getInstance().sendText(fromChannel, webSocketClientMessage);
} }
public void setWebSocketTextCustomHandler(IWebSocketTextCustomService IWebSocketTextCustomService) {
this.IWebSocketTextCustomService = IWebSocketTextCustomService;
}
} }

View File

@ -201,7 +201,7 @@ public class WebSocketChannelManager {
} }
/** /**
* 通过会话获取用户 * 通过通道获取用户
* *
* @param channelId 通道ID * @param channelId 通道ID
* @return * @return

View File

@ -1,5 +1,6 @@
package ink.wgink.module.instantmessage.websocket.server; package ink.wgink.module.instantmessage.websocket.server;
import ink.wgink.module.instantmessage.service.IWebSocketTextCustomService;
import ink.wgink.module.instantmessage.websocket.channel.WebSocketChannelInitializer; import ink.wgink.module.instantmessage.websocket.channel.WebSocketChannelInitializer;
import ink.wgink.properties.websocket.WebSocketProperties; import ink.wgink.properties.websocket.WebSocketProperties;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
@ -25,12 +26,15 @@ public class WebSocketServer implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class); private static final Logger LOG = LoggerFactory.getLogger(WebSocketServer.class);
@Autowired @Autowired
private WebSocketProperties webSocketProperties; private WebSocketProperties webSocketProperties;
@Autowired(required = false)
private IWebSocketTextCustomService webSocketTextCustomService;
@Override @Override
public void run() { public void run() {
// 通道初始化 // 通道初始化
WebSocketChannelInitializer webSocketChannelInitializer = new WebSocketChannelInitializer(); WebSocketChannelInitializer webSocketChannelInitializer = new WebSocketChannelInitializer();
webSocketChannelInitializer.setWebSocketProperties(webSocketProperties); webSocketChannelInitializer.setWebSocketProperties(webSocketProperties);
webSocketChannelInitializer.setWebSocketTextCustomHandler(webSocketTextCustomService);
// server // server
ServerBootstrap serverBootstrap = new ServerBootstrap(); ServerBootstrap serverBootstrap = new ServerBootstrap();
EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup bossGroup = new NioEventLoopGroup();