境外人员、流动人口全量推送
This commit is contained in:
parent
efc3d46e3a
commit
74b3bbdfab
@ -1,6 +1,8 @@
|
||||
package cn.com.tenlion.commonpopulation.kafkalistener;
|
||||
|
||||
import cn.com.tenlion.commonpopulation.service.basepopulationinfo.IBasePopulationInfoService;
|
||||
import cn.com.tenlion.commonpopulation.service.floatingpopulation.IFloatingPopulationService;
|
||||
import cn.com.tenlion.commonpopulation.service.overseaspersonnel.IOverseasPersonnelService;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
||||
@ -25,6 +27,10 @@ public class KafKaPublishConsumer implements ApplicationEventPublisherAware {
|
||||
private MongoTemplate mongoTemplate;
|
||||
@Autowired
|
||||
private IBasePopulationInfoService baseInfoService;
|
||||
@Autowired
|
||||
private IOverseasPersonnelService overseasPersonnelService;
|
||||
@Autowired
|
||||
private IFloatingPopulationService floatingPopulationService;
|
||||
|
||||
public KafKaPublishConsumer(MongoTemplate mongoTemplate) {
|
||||
this.mongoTemplate = mongoTemplate;
|
||||
@ -39,6 +45,12 @@ public class KafKaPublishConsumer implements ApplicationEventPublisherAware {
|
||||
case "C0007" :
|
||||
baseInfoService.kafkaFullPullSync(tableNumber);
|
||||
break;
|
||||
case "C0008" :
|
||||
overseasPersonnelService.kafkaFullPullSync(tableNumber);
|
||||
break;
|
||||
case "C0009" :
|
||||
floatingPopulationService.kafkaFullPullSync(tableNumber);
|
||||
break;
|
||||
default :
|
||||
break;
|
||||
}
|
||||
|
@ -592,16 +592,15 @@ public class BasePopulationInfoServiceImpl extends DefaultBaseService implements
|
||||
jObj.put("action", "save");
|
||||
List<BasePopulationInfoDTO> fullList = basePopulationInfoDao.listBasePopulationInfo(null);
|
||||
int forIndex = 0;
|
||||
PopulationInfo info = new PopulationInfo();
|
||||
for(BasePopulationInfoDTO item : fullList){
|
||||
try {
|
||||
jObj.put("uid", item.getBasePopulationInfoId());
|
||||
PopulationInfo info = new PopulationInfo();
|
||||
info.setfull_name(item.getFullName());
|
||||
info.setname_used_before(item.getNameUsedBefore());
|
||||
String gender = IdCardVerifyUtil.getIdCardGender(item.getIdCardNumber());
|
||||
info.setgender("1".equals(gender) ? "男" : "女");
|
||||
info.setage(IdCardVerifyUtil.getAgeFromIdCardNumber(item.getIdCardNumber()));
|
||||
//info.setage(15);
|
||||
if(item.getNation() != null && !"".equals(item.getNation())){
|
||||
DataDTO dataDTO = mongoDataService.get(item.getNation());
|
||||
info.setnation(dataDTO == null ? "" : dataDTO.getDataName());
|
||||
|
@ -148,4 +148,6 @@ public interface IFloatingPopulationService {
|
||||
void updateKafkaFloatingPopulation(Map<String, Object> params, String action);
|
||||
|
||||
SuccessResultData<Integer> countFloatingPopulation(Map<String, Object> params);
|
||||
|
||||
void kafkaFullPullSync(String tableNumber);
|
||||
}
|
@ -395,6 +395,51 @@ public class FloatingPopulationServiceImpl extends DefaultBaseService implements
|
||||
kafkaTemplate.send("tableSync", jObj.toJSONString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void kafkaFullPullSync(String tableNumber) {
|
||||
JSONObject jObj = new JSONObject();
|
||||
jObj.put("tableNumber", "C0009");
|
||||
jObj.put("action", "save");
|
||||
int forIndex = 0;
|
||||
List<FloatingPopulationDTO> fullList = floatingPopulationDao.listFloatingPopulation(null);
|
||||
FloatingPopulation info = new FloatingPopulation();
|
||||
for(FloatingPopulationDTO item : fullList){
|
||||
try{
|
||||
jObj.put("uid", item.getFloatingPopulationId());
|
||||
info.setid_card_number(IdCardVerifyUtil.sensitiveDataReplace(item.getIdCardNumber()));
|
||||
info.setfull_name(item.getFullName());
|
||||
info.setlocation_all(item.getAreaNames());
|
||||
info.setlocation_code(item.getAreaCode());
|
||||
if(item.getAreaCode() != null && !"".equals(item.getAreaCode())){
|
||||
AreaDTO areaDTO = areaService.getByCode(item.getAreaCode());
|
||||
info.setlocation_area(areaDTO == null ? "" : areaDTO.getAreaName());
|
||||
}
|
||||
info.setinflow_reason(item.getInflowReason() == null ? "" : item.getInflowReason());
|
||||
if(item.getInflowReason() != null && !"".equals(item.getInflowReason())){
|
||||
DataDTO dataDTO = dataService.get(item.getInflowReason());
|
||||
info.setinflow_reason(dataDTO == null ? "" : dataDTO.getDataName());
|
||||
}
|
||||
info.setregister_date(item.getRegisterDate() == null ? "" : item.getRegisterDate());
|
||||
info.setcard_expire_date(item.getCardExpireDate() == null ? "" : item.getCardExpireDate());
|
||||
if(item.getRegistrationType() != null && !"".equals(item.getRegistrationType())){
|
||||
DataDTO dataDTO = dataService.get(item.getRegistrationType());
|
||||
info.setresidence_type(dataDTO == null ? "" : dataDTO.getDataName());
|
||||
}
|
||||
info.setkey_of_follow("否");
|
||||
if("1".equals(item.getKeyOfFollow())){
|
||||
info.setkey_of_follow("是");
|
||||
}
|
||||
jObj.put("data", info.saveCheckToJson());
|
||||
kafkaTemplate.send("tableSync", jObj.toJSONString());
|
||||
if(forIndex != 0 && forIndex % 10 == 0){
|
||||
System.out.println("================= PULL_COUNT_" + forIndex + " =================");
|
||||
}
|
||||
} catch (Exception e){
|
||||
System.out.println(item);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public SuccessResultData<Integer> countFloatingPopulation(Map<String, Object> params) {
|
||||
Integer count = floatingPopulationDao.countFloatingPopulation(params);
|
||||
|
@ -164,4 +164,6 @@ public interface IOverseasPersonnelService {
|
||||
void sendKafkaOverseasPersonnel(Map<String, Object> params, String action);
|
||||
|
||||
void updateKafkaOverseasPersonnel(Map<String, Object> params, String action);
|
||||
|
||||
void kafkaFullPullSync(String tableNumber);
|
||||
}
|
@ -316,7 +316,10 @@ public class OverseasPersonnelServiceImpl extends DefaultBaseService implements
|
||||
info.setsex(dataDTO == null ? "" : dataDTO.getDataName());
|
||||
}
|
||||
info.setbirthday(params.get("birthday") == null ? "" : params.get("birthday").toString());
|
||||
info.setkey_of_follow(params.get("keyOfFollow") == null ? "否" : params.get("keyOfFollow").toString());
|
||||
info.setkey_of_follow("否");
|
||||
if("1".equals(params.get("keyOfFollow").toString())){
|
||||
info.setkey_of_follow("是");
|
||||
}
|
||||
jObj.put("data", info.saveCheckToJson());
|
||||
kafkaTemplate.send("tableSync", jObj.toJSONString());
|
||||
}
|
||||
@ -351,11 +354,60 @@ public class OverseasPersonnelServiceImpl extends DefaultBaseService implements
|
||||
info.setsex(dataDTO == null ? null : dataDTO.getDataName());
|
||||
}
|
||||
info.setbirthday(params.get("birthday") == null ? null : params.get("birthday").toString());
|
||||
info.setkey_of_follow(params.get("keyOfFollow") == null ? "否" : params.get("keyOfFollow").toString());
|
||||
info.setkey_of_follow("否");
|
||||
if("1".equals(params.get("keyOfFollow").toString())){
|
||||
info.setkey_of_follow("是");
|
||||
}
|
||||
jObj.put("data", info.updateToJson());
|
||||
kafkaTemplate.send("tableSync", jObj.toJSONString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void kafkaFullPullSync(String tableNumber) {
|
||||
JSONObject jObj = new JSONObject();
|
||||
jObj.put("tableNumber", "C0008");
|
||||
jObj.put("action", "save");
|
||||
List<OverseasPersonnelDTO> fullList = overseasPersonnelDao.listOverseasPersonnel(null);
|
||||
OverseasPersonnel info = new OverseasPersonnel();
|
||||
int forIndex = 0;
|
||||
for(OverseasPersonnelDTO item : fullList){
|
||||
try {
|
||||
jObj.put("uid", item.getOverseasPersonnelId());
|
||||
info.setlocation_all(item.getAreaNames());
|
||||
info.setlocation_code(item.getAreaCode());
|
||||
if(item.getAreaCode() != null && !"".equals(item.getAreaCode())){
|
||||
AreaDTO areaDTO = areaService.getByCode(item.getAreaCode());
|
||||
info.setlocation_area(areaDTO == null ? "" : areaDTO.getAreaName());
|
||||
}
|
||||
info.setfamily_name(item.getFamilyName() == null ? "" : item.getFamilyName());
|
||||
info.setgiven_name(item.getGivenName() == null ? "" : item.getGivenName());
|
||||
info.setchinese_name(item.getChineseName() == null ? "" : item.getChineseName());
|
||||
info.setnationality(item.getNationality() == null ? "" : item.getNationality());
|
||||
if(item.getNationality() != null && !"".equals(item.getNationality())){
|
||||
DataDTO dataDTO = dataService.get(item.getNationality());
|
||||
info.setnationality(dataDTO == null ? "" : dataDTO.getDataName());
|
||||
}
|
||||
if(item.getSex() != null && !"".equals(item.getSex())){
|
||||
DataDTO dataDTO = dataService.get(item.getSex());
|
||||
info.setsex(dataDTO == null ? "" : dataDTO.getDataName());
|
||||
}
|
||||
info.setbirthday(item.getBirthday() == null ? "" : item.getBirthday());
|
||||
info.setkey_of_follow("否");
|
||||
if("1".equals(item.getKeyOfFollow())){
|
||||
info.setkey_of_follow("是");
|
||||
}
|
||||
jObj.put("data", info.saveCheckToJson());
|
||||
kafkaTemplate.send("tableSync", jObj.toJSONString());
|
||||
if(forIndex != 0 && forIndex % 10 == 0){
|
||||
System.out.println("================= PULL_COUNT_" + forIndex + " =================");
|
||||
}
|
||||
} catch (Exception e){
|
||||
System.out.println(item);
|
||||
}
|
||||
forIndex++;
|
||||
}
|
||||
}
|
||||
|
||||
private void changeDTOFromDict(OverseasPersonnelDTO dto){
|
||||
// 性别
|
||||
if(dto.getSex() != null && !"".equals(dto.getSex())){
|
||||
|
Loading…
Reference in New Issue
Block a user