增加kafka全量同步功能

This commit is contained in:
wanggeng 2022-02-09 10:07:56 +08:00
parent dfd36f3c39
commit 16c4f78c0d
5 changed files with 33 additions and 18 deletions

View File

@ -31,7 +31,7 @@ public interface ISmartCityUserCenterConsts {
/**
* 网格员角色
*/
String KAFKA_TABLE_SYNC_GRID_MEMBER = "C0032";
String KAFKA_TABLE_SYNC_GRID_MEMBER = "C0036";
String KAFKA_DATA_SAVE = "save";
String KAFKA_DATA_UPDATE = "update";

View File

@ -378,7 +378,7 @@ public class KafKaSyncTableConsumer implements ApplicationEventPublisherAware {
/**
* 专管
* 网格
*
* @param record
*/
@ -388,30 +388,30 @@ public class KafKaSyncTableConsumer implements ApplicationEventPublisherAware {
LOG.debug("Specialized person(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + ") recordValue: {}", recordValue);
SyncDataDTO<Map<String, Object>> syncDataDTO = getSyncData(recordValue);
if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_SAVE)) {
List<String> userIds = roleUserService.listUserId(paramsConfigProperties.getRoleSpecializedPerson());
List<String> userIds = roleUserService.listUserId(paramsConfigProperties.getRoleGridMember());
if (userIds.isEmpty()) {
LOG.error("Grid Member Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + ") error, userIds is empty");
return;
}
roleUserSaveAfterHandler.handle(paramsConfigProperties.getRoleSpecializedPerson(), userIds);
roleUserSaveAfterHandler.handle(paramsConfigProperties.getRoleGridMember(), userIds);
} else if (StringUtils.equals(syncDataDTO.getAction(), ISmartCityUserCenterConsts.KAFKA_DATA_DELETE)) {
String uid = syncDataDTO.getUid();
if (uid.isEmpty()) {
LOG.error("Grid Member(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + ") delete error, keyValue is empty");
return;
}
roleUserDeleteAfterHandler.handle(paramsConfigProperties.getRoleSpecializedPerson(), Arrays.asList(uid.split("\\_")));
roleUserDeleteAfterHandler.handle(paramsConfigProperties.getRoleGridMember(), Arrays.asList(uid.split("\\_")));
}
}
/**
* 专管员全量
* 网格员全量
*
* @param record
*/
@KafkaListener(id = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + "FullSync", topics = ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + "FullSync")
public void gridMemberFullSync(ConsumerRecord<?, ?> record) {
List<String> userIds = roleUserService.listUserId(paramsConfigProperties.getRoleSpecializedPerson());
List<String> userIds = roleUserService.listUserId(paramsConfigProperties.getRoleGridMember());
if (userIds.isEmpty()) {
LOG.error("Grid Member Full Sync(" + ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER + "FullSync) error, userIds is empty");
return;

View File

@ -9,7 +9,6 @@ import com.alibaba.fastjson.JSONObject;
import ink.wgink.exceptions.base.SystemException;
import ink.wgink.interfaces.role.IRoleUserSaveAfterHandler;
import ink.wgink.service.role.service.IRoleUserService;
import ink.wgink.service.user.service.IUserService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
@ -34,8 +33,6 @@ public class RoleUserSaveAfterHandlerImpl implements IRoleUserSaveAfterHandler {
@Autowired
private IRoleUserService roleUserService;
@Autowired
private IUserService userService;
@Autowired
private IUserExpandService userExpandService;
@Autowired
private ParamsConfigProperties paramsConfigProperties;

View File

@ -68,7 +68,7 @@ spring:
max-idle: 8
min-idle: 0
kafka:
bootstrap-servers: 127.0.0.1:9092
bootstrap-servers: 10.1.5.97:9092
producer:
# 写入失败时重试次数。当leader节点失效一个repli节点会替代成为leader节点此时可能出现写入失败
# 当retris为0时produce不会重复。retirs重发此时repli节点完全成为leader节点不会产生消息丢失。

View File

@ -87,6 +87,7 @@ public class BaseGridMemberTest {
private String SQL_GET_AREA_BY_AREA_CODE = "SELECT area_id, area_parent_id, area_name, area_code FROM data_area WHERE area_code = ?";
private String SQL_GET_AREA_BY_AREA_ID = "SELECT area_id, area_parent_id, area_name, area_code FROM data_area WHERE area_id = ?";
private static Integer TOTAL_COUNT = 0;
private static Integer SAVE_USER_COUNT = 0;
private static Integer SAVE_USER_EXPAND_COUNT = 0;
private static Integer SAVE_USER_ROLE_COUNT = 0;
@ -162,6 +163,7 @@ public class BaseGridMemberTest {
public void doSave(List<ExcelGridMember> excelGridMembers) {
try {
for (ExcelGridMember excelGridMember : excelGridMembers) {
TOTAL_COUNT++;
// 查询用户
getUserPS.setString(1, excelGridMember.getUserPhone());
Map<String, Object> userMap = JdbcUtil.getResult(getUserPS.executeQuery());
@ -179,6 +181,12 @@ public class BaseGridMemberTest {
}
connection.commit();
} catch (Exception e) {
try {
connection.rollback();
} catch (SQLException ex) {
ex.printStackTrace();
}
System.err.println(e.getMessage());
e.printStackTrace();
}
}
@ -187,6 +195,7 @@ public class BaseGridMemberTest {
long endTime = System.currentTimeMillis();
System.out.println("耗时:" + (endTime - startTime) + "ms");
System.out.println("****************************************");
System.out.println("总量: " + TOTAL_COUNT);
System.out.println("导入用户: " + SAVE_USER_COUNT);
System.out.println("导入用户拓展: " + SAVE_USER_EXPAND_COUNT);
System.out.println("导入用户角色: " + SAVE_USER_ROLE_COUNT);
@ -284,13 +293,13 @@ public class BaseGridMemberTest {
*/
private String getFullArea(String fullName, String areaParentId, PreparedStatement getAreaByParentIdPS) throws SQLException {
getAreaByParentIdPS.setString(1, areaParentId);
System.out.println(getAreaByParentIdPS);
// System.out.println(getAreaByParentIdPS);
Map<String, Object> areaMap = JdbcUtil.getResult(getAreaByParentIdPS.executeQuery());
if (areaMap == null) {
System.out.println(fullName);
return fullName;
}
fullName = areaMap.get("area_name").toString() + " / " + fullName;
System.out.println(fullName);
return getFullArea(fullName, areaMap.get("area_parent_id").toString(), getAreaByParentIdPS);
}
@ -304,11 +313,20 @@ public class BaseGridMemberTest {
if (StringUtils.isBlank(birth)) {
return "";
}
System.out.println(birth);
String[] birthArray = birth.split("\\.");
if (birthArray.length <= 1) {
return "";
}
return String.format("%04d-%02d", Integer.parseInt(birthArray[0]), Integer.parseInt(birthArray[1]));
int day;
if (StringUtils.equals(birthArray[1], "1")) {
day = 10;
} else {
day = Integer.parseInt(birthArray[1]);
}
String birthMonth = String.format("%04d-%02d", Integer.parseInt(birthArray[0]), day);
System.out.println(birthMonth);
return birthMonth;
}
/**
@ -433,10 +451,10 @@ public class BaseGridMemberTest {
@Override
public void invoke(ExcelGridMember excelGridMember, AnalysisContext analysisContext) {
if (excelGridMembers.size() > 100) {
doSave(excelGridMembers);
excelGridMembers.clear();
}
// if (excelGridMembers.size() > 100) {
// doSave(excelGridMembers);
// excelGridMembers.clear();
// }
excelGridMembers.add(excelGridMember);
}