From b3c3c492c68ecaa9b0bcf380dcbd4bee40bc9c4a Mon Sep 17 00:00:00 2001 From: wans <747101512@qq.com> Date: Thu, 27 Jan 2022 16:53:06 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4kafka=E9=94=99=E8=AF=AF?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=9B=9E=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafkalistener/KafKaErrorConsumer.java | 67 +++++++++++++++++++ .../IBasePopulationInfoService.java | 2 + .../impl/BasePopulationInfoServiceImpl.java | 24 ++++++- .../IFloatingPopulationService.java | 2 + .../impl/FloatingPopulationServiceImpl.java | 23 +++++++ .../IOverseasPersonnelService.java | 2 + .../impl/OverseasPersonnelServiceImpl.java | 23 +++++++ 7 files changed, 142 insertions(+), 1 deletion(-) create mode 100644 src/main/java/cn/com/tenlion/commonpopulation/kafkalistener/KafKaErrorConsumer.java diff --git a/src/main/java/cn/com/tenlion/commonpopulation/kafkalistener/KafKaErrorConsumer.java b/src/main/java/cn/com/tenlion/commonpopulation/kafkalistener/KafKaErrorConsumer.java new file mode 100644 index 0000000..6a74754 --- /dev/null +++ b/src/main/java/cn/com/tenlion/commonpopulation/kafkalistener/KafKaErrorConsumer.java @@ -0,0 +1,67 @@ +package cn.com.tenlion.commonpopulation.kafkalistener; + +import cn.com.tenlion.commonpopulation.service.basepopulationinfo.IBasePopulationInfoService; +import cn.com.tenlion.commonpopulation.service.floatingpopulation.IFloatingPopulationService; +import cn.com.tenlion.commonpopulation.service.overseaspersonnel.IOverseasPersonnelService; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +/** + * @ClassName: ApiPublishConsumer + * @Description: api发布消费者 + * @Author: wanggeng + * @Date: 2021/8/22 9:31 下午 + * @Version: 1.0 + */ +@Component +public class KafKaErrorConsumer implements ApplicationEventPublisherAware { + + private ApplicationEventPublisher applicationEventPublisher; + private MongoTemplate mongoTemplate; + @Autowired + private IBasePopulationInfoService baseInfoService; + @Autowired + private IOverseasPersonnelService overseasPersonnelService; + @Autowired + private IFloatingPopulationService floatingPopulationService; + + public KafKaErrorConsumer(MongoTemplate mongoTemplate) { + this.mongoTemplate = mongoTemplate; + } + + @KafkaListener(topics = {"C0007", "C0008", "C0009"}) + public void dataT(ConsumerRecord record) { + JSONObject jsonObject = JSON.parseObject(record.value().toString()); + String tableNumber = jsonObject.getString("tableNumber"); + String uid = jsonObject.getString("uid"); + String action = jsonObject.getString("action"); + System.out.println("================= ERROR_PULL_DATA_START ================="); + switch(tableNumber){ + case "C0007" : + baseInfoService.kafkaErrorPullSync(tableNumber, uid, action); + break; + case "C0008" : + overseasPersonnelService.kafkaErrorPullSync(tableNumber, uid, action); + break; + case "C0009" : + floatingPopulationService.kafkaErrorPullSync(tableNumber, uid, action); + break; + default : + break; + } + System.out.println("================= ERROR_PULL_DATA_FINISH ================="); + } + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } + +} diff --git a/src/main/java/cn/com/tenlion/commonpopulation/service/basepopulationinfo/IBasePopulationInfoService.java b/src/main/java/cn/com/tenlion/commonpopulation/service/basepopulationinfo/IBasePopulationInfoService.java index b0080cd..8d6f095 100644 --- a/src/main/java/cn/com/tenlion/commonpopulation/service/basepopulationinfo/IBasePopulationInfoService.java +++ b/src/main/java/cn/com/tenlion/commonpopulation/service/basepopulationinfo/IBasePopulationInfoService.java @@ -184,4 +184,6 @@ public interface IBasePopulationInfoService { List> getDefaultPageNation(Map params); void kafkaFullPullSync(String tableNumber); + + void kafkaErrorPullSync(String tableNumber, String uid, String action); } \ No newline at end of file diff --git a/src/main/java/cn/com/tenlion/commonpopulation/service/basepopulationinfo/impl/BasePopulationInfoServiceImpl.java b/src/main/java/cn/com/tenlion/commonpopulation/service/basepopulationinfo/impl/BasePopulationInfoServiceImpl.java index 6289d9a..5f53894 100644 --- a/src/main/java/cn/com/tenlion/commonpopulation/service/basepopulationinfo/impl/BasePopulationInfoServiceImpl.java +++ b/src/main/java/cn/com/tenlion/commonpopulation/service/basepopulationinfo/impl/BasePopulationInfoServiceImpl.java @@ -595,7 +595,7 @@ public class BasePopulationInfoServiceImpl extends DefaultBaseService implements PopulationInfo info = new PopulationInfo(); for(BasePopulationInfoDTO item : fullList){ try { - jObj.put("uid", item.getBasePopulationInfoId()); + jObj.put("uid", item.getIdCardNumber()); info.setfull_name(item.getFullName()); info.setname_used_before(item.getNameUsedBefore()); String gender = IdCardVerifyUtil.getIdCardGender(item.getIdCardNumber()); @@ -661,4 +661,26 @@ public class BasePopulationInfoServiceImpl extends DefaultBaseService implements } } + @Override + public void kafkaErrorPullSync(String tableNumber, String uid, String action) { + Map params = new HashMap<>(8); + params.put("idCardNumber", uid); + BasePopulationInfoDTO oData = basePopulationInfoDao.getBasePopulationInfo(params); + if(oData == null){ + return; + } + Map dataMap = HashMapUtil.beanToMap(oData); + switch (action){ + case "save" : + sendKafkaBasePopulationInfo(dataMap,"save"); + break; + case "update" : + sendKafkaBasePopulationInfo(dataMap,"save"); + break; + case "delete" : + updateKafkaBasePopulationInfo(dataMap, "delete"); + default: + break; + } + } } \ No newline at end of file diff --git a/src/main/java/cn/com/tenlion/commonpopulation/service/floatingpopulation/IFloatingPopulationService.java b/src/main/java/cn/com/tenlion/commonpopulation/service/floatingpopulation/IFloatingPopulationService.java index 9da8775..7932af9 100644 --- a/src/main/java/cn/com/tenlion/commonpopulation/service/floatingpopulation/IFloatingPopulationService.java +++ b/src/main/java/cn/com/tenlion/commonpopulation/service/floatingpopulation/IFloatingPopulationService.java @@ -150,4 +150,6 @@ public interface IFloatingPopulationService { SuccessResultData countFloatingPopulation(Map params); void kafkaFullPullSync(String tableNumber); + + void kafkaErrorPullSync(String tableNumber, String uid, String action); } \ No newline at end of file diff --git a/src/main/java/cn/com/tenlion/commonpopulation/service/floatingpopulation/impl/FloatingPopulationServiceImpl.java b/src/main/java/cn/com/tenlion/commonpopulation/service/floatingpopulation/impl/FloatingPopulationServiceImpl.java index 5a74106..d78e104 100644 --- a/src/main/java/cn/com/tenlion/commonpopulation/service/floatingpopulation/impl/FloatingPopulationServiceImpl.java +++ b/src/main/java/cn/com/tenlion/commonpopulation/service/floatingpopulation/impl/FloatingPopulationServiceImpl.java @@ -440,6 +440,29 @@ public class FloatingPopulationServiceImpl extends DefaultBaseService implements } } + @Override + public void kafkaErrorPullSync(String tableNumber, String uid, String action) { + Map params = new HashMap<>(8); + params.put("floatingPopulationId", uid); + FloatingPopulationDTO oData = floatingPopulationDao.getFloatingPopulation(params); + if(oData == null){ + return; + } + Map dataMap = HashMapUtil.beanToMap(oData); + switch (action){ + case "save" : + sendKafkaFloatingPopulation(dataMap,"save"); + break; + case "update" : + sendKafkaFloatingPopulation(dataMap,"save"); + break; + case "delete" : + updateKafkaFloatingPopulation(dataMap, "delete"); + default: + break; + } + } + @Override public SuccessResultData countFloatingPopulation(Map params) { Integer count = floatingPopulationDao.countFloatingPopulation(params); diff --git a/src/main/java/cn/com/tenlion/commonpopulation/service/overseaspersonnel/IOverseasPersonnelService.java b/src/main/java/cn/com/tenlion/commonpopulation/service/overseaspersonnel/IOverseasPersonnelService.java index df0838c..93b52d3 100644 --- a/src/main/java/cn/com/tenlion/commonpopulation/service/overseaspersonnel/IOverseasPersonnelService.java +++ b/src/main/java/cn/com/tenlion/commonpopulation/service/overseaspersonnel/IOverseasPersonnelService.java @@ -166,4 +166,6 @@ public interface IOverseasPersonnelService { void updateKafkaOverseasPersonnel(Map params, String action); void kafkaFullPullSync(String tableNumber); + + void kafkaErrorPullSync(String tableNumber, String uid, String action); } \ No newline at end of file diff --git a/src/main/java/cn/com/tenlion/commonpopulation/service/overseaspersonnel/impl/OverseasPersonnelServiceImpl.java b/src/main/java/cn/com/tenlion/commonpopulation/service/overseaspersonnel/impl/OverseasPersonnelServiceImpl.java index 18a464c..7ba9ba8 100644 --- a/src/main/java/cn/com/tenlion/commonpopulation/service/overseaspersonnel/impl/OverseasPersonnelServiceImpl.java +++ b/src/main/java/cn/com/tenlion/commonpopulation/service/overseaspersonnel/impl/OverseasPersonnelServiceImpl.java @@ -408,6 +408,29 @@ public class OverseasPersonnelServiceImpl extends DefaultBaseService implements } } + @Override + public void kafkaErrorPullSync(String tableNumber, String uid, String action) { + Map params = new HashMap<>(8); + params.put("overseasPersonnelId", uid); + OverseasPersonnelDTO oData = overseasPersonnelDao.getOverseasPersonnel(params); + if(oData == null){ + return; + } + Map dataMap = HashMapUtil.beanToMap(oData); + switch (action){ + case "save" : + sendKafkaOverseasPersonnel(dataMap,"save"); + break; + case "update" : + sendKafkaOverseasPersonnel(dataMap,"save"); + break; + case "delete" : + updateKafkaOverseasPersonnel(dataMap, "delete"); + default: + break; + } + } + private void changeDTOFromDict(OverseasPersonnelDTO dto){ // 性别 if(dto.getSex() != null && !"".equals(dto.getSex())){