package cn.com.tenlion.usercenter.kafka; import cn.com.tenlion.usercenter.config.ParamsConfigProperties; import cn.com.tenlion.usercenter.consts.ISmartCityUserCenterConsts; import cn.com.tenlion.usercenter.pojo.dtos.kafka.SyncDataDTO; import cn.com.tenlion.usercenter.pojo.pos.area.user.GridUserPO; import cn.com.tenlion.usercenter.service.area.user.IAreaUserService; import cn.com.tenlion.usercenter.service.userexpand.IUserExpandService; import com.alibaba.fastjson.JSON; import ink.wgink.interfaces.department.IDepartmentDeleteAfterHandler; import ink.wgink.interfaces.department.IDepartmentSaveAfterHandler; import ink.wgink.interfaces.department.IDepartmentUpdateAfterHandler; import ink.wgink.interfaces.map.*; import ink.wgink.interfaces.role.IRoleUserDeleteAfterHandler; import ink.wgink.interfaces.role.IRoleUserSaveAfterHandler; import ink.wgink.interfaces.user.IUserUpdateAfterHandler; import ink.wgink.module.map.pojo.dtos.grid.GridDTO; import ink.wgink.module.map.pojo.dtos.grid.GridRelationDTO; import ink.wgink.module.map.pojo.dtos.grid.GridUserDTO; import ink.wgink.module.map.service.grid.IGridPointService; import ink.wgink.module.map.service.grid.IGridRelationService; import ink.wgink.module.map.service.grid.IGridService; import ink.wgink.module.map.service.grid.IGridUserService; import ink.wgink.pojo.dtos.user.UserDTO; import ink.wgink.pojo.pos.DepartmentPO; import ink.wgink.service.department.service.IDepartmentService; import ink.wgink.service.role.service.IRoleUserService; import ink.wgink.service.user.service.IUserService; import ink.wgink.util.ArrayListUtil; import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.checkerframework.checker.units.qual.A; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.ApplicationEventPublisherAware; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; /** * @ClassName: KafKaPublishConsumer * @Description: kafka表同步消费者 * @Author: wanggeng * @Date: 2021/12/23 10:06 AM * @Version: 1.0 */ @Component public class KafKaSyncTableConsumer implements ApplicationEventPublisherAware { private static final Logger LOG = LoggerFactory.getLogger(KafKaSyncTableConsumer.class); private ApplicationEventPublisher applicationEventPublisher; @Autowired private IGridService gridService; @Autowired private IGridPointService gridPointService; @Autowired private IGridRelationService gridRelationService; @Autowired private IUserService userService; @Autowired private IDepartmentService departmentService; @Autowired private IGridSaveAfterHandler gridSaveAfterHandler; @Autowired private IGridDeleteAfterHandler gridDeleteAfterHandler; @Autowired private IGridUpdateAfterHandler gridUpdateAfterHandler; @Autowired private IGridPointSaveAfterHandler gridPointSaveAfterHandler; @Autowired private IGridPointDeleteAfterHandler gridPointDeleteAfterHandler; @Autowired private IGridRelationSaveAfterHandler gridRelationSaveAfterHandler; @Autowired private IGridRelationDeleteAfterHandler gridRelationDeleteAfterHandler; @Autowired private IUserUpdateAfterHandler userUpdateAfterHandler; @Autowired private IDepartmentSaveAfterHandler departmentSaveAfterHandler; @Autowired private IDepartmentUpdateAfterHandler departmentUpdateAfterHandler; @Autowired private IDepartmentDeleteAfterHandler departmentDeleteAfterHandler; @Autowired private IRoleUserSaveAfterHandler roleUserSaveAfterHandler; @Autowired private IRoleUserDeleteAfterHandler roleUserDeleteAfterHandler; @Autowired private IRoleUserService roleUserService; @Autowired private ParamsConfigProperties paramsConfigProperties; @Autowired private IUserExpandService userExpandService; @Autowired private IAreaUserService areaUserService; @Autowired private IGridUserService gridUserService; public KafKaSyncTableConsumer() { } /** * 网格关系 * * @param record */ @KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION, topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION) public void gridRelation(ConsumerRecord record) { String recordValue = record.value().toString(); LOG.debug("Grid Relation(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION + ") recordValue: {}", recordValue); SyncDataDTO> syncDataDTO = getSyncData(recordValue); if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_SAVE)) { if (StringUtils.isBlank(syncDataDTO.getUid())) { LOG.error("Grid Relation(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION + ") save error, uid is empty"); return; } UserDTO userDTO = userService.get(syncDataDTO.getUid()); if (userDTO == null) { LOG.error("Grid Relation(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION + ") save error, userDTO is null"); } gridRelationSaveAfterHandler.handle(userDTO); } else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_UPDATE)) { if (StringUtils.isBlank(syncDataDTO.getUid())) { LOG.error("Grid Relation(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION + ") update error, uid is empty"); return; } userUpdateAfterHandler.handle(syncDataDTO.getUid()); } else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_DELETE)) { if (StringUtils.isBlank(syncDataDTO.getUid())) { LOG.error("Grid Relation(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION + ") delete error, uid is empty"); return; } gridRelationDeleteAfterHandler.handle(Arrays.asList(syncDataDTO.getUid().split("_"))); } } /** * 网格关系全量 * * @param record */ @KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION + "FullSync", topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION + "FullSync") public void gridRelationFullSync(ConsumerRecord record) { Map params = new HashMap<>(); List userDTOS = gridUserService.listUser(params); if (userDTOS.isEmpty()) { LOG.error("Grid Relation Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_RELATION + "FullSync) error, userDTOS is empty"); return; } userDTOS.forEach(userDTO -> { gridRelationSaveAfterHandler.handle(userDTO); }); } /** * 网格 * * @param record */ @KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID, topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID) public void mapGrid(ConsumerRecord record) { String recordValue = record.value().toString(); LOG.debug("Grid(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + ") recordValue: {}", recordValue); SyncDataDTO> syncDataDTO = getSyncData(recordValue); if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_SAVE)) { String uid = syncDataDTO.getUid(); if (StringUtils.isBlank(uid)) { LOG.error("Grid(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + ") save error, uid is empty"); return; } GridDTO gridDTO = gridService.get(uid); if (gridDTO == null) { LOG.error("Grid(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + ") save error, uid is null"); return; } gridSaveAfterHandler.handle(gridDTO.getGridId(), gridDTO.getGridCode(), gridDTO.getGridName(), gridDTO.getFillColor(), gridDTO.getAreaCode(), gridDTO.getAreaName()); } else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_UPDATE)) { String uid = syncDataDTO.getUid(); if (StringUtils.isBlank(uid)) { LOG.error("Grid(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + ") update error, uid is empty"); return; } GridDTO gridDTO = gridService.get(uid); if (gridDTO == null) { LOG.error("Grid(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + ") update error, grid is null"); return; } gridUpdateAfterHandler.handle(gridDTO.getGridId(), gridDTO.getGridName(), gridDTO.getFillColor()); } else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_DELETE)) { String uid = syncDataDTO.getUid(); if (StringUtils.isBlank(uid)) { LOG.error("Grid(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + ") delete error, uid is empty"); return; } gridDeleteAfterHandler.handler(Arrays.asList(uid.split("_"))); } } /** * 网格全量 * * @param record */ @KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + "FullSync", topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + "FullSync") public void mapGridFullSync(ConsumerRecord record) { Map params = new HashMap<>(0); List gridDTOs = gridService.list(params); if (gridDTOs.isEmpty()) { LOG.error("Grid Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID + "FullSync) error, gridDTOs is empty"); return; } gridDTOs.forEach(gridDTO -> { gridSaveAfterHandler.handle(gridDTO.getGridId(), gridDTO.getGridCode(), gridDTO.getGridName(), gridDTO.getFillColor(), gridDTO.getAreaCode(), gridDTO.getAreaName()); }); } /** * 网格点 * * @param record */ @KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT, topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT) public void mapGridPoint(ConsumerRecord record) { String recordValue = record.value().toString(); LOG.debug("Grid Point(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT + ") recordValue: {}", recordValue); SyncDataDTO> syncDataDTO = getSyncData(recordValue); if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_SAVE)) { String gridId = String.valueOf(syncDataDTO.getData().get("grid_id")); if (StringUtils.isBlank(gridId)) { LOG.error("Grid Point(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT + ") delete error, gridId is empty"); return; } // 删除原有点 gridPointDeleteAfterHandler.handle(Arrays.asList(gridId)); // 保存新的点 gridPointSaveAfterHandler.handle(gridId); } else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_DELETE)) { String keyValue = syncDataDTO.getKeyValue(); if (keyValue.isEmpty()) { LOG.error("Grid Point(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT + ") delete error, keyValue is empty"); return; } gridPointDeleteAfterHandler.handle(Arrays.asList(keyValue.split("_"))); } } /** * 网格点全量 * * @param record */ @KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT + "FullSync", topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT + "FullSync") public void mapGridPointFullSync(ConsumerRecord record) { Map params = new HashMap<>(0); List gridDTOs = gridService.list(params); if (gridDTOs.isEmpty()) { LOG.error("Grid Point Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT + "FullSync) error, gridDTOs is empty"); return; } for(GridDTO gridDTO : gridDTOs) { gridPointSaveAfterHandler.handle(gridDTO.getGridId()); } } /** * 专管机构 * * @param record */ @KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT, topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT) public void department(ConsumerRecord record) { String recordValue = record.value().toString(); LOG.debug("Department(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + ") recordValue: {}", recordValue); SyncDataDTO> syncDataDTO = getSyncData(recordValue); if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_SAVE)) { String departmentId = String.valueOf(syncDataDTO.getUid()); if (StringUtils.isBlank(departmentId)) { LOG.error("Department(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + ") save error, uid is empty"); return; } DepartmentPO departmentPO = departmentService.getPO(departmentId); if (departmentPO == null) { LOG.debug("Department(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + ") save error, department is null"); return; } departmentSaveAfterHandler.handle(departmentPO.getDepartmentId(), departmentPO.getDepartmentParentId(), departmentPO.getDepartmentName(), departmentPO.getDepartmentCode(), departmentPO.getDepartmentType(), departmentPO.getDepartmentAreaCode(), departmentPO.getDepartmentAreaName()); } else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_UPDATE)) { String departmentId = String.valueOf(syncDataDTO.getUid()); if (StringUtils.isBlank(departmentId)) { LOG.error("Department(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + ") update error, uid is empty"); return; } DepartmentPO departmentPO = departmentService.getPO(departmentId); if (departmentPO == null) { LOG.debug("Department(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + ") update error, department is null"); return; } departmentUpdateAfterHandler.handle(departmentPO.getDepartmentId(), departmentPO.getDepartmentParentId(), departmentPO.getDepartmentName(), departmentPO.getDepartmentType(), departmentPO.getDepartmentAreaCode(), departmentPO.getDepartmentAreaName()); } else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_DELETE)) { String uid = syncDataDTO.getUid(); if (uid.isEmpty()) { LOG.error("Department(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + ") delete error, keyValue is empty"); return; } departmentDeleteAfterHandler.handle(Arrays.asList(uid.split("_"))); } } /** * 专管机构 * * @param record */ @KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + "FullSync", topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + "FullSync") public void departmentFullSync(ConsumerRecord record) { List departmentPOs = departmentService.listPO(new HashMap<>(0)); if (departmentPOs.isEmpty()) { LOG.error("Department Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_DEPARTMENT + "FullSync) error, departmentPOs is empty"); return; } departmentPOs.forEach(departmentPO -> { departmentSaveAfterHandler.handle(departmentPO.getDepartmentId(), departmentPO.getDepartmentParentId(), departmentPO.getDepartmentName(), departmentPO.getDepartmentCode(), departmentPO.getDepartmentType(), departmentPO.getDepartmentAreaCode(), departmentPO.getDepartmentAreaName()); }); } /** * 专管员 * * @param record */ @KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_SPECIALIZED_PERSON, topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_SPECIALIZED_PERSON) public void specializedPerson(ConsumerRecord record) { String recordValue = record.value().toString(); LOG.debug("Specialized person(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_SPECIALIZED_PERSON + ") recordValue: {}", recordValue); SyncDataDTO> syncDataDTO = getSyncData(recordValue); if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_SAVE)) { List userIds = roleUserService.listUserId(paramsConfigProperties.getRoleSpecializedPerson()); if (userIds.isEmpty()) { LOG.error("Specialized Person Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_SPECIALIZED_PERSON + "FullSync) error, userIds is empty"); return; } roleUserSaveAfterHandler.handle(paramsConfigProperties.getRoleSpecializedPerson(), userIds); } else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_DELETE)) { String uid = syncDataDTO.getUid(); if (uid.isEmpty()) { LOG.error("Specialized person(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_SPECIALIZED_PERSON + ") delete error, keyValue is empty"); return; } roleUserDeleteAfterHandler.handle(paramsConfigProperties.getRoleSpecializedPerson(), Arrays.asList(uid.split("\\_"))); } } /** * 专管员全量 * * @param record */ @KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_SPECIALIZED_PERSON + "FullSync", topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_SPECIALIZED_PERSON + "FullSync") public void specializedPersonFullSync(ConsumerRecord record) { List userDTOS = roleUserService.listUserByRoleId(paramsConfigProperties.getRoleSpecializedPerson()); if (userDTOS.isEmpty()) { LOG.error("Specialized Person Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_SPECIALIZED_PERSON + "FullSync) error, userDTOS is empty"); return; } userDTOS.forEach(userDTO -> { roleUserSaveAfterHandler.handle(paramsConfigProperties.getRoleSpecializedPerson(), userDTO); }); } /** * 网格员 * * @param record */ @KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER, topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER) public void gridMember(ConsumerRecord record) { String recordValue = record.value().toString(); LOG.debug("Specialized person(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + ") recordValue: {}", recordValue); SyncDataDTO> syncDataDTO = getSyncData(recordValue); if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_SAVE)) { List userIds = roleUserService.listUserId(paramsConfigProperties.getRoleGridMember()); if (userIds.isEmpty()) { LOG.error("Grid Member Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + ") error, userIds is empty"); return; } roleUserSaveAfterHandler.handle(paramsConfigProperties.getRoleGridMember(), userIds); } else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_DELETE)) { String uid = syncDataDTO.getUid(); if (uid.isEmpty()) { LOG.error("Grid Member(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + ") delete error, keyValue is empty"); return; } roleUserDeleteAfterHandler.handle(paramsConfigProperties.getRoleGridMember(), Arrays.asList(uid.split("\\_"))); } } /** * 网格员全量 * * @param record */ @KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + "FullSync", topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + "FullSync") public void gridMemberFullSync(ConsumerRecord record) { List userDTOS = roleUserService.listUserByRoleId(paramsConfigProperties.getRoleGridMember()); if (userDTOS.isEmpty()) { LOG.error("Grid Member Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + "FullSync) error, userDTOS is empty"); return; } userDTOS.forEach(userDTO -> { roleUserSaveAfterHandler.handle(paramsConfigProperties.getRoleGridMember(), userDTO); }); } private SyncDataDTO> getSyncData(String msg) { return JSON.parseObject(msg, SyncDataDTO.class); } @Override public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { this.applicationEventPublisher = applicationEventPublisher; } }