完善kafka数据同步消息

This commit is contained in:
wanggeng 2021-12-23 22:40:05 +08:00
parent e6b1b64bd6
commit 898be955bd
7 changed files with 119 additions and 16 deletions

View File

@ -14,4 +14,11 @@ public interface ISmartCityUserCenterConsts {
*/ */
String APP_CLIENT_NAME = "city-governance"; String APP_CLIENT_NAME = "city-governance";
String KAFKA_TOPIC_GRID = "C0021";
String KAFKA_TOPIC_GRID_POINT = "C0022";
String KAFKA_DATA_SAVE = "save";
String KAFKA_DATA_UPDATE = "update";
String KAFKA_DATA_DELETE = "delete";
} }

View File

@ -7,7 +7,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -25,10 +24,8 @@ public class KafKaPublishConsumer implements ApplicationEventPublisherAware {
@Autowired @Autowired
private ITeamMemberGridService teamMemberGridService; private ITeamMemberGridService teamMemberGridService;
private MongoTemplate mongoTemplate;
public KafKaPublishConsumer(MongoTemplate mongoTemplate) { public KafKaPublishConsumer() {
this.mongoTemplate = mongoTemplate;
} }
@KafkaListener(topics = {"C0021"}) @KafkaListener(topics = {"C0021"})

View File

@ -1,9 +1,10 @@
package cn.com.tenlion.usercenter.service.grid.impl; package cn.com.tenlion.usercenter.service.grid.impl;
import cn.com.tenlion.usercenter.consts.ISmartCityUserCenterConsts;
import cn.com.tenlion.usercenter.pojo.dtos.kafka.SyncDataDTO; import cn.com.tenlion.usercenter.pojo.dtos.kafka.SyncDataDTO;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import ink.wgink.interfaces.map.IGridDeleteAfterHandler; import ink.wgink.interfaces.map.IGridDeleteAfterHandler;
import org.apache.commons.lang3.StringUtils; import ink.wgink.util.string.WStringUtil;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -33,13 +34,12 @@ public class GridDeleteAfterHandlerImpl implements IGridDeleteAfterHandler {
if (gridIds.isEmpty()) { if (gridIds.isEmpty()) {
return; return;
} }
String gridIdString = StringUtils.join(gridIds, "_");
SyncDataDTO<Map> syncDataDTO = new SyncDataDTO<>(); SyncDataDTO<Map> syncDataDTO = new SyncDataDTO<>();
syncDataDTO.setUid(gridIdString); syncDataDTO.setUid(WStringUtil.listToStr(gridIds, "_"));
syncDataDTO.setAction("delete"); syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_DELETE);
syncDataDTO.setData(new HashMap<>()); syncDataDTO.setData(new HashMap<>());
syncDataDTO.setTableNumber(""); syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TOPIC_GRID);
kafkaTemplate.send("C0021", JSONObject.toJSONString(syncDataDTO)); kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_GRID, JSONObject.toJSONString(syncDataDTO));
} }
} }

View File

@ -0,0 +1,43 @@
package cn.com.tenlion.usercenter.service.grid.impl;
import cn.com.tenlion.usercenter.consts.ISmartCityUserCenterConsts;
import cn.com.tenlion.usercenter.pojo.dtos.kafka.SyncDataDTO;
import ink.wgink.interfaces.map.IGridPointDeleteAfterHandler;
import ink.wgink.module.map.service.grid.IGridPointService;
import ink.wgink.util.string.WStringUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @ClassName: GridPointDeleteAfterHandlerImpl
* @Description: 网格点删除
* @Author: wanggeng
* @Date: 2021/12/23 10:15 PM
* @Version: 1.0
*/
@Service
public class GridPointDeleteAfterHandlerImpl implements IGridPointDeleteAfterHandler {
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private IGridPointService gridPointService;
@Override
public void handle(List<String> gridIds) {
if (gridIds.isEmpty()) {
return;
}
SyncDataDTO<Map> syncDataDTO = new SyncDataDTO<>();
syncDataDTO.setKeyId("grid_id");
syncDataDTO.setKeyValue(WStringUtil.listToStr(gridIds, "_"));
syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_DELETE);
syncDataDTO.setData(new HashMap());
syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TOPIC_GRID_POINT);
}
}

View File

