431 lines
21 KiB
Java
431 lines
21 KiB
Java
package cn.com.tenlion.usercenter.kafka;
|
|
|
|
import cn.com.tenlion.usercenter.config.ParamsConfigProperties;
|
|
import cn.com.tenlion.usercenter.consts.ISmartCityUserCenterConsts;
|
|
import cn.com.tenlion.usercenter.pojo.dtos.kafka.SyncDataDTO;
|
|
import com.alibaba.fastjson.JSON;
|
|
import ink.wgink.interfaces.department.IDepartmentDeleteAfterHandler;
|
|
import ink.wgink.interfaces.department.IDepartmentSaveAfterHandler;
|
|
import ink.wgink.interfaces.department.IDepartmentUpdateAfterHandler;
|
|
import ink.wgink.interfaces.map.*;
|
|
import ink.wgink.interfaces.role.IRoleUserDeleteAfterHandler;
|
|
import ink.wgink.interfaces.role.IRoleUserSaveAfterHandler;
|
|
import ink.wgink.interfaces.user.IUserUpdateAfterHandler;
|
|
import ink.wgink.module.map.pojo.dtos.grid.GridDTO;
|
|
import ink.wgink.module.map.pojo.dtos.grid.GridRelationDTO;
|
|
import ink.wgink.module.map.service.grid.IGridPointService;
|
|
import ink.wgink.module.map.service.grid.IGridRelationService;
|
|
import ink.wgink.module.map.service.grid.IGridService;
|
|
import ink.wgink.pojo.dtos.user.UserDTO;
|
|
import ink.wgink.pojo.pos.DepartmentPO;
|
|
import ink.wgink.service.department.service.IDepartmentService;
|
|
import ink.wgink.service.role.service.IRoleUserService;
|
|
import ink.wgink.service.user.service.IUserService;
|
|
import ink.wgink.util.ArrayListUtil;
|
|
import org.apache.commons.lang3.StringUtils;
|
|
import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.context.ApplicationEventPublisher;
|
|
import org.springframework.context.ApplicationEventPublisherAware;
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
import org.springframework.stereotype.Component;
|
|
|
|
import java.util.Arrays;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
|
|
/**
|
|
* @ClassName: KafKaPublishConsumer
|
|
* @Description: kafka表同步消费者
|
|
* @Author: wanggeng
|
|
* @Date: 2021/12/23 10:06 AM
|
|
* @Version: 1.0
|
|
*/
|
|
@Component
|
|
public class KafKaSyncTableConsumer implements ApplicationEventPublisherAware {
|
|
private static final Logger LOG = LoggerFactory.getLogger(KafKaSyncTableConsumer.class);
|
|
|
|
private ApplicationEventPublisher applicationEventPublisher;
|
|
@Autowired
|
|
private IGridService gridService;
|
|
@Autowired
|
|
private IGridPointService gridPointService;
|
|
@Autowired
|
|
private IGridRelationService gridRelationService;
|
|
@Autowired
|
|
private IUserService userService;
|
|
@Autowired
|
|
private IDepartmentService departmentService;
|
|
@Autowired
|
|
private IGridSaveAfterHandler gridSaveAfterHandler;
|
|
@Autowired
|
|
private IGridDeleteAfterHandler gridDeleteAfterHandler;
|
|
@Autowired
|
|
private IGridUpdateAfterHandler gridUpdateAfterHandler;
|
|
@Autowired
|
|
private IGridPointSaveAfterHandler gridPointSaveAfterHandler;
|
|
@Autowired
|
|
private IGridPointDeleteAfterHandler gridPointDeleteAfterHandler;
|
|
@Autowired
|
|
private IGridRelationSaveAfterHandler gridRelationSaveAfterHandler;
|
|
@Autowired
|
|
private IGridRelationDeleteAfterHandler gridRelationDeleteAfterHandler;
|
|
@Autowired
|
|
private IUserUpdateAfterHandler userUpdateAfterHandler;
|
|
@Autowired
|
|
private IDepartmentSaveAfterHandler departmentSaveAfterHandler;
|
|
@Autowired
|
|
private IDepartmentUpdateAfterHandler departmentUpdateAfterHandler;
|
|
@Autowired
|
|
private IDepartmentDeleteAfterHandler departmentDeleteAfterHandler;
|
|
@Autowired
|
|
private IRoleUserSaveAfterHandler roleUserSaveAfterHandler;
|
|
@Autowired
|
|
private IRoleUserDeleteAfterHandler roleUserDeleteAfterHandler;
|
|
@Autowired
|
|
private IRoleUserService roleUserService;
|
|
@Autowired
|
|
private ParamsConfigProperties paramsConfigProperties;
|
|
|
|
|
|
public KafKaSyncTableConsumer() {
|
|
}
|
|
|
|
/**
|
|
* 网格关系
|
|
*
|
|
* @param record
|
|
*/
|
|
@KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION, topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION)
|
|
public void gridRelation(ConsumerRecord<?, ?> record) {
|
|
String recordValue = record.value().toString();
|
|
LOG.debug("Grid Relation(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION + ") recordValue: {}", recordValue);
|
|
SyncDataDTO<Map<String, Object>> syncDataDTO = getSyncData(recordValue);
|
|
if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_SAVE)) {
|
|
if (StringUtils.isBlank(syncDataDTO.getUid())) {
|
|
LOG.error("Grid Relation(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION + ") save error, uid is empty");
|
|
return;
|
|
}
|
|
UserDTO userDTO = userService.get(syncDataDTO.getUid());
|
|
if (userDTO == null) {
|
|
LOG.error("Grid Relation(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION + ") save error, userDTO is null");
|
|
}
|
|
gridRelationSaveAfterHandler.handle(userDTO);
|
|
} else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_UPDATE)) {
|
|
if (StringUtils.isBlank(syncDataDTO.getUid())) {
|
|
LOG.error("Grid Relation(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION + ") update error, uid is empty");
|
|
return;
|
|
}
|
|
userUpdateAfterHandler.handle(syncDataDTO.getUid());
|
|
} else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_DELETE)) {
|
|
if (StringUtils.isBlank(syncDataDTO.getUid())) {
|
|
LOG.error("Grid Relation(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION + ") delete error, uid is empty");
|
|
return;
|
|
}
|
|
gridRelationDeleteAfterHandler.handle(Arrays.asList(syncDataDTO.getUid().split("_")));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 网格关系全量
|
|
*
|
|
* @param record
|
|
*/
|
|
@KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION + "FullSync", topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION + "FullSync")
|
|
public void gridRelationFullSync(ConsumerRecord<?, ?> record) {
|
|
Map<String, Object> params = new HashMap<>();
|
|
List<GridRelationDTO> gridRelationDTOs = gridRelationService.list(params);
|
|
if (gridRelationDTOs.isEmpty()) {
|
|
LOG.error("Grid Relation Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION + "FullSync) error, gridRelationDTOs is empty");
|
|
return;
|
|
}
|
|
List<String> relationIds = ArrayListUtil.listBeanStringIdValue(gridRelationDTOs, "relationId", GridRelationDTO.class);
|
|
gridRelationSaveAfterHandler.handle(relationIds);
|
|
}
|
|
|
|
/**
|
|
* 网格
|
|
*
|
|
* @param record
|
|
*/
|
|
@KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID, topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID)
|
|
public void mapGrid(ConsumerRecord<?, ?> record) {
|
|
String recordValue = record.value().toString();
|
|
LOG.debug("Grid(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + ") recordValue: {}", recordValue);
|
|
SyncDataDTO<Map<String, Object>> syncDataDTO = getSyncData(recordValue);
|
|
if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_SAVE)) {
|
|
String uid = syncDataDTO.getUid();
|
|
if (StringUtils.isBlank(uid)) {
|
|
LOG.error("Grid(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + ") save error, uid is empty");
|
|
return;
|
|
}
|
|
GridDTO gridDTO = gridService.get(uid);
|
|
if (gridDTO == null) {
|
|
LOG.error("Grid(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + ") save error, uid is null");
|
|
return;
|
|
}
|
|
gridSaveAfterHandler.handle(gridDTO.getGridId(), gridDTO.getGridCode(), gridDTO.getGridName(), gridDTO.getFillColor(), gridDTO.getAreaCode(), gridDTO.getAreaName());
|
|
} else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_UPDATE)) {
|
|
String uid = syncDataDTO.getUid();
|
|
if (StringUtils.isBlank(uid)) {
|
|
LOG.error("Grid(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + ") update error, uid is empty");
|
|
return;
|
|
}
|
|
GridDTO gridDTO = gridService.get(uid);
|
|
if (gridDTO == null) {
|
|
LOG.error("Grid(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + ") update error, grid is null");
|
|
return;
|
|
}
|
|
gridUpdateAfterHandler.handle(gridDTO.getGridId(), gridDTO.getGridName(), gridDTO.getFillColor());
|
|
} else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_DELETE)) {
|
|
String uid = syncDataDTO.getUid();
|
|
if (StringUtils.isBlank(uid)) {
|
|
LOG.error("Grid(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + ") delete error, uid is empty");
|
|
return;
|
|
}
|
|
gridDeleteAfterHandler.handler(Arrays.asList(uid.split("_")));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 网格全量
|
|
*
|
|
* @param record
|
|
*/
|
|
@KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + "FullSync", topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + "FullSync")
|
|
public void mapGridFullSync(ConsumerRecord<?, ?> record) {
|
|
Map<String, Object> params = new HashMap<>(0);
|
|
List<GridDTO> gridDTOs = gridService.list(params);
|
|
if (gridDTOs.isEmpty()) {
|
|
LOG.error("Grid Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + "FullSync) error, gridDTOs is empty");
|
|
return;
|
|
}
|
|
gridDTOs.forEach(gridDTO -> {
|
|
gridSaveAfterHandler.handle(gridDTO.getGridId(), gridDTO.getGridCode(), gridDTO.getGridName(), gridDTO.getFillColor(), gridDTO.getAreaCode(), gridDTO.getAreaName());
|
|
});
|
|
}
|
|
|
|
/**
|
|
* 网格点
|
|
*
|
|
* @param record
|
|
*/
|
|
@KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT, topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT)
|
|
public void mapGridPoint(ConsumerRecord<?, ?> record) {
|
|
String recordValue = record.value().toString();
|
|
LOG.debug("Grid Point(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT + ") recordValue: {}", recordValue);
|
|
SyncDataDTO<Map<String, Object>> syncDataDTO = getSyncData(recordValue);
|
|
if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_SAVE)) {
|
|
String gridId = String.valueOf(syncDataDTO.getData().get("grid_id"));
|
|
if (StringUtils.isBlank(gridId)) {
|
|
LOG.error("Grid Point(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT + ") delete error, gridId is empty");
|
|
return;
|
|
}
|
|
// 删除原有点
|
|
gridPointDeleteAfterHandler.handle(Arrays.asList(gridId));
|
|
// 保存新的点
|
|
gridPointSaveAfterHandler.handle(gridId);
|
|
} else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_DELETE)) {
|
|
String keyValue = syncDataDTO.getKeyValue();
|
|
if (keyValue.isEmpty()) {
|
|
LOG.error("Grid Point(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT + ") delete error, keyValue is empty");
|
|
return;
|
|
}
|
|
gridPointDeleteAfterHandler.handle(Arrays.asList(keyValue.split("_")));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 网格点全量
|
|
*
|
|
* @param record
|
|
*/
|
|
@KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT + "FullSync", topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT + "FullSync")
|
|
public void mapGridPointFullSync(ConsumerRecord<?, ?> record) {
|
|
Map<String, Object> params = new HashMap<>(0);
|
|
List<GridDTO> gridDTOs = gridService.list(params);
|
|
if (gridDTOs.isEmpty()) {
|
|
LOG.error("Grid Point Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT + "FullSync) error, gridDTOs is empty");
|
|
return;
|
|
}
|
|
gridDTOs.forEach(gridDTO -> {
|
|
gridPointSaveAfterHandler.handle(gridDTO.getGridId());
|
|
});
|
|
}
|
|
|
|
/**
|
|
* 专管机构
|
|
*
|
|
* @param record
|
|
*/
|
|
@KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT, topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT)
|
|
public void department(ConsumerRecord<?, ?> record) {
|
|
String recordValue = record.value().toString();
|
|
LOG.debug("Department(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + ") recordValue: {}", recordValue);
|
|
SyncDataDTO<Map<String, Object>> syncDataDTO = getSyncData(recordValue);
|
|
if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_SAVE)) {
|
|
String departmentId = String.valueOf(syncDataDTO.getUid());
|
|
if (StringUtils.isBlank(departmentId)) {
|
|
LOG.error("Department(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + ") save error, uid is empty");
|
|
return;
|
|
}
|
|
DepartmentPO departmentPO = departmentService.getPO(departmentId);
|
|
if (departmentPO == null) {
|
|
LOG.debug("Department(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + ") save error, department is null");
|
|
return;
|
|
}
|
|
departmentSaveAfterHandler.handle(departmentPO.getDepartmentId(),
|
|
departmentPO.getDepartmentParentId(),
|
|
departmentPO.getDepartmentName(),
|
|
departmentPO.getDepartmentCode(),
|
|
departmentPO.getDepartmentType(),
|
|
departmentPO.getDepartmentAreaCode(),
|
|
departmentPO.getDepartmentAreaName());
|
|
} else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_UPDATE)) {
|
|
String departmentId = String.valueOf(syncDataDTO.getUid());
|
|
if (StringUtils.isBlank(departmentId)) {
|
|
LOG.error("Department(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + ") update error, uid is empty");
|
|
return;
|
|
}
|
|
DepartmentPO departmentPO = departmentService.getPO(departmentId);
|
|
if (departmentPO == null) {
|
|
LOG.debug("Department(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + ") update error, department is null");
|
|
return;
|
|
}
|
|
departmentUpdateAfterHandler.handle(departmentPO.getDepartmentId(),
|
|
departmentPO.getDepartmentParentId(),
|
|
departmentPO.getDepartmentName(),
|
|
departmentPO.getDepartmentType(),
|
|
departmentPO.getDepartmentAreaCode(),
|
|
departmentPO.getDepartmentAreaName());
|
|
} else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_DELETE)) {
|
|
String uid = syncDataDTO.getUid();
|
|
if (uid.isEmpty()) {
|
|
LOG.error("Department(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + ") delete error, keyValue is empty");
|
|
return;
|
|
}
|
|
departmentDeleteAfterHandler.handle(Arrays.asList(uid.split("_")));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 专管机构
|
|
*
|
|
* @param record
|
|
*/
|
|
@KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + "FullSync", topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + "FullSync")
|
|
public void departmentFullSync(ConsumerRecord<?, ?> record) {
|
|
List<DepartmentPO> departmentPOs = departmentService.listPO(new HashMap<>(0));
|
|
if (departmentPOs.isEmpty()) {
|
|
LOG.error("Department Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + "FullSync) error, departmentPOs is empty");
|
|
return;
|
|
}
|
|
departmentPOs.forEach(departmentPO -> {
|
|
departmentSaveAfterHandler.handle(departmentPO.getDepartmentId(),
|
|
departmentPO.getDepartmentParentId(),
|
|
departmentPO.getDepartmentName(),
|
|
departmentPO.getDepartmentCode(),
|
|
departmentPO.getDepartmentType(),
|
|
departmentPO.getDepartmentAreaCode(),
|
|
departmentPO.getDepartmentAreaName());
|
|
});
|
|
}
|
|
|
|
/**
|
|
* 专管员
|
|
*
|
|
* @param record
|
|
*/
|
|
@KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_SPECIALIZED_PERSON, topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_SPECIALIZED_PERSON)
|
|
public void specializedPerson(ConsumerRecord<?, ?> record) {
|
|
String recordValue = record.value().toString();
|
|
LOG.debug("Specialized person(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_SPECIALIZED_PERSON + ") recordValue: {}", recordValue);
|
|
SyncDataDTO<Map<String, Object>> syncDataDTO = getSyncData(recordValue);
|
|
if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_SAVE)) {
|
|
List<String> userIds = roleUserService.listUserId(paramsConfigProperties.getRoleSpecializedPerson());
|
|
if (userIds.isEmpty()) {
|
|
LOG.error("Specialized Person Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_SPECIALIZED_PERSON + "FullSync) error, userIds is empty");
|
|
return;
|
|
}
|
|
roleUserSaveAfterHandler.handle(paramsConfigProperties.getRoleSpecializedPerson(), userIds);
|
|
} else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_DELETE)) {
|
|
String uid = syncDataDTO.getUid();
|
|
if (uid.isEmpty()) {
|
|
LOG.error("Specialized person(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_SPECIALIZED_PERSON + ") delete error, keyValue is empty");
|
|
return;
|
|
}
|
|
roleUserDeleteAfterHandler.handle(paramsConfigProperties.getRoleSpecializedPerson(), Arrays.asList(uid.split("\\_")));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 专管员全量
|
|
*
|
|
* @param record
|
|
*/
|
|
@KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_SPECIALIZED_PERSON + "FullSync", topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_SPECIALIZED_PERSON + "FullSync")
|
|
public void specializedPersonFullSync(ConsumerRecord<?, ?> record) {
|
|
List<String> userIds = roleUserService.listUserId(paramsConfigProperties.getRoleSpecializedPerson());
|
|
if (userIds.isEmpty()) {
|
|
LOG.error("Specialized Person Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_SPECIALIZED_PERSON + "FullSync) error, userIds is empty");
|
|
return;
|
|
}
|
|
roleUserSaveAfterHandler.handle(paramsConfigProperties.getRoleSpecializedPerson(), userIds);
|
|
}
|
|
|
|
|
|
/**
|
|
* 专管员
|
|
*
|
|
* @param record
|
|
*/
|
|
@KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER, topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER)
|
|
public void gridMember(ConsumerRecord<?, ?> record) {
|
|
String recordValue = record.value().toString();
|
|
LOG.debug("Specialized person(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + ") recordValue: {}", recordValue);
|
|
SyncDataDTO<Map<String, Object>> syncDataDTO = getSyncData(recordValue);
|
|
if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_SAVE)) {
|
|
List<String> userIds = roleUserService.listUserId(paramsConfigProperties.getRoleSpecializedPerson());
|
|
if (userIds.isEmpty()) {
|
|
LOG.error("Grid Member Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + ") error, userIds is empty");
|
|
return;
|
|
}
|
|
roleUserSaveAfterHandler.handle(paramsConfigProperties.getRoleSpecializedPerson(), userIds);
|
|
} else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_DELETE)) {
|
|
String uid = syncDataDTO.getUid();
|
|
if (uid.isEmpty()) {
|
|
LOG.error("Grid Member(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + ") delete error, keyValue is empty");
|
|
return;
|
|
}
|
|
roleUserDeleteAfterHandler.handle(paramsConfigProperties.getRoleSpecializedPerson(), Arrays.asList(uid.split("\\_")));
|
|
}
|
|
}
|
|
|
|
/**
|
|
* 专管员全量
|
|
*
|
|
* @param record
|
|
*/
|
|
@KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + "FullSync", topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + "FullSync")
|
|
public void gridMemberFullSync(ConsumerRecord<?, ?> record) {
|
|
List<String> userIds = roleUserService.listUserId(paramsConfigProperties.getRoleSpecializedPerson());
|
|
if (userIds.isEmpty()) {
|
|
LOG.error("Grid Member Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + "FullSync) error, userIds is empty");
|
|
return;
|
|
}
|
|
roleUserSaveAfterHandler.handle(paramsConfigProperties.getRoleGridMember(), userIds);
|
|
}
|
|
|
|
private SyncDataDTO<Map<String, Object>> getSyncData(String msg) {
|
|
return JSON.parseObject(msg, SyncDataDTO.class);
|
|
}
|
|
|
|
@Override
|
|
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
|
|
this.applicationEventPublisher = applicationEventPublisher;
|
|
}
|
|
}
|