统一处理异常,更改消息发送状态名称

This commit is contained in:
wanggeng 2021-12-07 15:06:25 +08:00
parent 22e82dfa75
commit b9ac3f1191
5 changed files with 80 additions and 71 deletions

View File

@ -1,5 +1,6 @@
package ink.wgink.module.instantmessage.service; package ink.wgink.module.instantmessage.service;
import ink.wgink.exceptions.websocket.BaseSocketException;
import ink.wgink.exceptions.websocket.CustomHandleException; import ink.wgink.exceptions.websocket.CustomHandleException;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage; import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage;
import io.netty.channel.Channel; import io.netty.channel.Channel;
@ -16,10 +17,10 @@ public interface IWebSocketTextCustomService {
/** /**
* 消息处理逻辑 * 消息处理逻辑
* *
* @param channel * @param fromChannel
* @param webSocketClientMessage * @param webSocketClientMessage
* @throws CustomHandleException * @throws CustomHandleException
*/ */
void handle(Channel channel, WebSocketClientMessage webSocketClientMessage) throws CustomHandleException; void handle(Channel fromChannel, WebSocketClientMessage webSocketClientMessage) throws BaseSocketException;
} }

View File

@ -4,13 +4,13 @@ package ink.wgink.module.instantmessage.websocket.enums;
* When you feel like quitting. Think about why you started * When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始 * 当你想要放弃的时候想想当初你为何开始
* *
* @ClassName: ResultCodeEnum * @ClassName: MessageSendStatusEnum
* @Description: 返回编码类型 * @Description: 消息发送状态
* @Author: wanggeng * @Author: wanggeng
* @Date: 2021/1/14 12:59 下午 * @Date: 2021/1/14 12:59 下午
* @Version: 1.0 * @Version: 1.0
*/ */
public enum StatusEnum { public enum MessageSendStatusEnum {
SUCCESS(200, "成功"), SUCCESS(200, "成功"),
FAILED(400, "失败"), FAILED(400, "失败"),
MESSAGE_ERROR(401, "消息错误"), MESSAGE_ERROR(401, "消息错误"),
@ -20,12 +20,17 @@ public enum StatusEnum {
FROM_ERROR(405, "来源错误"), FROM_ERROR(405, "来源错误"),
TO_ERROR(406, "接收人错误"), TO_ERROR(406, "接收人错误"),
RECEIVE_ERROR(407, "接收错误"), RECEIVE_ERROR(407, "接收错误"),
CUSTOM_HANDLE_ERROR(408, "自定义处理异常");
USER_OFFLINE(410, "用户离线"),
TEXT_MESSAGE_USER_OFFLINE(411, "文本消息用户离线"),
RTC_MESSAGE_USER_OFFLINE(412, "实时语音用户离线"),
CUSTOM_HANDLE_ERROR(500, "自定义处理异常");
private int value; private int value;
private String summary; private String summary;
StatusEnum(int value, String summary) { MessageSendStatusEnum(int value, String summary) {
this.value = value; this.value = value;
this.summary = summary; this.summary = summary;
} }

View File

@ -46,11 +46,12 @@ public enum MessageTypeEnum {
WEBRTC_REFUSE(5003, "webrtc拒绝"), WEBRTC_REFUSE(5003, "webrtc拒绝"),
WEBRTC_JOIN(5004, "webrtc加入"), WEBRTC_JOIN(5004, "webrtc加入"),
STATUS_SEND(9001, "消息发送状态body 为 StatusBody 的 JSON 字符串"), // 消息状态
STATUS_SEND(9001, "消息发送状态body 为 MessageSendStatusBody 的 JSON 字符串"),
STATUS_SEND_ONLINE(9002, "发送在线状态body 为 在线用户的ID JSONArray 字符串"), STATUS_SEND_ONLINE(9002, "发送在线状态body 为 在线用户的ID JSONArray 字符串"),
STATUS_SEND_OFFLINE(9003, "发送离线状态body 为 离线用户的ID JSONArray 字符串"), STATUS_SEND_OFFLINE(9003, "发送离线状态body 为 离线用户的ID JSONArray 字符串"),
STATUS_RECEIVE(9101, "消息接受状态body 自定义的交互内容");
STATUS_RECEIVE(9101, "消息接受状态body 为 StatusBody 的 JSON 字符串");
private int value; private int value;
private String summary; private String summary;

View File

@ -3,21 +3,26 @@ package ink.wgink.module.instantmessage.websocket.handler.text;
import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import ink.wgink.exceptions.websocket.*; import ink.wgink.exceptions.websocket.*;
import ink.wgink.exceptions.websocket.useroffline.TextUserOfflineException;
import ink.wgink.exceptions.websocket.useroffline.RtcUserOfflineException;
import ink.wgink.module.instantmessage.service.IMessageService; import ink.wgink.module.instantmessage.service.IMessageService;
import ink.wgink.module.instantmessage.websocket.enums.MessageTypeEnum;
import ink.wgink.module.instantmessage.websocket.enums.StatusEnum;
import ink.wgink.module.instantmessage.service.IWebSocketTextCustomService; import ink.wgink.module.instantmessage.service.IWebSocketTextCustomService;
import ink.wgink.module.instantmessage.websocket.enums.MessageSendStatusEnum;
import ink.wgink.module.instantmessage.websocket.enums.MessageTypeEnum;
import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager; import ink.wgink.module.instantmessage.websocket.manager.WebSocketChannelManager;
import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage; import ink.wgink.module.instantmessage.websocket.pojo.WebSocketClientMessage;
import ink.wgink.pojo.session.WebSocketSession;
import ink.wgink.module.instantmessage.websocket.pojo.body.IdsBody; import ink.wgink.module.instantmessage.websocket.pojo.body.IdsBody;
import ink.wgink.module.instantmessage.websocket.pojo.body.RegisterBody; import ink.wgink.module.instantmessage.websocket.pojo.body.RegisterBody;
import ink.wgink.module.instantmessage.websocket.pojo.body.StatusBody; import ink.wgink.module.instantmessage.websocket.pojo.body.MessageSendStatusBody;
import ink.wgink.pojo.session.WebSocketSession;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.apache.commons.compress.utils.Sets; import org.apache.commons.compress.utils.Sets;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.omg.CORBA.UserException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
@ -33,12 +38,13 @@ import java.util.Set;
*/ */
public class WebSocketTextHandler { public class WebSocketTextHandler {
private static final Logger LOG = LoggerFactory.getLogger(WebSocketTextHandler.class);
private IWebSocketTextCustomService IWebSocketTextCustomService; private IWebSocketTextCustomService IWebSocketTextCustomService;
private IMessageService messageService; private IMessageService messageService;
public void handler(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) { public void handler(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) {
WebSocketClientMessage clientSocketMessage = null; WebSocketClientMessage clientSocketMessage = null;
StatusBody statusBody = null; MessageSendStatusBody messageSendStatusBody = null;
try { try {
clientSocketMessage = JSONObject.parseObject(textWebSocketFrame.text(), WebSocketClientMessage.class); clientSocketMessage = JSONObject.parseObject(textWebSocketFrame.text(), WebSocketClientMessage.class);
if (clientSocketMessage.getType() == null) { if (clientSocketMessage.getType() == null) {
@ -49,7 +55,7 @@ public class WebSocketTextHandler {
} }
if (MessageTypeEnum.REGISTER.getValue() == clientSocketMessage.getType()) { if (MessageTypeEnum.REGISTER.getValue() == clientSocketMessage.getType()) {
// 客户端注册消息 // 客户端注册消息
statusBody = clientRegisterSession(ctx.channel(), clientSocketMessage); messageSendStatusBody = clientRegisterSession(ctx.channel(), clientSocketMessage);
} else if (MessageTypeEnum.MESSAGE.getValue() == clientSocketMessage.getType()) { } else if (MessageTypeEnum.MESSAGE.getValue() == clientSocketMessage.getType()) {
sendText(ctx.channel(), clientSocketMessage); sendText(ctx.channel(), clientSocketMessage);
} else if (MessageTypeEnum.MESSAGE_HREF.getValue() == clientSocketMessage.getType()) { } else if (MessageTypeEnum.MESSAGE_HREF.getValue() == clientSocketMessage.getType()) {
@ -89,29 +95,40 @@ public class WebSocketTextHandler {
throw new TypeException("请求类型错误"); throw new TypeException("请求类型错误");
} }
} }
} catch (SessionException e) { } catch (BaseSocketException e) {
LOG.error(e.getMessage(), e);
if (e instanceof SessionException) {
// 没有登录时返回内容关闭连接 // 没有登录时返回内容关闭连接
statusBody = new StatusBody(StatusEnum.SESSION_ERROR.getValue(), StatusEnum.SESSION_ERROR, e.getMessage()); messageSendStatusBody = new MessageSendStatusBody(MessageSendStatusEnum.SESSION_ERROR.getValue(), MessageSendStatusEnum.SESSION_ERROR, e.getMessage());
clientSocketMessage.setBody(JSONObject.toJSONString(statusBody)); clientSocketMessage.setBody(JSONObject.toJSONString(messageSendStatusBody));
WebSocketChannelManager.getInstance().sendText(ctx.channel(), clientSocketMessage); WebSocketChannelManager.getInstance().sendText(ctx.channel(), clientSocketMessage);
ctx.close(); ctx.close();
} catch (TypeException e) { } else if (e instanceof TypeException) {
statusBody = new StatusBody(StatusEnum.TYPE_ERROR.getValue(), StatusEnum.SESSION_ERROR, e.getMessage()); messageSendStatusBody = new MessageSendStatusBody(MessageSendStatusEnum.TYPE_ERROR.getValue(), MessageSendStatusEnum.TYPE_ERROR, e.getMessage());
} catch (BodyException e) { } else if (e instanceof BodyException) {
statusBody = new StatusBody(StatusEnum.BODY_ERROR.getValue(), StatusEnum.SESSION_ERROR, e.getMessage()); messageSendStatusBody = new MessageSendStatusBody(MessageSendStatusEnum.BODY_ERROR.getValue(), MessageSendStatusEnum.BODY_ERROR, e.getMessage());
} catch (UserException e) { } else if (e instanceof ToException) {
statusBody = new StatusBody(StatusEnum.SESSION_ERROR.getValue(), StatusEnum.SESSION_ERROR, e.getMessage()); messageSendStatusBody = new MessageSendStatusBody(MessageSendStatusEnum.TO_ERROR.getValue(), MessageSendStatusEnum.TO_ERROR, e.getMessage());
} catch (CustomHandleException e) { } else if (e instanceof UserOfflineException) {
statusBody = new StatusBody(StatusEnum.CUSTOM_HANDLE_ERROR.getValue(), StatusEnum.SESSION_ERROR, e.getMessage()); if (e instanceof TextUserOfflineException) {
messageSendStatusBody = new MessageSendStatusBody(MessageSendStatusEnum.TEXT_MESSAGE_USER_OFFLINE.getValue(), MessageSendStatusEnum.TEXT_MESSAGE_USER_OFFLINE, e.getMessage());
} else if (e instanceof RtcUserOfflineException) {
messageSendStatusBody = new MessageSendStatusBody(MessageSendStatusEnum.RTC_MESSAGE_USER_OFFLINE.getValue(), MessageSendStatusEnum.RTC_MESSAGE_USER_OFFLINE, e.getMessage());
} else {
messageSendStatusBody = new MessageSendStatusBody(MessageSendStatusEnum.USER_OFFLINE.getValue(), MessageSendStatusEnum.USER_OFFLINE, e.getMessage());
}
} else {
messageSendStatusBody = new MessageSendStatusBody(MessageSendStatusEnum.CUSTOM_HANDLE_ERROR.getValue(), MessageSendStatusEnum.CUSTOM_HANDLE_ERROR, e.getMessage());
}
} catch (JSONException e) { } catch (JSONException e) {
clientSocketMessage = new WebSocketClientMessage(); clientSocketMessage = new WebSocketClientMessage();
clientSocketMessage.setSystem(true); messageSendStatusBody = new MessageSendStatusBody(MessageSendStatusEnum.TO_ERROR.getValue(), MessageSendStatusEnum.MESSAGE_ERROR, e.getMessage());
clientSocketMessage.setType(MessageTypeEnum.MESSAGE_SYSTEM.getValue());
clientSocketMessage.setFrom(WebSocketChannelManager.FORM_SYSTEM);
statusBody = new StatusBody(StatusEnum.TO_ERROR.getValue(), StatusEnum.MESSAGE_ERROR, e.getMessage());
} finally { } finally {
if (statusBody != null && ctx.channel().isOpen()) { if (messageSendStatusBody != null && ctx.channel().isOpen()) {
clientSocketMessage.setBody(JSONObject.toJSONString(statusBody)); clientSocketMessage.setType(MessageTypeEnum.STATUS_SEND.getValue());
clientSocketMessage.setSystem(true);
clientSocketMessage.setFrom(WebSocketChannelManager.FORM_SYSTEM);
clientSocketMessage.setBody(JSONObject.toJSONString(messageSendStatusBody));
WebSocketChannelManager.getInstance().sendText(ctx.channel(), clientSocketMessage); WebSocketChannelManager.getInstance().sendText(ctx.channel(), clientSocketMessage);
} }
} }
@ -138,12 +155,12 @@ public class WebSocketTextHandler {
* @return * @return
* @throws SessionException * @throws SessionException
*/ */
private StatusBody clientRegisterSession(Channel channel, WebSocketClientMessage clientSocketMessage) throws SessionException { private MessageSendStatusBody clientRegisterSession(Channel channel, WebSocketClientMessage clientSocketMessage) throws SessionException {
RegisterBody registerBody = JSONObject.parseObject(clientSocketMessage.getBody(), RegisterBody.class); RegisterBody registerBody = JSONObject.parseObject(clientSocketMessage.getBody(), RegisterBody.class);
// 更新通道 // 更新通道
WebSocketSession webSocketSession = WebSocketChannelManager.getInstance().addChannel(registerBody.getSessionId(), clientSocketMessage.getFrom(), channel); WebSocketSession webSocketSession = WebSocketChannelManager.getInstance().addChannel(registerBody.getSessionId(), clientSocketMessage.getFrom(), channel);
sendHistory(webSocketSession); sendHistory(webSocketSession);
return new StatusBody(StatusEnum.SUCCESS.getValue(), StatusEnum.SUCCESS, "OK"); return new MessageSendStatusBody(MessageSendStatusEnum.SUCCESS.getValue(), MessageSendStatusEnum.SUCCESS, "OK");
} }
/** /**
@ -151,18 +168,17 @@ public class WebSocketTextHandler {
* *
* @param channel 通道 * @param channel 通道
* @param webSocketClientMessage 客户端消息 * @param webSocketClientMessage 客户端消息
* @throws UserException * @throws ToException
*/ */
private void sendText(Channel channel, WebSocketClientMessage webSocketClientMessage) throws UserException { private void sendText(Channel channel, WebSocketClientMessage webSocketClientMessage) throws ToException, TextUserOfflineException {
if (StringUtils.isBlank(webSocketClientMessage.getTo())) { if (StringUtils.isBlank(webSocketClientMessage.getTo())) {
throw new UserException("To 值不能为空"); throw new ToException("To 值不能为空");
} }
webSocketClientMessage.setSystem(false); webSocketClientMessage.setSystem(false);
List<WebSocketSession> webSocketSessions = WebSocketChannelManager.getInstance().listOnlineUser(webSocketClientMessage.getTo()); List<WebSocketSession> webSocketSessions = WebSocketChannelManager.getInstance().listOnlineUser(webSocketClientMessage.getTo());
// 返回失败结果 // 返回失败结果
if (webSocketSessions.isEmpty()) { if (webSocketSessions.isEmpty()) {
sendTextFailed(channel, webSocketClientMessage, "用户不在线"); throw new TextUserOfflineException("用户离线");
return;
} }
// 发送消息 // 发送消息
for (WebSocketSession webSocketSession : webSocketSessions) { for (WebSocketSession webSocketSession : webSocketSessions) {
@ -175,19 +191,19 @@ public class WebSocketTextHandler {
* *
* @param fromChannel 发送通道 * @param fromChannel 发送通道
* @param webSocketClientMessage 客户端消息 * @param webSocketClientMessage 客户端消息
* @throws UserException * @throws ToException
* @throws TextUserOfflineException
*/ */
private void sendGroupText(Channel fromChannel, WebSocketClientMessage webSocketClientMessage) throws UserException { private void sendGroupText(Channel fromChannel, WebSocketClientMessage webSocketClientMessage) throws ToException, TextUserOfflineException {
if (StringUtils.isBlank(webSocketClientMessage.getTo())) { if (StringUtils.isBlank(webSocketClientMessage.getTo())) {
throw new UserException("To 值不能为空"); throw new ToException("To 值不能为空");
} }
webSocketClientMessage.setSystem(false); webSocketClientMessage.setSystem(false);
List<String> toUserIds = new ArrayList<>(Sets.newHashSet(webSocketClientMessage.getTo().split(","))); List<String> toUserIds = new ArrayList<>(Sets.newHashSet(webSocketClientMessage.getTo().split(",")));
List<WebSocketSession> webSocketSessions = WebSocketChannelManager.getInstance().listOnlineUser(toUserIds); List<WebSocketSession> webSocketSessions = WebSocketChannelManager.getInstance().listOnlineUser(toUserIds);
// 返回失败结果 // 返回失败结果
if (webSocketSessions.isEmpty()) { if (webSocketSessions.isEmpty()) {
sendTextFailed(fromChannel, webSocketClientMessage, "无用户在线"); throw new TextUserOfflineException("无用户在线");
return;
} }
// 发送消息 // 发送消息
for (WebSocketSession webSocketSession : webSocketSessions) { for (WebSocketSession webSocketSession : webSocketSessions) {
@ -202,13 +218,12 @@ public class WebSocketTextHandler {
* @param webSocketClientMessage 消息体 * @param webSocketClientMessage 消息体
* @throws UserException * @throws UserException
*/ */
private void listOnlineUser(Channel fromChannel, WebSocketClientMessage webSocketClientMessage) throws UserException { private void listOnlineUser(Channel fromChannel, WebSocketClientMessage webSocketClientMessage) throws BodyException, ToException, UserOfflineException {
webSocketClientMessage.setSystem(true); webSocketClientMessage.setSystem(true);
webSocketClientMessage.setTo(webSocketClientMessage.getFrom()); webSocketClientMessage.setTo(webSocketClientMessage.getFrom());
IdsBody idsBody = JSONObject.parseObject(webSocketClientMessage.getBody(), IdsBody.class); IdsBody idsBody = JSONObject.parseObject(webSocketClientMessage.getBody(), IdsBody.class);
if (idsBody.getIds() == null || idsBody.getIds().isEmpty()) { if (idsBody.getIds() == null || idsBody.getIds().isEmpty()) {
sendTextFailed(fromChannel, webSocketClientMessage, "用户ID列表为空"); throw new BodyException("用户ID列表为空");
return;
} }
List<WebSocketSession> webSocketSessions = WebSocketChannelManager.getInstance().listOnlineUser(idsBody.getIds()); List<WebSocketSession> webSocketSessions = WebSocketChannelManager.getInstance().listOnlineUser(idsBody.getIds());
Set<String> idSets = new HashSet<>(); Set<String> idSets = new HashSet<>();
@ -232,19 +247,6 @@ public class WebSocketTextHandler {
messageService.noticeUnSendByUserId(webSocketSession); messageService.noticeUnSendByUserId(webSocketSession);
} }
/**
* 发送失败内容
*
* @param fromChannel 来源通道
* @param webSocketClientMessage webSocket客户端消息
* @param msg
*/
public static void sendTextFailed(Channel fromChannel, WebSocketClientMessage webSocketClientMessage, String msg) {
webSocketClientMessage.setType(MessageTypeEnum.STATUS_SEND.getValue());
webSocketClientMessage.setBody(JSONObject.toJSONString(new StatusBody(StatusEnum.FAILED.getValue(), StatusEnum.FAILED, msg)));
WebSocketChannelManager.getInstance().sendText(fromChannel, webSocketClientMessage);
}
public void setWebSocketTextCustomHandler(IWebSocketTextCustomService IWebSocketTextCustomService) { public void setWebSocketTextCustomHandler(IWebSocketTextCustomService IWebSocketTextCustomService) {
this.IWebSocketTextCustomService = IWebSocketTextCustomService; this.IWebSocketTextCustomService = IWebSocketTextCustomService;
} }

View File

@ -1,23 +1,23 @@
package ink.wgink.module.instantmessage.websocket.pojo.body; package ink.wgink.module.instantmessage.websocket.pojo.body;
import ink.wgink.module.instantmessage.websocket.enums.StatusEnum; import ink.wgink.module.instantmessage.websocket.enums.MessageSendStatusEnum;
/** /**
* @ClassName: SendStatusBody * @ClassName: MessageSendStatusBody
* @Description: 发送状态 * @Description: 发送状态
* @Author: wanggeng * @Author: wanggeng
* @Date: 2021/9/12 10:08 下午 * @Date: 2021/9/12 10:08 下午
* @Version: 1.0 * @Version: 1.0
*/ */
public class StatusBody { public class MessageSendStatusBody {
private Integer code; private Integer code;
private StatusEnum status; private MessageSendStatusEnum status;
private String msg; private String msg;
public StatusBody() { public MessageSendStatusBody() {
} }
public StatusBody(int code, StatusEnum status, String msg) { public MessageSendStatusBody(int code, MessageSendStatusEnum status, String msg) {
this.code = code; this.code = code;
this.status = status; this.status = status;
this.msg = msg; this.msg = msg;
@ -31,11 +31,11 @@ public class StatusBody {
this.code = code; this.code = code;
} }
public StatusEnum getStatus() { public MessageSendStatusEnum getStatus() {
return status; return status;
} }
public void setStatus(StatusEnum status) { public void setStatus(MessageSendStatusEnum status) {
this.status = status; this.status = status;
} }