@ -0,0 +1,54 @@
package cn.com.tenlion.usercenter.service.grid.impl;
import cn.com.tenlion.usercenter.consts.ISmartCityUserCenterConsts;
import cn.com.tenlion.usercenter.pojo.dtos.kafka.SyncDataDTO;
import com.alibaba.fastjson.JSONObject;
import ink.wgink.interfaces.map.IGridPointSaveAfterHandler;
import ink.wgink.module.map.pojo.dtos.grid.GridPointDTO;
import ink.wgink.module.map.service.grid.IGridPointService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @ClassName: GridPointSaveAfterHandlerImpl
* @Description: 网格点
* @Author: wanggeng
* @Date: 2021/12/23 10:03 PM
* @Version: 1.0
*/
@Service
public class GridPointSaveAfterHandlerImpl implements IGridPointSaveAfterHandler {
@Autowired
private KafkaTemplate kafkaTemplate;
@Autowired
private IGridPointService gridPointService;
@Override
public void handle(String gridId) {
if (StringUtils.isBlank(gridId)) {
return;
}
List<GridPointDTO> gridPointDTOs = gridPointService.list(gridId);
gridPointDTOs.forEach(gridPointDTO -> {
Map<String, Object> data = new HashMap<>(6);
data.put("lng", gridPointDTO.getLng());
data.put("lat", gridPointDTO.getLat());
SyncDataDTO<Map> syncDataDTO = new SyncDataDTO<>();
syncDataDTO.setKeyId("grid_id");
syncDataDTO.setKeyValue(gridId);
syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_SAVE);
syncDataDTO.setData(data);
syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TOPIC_GRID_POINT);
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_GRID_POINT, JSONObject.toJSONString(syncDataDTO));
});
}
}

View File

@ -1,5 +1,6 @@
package cn.com.tenlion.usercenter.service.grid.impl; package cn.com.tenlion.usercenter.service.grid.impl;
import cn.com.tenlion.usercenter.consts.ISmartCityUserCenterConsts;
import cn.com.tenlion.usercenter.pojo.dtos.kafka.SyncDataDTO; import cn.com.tenlion.usercenter.pojo.dtos.kafka.SyncDataDTO;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import ink.wgink.interfaces.map.IGridSaveAfterHandler; import ink.wgink.interfaces.map.IGridSaveAfterHandler;
@ -37,10 +38,10 @@ public class GridSaveAfterHandlerImpl implements IGridSaveAfterHandler {
gridMap.put("areaCode", areaCode); gridMap.put("areaCode", areaCode);
gridMap.put("areaName", areaName); gridMap.put("areaName", areaName);
syncDataDTO.setUid(gridId); syncDataDTO.setUid(gridId);
syncDataDTO.setAction("save"); syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_SAVE);
syncDataDTO.setData(gridMap); syncDataDTO.setData(gridMap);
syncDataDTO.setTableNumber(""); syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TOPIC_GRID);
kafkaTemplate.send("C0021", JSONObject.toJSONString(syncDataDTO)); kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_GRID, JSONObject.toJSONString(syncDataDTO));
} }
} }

View File

@ -1,5 +1,6 @@
package cn.com.tenlion.usercenter.service.grid.impl; package cn.com.tenlion.usercenter.service.grid.impl;
import cn.com.tenlion.usercenter.consts.ISmartCityUserCenterConsts;
import cn.com.tenlion.usercenter.pojo.dtos.kafka.SyncDataDTO; import cn.com.tenlion.usercenter.pojo.dtos.kafka.SyncDataDTO;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import ink.wgink.interfaces.map.IGridUpdateAfterHandler; import ink.wgink.interfaces.map.IGridUpdateAfterHandler;
@ -37,10 +38,10 @@ public class GridUpdateAfterHandlerImpl implements IGridUpdateAfterHandler {
gridMap.put("gridName", gridName); gridMap.put("gridName", gridName);
gridMap.put("fillColor", fillColor); gridMap.put("fillColor", fillColor);
syncDataDTO.setUid(gridId); syncDataDTO.setUid(gridId);
syncDataDTO.setAction("update"); syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_UPDATE);
syncDataDTO.setData(gridMap); syncDataDTO.setData(gridMap);
syncDataDTO.setTableNumber(""); syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TOPIC_GRID);
kafkaTemplate.send("C0021", JSONObject.toJSONString(syncDataDTO)); kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_GRID, JSONObject.toJSONString(syncDataDTO));
} }
} }