From 22cf857ada0c139c7df18619f312ca61b3056aed Mon Sep 17 00:00:00 2001 From: Renpc-kilig <308442850@qq.com> Date: Thu, 27 Jan 2022 20:13:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AF=B9=E6=8E=A5=E5=AE=9E=E6=97=B6=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E5=92=8C=E5=85=A8=E9=87=8F=E5=90=8C=E6=AD=A5=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../utils/fullsync/FullSyncC0006.java | 94 +++++++++++++++++++ 1 file changed, 94 insertions(+) create mode 100644 src/main/java/cn/com/tenlion/systemtask/utils/fullsync/FullSyncC0006.java diff --git a/src/main/java/cn/com/tenlion/systemtask/utils/fullsync/FullSyncC0006.java b/src/main/java/cn/com/tenlion/systemtask/utils/fullsync/FullSyncC0006.java new file mode 100644 index 0000000..be80988 --- /dev/null +++ b/src/main/java/cn/com/tenlion/systemtask/utils/fullsync/FullSyncC0006.java @@ -0,0 +1,94 @@ +package cn.com.tenlion.systemtask.utils.fullsync; + +import cn.com.tenlion.systemtask.pojo.dtos.receiveruser.ReceiverUserDTO; +import cn.com.tenlion.systemtask.pojo.vos.receiveruser.ReceiverUserVO; +import cn.com.tenlion.systemtask.service.receiveruser.IReceiverUserService; +import com.alibaba.fastjson.JSONObject; +import ink.wgink.exceptions.SearchException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 表单全量数据同步 + * 接收数据整合系统发来的全量数据同步命令, 向数据整合系统发送表单的全部数据 + */ +@Component +public class FullSyncC0006 implements ApplicationEventPublisherAware { + + private ApplicationEventPublisher applicationEventPublisher; + @Autowired + private IReceiverUserService receiverUserService; + @Autowired + KafkaTemplate kafkaTemplate; + + public FullSyncC0006() {} + + @KafkaListener(topics = "C0002FullSync") + public void message(ConsumerRecord record) { + System.out.println("收到全量同步命令"); + JSONObject jsonObject = JSONObject.parseObject(record.value().toString()); + System.out.println(jsonObject.toJSONString());// 表单编号 + System.out.println(jsonObject.getString("tableNumber"));// 表单编号 + System.out.println(jsonObject.getInteger("tableDataCount"));// 整合系统数据量 + /** + * 1 . 查询本地数据的数量 , 如果本地数据量 > 整合系统数据量 + * 2 . 查询数据库中所有的事件列表 + * 3 . 遍历事件, 封装数据 + * 4 . 发送新增案件消息 + */ + Map params = new HashMap<>(); + List receiverUserDTOList = receiverUserService.list(params); + System.out.println(receiverUserDTOList.size()); + if(null != receiverUserDTOList && receiverUserDTOList.size() > 0) { + for(ReceiverUserDTO receiverUserDTO: receiverUserDTOList) { + ReceiverUserVO receiverUserVO = setData(receiverUserDTO); + receiverUserService.saveKafka(receiverUserDTO.getReceiverUserId(), receiverUserVO); + } + } + } + + @KafkaListener(topics = "C0002") + public void messageMsg(ConsumerRecord record) { + JSONObject jsonObject = JSONObject.parseObject(record.value().toString()); + Map params = new HashMap<>(); + if(!"delete".equals(jsonObject.getString("action"))) { + ReceiverUserDTO receiverUserDTO = receiverUserService.get(jsonObject.getString("uid")); + ReceiverUserVO receiverUserVO = setData(receiverUserDTO); + receiverUserService.saveKafka(receiverUserDTO.getReceiverUserId(), receiverUserVO); + }else { + receiverUserService.deleteKafka(jsonObject.getString("uid")); + } + } + + private ReceiverUserVO setData(ReceiverUserDTO receiverUserDTO) { + if(null == receiverUserDTO) { + throw new SearchException("当前对象为空,请注意查看"); + } + ReceiverUserVO receiverUserVO = new ReceiverUserVO(); + receiverUserVO.setTaskName(receiverUserDTO.getDistributeTitle()); + receiverUserVO.setTaskSummary(receiverUserDTO.getDistributeSummary()); + receiverUserVO.setDistributeTaskType(receiverUserDTO.getDistributeTaskType()); + receiverUserVO.setDistributeType(receiverUserDTO.getDistributeType()); + receiverUserVO.setDistributeTime(receiverUserDTO.getDistributeTime()); + receiverUserVO.setIsWarning(receiverUserDTO.getIsWarning()); + receiverUserVO.setWarningTime(receiverUserDTO.getWarningTime()); + receiverUserVO.setIsSupervision(receiverUserDTO.getIsSupervision()); + receiverUserVO.setSupervisionTime(receiverUserDTO.getSupervisionTime()); + return receiverUserVO; + } + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } + +} \ No newline at end of file