diff --git a/src/main/java/cn/com/tenlion/systemtask/utils/fullsync/FullSyncC0006.java b/src/main/java/cn/com/tenlion/systemtask/utils/fullsync/FullSyncC0006.java new file mode 100644 index 0000000..be80988 --- /dev/null +++ b/src/main/java/cn/com/tenlion/systemtask/utils/fullsync/FullSyncC0006.java @@ -0,0 +1,94 @@ +package cn.com.tenlion.systemtask.utils.fullsync; + +import cn.com.tenlion.systemtask.pojo.dtos.receiveruser.ReceiverUserDTO; +import cn.com.tenlion.systemtask.pojo.vos.receiveruser.ReceiverUserVO; +import cn.com.tenlion.systemtask.service.receiveruser.IReceiverUserService; +import com.alibaba.fastjson.JSONObject; +import ink.wgink.exceptions.SearchException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * 表单全量数据同步 + * 接收数据整合系统发来的全量数据同步命令, 向数据整合系统发送表单的全部数据 + */ +@Component +public class FullSyncC0006 implements ApplicationEventPublisherAware { + + private ApplicationEventPublisher applicationEventPublisher; + @Autowired + private IReceiverUserService receiverUserService; + @Autowired + KafkaTemplate kafkaTemplate; + + public FullSyncC0006() {} + + @KafkaListener(topics = "C0002FullSync") + public void message(ConsumerRecord record) { + System.out.println("收到全量同步命令"); + JSONObject jsonObject = JSONObject.parseObject(record.value().toString()); + System.out.println(jsonObject.toJSONString());// 表单编号 + System.out.println(jsonObject.getString("tableNumber"));// 表单编号 + System.out.println(jsonObject.getInteger("tableDataCount"));// 整合系统数据量 + /** + * 1 . 查询本地数据的数量 , 如果本地数据量 > 整合系统数据量 + * 2 . 查询数据库中所有的事件列表 + * 3 . 遍历事件, 封装数据 + * 4 . 发送新增案件消息 + */ + Map params = new HashMap<>(); + List receiverUserDTOList = receiverUserService.list(params); + System.out.println(receiverUserDTOList.size()); + if(null != receiverUserDTOList && receiverUserDTOList.size() > 0) { + for(ReceiverUserDTO receiverUserDTO: receiverUserDTOList) { + ReceiverUserVO receiverUserVO = setData(receiverUserDTO); + receiverUserService.saveKafka(receiverUserDTO.getReceiverUserId(), receiverUserVO); + } + } + } + + @KafkaListener(topics = "C0002") + public void messageMsg(ConsumerRecord record) { + JSONObject jsonObject = JSONObject.parseObject(record.value().toString()); + Map params = new HashMap<>(); + if(!"delete".equals(jsonObject.getString("action"))) { + ReceiverUserDTO receiverUserDTO = receiverUserService.get(jsonObject.getString("uid")); + ReceiverUserVO receiverUserVO = setData(receiverUserDTO); + receiverUserService.saveKafka(receiverUserDTO.getReceiverUserId(), receiverUserVO); + }else { + receiverUserService.deleteKafka(jsonObject.getString("uid")); + } + } + + private ReceiverUserVO setData(ReceiverUserDTO receiverUserDTO) { + if(null == receiverUserDTO) { + throw new SearchException("当前对象为空,请注意查看"); + } + ReceiverUserVO receiverUserVO = new ReceiverUserVO(); + receiverUserVO.setTaskName(receiverUserDTO.getDistributeTitle()); + receiverUserVO.setTaskSummary(receiverUserDTO.getDistributeSummary()); + receiverUserVO.setDistributeTaskType(receiverUserDTO.getDistributeTaskType()); + receiverUserVO.setDistributeType(receiverUserDTO.getDistributeType()); + receiverUserVO.setDistributeTime(receiverUserDTO.getDistributeTime()); + receiverUserVO.setIsWarning(receiverUserDTO.getIsWarning()); + receiverUserVO.setWarningTime(receiverUserDTO.getWarningTime()); + receiverUserVO.setIsSupervision(receiverUserDTO.getIsSupervision()); + receiverUserVO.setSupervisionTime(receiverUserDTO.getSupervisionTime()); + return receiverUserVO; + } + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } + +} \ No newline at end of file