提交kafka错误消息回调

This commit is contained in:
wans 2022-01-27 16:53:06 +08:00
parent 74b3bbdfab
commit b3c3c492c6
7 changed files with 142 additions and 1 deletions

View File

@ -0,0 +1,67 @@
package cn.com.tenlion.commonpopulation.kafkalistener;
import cn.com.tenlion.commonpopulation.service.basepopulationinfo.IBasePopulationInfoService;
import cn.com.tenlion.commonpopulation.service.floatingpopulation.IFloatingPopulationService;
import cn.com.tenlion.commonpopulation.service.overseaspersonnel.IOverseasPersonnelService;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @ClassName: ApiPublishConsumer
* @Description: api发布消费者
* @Author: wanggeng
* @Date: 2021/8/22 9:31 下午
* @Version: 1.0
*/
@Component
public class KafKaErrorConsumer implements ApplicationEventPublisherAware {
private ApplicationEventPublisher applicationEventPublisher;
private MongoTemplate mongoTemplate;
@Autowired
private IBasePopulationInfoService baseInfoService;
@Autowired
private IOverseasPersonnelService overseasPersonnelService;
@Autowired
private IFloatingPopulationService floatingPopulationService;
public KafKaErrorConsumer(MongoTemplate mongoTemplate) {
this.mongoTemplate = mongoTemplate;
}
@KafkaListener(topics = {"C0007", "C0008", "C0009"})
public void dataT(ConsumerRecord<?, ?> record) {
JSONObject jsonObject = JSON.parseObject(record.value().toString());
String tableNumber = jsonObject.getString("tableNumber");
String uid = jsonObject.getString("uid");
String action = jsonObject.getString("action");
System.out.println("================= ERROR_PULL_DATA_START =================");
switch(tableNumber){
case "C0007" :
baseInfoService.kafkaErrorPullSync(tableNumber, uid, action);
break;
case "C0008" :
overseasPersonnelService.kafkaErrorPullSync(tableNumber, uid, action);
break;
case "C0009" :
floatingPopulationService.kafkaErrorPullSync(tableNumber, uid, action);
break;
default :
break;
}
System.out.println("================= ERROR_PULL_DATA_FINISH =================");
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
}

View File

@ -184,4 +184,6 @@ public interface IBasePopulationInfoService {
List<Map<String, Object>> getDefaultPageNation(Map<String, Object> params); List<Map<String, Object>> getDefaultPageNation(Map<String, Object> params);
void kafkaFullPullSync(String tableNumber); void kafkaFullPullSync(String tableNumber);
void kafkaErrorPullSync(String tableNumber, String uid, String action);
} }

View File

@ -595,7 +595,7 @@ public class BasePopulationInfoServiceImpl extends DefaultBaseService implements
PopulationInfo info = new PopulationInfo(); PopulationInfo info = new PopulationInfo();
for(BasePopulationInfoDTO item : fullList){ for(BasePopulationInfoDTO item : fullList){
try { try {
jObj.put("uid", item.getBasePopulationInfoId()); jObj.put("uid", item.getIdCardNumber());
info.setfull_name(item.getFullName()); info.setfull_name(item.getFullName());
info.setname_used_before(item.getNameUsedBefore()); info.setname_used_before(item.getNameUsedBefore());
String gender = IdCardVerifyUtil.getIdCardGender(item.getIdCardNumber()); String gender = IdCardVerifyUtil.getIdCardGender(item.getIdCardNumber());
@ -661,4 +661,26 @@ public class BasePopulationInfoServiceImpl extends DefaultBaseService implements
} }
} }
@Override
public void kafkaErrorPullSync(String tableNumber, String uid, String action) {
Map<String, Object> params = new HashMap<>(8);
params.put("idCardNumber", uid);
BasePopulationInfoDTO oData = basePopulationInfoDao.getBasePopulationInfo(params);
if(oData == null){
return;
}
Map<String, Object> dataMap = HashMapUtil.beanToMap(oData);
switch (action){
case "save" :
sendKafkaBasePopulationInfo(dataMap,"save");
break;
case "update" :
sendKafkaBasePopulationInfo(dataMap,"save");
break;
case "delete" :
updateKafkaBasePopulationInfo(dataMap, "delete");
default:
break;
}
}
} }

View File

@ -150,4 +150,6 @@ public interface IFloatingPopulationService {
SuccessResultData<Integer> countFloatingPopulation(Map<String, Object> params); SuccessResultData<Integer> countFloatingPopulation(Map<String, Object> params);
void kafkaFullPullSync(String tableNumber); void kafkaFullPullSync(String tableNumber);
void kafkaErrorPullSync(String tableNumber, String uid, String action);
} }

View File

@ -440,6 +440,29 @@ public class FloatingPopulationServiceImpl extends DefaultBaseService implements
} }
} }
@Override
public void kafkaErrorPullSync(String tableNumber, String uid, String action) {
Map<String, Object> params = new HashMap<>(8);
params.put("floatingPopulationId", uid);
FloatingPopulationDTO oData = floatingPopulationDao.getFloatingPopulation(params);
if(oData == null){
return;
}
Map<String, Object> dataMap = HashMapUtil.beanToMap(oData);
switch (action){
case "save" :
sendKafkaFloatingPopulation(dataMap,"save");
break;
case "update" :
sendKafkaFloatingPopulation(dataMap,"save");
break;
case "delete" :
updateKafkaFloatingPopulation(dataMap, "delete");
default:
break;
}
}
@Override @Override
public SuccessResultData<Integer> countFloatingPopulation(Map<String, Object> params) { public SuccessResultData<Integer> countFloatingPopulation(Map<String, Object> params) {
Integer count = floatingPopulationDao.countFloatingPopulation(params); Integer count = floatingPopulationDao.countFloatingPopulation(params);

View File

@ -166,4 +166,6 @@ public interface IOverseasPersonnelService {
void updateKafkaOverseasPersonnel(Map<String, Object> params, String action); void updateKafkaOverseasPersonnel(Map<String, Object> params, String action);
void kafkaFullPullSync(String tableNumber); void kafkaFullPullSync(String tableNumber);
void kafkaErrorPullSync(String tableNumber, String uid, String action);
} }

View File

@ -408,6 +408,29 @@ public class OverseasPersonnelServiceImpl extends DefaultBaseService implements
} }
} }
@Override
public void kafkaErrorPullSync(String tableNumber, String uid, String action) {
Map<String, Object> params = new HashMap<>(8);
params.put("overseasPersonnelId", uid);
OverseasPersonnelDTO oData = overseasPersonnelDao.getOverseasPersonnel(params);
if(oData == null){
return;
}
Map<String, Object> dataMap = HashMapUtil.beanToMap(oData);
switch (action){
case "save" :
sendKafkaOverseasPersonnel(dataMap,"save");
break;
case "update" :
sendKafkaOverseasPersonnel(dataMap,"save");
break;
case "delete" :
updateKafkaOverseasPersonnel(dataMap, "delete");
default:
break;
}
}
private void changeDTOFromDict(OverseasPersonnelDTO dto){ private void changeDTOFromDict(OverseasPersonnelDTO dto){
// 性别 // 性别
if(dto.getSex() != null && !"".equals(dto.getSex())){ if(dto.getSex() != null && !"".equals(dto.getSex())){