From 9f22f9c095e00ef8fb59fc0c4b192a920f58ab7a Mon Sep 17 00:00:00 2001 From: wans <747101512@qq.com> Date: Wed, 26 Jan 2022 16:27:01 +0800 Subject: [PATCH] =?UTF-8?q?=E7=9B=91=E5=90=AC=E6=95=B0=E6=8D=AE=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../kafkalistener/KafKaPublishConsumer.java | 53 +++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 src/main/java/cn/com/tenlion/commonpopulation/kafkalistener/KafKaPublishConsumer.java diff --git a/src/main/java/cn/com/tenlion/commonpopulation/kafkalistener/KafKaPublishConsumer.java b/src/main/java/cn/com/tenlion/commonpopulation/kafkalistener/KafKaPublishConsumer.java new file mode 100644 index 0000000..2cbf17d --- /dev/null +++ b/src/main/java/cn/com/tenlion/commonpopulation/kafkalistener/KafKaPublishConsumer.java @@ -0,0 +1,53 @@ +package cn.com.tenlion.commonpopulation.kafkalistener; + +import cn.com.tenlion.commonpopulation.service.basepopulationinfo.IBasePopulationInfoService; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +/** + * @ClassName: ApiPublishConsumer + * @Description: api发布消费者 + * @Author: wanggeng + * @Date: 2021/8/22 9:31 下午 + * @Version: 1.0 + */ +@Component +public class KafKaPublishConsumer implements ApplicationEventPublisherAware { + + private ApplicationEventPublisher applicationEventPublisher; + private MongoTemplate mongoTemplate; + @Autowired + private IBasePopulationInfoService baseInfoService; + + public KafKaPublishConsumer(MongoTemplate mongoTemplate) { + this.mongoTemplate = mongoTemplate; + } + + @KafkaListener(topics = {"C0007FullSync", "C0008FullSync", "C0009FullSync"}) + public void dataT(ConsumerRecord record) { + JSONObject jsonObject = JSON.parseObject(record.value().toString()); + String tableNumber = jsonObject.getString("tableNumber"); + System.out.println("================= FULL_PULL_DATA_START ================="); + switch(tableNumber){ + case "C0007" : + baseInfoService.kafkaFullPullSync(tableNumber); + break; + default : + break; + } + System.out.println("================= FULL_PULL_DATA_FINISH ================="); + } + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.applicationEventPublisher = applicationEventPublisher; + } + +}