diff --git a/src/main/java/cn/com/tenlion/commonpopulation/kafkalistener/KafKaPublishConsumer.java b/src/main/java/cn/com/tenlion/commonpopulation/kafkalistener/KafKaPublishConsumer.java index 2cbf17d..4f309c7 100644 --- a/src/main/java/cn/com/tenlion/commonpopulation/kafkalistener/KafKaPublishConsumer.java +++ b/src/main/java/cn/com/tenlion/commonpopulation/kafkalistener/KafKaPublishConsumer.java @@ -1,6 +1,8 @@ package cn.com.tenlion.commonpopulation.kafkalistener; import cn.com.tenlion.commonpopulation.service.basepopulationinfo.IBasePopulationInfoService; +import cn.com.tenlion.commonpopulation.service.floatingpopulation.IFloatingPopulationService; +import cn.com.tenlion.commonpopulation.service.overseaspersonnel.IOverseasPersonnelService; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -25,6 +27,10 @@ public class KafKaPublishConsumer implements ApplicationEventPublisherAware { private MongoTemplate mongoTemplate; @Autowired private IBasePopulationInfoService baseInfoService; + @Autowired + private IOverseasPersonnelService overseasPersonnelService; + @Autowired + private IFloatingPopulationService floatingPopulationService; public KafKaPublishConsumer(MongoTemplate mongoTemplate) { this.mongoTemplate = mongoTemplate; @@ -39,6 +45,12 @@ public class KafKaPublishConsumer implements ApplicationEventPublisherAware { case "C0007" : baseInfoService.kafkaFullPullSync(tableNumber); break; + case "C0008" : + overseasPersonnelService.kafkaFullPullSync(tableNumber); + break; + case "C0009" : + floatingPopulationService.kafkaFullPullSync(tableNumber); + break; default : break; } diff --git a/src/main/java/cn/com/tenlion/commonpopulation/service/basepopulationinfo/impl/BasePopulationInfoServiceImpl.java b/src/main/java/cn/com/tenlion/commonpopulation/service/basepopulationinfo/impl/BasePopulationInfoServiceImpl.java index 75edb2d..6289d9a 100644 --- a/src/main/java/cn/com/tenlion/commonpopulation/service/basepopulationinfo/impl/BasePopulationInfoServiceImpl.java +++ b/src/main/java/cn/com/tenlion/commonpopulation/service/basepopulationinfo/impl/BasePopulationInfoServiceImpl.java @@ -592,16 +592,15 @@ public class BasePopulationInfoServiceImpl extends DefaultBaseService implements jObj.put("action", "save"); List fullList = basePopulationInfoDao.listBasePopulationInfo(null); int forIndex = 0; + PopulationInfo info = new PopulationInfo(); for(BasePopulationInfoDTO item : fullList){ try { jObj.put("uid", item.getBasePopulationInfoId()); - PopulationInfo info = new PopulationInfo(); info.setfull_name(item.getFullName()); info.setname_used_before(item.getNameUsedBefore()); String gender = IdCardVerifyUtil.getIdCardGender(item.getIdCardNumber()); info.setgender("1".equals(gender) ? "男" : "女"); info.setage(IdCardVerifyUtil.getAgeFromIdCardNumber(item.getIdCardNumber())); - //info.setage(15); if(item.getNation() != null && !"".equals(item.getNation())){ DataDTO dataDTO = mongoDataService.get(item.getNation()); info.setnation(dataDTO == null ? "" : dataDTO.getDataName()); diff --git a/src/main/java/cn/com/tenlion/commonpopulation/service/floatingpopulation/IFloatingPopulationService.java b/src/main/java/cn/com/tenlion/commonpopulation/service/floatingpopulation/IFloatingPopulationService.java index f690be4..9da8775 100644 --- a/src/main/java/cn/com/tenlion/commonpopulation/service/floatingpopulation/IFloatingPopulationService.java +++ b/src/main/java/cn/com/tenlion/commonpopulation/service/floatingpopulation/IFloatingPopulationService.java @@ -148,4 +148,6 @@ public interface IFloatingPopulationService { void updateKafkaFloatingPopulation(Map params, String action); SuccessResultData countFloatingPopulation(Map params); + + void kafkaFullPullSync(String tableNumber); } \ No newline at end of file diff --git a/src/main/java/cn/com/tenlion/commonpopulation/service/floatingpopulation/impl/FloatingPopulationServiceImpl.java b/src/main/java/cn/com/tenlion/commonpopulation/service/floatingpopulation/impl/FloatingPopulationServiceImpl.java index 419fce9..5a74106 100644 --- a/src/main/java/cn/com/tenlion/commonpopulation/service/floatingpopulation/impl/FloatingPopulationServiceImpl.java +++ b/src/main/java/cn/com/tenlion/commonpopulation/service/floatingpopulation/impl/FloatingPopulationServiceImpl.java @@ -395,6 +395,51 @@ public class FloatingPopulationServiceImpl extends DefaultBaseService implements kafkaTemplate.send("tableSync", jObj.toJSONString()); } + @Override + public void kafkaFullPullSync(String tableNumber) { + JSONObject jObj = new JSONObject(); + jObj.put("tableNumber", "C0009"); + jObj.put("action", "save"); + int forIndex = 0; + List fullList = floatingPopulationDao.listFloatingPopulation(null); + FloatingPopulation info = new FloatingPopulation(); + for(FloatingPopulationDTO item : fullList){ + try{ + jObj.put("uid", item.getFloatingPopulationId()); + info.setid_card_number(IdCardVerifyUtil.sensitiveDataReplace(item.getIdCardNumber())); + info.setfull_name(item.getFullName()); + info.setlocation_all(item.getAreaNames()); + info.setlocation_code(item.getAreaCode()); + if(item.getAreaCode() != null && !"".equals(item.getAreaCode())){ + AreaDTO areaDTO = areaService.getByCode(item.getAreaCode()); + info.setlocation_area(areaDTO == null ? "" : areaDTO.getAreaName()); + } + info.setinflow_reason(item.getInflowReason() == null ? "" : item.getInflowReason()); + if(item.getInflowReason() != null && !"".equals(item.getInflowReason())){ + DataDTO dataDTO = dataService.get(item.getInflowReason()); + info.setinflow_reason(dataDTO == null ? "" : dataDTO.getDataName()); + } + info.setregister_date(item.getRegisterDate() == null ? "" : item.getRegisterDate()); + info.setcard_expire_date(item.getCardExpireDate() == null ? "" : item.getCardExpireDate()); + if(item.getRegistrationType() != null && !"".equals(item.getRegistrationType())){ + DataDTO dataDTO = dataService.get(item.getRegistrationType()); + info.setresidence_type(dataDTO == null ? "" : dataDTO.getDataName()); + } + info.setkey_of_follow("否"); + if("1".equals(item.getKeyOfFollow())){ + info.setkey_of_follow("是"); + } + jObj.put("data", info.saveCheckToJson()); + kafkaTemplate.send("tableSync", jObj.toJSONString()); + if(forIndex != 0 && forIndex % 10 == 0){ + System.out.println("================= PULL_COUNT_" + forIndex + " ================="); + } + } catch (Exception e){ + System.out.println(item); + } + } + } + @Override public SuccessResultData countFloatingPopulation(Map params) { Integer count = floatingPopulationDao.countFloatingPopulation(params); diff --git a/src/main/java/cn/com/tenlion/commonpopulation/service/overseaspersonnel/IOverseasPersonnelService.java b/src/main/java/cn/com/tenlion/commonpopulation/service/overseaspersonnel/IOverseasPersonnelService.java index 608ffa4..df0838c 100644 --- a/src/main/java/cn/com/tenlion/commonpopulation/service/overseaspersonnel/IOverseasPersonnelService.java +++ b/src/main/java/cn/com/tenlion/commonpopulation/service/overseaspersonnel/IOverseasPersonnelService.java @@ -164,4 +164,6 @@ public interface IOverseasPersonnelService { void sendKafkaOverseasPersonnel(Map params, String action); void updateKafkaOverseasPersonnel(Map params, String action); + + void kafkaFullPullSync(String tableNumber); } \ No newline at end of file diff --git a/src/main/java/cn/com/tenlion/commonpopulation/service/overseaspersonnel/impl/OverseasPersonnelServiceImpl.java b/src/main/java/cn/com/tenlion/commonpopulation/service/overseaspersonnel/impl/OverseasPersonnelServiceImpl.java index 8c8d5a1..18a464c 100644 --- a/src/main/java/cn/com/tenlion/commonpopulation/service/overseaspersonnel/impl/OverseasPersonnelServiceImpl.java +++ b/src/main/java/cn/com/tenlion/commonpopulation/service/overseaspersonnel/impl/OverseasPersonnelServiceImpl.java @@ -316,7 +316,10 @@ public class OverseasPersonnelServiceImpl extends DefaultBaseService implements info.setsex(dataDTO == null ? "" : dataDTO.getDataName()); } info.setbirthday(params.get("birthday") == null ? "" : params.get("birthday").toString()); - info.setkey_of_follow(params.get("keyOfFollow") == null ? "否" : params.get("keyOfFollow").toString()); + info.setkey_of_follow("否"); + if("1".equals(params.get("keyOfFollow").toString())){ + info.setkey_of_follow("是"); + } jObj.put("data", info.saveCheckToJson()); kafkaTemplate.send("tableSync", jObj.toJSONString()); } @@ -351,11 +354,60 @@ public class OverseasPersonnelServiceImpl extends DefaultBaseService implements info.setsex(dataDTO == null ? null : dataDTO.getDataName()); } info.setbirthday(params.get("birthday") == null ? null : params.get("birthday").toString()); - info.setkey_of_follow(params.get("keyOfFollow") == null ? "否" : params.get("keyOfFollow").toString()); + info.setkey_of_follow("否"); + if("1".equals(params.get("keyOfFollow").toString())){ + info.setkey_of_follow("是"); + } jObj.put("data", info.updateToJson()); kafkaTemplate.send("tableSync", jObj.toJSONString()); } + @Override + public void kafkaFullPullSync(String tableNumber) { + JSONObject jObj = new JSONObject(); + jObj.put("tableNumber", "C0008"); + jObj.put("action", "save"); + List fullList = overseasPersonnelDao.listOverseasPersonnel(null); + OverseasPersonnel info = new OverseasPersonnel(); + int forIndex = 0; + for(OverseasPersonnelDTO item : fullList){ + try { + jObj.put("uid", item.getOverseasPersonnelId()); + info.setlocation_all(item.getAreaNames()); + info.setlocation_code(item.getAreaCode()); + if(item.getAreaCode() != null && !"".equals(item.getAreaCode())){ + AreaDTO areaDTO = areaService.getByCode(item.getAreaCode()); + info.setlocation_area(areaDTO == null ? "" : areaDTO.getAreaName()); + } + info.setfamily_name(item.getFamilyName() == null ? "" : item.getFamilyName()); + info.setgiven_name(item.getGivenName() == null ? "" : item.getGivenName()); + info.setchinese_name(item.getChineseName() == null ? "" : item.getChineseName()); + info.setnationality(item.getNationality() == null ? "" : item.getNationality()); + if(item.getNationality() != null && !"".equals(item.getNationality())){ + DataDTO dataDTO = dataService.get(item.getNationality()); + info.setnationality(dataDTO == null ? "" : dataDTO.getDataName()); + } + if(item.getSex() != null && !"".equals(item.getSex())){ + DataDTO dataDTO = dataService.get(item.getSex()); + info.setsex(dataDTO == null ? "" : dataDTO.getDataName()); + } + info.setbirthday(item.getBirthday() == null ? "" : item.getBirthday()); + info.setkey_of_follow("否"); + if("1".equals(item.getKeyOfFollow())){ + info.setkey_of_follow("是"); + } + jObj.put("data", info.saveCheckToJson()); + kafkaTemplate.send("tableSync", jObj.toJSONString()); + if(forIndex != 0 && forIndex % 10 == 0){ + System.out.println("================= PULL_COUNT_" + forIndex + " ================="); + } + } catch (Exception e){ + System.out.println(item); + } + forIndex++; + } + } + private void changeDTOFromDict(OverseasPersonnelDTO dto){ // 性别 if(dto.getSex() != null && !"".equals(dto.getSex())){