监听数据同步方法

This commit is contained in:
wans 2022-01-26 16:27:01 +08:00
parent f8f5ea74a8
commit 9f22f9c095

View File

@ -0,0 +1,53 @@
package cn.com.tenlion.commonpopulation.kafkalistener;
import cn.com.tenlion.commonpopulation.service.basepopulationinfo.IBasePopulationInfoService;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* @ClassName: ApiPublishConsumer
* @Description: api发布消费者
* @Author: wanggeng
* @Date: 2021/8/22 9:31 下午
* @Version: 1.0
*/
@Component
public class KafKaPublishConsumer implements ApplicationEventPublisherAware {
private ApplicationEventPublisher applicationEventPublisher;
private MongoTemplate mongoTemplate;
@Autowired
private IBasePopulationInfoService baseInfoService;
public KafKaPublishConsumer(MongoTemplate mongoTemplate) {
this.mongoTemplate = mongoTemplate;
}
@KafkaListener(topics = {"C0007FullSync", "C0008FullSync", "C0009FullSync"})
public void dataT(ConsumerRecord<?, ?> record) {
JSONObject jsonObject = JSON.parseObject(record.value().toString());
String tableNumber = jsonObject.getString("tableNumber");
System.out.println("================= FULL_PULL_DATA_START =================");
switch(tableNumber){
case "C0007" :
baseInfoService.kafkaFullPullSync(tableNumber);
break;
default :
break;
}
System.out.println("================= FULL_PULL_DATA_FINISH =================");
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
}