完善数据采集功能代码

This commit is contained in:
wanggeng888 2021-03-19 21:08:24 +08:00
parent a426baaf58
commit 41a1fee5bc
11 changed files with 235 additions and 80 deletions

View File

@ -62,6 +62,7 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.50.Final</version>
</dependency>
<dependency>

View File

@ -14,11 +14,8 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
import java.util.Date;
@EnableSwagger2
@EnableScheduling
@SpringBootApplication(scanBasePackages = {"cn.com.tenlion", "ink.wgink"})
@ -62,6 +59,7 @@ public class PollutantDataApplication {
e.printStackTrace();
}
}).start();
// new Thread(new Server()).start();
};
}

View File

@ -10,7 +10,7 @@ package cn.com.tenlion.pollutantdata.enums;
* @Date: 2021/3/15 11:07 下午
* @Version: 1.0
*/
public enum CnEnums {
public enum CnEnum {
/**
* 实时数据
@ -23,7 +23,7 @@ public enum CnEnums {
private int value;
CnEnums(int value) {
CnEnum(int value) {
this.value = value;
}

View File

@ -10,7 +10,7 @@ package cn.com.tenlion.pollutantdata.enums;
* @Date: 2021/3/17 12:36 下午
* @Version: 1.0
*/
public enum DataFlagEnums {
public enum DataFlagEnum {
/**
* 有包有应答2005版本
*/
@ -49,7 +49,7 @@ public enum DataFlagEnums {
private int pNum;
private int answer;
DataFlagEnums(int value, int version, int pNum, int answer) {
DataFlagEnum(int value, int version, int pNum, int answer) {
this.value = value;
this.version = version;
this.pNum = pNum;

View File

@ -1,7 +1,7 @@
package cn.com.tenlion.pollutantdata.pojo;
import cn.com.tenlion.pollutantdata.enums.CnEnums;
import cn.com.tenlion.pollutantdata.enums.DataFlagEnums;
import cn.com.tenlion.pollutantdata.enums.CnEnum;
import cn.com.tenlion.pollutantdata.enums.DataFlagEnum;
import cn.com.tenlion.pollutantdata.enums.StEnum;
import lombok.Data;
@ -19,9 +19,9 @@ import lombok.Data;
public class ResponseResult<CPData> {
private String qn;
private StEnum st;
private CnEnums cn;
private CnEnum cn;
private String mn;
private DataFlagEnums flag;
private DataFlagEnum flag;
private CPData cpData;
}

View File

@ -49,7 +49,7 @@ public class HJ212DataUtil {
List<String> results = new ArrayList<>();
String dataCp = respDataCp(data.getDataCp());
int packageCount = (dataCp.length() % 950 == 0) ? (dataCp.length() / 950) : dataCp.length() / 950 + 1;
if (packageCount == 1) {
if (packageCount <= 1) {
String respData = String.format("QN=%s;ST=%d;CN=%d;PW=%s;MN=%s;Flag=%d;CP=&&%s&&",
data.getQn(), data.getSt(), data.getCn(), data.getPw(), data.getMn(), data.getFlag(), dataCp);
char[] respDataChar = respData.toCharArray();

View File

@ -1,14 +1,14 @@
package cn.com.tenlion.pollutantdata.utils.core;
import cn.com.tenlion.pollutantdata.enums.CnEnums;
import cn.com.tenlion.pollutantdata.enums.CnEnum;
import cn.com.tenlion.pollutantdata.enums.ExeRtnEnum;
import cn.com.tenlion.pollutantdata.enums.QnRtdEnum;
import cn.com.tenlion.pollutantdata.enums.StEnum;
import cn.com.tenlion.pollutantdata.utils.HJ212DataUtil;
import cn.com.tenlion.pollutantdata.utils.config.IResponse;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.util.unit.DataUnit;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
@ -60,10 +60,16 @@ public class ResponseDefault implements IResponse<HJ212DataUtil.Data> {
// 系统交互
data.setSt(StEnum.SYSTEM_INTERACTION.getValue());
// 系统应答
data.setCn(CnEnums.RESPONSE.getValue());
data.setCn(CnEnum.RESPONSE.getValue());
List<String> respDatas = HJ212DataUtil.respData(data);
respDatas.forEach((respData) -> {
ctx.writeAndFlush(respData);
if (ctx.isRemoved()) {
return;
}
if (!ctx.channel().isActive()) {
return;
}
ctx.channel().writeAndFlush(respData.getBytes(StandardCharsets.UTF_8));
});
}
}

View File

@ -0,0 +1,32 @@
package cn.com.tenlion.pollutantdata.utils.exception;
/**
* When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始
*
* @ClassName: LengthException
* @Description: 长度异常
* @Author: wanggeng
* @Date: 2021/3/19 2:28 下午
* @Version: 1.0
*/
public class LengthException extends HJ212Exception {
public LengthException() {
}
public LengthException(String message) {
super(message);
}
public LengthException(String message, Throwable cause) {
super(message, cause);
}
public LengthException(Throwable cause) {
super(cause);
}
public LengthException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -11,11 +11,8 @@ import cn.com.tenlion.pollutantdata.utils.core.ResponseDefault;
import cn.com.tenlion.pollutantdata.utils.core.T212Parser;
import cn.com.tenlion.pollutantdata.utils.exception.*;
import cn.com.tenlion.pollutantdata.utils.net.handle.RealDataHandler;
import com.oracle.tools.packager.Log;
import ink.wgink.util.RegexUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
@ -46,13 +43,19 @@ public class TCPHandler extends SimpleChannelInboundHandler<ByteBuf> {
private IDataMinuteService dataMinuteService;
private IPollService pollService;
public TCPHandler() {
public TCPHandler(IInstrumentService instrumentService, ICollectorService collectorService, IDataMinuteService dataMinuteService, IPollService pollService) {
this.instrumentService = instrumentService;
this.collectorService = collectorService;
this.dataMinuteService = dataMinuteService;
this.pollService = pollService;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf byteBuf) throws Exception {
byte[] dataByte = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(dataByte);
ctx.channel().writeAndFlush(dataByte);
byteBuf.readBytes(dataByte);
String msg = new String(dataByte, CharsetUtil.UTF_8);
SocketAddress address = ctx.channel().remoteAddress();
@ -62,16 +65,16 @@ public class TCPHandler extends SimpleChannelInboundHandler<ByteBuf> {
return;
}
// 是否要响应
boolean needResponse = data.getFlag() == DataFlagEnums.HAS_PNUM_ANSWER_V2005.getValue() ||
data.getFlag() == DataFlagEnums.HAS_PNUM_ANSWER_V2017.getValue() ||
data.getFlag() == DataFlagEnums.NO_PNUM_ANSWER_V2005.getValue() ||
data.getFlag() == DataFlagEnums.NO_PNUM_ANSWER_V2017.getValue();
boolean needResponse = data.getFlag() == DataFlagEnum.HAS_PNUM_ANSWER_V2005.getValue() ||
data.getFlag() == DataFlagEnum.HAS_PNUM_ANSWER_V2017.getValue() ||
data.getFlag() == DataFlagEnum.NO_PNUM_ANSWER_V2005.getValue() ||
data.getFlag() == DataFlagEnum.NO_PNUM_ANSWER_V2017.getValue();
if (StringUtils.isBlank(data.getQn())) {
log.debug("请求编码空");
new ResponseDefault(ctx, QnRtdEnum.QN_ERROR, ExeRtnEnum.COMMAND_ERROR, needResponse).response(data);
return;
}
if (RegexUtil.isYyyyMmDdHhMmSsZzz(data.getQn())) {
if (!RegexUtil.isYyyyMmDdHhMmSsZzz(data.getQn())) {
log.debug("请求编码格式错误");
new ResponseDefault(ctx, QnRtdEnum.QN_ERROR, ExeRtnEnum.COMMAND_ERROR, needResponse).response(data);
return;
@ -107,7 +110,7 @@ public class TCPHandler extends SimpleChannelInboundHandler<ByteBuf> {
new ResponseDefault(ctx, QnRtdEnum.ST_ERROR, ExeRtnEnum.COMMAND_ERROR, needResponse).response(data);
return;
}
if (CnEnums.REAL_TIME.getValue() == data.getCn()) {
if (CnEnum.REAL_TIME.getValue() == data.getCn()) {
log.debug("保存实时数据");
RealDataHandler realDataHandler = new RealDataHandler();
realDataHandler.setIpAddress(address.toString());
@ -134,12 +137,7 @@ public class TCPHandler extends SimpleChannelInboundHandler<ByteBuf> {
char[] dataChar = t212Parser.readData(Integer.parseInt(new String(dataLen)));
char[] readCrc = t212Parser.readCrc();
int crc = T212Parser.crc16Checkout(dataChar, dataChar.length);
try {
if (Integer.parseInt((new BigInteger(new String(readCrc), 16)).toString()) != crc) {
log.error("crc error");
throw new CrcException("CRC Error");
}
} catch (Exception e) {
if (Integer.parseInt((new BigInteger(new String(readCrc), 16)).toString()) != crc) {
log.error("crc error");
throw new CrcException("CRC Error");
}
@ -157,46 +155,24 @@ public class TCPHandler extends SimpleChannelInboundHandler<ByteBuf> {
return data;
}
public static void main(String[] args) {
String data = "ST=32;CN=2011;QN=20210318233308001;PW=123456;MN=41018311021510;Flag=7;CP=&&DataTime=20210318233257;011-Rtd=123;011-ZsRtd=123&&";
char[] dataChar = data.toCharArray();
System.out.println(Integer.toHexString(T212Parser.crc16Checkout(dataChar, dataChar.length)));
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
log.debug("new connection");
super.channelRegistered(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("close connection");
super.channelInactive(ctx);
System.out.println("inactive");
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channelActive");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error(cause.getMessage(), cause);
if (ctx.channel().isActive()) {
ctx.channel().writeAndFlush(Unpooled.EMPTY_BUFFER);//.addListener(ChannelFutureListener.CLOSE);
}
}
public void setInstrumentService(IInstrumentService instrumentService) {
this.instrumentService = instrumentService;
}
public void setCollectorService(ICollectorService collectorService) {
this.collectorService = collectorService;
}
public void setDataMinuteService(IDataMinuteService dataMinuteService) {
this.dataMinuteService = dataMinuteService;
}
public void setPollService(IPollService pollService) {
this.pollService = pollService;
ctx.close().addListener(listener -> {
if (listener.isSuccess()) {
System.out.println("channel close");
}
});
}
/**

View File

@ -10,9 +10,18 @@ import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.CharsetUtil;
import org.springframework.beans.factory.annotation.Autowired;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始
@ -41,28 +50,50 @@ public class TCPServer {
EventLoopGroup boosGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
// TCP处理器
TCPHandler tcpHandler = new TCPHandler();
tcpHandler.setInstrumentService(instrumentService);
tcpHandler.setCollectorService(collectorService);
tcpHandler.setDataMinuteService(dataMinuteService);
try {
ServerBootstrap bootstrap = new ServerBootstrap();
((ServerBootstrap) ((ServerBootstrap) bootstrap.group(boosGroup, workGroup).channel(NioServerSocketChannel.class)).childHandler(new ChannelInitializer<SocketChannel>() {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boosGroup, workGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
serverBootstrap.option(ChannelOption.SO_REUSEADDR, true);
serverBootstrap.option(ChannelOption.SO_KEEPALIVE, true);
serverBootstrap.option(ChannelOption.TCP_NODELAY, true);
serverBootstrap.option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 1024);
serverBootstrap.option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 1024);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", new MessageToByteEncoder<byte[]>() {
pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
protected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out) throws Exception {
out.writeBytes(msg);
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState idleState = ((IdleStateEvent) evt).state();
if (idleState == IdleState.READER_IDLE) {
ctx.close();
}
return;
}
super.userEventTriggered(ctx, evt);
}
});
pipeline.addLast("handler", tcpHandler);
pipeline.addLast(new MessageToByteEncoder<byte[]>() {
@Override
protected void encode(ChannelHandlerContext ctx, byte[] bytes, ByteBuf byteBuf) throws Exception {
byteBuf.writeBytes(bytes);
}
});
// 这里的对象必须new否则只有第一次有效其余无法连接每次执行的后都需要new
pipeline.addLast(new TCPHandler(instrumentService, collectorService, dataMinuteService, pollService));
}
}).option(ChannelOption.SO_BACKLOG, 128)).childOption(ChannelOption.SO_KEEPALIVE, true);
System.out.println("服务启动...");
ChannelFuture channelFuture = bootstrap.bind(this.port).sync();
});
ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync().addListener(listener -> {
if (listener.isSuccess()) {
System.out.println("服务启动...");
}
});
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();

View File

@ -0,0 +1,111 @@
package cn.com.tenlion.pollutantdata;
import cn.com.tenlion.pollutantdata.enums.CnEnum;
import cn.com.tenlion.pollutantdata.enums.DataFlagEnum;
import cn.com.tenlion.pollutantdata.enums.StEnum;
import cn.com.tenlion.pollutantdata.utils.HJ212DataUtil;
import cn.com.tenlion.pollutantdata.utils.core.T212Parser;
import ink.wgink.util.date.DateUtil;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.util.CharsetUtil;
import org.junit.jupiter.api.Test;
import org.springframework.util.unit.DataUnit;
import java.nio.charset.StandardCharsets;
import java.util.List;
/**
* When you feel like quitting. Think about why you started
* 当你想要放弃的时候想想当初你为何开始
*
* @ClassName: Demo
* @Description:
* @Author: wanggeng
* @Date: 2021/3/19 10:41 上午
* @Version: 1.0
*/
public class HJ212Test {
public HJ212DataUtil.Data getData() {
HJ212DataUtil.Data data = new HJ212DataUtil.Data();
data.setQn(DateUtil.formatDate(System.currentTimeMillis(), "yyyyMMddHHmmsszzz"));
data.setSt(StEnum.AIR.getValue());
data.setCn(CnEnum.REAL_TIME.getValue());
// 采集器密码
data.setPw("123456");
// 采集器编号
data.setMn("41018311021510");
data.setFlag(DataFlagEnum.HAS_PNUM_ANSWER_V2017.getValue());
HJ212DataUtil.DataCp dataCp = new HJ212DataUtil.DataCp();
// 实时值
dataCp.setRtd(18.0D);
// 折算实时值
dataCp.setZsRtd(6.0D);
// 污染因子
dataCp.setPollId("");
dataCp.setDataTime(DateUtil.formatDate(System.currentTimeMillis(), "yyyyMMddHHmmsszzz"));
data.setDataCp(dataCp);
return data;
}
@Test
public void hj212ClientTest() {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try {
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 1024);
bootstrap.option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 1024);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new MessageToByteEncoder<byte[]>() {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, byte[] bytes, ByteBuf byteBuf) throws Exception {
byteBuf.writeBytes(bytes);
}
});
socketChannel.pipeline().addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
byte[] dataByte = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(dataByte);
String msg = new String(dataByte, CharsetUtil.UTF_8);
System.out.println(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
channelFuture.addListener((ChannelFutureListener) future -> {
if (!future.isSuccess()) {
future.channel().pipeline().fireChannelInactive();
}
});
List<String> datas = HJ212DataUtil.respData(getData());
datas.forEach(data -> {
channelFuture.channel().writeAndFlush(data.getBytes(StandardCharsets.UTF_8));
});
channelFuture.channel().closeFuture();
} catch (Exception e) {
e.printStackTrace();
} finally {
eventLoopGroup.shutdownGracefully();
}
}
}