From 630211df12f29fb60d0305c7909b8272ff33a243 Mon Sep 17 00:00:00 2001 From: wanggeng <450292408@qq.com> Date: Thu, 20 Jan 2022 11:18:10 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../consts/ISmartCityUserCenterConsts.java | 3 ++- .../grid/impl/GridDeleteAfterHandlerImpl.java | 2 +- .../grid/impl/GridPointDeleteAfterHandlerImpl.java | 2 +- .../grid/impl/GridPointSaveAfterHandlerImpl.java | 2 +- .../impl/GridRelationDeleteAfterHandlerImpl.java | 2 +- .../grid/impl/GridRelationSaveAfterHandlerImpl.java | 2 +- .../service/grid/impl/GridSaveAfterHandlerImpl.java | 2 +- .../grid/impl/GridUpdateAfterHandlerImpl.java | 2 +- .../grid/impl/UserUpdateAfterHandlerImpl.java | 2 +- .../impl/UserRealtimeLocationServiceImpl.java | 12 ++++++++++++ src/main/resources/application-dev.yml | 3 ++- 11 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/main/java/cn/com/tenlion/usercenter/consts/ISmartCityUserCenterConsts.java b/src/main/java/cn/com/tenlion/usercenter/consts/ISmartCityUserCenterConsts.java index 01204f3..3d8eb5a 100644 --- a/src/main/java/cn/com/tenlion/usercenter/consts/ISmartCityUserCenterConsts.java +++ b/src/main/java/cn/com/tenlion/usercenter/consts/ISmartCityUserCenterConsts.java @@ -14,7 +14,8 @@ public interface ISmartCityUserCenterConsts { */ String APP_CLIENT_NAME = "city-governance"; - String KAFKA_TABLE_SYNC_TOPIC = "tableSync"; + String KAFKA_TOPIC_TABLE_SYNC = "tableSync"; + String KAFKA_TOPIC_GRID_MEMBER_REALTIME_LOCATION = "gridMemberRealtimeLocation"; String KAFKA_TABLE_SYNC_GRID_MEMBER = "C0021"; String KAFKA_TABLE_SYNC_GRID = "C0022"; diff --git a/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridDeleteAfterHandlerImpl.java b/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridDeleteAfterHandlerImpl.java index aea4002..23e691a 100644 --- a/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridDeleteAfterHandlerImpl.java +++ b/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridDeleteAfterHandlerImpl.java @@ -39,7 +39,7 @@ public class GridDeleteAfterHandlerImpl implements IGridDeleteAfterHandler { syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_DELETE); syncDataDTO.setData(new HashMap<>()); syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID); - kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_TOPIC, JSONObject.toJSONString(syncDataDTO)); + kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_TABLE_SYNC, JSONObject.toJSONString(syncDataDTO)); } } diff --git a/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridPointDeleteAfterHandlerImpl.java b/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridPointDeleteAfterHandlerImpl.java index 1636e71..616634c 100644 --- a/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridPointDeleteAfterHandlerImpl.java +++ b/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridPointDeleteAfterHandlerImpl.java @@ -36,6 +36,6 @@ public class GridPointDeleteAfterHandlerImpl implements IGridPointDeleteAfterHan syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_DELETE); syncDataDTO.setData(new HashMap()); syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT); - kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_TOPIC, JSONObject.toJSONString(syncDataDTO)); + kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_TABLE_SYNC, JSONObject.toJSONString(syncDataDTO)); } } diff --git a/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridPointSaveAfterHandlerImpl.java b/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridPointSaveAfterHandlerImpl.java index 81a46b8..d239855 100644 --- a/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridPointSaveAfterHandlerImpl.java +++ b/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridPointSaveAfterHandlerImpl.java @@ -47,7 +47,7 @@ public class GridPointSaveAfterHandlerImpl implements IGridPointSaveAfterHandler syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_SAVE); syncDataDTO.setData(data); syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_POINT); - kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_TOPIC, JSONObject.toJSONString(syncDataDTO)); + kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_TABLE_SYNC, JSONObject.toJSONString(syncDataDTO)); }); } diff --git a/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridRelationDeleteAfterHandlerImpl.java b/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridRelationDeleteAfterHandlerImpl.java index 68d2f32..899aac7 100644 --- a/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridRelationDeleteAfterHandlerImpl.java +++ b/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridRelationDeleteAfterHandlerImpl.java @@ -34,7 +34,7 @@ public class GridRelationDeleteAfterHandlerImpl implements IGridRelationDeleteAf syncDataDTO.setUid(WStringUtil.listToStr(relationIds, "_")); syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_DELETE); syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER); - kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_TOPIC, JSONObject.toJSONString(syncDataDTO)); + kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_TABLE_SYNC, JSONObject.toJSONString(syncDataDTO)); } } diff --git a/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridRelationSaveAfterHandlerImpl.java b/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridRelationSaveAfterHandlerImpl.java index 284838a..479e8c9 100644 --- a/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridRelationSaveAfterHandlerImpl.java +++ b/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridRelationSaveAfterHandlerImpl.java @@ -82,7 +82,7 @@ public class GridRelationSaveAfterHandlerImpl implements IGridRelationSaveAfterH syncDataDTO.setData(data); syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_SAVE); syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER); - kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_TOPIC, JSONObject.toJSONString(syncDataDTO)); + kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_TABLE_SYNC, JSONObject.toJSONString(syncDataDTO)); } } diff --git a/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridSaveAfterHandlerImpl.java b/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridSaveAfterHandlerImpl.java index 1371714..16a06fa 100644 --- a/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridSaveAfterHandlerImpl.java +++ b/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridSaveAfterHandlerImpl.java @@ -41,7 +41,7 @@ public class GridSaveAfterHandlerImpl implements IGridSaveAfterHandler { syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_SAVE); syncDataDTO.setData(gridMap); syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID); - kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_TOPIC, JSONObject.toJSONString(syncDataDTO)); + kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_TABLE_SYNC, JSONObject.toJSONString(syncDataDTO)); } } diff --git a/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridUpdateAfterHandlerImpl.java b/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridUpdateAfterHandlerImpl.java index e64c6e0..157b0d8 100644 --- a/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridUpdateAfterHandlerImpl.java +++ b/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/GridUpdateAfterHandlerImpl.java @@ -41,7 +41,7 @@ public class GridUpdateAfterHandlerImpl implements IGridUpdateAfterHandler { syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_UPDATE); syncDataDTO.setData(gridMap); syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID); - kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_TOPIC, JSONObject.toJSONString(syncDataDTO)); + kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_TABLE_SYNC, JSONObject.toJSONString(syncDataDTO)); } } diff --git a/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/UserUpdateAfterHandlerImpl.java b/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/UserUpdateAfterHandlerImpl.java index 0a64cef..c1afacf 100644 --- a/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/UserUpdateAfterHandlerImpl.java +++ b/src/main/java/cn/com/tenlion/usercenter/service/grid/impl/UserUpdateAfterHandlerImpl.java @@ -45,7 +45,7 @@ public class UserUpdateAfterHandlerImpl implements IUserUpdateAfterHandler { syncDataDTO.setData(data); syncDataDTO.setAction(ISmartCityUserCenterConsts.KAFKA_DATA_UPDATE); syncDataDTO.setTableNumber(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_GRID_MEMBER); - kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TABLE_SYNC_TOPIC, JSONObject.toJSONString(syncDataDTO)); + kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_TABLE_SYNC, JSONObject.toJSONString(syncDataDTO)); } } diff --git a/src/main/java/cn/com/tenlion/usercenter/service/userrealtimelocation/impl/UserRealtimeLocationServiceImpl.java b/src/main/java/cn/com/tenlion/usercenter/service/userrealtimelocation/impl/UserRealtimeLocationServiceImpl.java index 7429637..71d730e 100644 --- a/src/main/java/cn/com/tenlion/usercenter/service/userrealtimelocation/impl/UserRealtimeLocationServiceImpl.java +++ b/src/main/java/cn/com/tenlion/usercenter/service/userrealtimelocation/impl/UserRealtimeLocationServiceImpl.java @@ -1,7 +1,9 @@ package cn.com.tenlion.usercenter.service.userrealtimelocation.impl; +import cn.com.tenlion.usercenter.consts.ISmartCityUserCenterConsts; import cn.com.tenlion.usercenter.enums.mongo.MongoCollectionEnum; import cn.com.tenlion.usercenter.service.userrealtimelocation.IUserRealtimeLocationService; +import com.alibaba.fastjson.JSONObject; import ink.wgink.app.AppTokenManager; import ink.wgink.common.base.DefaultBaseService; import ink.wgink.module.map.pojo.dtos.userlocation.UserLocationDTO; @@ -16,11 +18,15 @@ import org.springframework.data.domain.Pageable; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Query; +import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; /** @@ -37,6 +43,9 @@ public class UserRealtimeLocationServiceImpl extends DefaultBaseService implemen private IUserLocationService userLocationService; @Autowired private MongoTemplate mongoTemplate; + @Autowired + private KafkaTemplate kafkaTemplate; + private static ThreadPoolExecutor THREAD_POLL = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingDeque<>()); @Override public void save(UserLocationVO userLocationVO) { @@ -44,6 +53,9 @@ public class UserRealtimeLocationServiceImpl extends DefaultBaseService implemen String userLocationId = userLocationService.saveAndReturnId(userLocationVO.getCreator(), userLocationVO); userLocationVO.setUserLocationId(userLocationId); mongoTemplate.insert(userLocationVO, MongoCollectionEnum.USER_REALTIME_LOCATION.getCollection()); + THREAD_POLL.execute(() -> { + kafkaTemplate.send(ISmartCityUserCenterConsts.KAFKA_TOPIC_GRID_MEMBER_REALTIME_LOCATION, JSONObject.toJSONString(userLocationVO)); + }); } @Override diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 5bd8d4c..9a86914 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -86,7 +86,7 @@ spring: # 设置自动提交offset enable-auto-commit: true # 如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。 - auto-commit-interval: 100 + auto-commit-interval: 5000 max-poll-records: 5 mybatis: @@ -152,6 +152,7 @@ logging: name: C:\Users\wenc0\Desktop\UploadFiles\logs\usercenter-logs.log level: root: error + org.apache.kafka: info org.springframework.data.mongodb: debug org.springframework.boot.autoconfigure.security.servlet: debug ink.wgink: debug