修改配置

This commit is contained in:
wanggeng 2022-01-20 11:18:10 +08:00
parent d9701a668b
commit 630211df12
11 changed files with 24 additions and 10 deletions

View File

@ -14,7 +14,8 @@ public interface ISmartCityUserCenterConsts {
*/
String APP_CLIENT_NAME = "city-governance";
String KAFKA_TABLE_SYNC_TOPIC = "tableSync";
String KAFKA_TOPIC_TABLE_SYNC = "tableSync";
String KAFKA_TOPIC_GRID_MEMBER_REALTIME_LOCATION = "gridMemberRealtimeLocation";
String KAFKA_TABLE_SYNC_GRID_MEMBER = "C0021";
String KAFKA_TABLE_SYNC_GRID = "C0022";

View File

@ -39,7 +39,7 @@ public class GridDeleteAfterHandlerImpl implements IGridDeleteAfterHandler {
syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_DELETE);
syncDataDTO.setData(new HashMap<>());
syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID);
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_TOPIC, JSONObject.toJSONString(syncDataDTO));
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_TABLE_SYNC, JSONObject.toJSONString(syncDataDTO));
}
}

View File

@ -36,6 +36,6 @@ public class GridPointDeleteAfterHandlerImpl implements IGridPointDeleteAfterHan
syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_DELETE);
syncDataDTO.setData(new HashMap());
syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT);
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_TOPIC, JSONObject.toJSONString(syncDataDTO));
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_TABLE_SYNC, JSONObject.toJSONString(syncDataDTO));
}
}

View File

@ -47,7 +47,7 @@ public class GridPointSaveAfterHandlerImpl implements IGridPointSaveAfterHandler
syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_SAVE);
syncDataDTO.setData(data);
syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT);
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_TOPIC, JSONObject.toJSONString(syncDataDTO));
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_TABLE_SYNC, JSONObject.toJSONString(syncDataDTO));
});
}

View File

@ -34,7 +34,7 @@ public class GridRelationDeleteAfterHandlerImpl implements IGridRelationDeleteAf
syncDataDTO.setUid(WStringUtil.listToStr(relationIds, "_"));
syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_DELETE);
syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER);
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_TOPIC, JSONObject.toJSONString(syncDataDTO));
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_TABLE_SYNC, JSONObject.toJSONString(syncDataDTO));
}
}

View File

@ -82,7 +82,7 @@ public class GridRelationSaveAfterHandlerImpl implements IGridRelationSaveAfterH
syncDataDTO.setData(data);
syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_SAVE);
syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER);
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_TOPIC, JSONObject.toJSONString(syncDataDTO));
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_TABLE_SYNC, JSONObject.toJSONString(syncDataDTO));
}
}

View File

@ -41,7 +41,7 @@ public class GridSaveAfterHandlerImpl implements IGridSaveAfterHandler {
syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_SAVE);
syncDataDTO.setData(gridMap);
syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID);
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_TOPIC, JSONObject.toJSONString(syncDataDTO));
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_TABLE_SYNC, JSONObject.toJSONString(syncDataDTO));
}
}

View File

@ -41,7 +41,7 @@ public class GridUpdateAfterHandlerImpl implements IGridUpdateAfterHandler {
syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_UPDATE);
syncDataDTO.setData(gridMap);
syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID);
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_TOPIC, JSONObject.toJSONString(syncDataDTO));
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_TABLE_SYNC, JSONObject.toJSONString(syncDataDTO));
}
}

View File

@ -45,7 +45,7 @@ public class UserUpdateAfterHandlerImpl implements IUserUpdateAfterHandler {
syncDataDTO.setData(data);
syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_UPDATE);
syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER);
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_TOPIC, JSONObject.toJSONString(syncDataDTO));
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_TABLE_SYNC, JSONObject.toJSONString(syncDataDTO));
}
}

View File

@ -1,7 +1,9 @@
package cn.com.tenlion.usercenter.service.userrealtimelocation.impl;
import cn.com.tenlion.usercenter.consts.ISmartCityUserCenterConsts;
import cn.com.tenlion.usercenter.enums.mongo.MongoCollectionEnum;
import cn.com.tenlion.usercenter.service.userrealtimelocation.IUserRealtimeLocationService;
import com.alibaba.fastjson.JSONObject;
import ink.wgink.app.AppTokenManager;
import ink.wgink.common.base.DefaultBaseService;
import ink.wgink.module.map.pojo.dtos.userlocation.UserLocationDTO;
@ -16,11 +18,15 @@ import org.springframework.data.domain.Pageable;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
/**
@ -37,6 +43,9 @@ public class UserRealtimeLocationServiceImpl extends DefaultBaseService implemen
private IUserLocationService userLocationService;
@Autowired
private MongoTemplate mongoTemplate;
@Autowired
private KafkaTemplate kafkaTemplate;
private static ThreadPoolExecutor THREAD_POLL = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
@Override
public void save(UserLocationVO userLocationVO) {
@ -44,6 +53,9 @@ public class UserRealtimeLocationServiceImpl extends DefaultBaseService implemen
String userLocationId = userLocationService.saveAndReturnId(userLocationVO.getCreator(), userLocationVO);
userLocationVO.setUserLocationId(userLocationId);
mongoTemplate.insert(userLocationVO, MongoCollectionEnum.USER_REALTIME_LOCATION.getCollection());
THREAD_POLL.execute(() -> {
kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_GRID_MEMBER_REALTIME_LOCATION, JSONObject.toJSONString(userLocationVO));
});
}
@Override

View File

@ -86,7 +86,7 @@ spring:
# 设置自动提交offset
enable-auto-commit: true
# 如果'enable.auto.commit'为true则消费者偏移自动提交给Kafka的频率以毫秒为单位默认值为5000。
auto-commit-interval: 100
auto-commit-interval: 5000
max-poll-records: 5
mybatis:
@ -152,6 +152,7 @@ logging:
name: C:\Users\wenc0\Desktop\UploadFiles\logs\usercenter-logs.log
level:
root: error
org.apache.kafka: info
org.springframework.data.mongodb: debug
org.springframework.boot.autoconfigure.security.servlet: debug
ink.wgink: debug