diff --git a/pom.xml b/pom.xml index abba098..d3c2f14 100644 --- a/pom.xml +++ b/pom.xml @@ -62,6 +62,7 @@ io.netty netty-all + 4.1.50.Final diff --git a/src/main/java/cn/com/tenlion/pollutantdata/PollutantDataApplication.java b/src/main/java/cn/com/tenlion/pollutantdata/PollutantDataApplication.java index 170ed7e..9a2e2cd 100644 --- a/src/main/java/cn/com/tenlion/pollutantdata/PollutantDataApplication.java +++ b/src/main/java/cn/com/tenlion/pollutantdata/PollutantDataApplication.java @@ -14,11 +14,8 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.annotation.Scheduled; import springfox.documentation.swagger2.annotations.EnableSwagger2; -import java.util.Date; - @EnableSwagger2 @EnableScheduling @SpringBootApplication(scanBasePackages = {"cn.com.tenlion", "ink.wgink"}) @@ -62,6 +59,7 @@ public class PollutantDataApplication { e.printStackTrace(); } }).start(); +// new Thread(new Server()).start(); }; } diff --git a/src/main/java/cn/com/tenlion/pollutantdata/enums/CnEnums.java b/src/main/java/cn/com/tenlion/pollutantdata/enums/CnEnum.java similarity index 91% rename from src/main/java/cn/com/tenlion/pollutantdata/enums/CnEnums.java rename to src/main/java/cn/com/tenlion/pollutantdata/enums/CnEnum.java index f772b15..6d237d8 100644 --- a/src/main/java/cn/com/tenlion/pollutantdata/enums/CnEnums.java +++ b/src/main/java/cn/com/tenlion/pollutantdata/enums/CnEnum.java @@ -10,7 +10,7 @@ package cn.com.tenlion.pollutantdata.enums; * @Date: 2021/3/15 11:07 下午 * @Version: 1.0 */ -public enum CnEnums { +public enum CnEnum { /** * 实时数据 @@ -23,7 +23,7 @@ public enum CnEnums { private int value; - CnEnums(int value) { + CnEnum(int value) { this.value = value; } diff --git a/src/main/java/cn/com/tenlion/pollutantdata/enums/DataFlagEnums.java b/src/main/java/cn/com/tenlion/pollutantdata/enums/DataFlagEnum.java similarity index 94% rename from src/main/java/cn/com/tenlion/pollutantdata/enums/DataFlagEnums.java rename to src/main/java/cn/com/tenlion/pollutantdata/enums/DataFlagEnum.java index cc26097..056fe16 100644 --- a/src/main/java/cn/com/tenlion/pollutantdata/enums/DataFlagEnums.java +++ b/src/main/java/cn/com/tenlion/pollutantdata/enums/DataFlagEnum.java @@ -10,7 +10,7 @@ package cn.com.tenlion.pollutantdata.enums; * @Date: 2021/3/17 12:36 下午 * @Version: 1.0 */ -public enum DataFlagEnums { +public enum DataFlagEnum { /** * 有包、有应答、2005版本 */ @@ -49,7 +49,7 @@ public enum DataFlagEnums { private int pNum; private int answer; - DataFlagEnums(int value, int version, int pNum, int answer) { + DataFlagEnum(int value, int version, int pNum, int answer) { this.value = value; this.version = version; this.pNum = pNum; diff --git a/src/main/java/cn/com/tenlion/pollutantdata/pojo/ResponseResult.java b/src/main/java/cn/com/tenlion/pollutantdata/pojo/ResponseResult.java index 2c0d665..b7e71c3 100644 --- a/src/main/java/cn/com/tenlion/pollutantdata/pojo/ResponseResult.java +++ b/src/main/java/cn/com/tenlion/pollutantdata/pojo/ResponseResult.java @@ -1,7 +1,7 @@ package cn.com.tenlion.pollutantdata.pojo; -import cn.com.tenlion.pollutantdata.enums.CnEnums; -import cn.com.tenlion.pollutantdata.enums.DataFlagEnums; +import cn.com.tenlion.pollutantdata.enums.CnEnum; +import cn.com.tenlion.pollutantdata.enums.DataFlagEnum; import cn.com.tenlion.pollutantdata.enums.StEnum; import lombok.Data; @@ -19,9 +19,9 @@ import lombok.Data; public class ResponseResult { private String qn; private StEnum st; - private CnEnums cn; + private CnEnum cn; private String mn; - private DataFlagEnums flag; + private DataFlagEnum flag; private CPData cpData; } diff --git a/src/main/java/cn/com/tenlion/pollutantdata/utils/HJ212DataUtil.java b/src/main/java/cn/com/tenlion/pollutantdata/utils/HJ212DataUtil.java index 02a8a80..4ae3131 100644 --- a/src/main/java/cn/com/tenlion/pollutantdata/utils/HJ212DataUtil.java +++ b/src/main/java/cn/com/tenlion/pollutantdata/utils/HJ212DataUtil.java @@ -49,7 +49,7 @@ public class HJ212DataUtil { List results = new ArrayList<>(); String dataCp = respDataCp(data.getDataCp()); int packageCount = (dataCp.length() % 950 == 0) ? (dataCp.length() / 950) : dataCp.length() / 950 + 1; - if (packageCount == 1) { + if (packageCount <= 1) { String respData = String.format("QN=%s;ST=%d;CN=%d;PW=%s;MN=%s;Flag=%d;CP=&&%s&&", data.getQn(), data.getSt(), data.getCn(), data.getPw(), data.getMn(), data.getFlag(), dataCp); char[] respDataChar = respData.toCharArray(); diff --git a/src/main/java/cn/com/tenlion/pollutantdata/utils/core/ResponseDefault.java b/src/main/java/cn/com/tenlion/pollutantdata/utils/core/ResponseDefault.java index 8bfa31d..9e73374 100644 --- a/src/main/java/cn/com/tenlion/pollutantdata/utils/core/ResponseDefault.java +++ b/src/main/java/cn/com/tenlion/pollutantdata/utils/core/ResponseDefault.java @@ -1,14 +1,14 @@ package cn.com.tenlion.pollutantdata.utils.core; -import cn.com.tenlion.pollutantdata.enums.CnEnums; +import cn.com.tenlion.pollutantdata.enums.CnEnum; import cn.com.tenlion.pollutantdata.enums.ExeRtnEnum; import cn.com.tenlion.pollutantdata.enums.QnRtdEnum; import cn.com.tenlion.pollutantdata.enums.StEnum; import cn.com.tenlion.pollutantdata.utils.HJ212DataUtil; import cn.com.tenlion.pollutantdata.utils.config.IResponse; import io.netty.channel.ChannelHandlerContext; -import org.springframework.util.unit.DataUnit; +import java.nio.charset.StandardCharsets; import java.util.List; /** @@ -60,10 +60,16 @@ public class ResponseDefault implements IResponse { // 系统交互 data.setSt(StEnum.SYSTEM_INTERACTION.getValue()); // 系统应答 - data.setCn(CnEnums.RESPONSE.getValue()); + data.setCn(CnEnum.RESPONSE.getValue()); List respDatas = HJ212DataUtil.respData(data); respDatas.forEach((respData) -> { - ctx.writeAndFlush(respData); + if (ctx.isRemoved()) { + return; + } + if (!ctx.channel().isActive()) { + return; + } + ctx.channel().writeAndFlush(respData.getBytes(StandardCharsets.UTF_8)); }); } } diff --git a/src/main/java/cn/com/tenlion/pollutantdata/utils/exception/LengthException.java b/src/main/java/cn/com/tenlion/pollutantdata/utils/exception/LengthException.java new file mode 100644 index 0000000..57f5dc4 --- /dev/null +++ b/src/main/java/cn/com/tenlion/pollutantdata/utils/exception/LengthException.java @@ -0,0 +1,32 @@ +package cn.com.tenlion.pollutantdata.utils.exception; + +/** + * When you feel like quitting. Think about why you started + * 当你想要放弃的时候,想想当初你为何开始 + * + * @ClassName: LengthException + * @Description: 长度异常 + * @Author: wanggeng + * @Date: 2021/3/19 2:28 下午 + * @Version: 1.0 + */ +public class LengthException extends HJ212Exception { + public LengthException() { + } + + public LengthException(String message) { + super(message); + } + + public LengthException(String message, Throwable cause) { + super(message, cause); + } + + public LengthException(Throwable cause) { + super(cause); + } + + public LengthException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/src/main/java/cn/com/tenlion/pollutantdata/utils/net/TCPHandler.java b/src/main/java/cn/com/tenlion/pollutantdata/utils/net/TCPHandler.java index 7c1b3f6..489de72 100644 --- a/src/main/java/cn/com/tenlion/pollutantdata/utils/net/TCPHandler.java +++ b/src/main/java/cn/com/tenlion/pollutantdata/utils/net/TCPHandler.java @@ -11,11 +11,8 @@ import cn.com.tenlion.pollutantdata.utils.core.ResponseDefault; import cn.com.tenlion.pollutantdata.utils.core.T212Parser; import cn.com.tenlion.pollutantdata.utils.exception.*; import cn.com.tenlion.pollutantdata.utils.net.handle.RealDataHandler; -import com.oracle.tools.packager.Log; import ink.wgink.util.RegexUtil; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; @@ -46,13 +43,19 @@ public class TCPHandler extends SimpleChannelInboundHandler { private IDataMinuteService dataMinuteService; private IPollService pollService; - public TCPHandler() { + public TCPHandler(IInstrumentService instrumentService, ICollectorService collectorService, IDataMinuteService dataMinuteService, IPollService pollService) { + this.instrumentService = instrumentService; + this.collectorService = collectorService; + this.dataMinuteService = dataMinuteService; + this.pollService = pollService; } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception { byte[] dataByte = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(dataByte); + ctx.channel().writeAndFlush(dataByte); + byteBuf.readBytes(dataByte); String msg = new String(dataByte, CharsetUtil.UTF_8); SocketAddress address = ctx.channel().remoteAddress(); @@ -62,16 +65,16 @@ public class TCPHandler extends SimpleChannelInboundHandler { return; } // 是否要响应 - boolean needResponse = data.getFlag() == DataFlagEnums.HAS_PNUM_ANSWER_V2005.getValue() || - data.getFlag() == DataFlagEnums.HAS_PNUM_ANSWER_V2017.getValue() || - data.getFlag() == DataFlagEnums.NO_PNUM_ANSWER_V2005.getValue() || - data.getFlag() == DataFlagEnums.NO_PNUM_ANSWER_V2017.getValue(); + boolean needResponse = data.getFlag() == DataFlagEnum.HAS_PNUM_ANSWER_V2005.getValue() || + data.getFlag() == DataFlagEnum.HAS_PNUM_ANSWER_V2017.getValue() || + data.getFlag() == DataFlagEnum.NO_PNUM_ANSWER_V2005.getValue() || + data.getFlag() == DataFlagEnum.NO_PNUM_ANSWER_V2017.getValue(); if (StringUtils.isBlank(data.getQn())) { log.debug("请求编码空"); new ResponseDefault(ctx, QnRtdEnum.QN_ERROR, ExeRtnEnum.COMMAND_ERROR, needResponse).response(data); return; } - if (RegexUtil.isYyyyMmDdHhMmSsZzz(data.getQn())) { + if (!RegexUtil.isYyyyMmDdHhMmSsZzz(data.getQn())) { log.debug("请求编码格式错误"); new ResponseDefault(ctx, QnRtdEnum.QN_ERROR, ExeRtnEnum.COMMAND_ERROR, needResponse).response(data); return; @@ -107,7 +110,7 @@ public class TCPHandler extends SimpleChannelInboundHandler { new ResponseDefault(ctx, QnRtdEnum.ST_ERROR, ExeRtnEnum.COMMAND_ERROR, needResponse).response(data); return; } - if (CnEnums.REAL_TIME.getValue() == data.getCn()) { + if (CnEnum.REAL_TIME.getValue() == data.getCn()) { log.debug("保存实时数据"); RealDataHandler realDataHandler = new RealDataHandler(); realDataHandler.setIpAddress(address.toString()); @@ -134,12 +137,7 @@ public class TCPHandler extends SimpleChannelInboundHandler { char[] dataChar = t212Parser.readData(Integer.parseInt(new String(dataLen))); char[] readCrc = t212Parser.readCrc(); int crc = T212Parser.crc16Checkout(dataChar, dataChar.length); - try { - if (Integer.parseInt((new BigInteger(new String(readCrc), 16)).toString()) != crc) { - log.error("crc error"); - throw new CrcException("CRC Error"); - } - } catch (Exception e) { + if (Integer.parseInt((new BigInteger(new String(readCrc), 16)).toString()) != crc) { log.error("crc error"); throw new CrcException("CRC Error"); } @@ -157,46 +155,24 @@ public class TCPHandler extends SimpleChannelInboundHandler { return data; } - public static void main(String[] args) { - String data = "ST=32;CN=2011;QN=20210318233308001;PW=123456;MN=41018311021510;Flag=7;CP=&&DataTime=20210318233257;011-Rtd=123;011-ZsRtd=123&&"; - char[] dataChar = data.toCharArray(); - System.out.println(Integer.toHexString(T212Parser.crc16Checkout(dataChar, dataChar.length))); - } - - @Override - public void channelRegistered(ChannelHandlerContext ctx) throws Exception { - log.debug("new connection"); - super.channelRegistered(ctx); - } - @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - log.debug("close connection"); - super.channelInactive(ctx); + System.out.println("inactive"); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + System.out.println("channelActive"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error(cause.getMessage(), cause); - if (ctx.channel().isActive()) { - ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER);//.addListener(ChannelFutureListener.CLOSE); - } - } - - public void setInstrumentService(IInstrumentService instrumentService) { - this.instrumentService = instrumentService; - } - - public void setCollectorService(ICollectorService collectorService) { - this.collectorService = collectorService; - } - - public void setDataMinuteService(IDataMinuteService dataMinuteService) { - this.dataMinuteService = dataMinuteService; - } - - public void setPollService(IPollService pollService) { - this.pollService = pollService; + ctx.close().addListener(listener -> { + if (listener.isSuccess()) { + System.out.println("channel close"); + } + }); } /** diff --git a/src/main/java/cn/com/tenlion/pollutantdata/utils/net/TCPServer.java b/src/main/java/cn/com/tenlion/pollutantdata/utils/net/TCPServer.java index 268452a..8f149fe 100644 --- a/src/main/java/cn/com/tenlion/pollutantdata/utils/net/TCPServer.java +++ b/src/main/java/cn/com/tenlion/pollutantdata/utils/net/TCPServer.java @@ -10,9 +10,18 @@ import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.handler.timeout.IdleStateHandler; +import io.netty.util.CharsetUtil; import org.springframework.beans.factory.annotation.Autowired; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.TimeUnit; + /** * When you feel like quitting. Think about why you started * 当你想要放弃的时候,想想当初你为何开始 @@ -41,28 +50,50 @@ public class TCPServer { EventLoopGroup boosGroup = new NioEventLoopGroup(); NioEventLoopGroup workGroup = new NioEventLoopGroup(); - // TCP处理器 - TCPHandler tcpHandler = new TCPHandler(); - tcpHandler.setInstrumentService(instrumentService); - tcpHandler.setCollectorService(collectorService); - tcpHandler.setDataMinuteService(dataMinuteService); - try { - ServerBootstrap bootstrap = new ServerBootstrap(); - ((ServerBootstrap) ((ServerBootstrap) bootstrap.group(boosGroup, workGroup).channel(NioServerSocketChannel.class)).childHandler(new ChannelInitializer() { + ServerBootstrap serverBootstrap = new ServerBootstrap(); + serverBootstrap.group(boosGroup, workGroup); + serverBootstrap.channel(NioServerSocketChannel.class); + serverBootstrap.option(ChannelOption.SO_BACKLOG, 128); + serverBootstrap.option(ChannelOption.SO_REUSEADDR, true); + serverBootstrap.option(ChannelOption.SO_KEEPALIVE, true); + serverBootstrap.option(ChannelOption.TCP_NODELAY, true); + serverBootstrap.option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 1024); + serverBootstrap.option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 1024); + serverBootstrap.childHandler(new ChannelInitializer() { + @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); - pipeline.addLast("encoder", new MessageToByteEncoder() { + pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS)); + pipeline.addLast(new ChannelInboundHandlerAdapter() { @Override - protected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out) throws Exception { - out.writeBytes(msg); + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleState idleState = ((IdleStateEvent) evt).state(); + if (idleState == IdleState.READER_IDLE) { + ctx.close(); + } + return; + } + super.userEventTriggered(ctx, evt); } }); - pipeline.addLast("handler", tcpHandler); + pipeline.addLast(new MessageToByteEncoder() { + @Override + protected void encode(ChannelHandlerContext ctx, byte[] bytes, ByteBuf byteBuf) throws Exception { + byteBuf.writeBytes(bytes); + } + }); + // 这里的对象必须new,否则只有第一次有效,其余无法连接,每次执行的后都需要new + pipeline.addLast(new TCPHandler(instrumentService, collectorService, dataMinuteService, pollService)); } - }).option(ChannelOption.SO_BACKLOG, 128)).childOption(ChannelOption.SO_KEEPALIVE, true); - System.out.println("服务启动..."); - ChannelFuture channelFuture = bootstrap.bind(this.port).sync(); + }); + + ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync().addListener(listener -> { + if (listener.isSuccess()) { + System.out.println("服务启动..."); + } + }); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); diff --git a/src/test/java/cn/com/tenlion/pollutantdata/HJ212Test.java b/src/test/java/cn/com/tenlion/pollutantdata/HJ212Test.java new file mode 100644 index 0000000..ab6c0e3 --- /dev/null +++ b/src/test/java/cn/com/tenlion/pollutantdata/HJ212Test.java @@ -0,0 +1,111 @@ +package cn.com.tenlion.pollutantdata; + +import cn.com.tenlion.pollutantdata.enums.CnEnum; +import cn.com.tenlion.pollutantdata.enums.DataFlagEnum; +import cn.com.tenlion.pollutantdata.enums.StEnum; +import cn.com.tenlion.pollutantdata.utils.HJ212DataUtil; +import cn.com.tenlion.pollutantdata.utils.core.T212Parser; +import ink.wgink.util.date.DateUtil; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.util.CharsetUtil; +import org.junit.jupiter.api.Test; +import org.springframework.util.unit.DataUnit; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +/** + * When you feel like quitting. Think about why you started + * 当你想要放弃的时候,想想当初你为何开始 + * + * @ClassName: Demo + * @Description: + * @Author: wanggeng + * @Date: 2021/3/19 10:41 上午 + * @Version: 1.0 + */ +public class HJ212Test { + + public HJ212DataUtil.Data getData() { + HJ212DataUtil.Data data = new HJ212DataUtil.Data(); + data.setQn(DateUtil.formatDate(System.currentTimeMillis(), "yyyyMMddHHmmsszzz")); + data.setSt(StEnum.AIR.getValue()); + data.setCn(CnEnum.REAL_TIME.getValue()); + // 采集器密码 + data.setPw("123456"); + // 采集器编号 + data.setMn("41018311021510"); + data.setFlag(DataFlagEnum.HAS_PNUM_ANSWER_V2017.getValue()); + HJ212DataUtil.DataCp dataCp = new HJ212DataUtil.DataCp(); + // 实时值 + dataCp.setRtd(18.0D); + // 折算实时值 + dataCp.setZsRtd(6.0D); + // 污染因子 + dataCp.setPollId(""); + dataCp.setDataTime(DateUtil.formatDate(System.currentTimeMillis(), "yyyyMMddHHmmsszzz")); + data.setDataCp(dataCp); + return data; + } + + @Test + public void hj212ClientTest() { + Bootstrap bootstrap = new Bootstrap(); + EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); + try { + bootstrap.group(eventLoopGroup); + bootstrap.channel(NioSocketChannel.class); + bootstrap.option(ChannelOption.SO_KEEPALIVE, true); + bootstrap.option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 1024); + bootstrap.option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 1024); + bootstrap.handler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel socketChannel) throws Exception { + socketChannel.pipeline().addLast(new MessageToByteEncoder() { + @Override + protected void encode(ChannelHandlerContext channelHandlerContext, byte[] bytes, ByteBuf byteBuf) throws Exception { + byteBuf.writeBytes(bytes); + } + }); + socketChannel.pipeline().addLast(new SimpleChannelInboundHandler() { + @Override + protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception { + byte[] dataByte = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(dataByte); + String msg = new String(dataByte, CharsetUtil.UTF_8); + System.out.println(msg); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + cause.printStackTrace(); + ctx.close(); + } + }); + } + }); + ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync(); + channelFuture.addListener((ChannelFutureListener) future -> { + if (!future.isSuccess()) { + future.channel().pipeline().fireChannelInactive(); + } + }); + List datas = HJ212DataUtil.respData(getData()); + datas.forEach(data -> { + channelFuture.channel().writeAndFlush(data.getBytes(StandardCharsets.UTF_8)); + }); + channelFuture.channel().closeFuture(); + } catch (Exception e) { + e.printStackTrace(); + } finally { + eventLoopGroup.shutdownGracefully(); + } + } + +}