对接实时同步和全量同步。

This commit is contained in:
Renpc-kilig 2022-01-27 20:10:18 +08:00
parent 71f12613f9
commit f6bcb45d9b
12 changed files with 541 additions and 38 deletions

View File

@ -197,4 +197,12 @@ public interface IBuildingService {
* @return
*/
List<Map<String, Object>> dataCount(Map<String, Object> params);
List<BuildingDTO> listForKafKa(Map<String, Object> params);
void saveKafka(String uid, BuildingVO buildingVO);
void updateKafka(String uid, BuildingVO buildingVO);
void deleteKafka(String uid);
}

View File

@ -3,40 +3,38 @@ package cn.com.tenlion.systemhouse.service.building.impl;
import cn.com.tenlion.systemhouse.dao.building.IBuildingDao;
import cn.com.tenlion.systemhouse.pojo.bos.building.BuildingBO;
import cn.com.tenlion.systemhouse.pojo.dtos.bigdata.HouseBuilding;
import cn.com.tenlion.systemhouse.pojo.dtos.bigdata.HouseCommunity;
import cn.com.tenlion.systemhouse.pojo.dtos.building.BuildingDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.building.HouseCountDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.community.CommunityDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.grid.GridDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.residential.ResidentialDTO;
import cn.com.tenlion.systemhouse.pojo.pos.building.BuildingPO;
import cn.com.tenlion.systemhouse.pojo.vos.building.BuildingVO;
import cn.com.tenlion.systemhouse.pojo.vos.buildinghouse.BuildingHouseVO;
import cn.com.tenlion.systemhouse.pojo.vos.community.CommunityVO;
import cn.com.tenlion.systemhouse.service.building.IBuildingService;
import cn.com.tenlion.systemhouse.service.buildinghouse.IBuildingHouseService;
import cn.com.tenlion.systemhouse.service.community.ICommunityService;
import cn.com.tenlion.systemhouse.service.grid.IGridService;
import cn.com.tenlion.systemhouse.service.residential.IResidentialService;
import cn.com.tenlion.systemhouse.utils.CheckDataUtils;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import ink.wgink.common.base.DefaultBaseService;
import ink.wgink.exceptions.SaveException;
import ink.wgink.module.dictionary.pojo.dtos.AreaDTO;
import ink.wgink.mongo.module.dictionary.service.IMongoAreaService;
import ink.wgink.pojo.ListPage;
import ink.wgink.pojo.result.SuccessResultList;
import ink.wgink.util.map.HashMapUtil;
import ink.wgink.util.UUIDUtil;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import ink.wgink.util.map.HashMapUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @ClassName: BuildingServiceImpl
@ -421,6 +419,11 @@ public class BuildingServiceImpl extends DefaultBaseService implements IBuilding
return buildingDao.list(params);
}
@Override
public List<BuildingDTO> listForKafKa(Map<String, Object> params) {
return buildingDao.list(params);
}
@Override
public List<BuildingBO> listBO(Map<String, Object> params) {
return buildingDao.listBO(params);
@ -501,7 +504,8 @@ public class BuildingServiceImpl extends DefaultBaseService implements IBuilding
buildingDao.updateGrid(params);
}
private void saveKafka(String uid, BuildingVO buildingVO) {
@Override
public void saveKafka(String uid, BuildingVO buildingVO) {
HouseBuilding houseBuilding = setKafkaData(buildingVO);
JSONObject jsonObject = new JSONObject();
// 实体模板编码
@ -516,7 +520,8 @@ public class BuildingServiceImpl extends DefaultBaseService implements IBuilding
kafkaTemplate.send("tableSync", jsonObject.toJSONString());
}
private void updateKafka(String uid, BuildingVO buildingVO) {
@Override
public void updateKafka(String uid, BuildingVO buildingVO) {
HouseBuilding houseBuilding = setKafkaData(buildingVO);
JSONObject jsonObject = new JSONObject();
// 实体模板编码
@ -531,7 +536,8 @@ public class BuildingServiceImpl extends DefaultBaseService implements IBuilding
kafkaTemplate.send("tableSync", jsonObject.toJSONString());
}
private void deleteKafka(String uid) {
@Override
public void deleteKafka(String uid) {
JSONObject jsonObject = new JSONObject();
// 实体模板编码
jsonObject.put("tableNumber", "C0004");
@ -541,7 +547,7 @@ public class BuildingServiceImpl extends DefaultBaseService implements IBuilding
// action: save/updete/delete
jsonObject.put("action", "delete");
// 实体
jsonObject.put("data", new HouseCommunity());
jsonObject.put("data", new HouseBuilding());
kafkaTemplate.send("tableSync", jsonObject.toJSONString());
}

View File

@ -1,7 +1,9 @@
package cn.com.tenlion.systemhouse.service.buildinghouse;
import cn.com.tenlion.systemhouse.pojo.bos.buildinghouse.BuildingHouseBO;
import cn.com.tenlion.systemhouse.pojo.dtos.building.BuildingDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.buildinghouse.BuildingHouseDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.residential.ResidentialDTO;
import cn.com.tenlion.systemhouse.pojo.pos.buildinghouse.BuildingHousePO;
import cn.com.tenlion.systemhouse.pojo.vos.buildinghouse.BuildingHouseVO;
import ink.wgink.pojo.ListPage;
@ -203,4 +205,12 @@ public interface IBuildingHouseService {
* @return
*/
List<Map<String, Object>> dataCount(Map<String, Object> params);
void saveKafka(String uid, BuildingHouseVO buildingHouseVO, BuildingDTO buildingDTO, ResidentialDTO residentialDTO);
void updateKafka(String uid, BuildingHouseVO buildingHouseVO, BuildingDTO buildingDTO, ResidentialDTO residentialDTO);
void deleteKafka(String uid);
List<BuildingHouseDTO> listForKafKa(Map<String, Object> params);
}

View File

@ -3,16 +3,13 @@ package cn.com.tenlion.systemhouse.service.buildinghouse.impl;
import cn.com.tenlion.systemhouse.dao.buildinghouse.IBuildingHouseDao;
import cn.com.tenlion.systemhouse.pojo.bos.buildinghouse.BuildingHouseBO;
import cn.com.tenlion.systemhouse.pojo.dtos.bigdata.HouseBuildingHouse;
import cn.com.tenlion.systemhouse.pojo.dtos.bigdata.HouseCommunity;
import cn.com.tenlion.systemhouse.pojo.dtos.building.BuildingDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.buildinghouse.BuildingHouseDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.grid.GridDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.houseuser.HouseUserDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.residential.ResidentialDTO;
import cn.com.tenlion.systemhouse.pojo.pos.buildinghouse.BuildingHousePO;
import cn.com.tenlion.systemhouse.pojo.vos.building.BuildingVO;
import cn.com.tenlion.systemhouse.pojo.vos.buildinghouse.BuildingHouseVO;
import cn.com.tenlion.systemhouse.pojo.vos.community.CommunityVO;
import cn.com.tenlion.systemhouse.service.building.IBuildingService;
import cn.com.tenlion.systemhouse.service.buildinghouse.IBuildingHouseService;
import cn.com.tenlion.systemhouse.service.grid.IGridService;
@ -20,21 +17,20 @@ import cn.com.tenlion.systemhouse.service.houseuser.IHouseUserService;
import cn.com.tenlion.systemhouse.service.residential.IResidentialService;
import cn.com.tenlion.systemhouse.utils.CheckDataUtils;
import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import ink.wgink.common.base.DefaultBaseService;
import ink.wgink.module.dictionary.pojo.dtos.AreaDTO;
import ink.wgink.mongo.module.dictionary.service.IMongoAreaService;
import ink.wgink.pojo.ListPage;
import ink.wgink.pojo.result.SuccessResultList;
import ink.wgink.util.map.HashMapUtil;
import ink.wgink.util.UUIDUtil;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import ink.wgink.util.map.HashMapUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.io.File;
import java.util.*;
/**
@ -235,6 +231,33 @@ public class BuildingHouseServiceImpl extends DefaultBaseService implements IBui
return buildingHouseDTOList;
}
@Override
public List<BuildingHouseDTO> listForKafKa(Map<String, Object> params) {
if(!com.alibaba.excel.util.StringUtils.isEmpty(params.get("keywords"))) {
List<HouseUserDTO> houseUserDTOS = houseUserService.list(params);
if(null != houseUserDTOS && houseUserDTOS.size() > 0) {
String ids = "";
for(int i=0;i<houseUserDTOS.size();i++) {
ids += houseUserDTOS.get(i).getBuildingHouseId() + ",";
}
ids = ids.substring(0, ids.length()-1);
params.put("buildingHouseIds", Arrays.asList(ids.split(",")));
params.remove("keywords");
}
}
List<BuildingHouseDTO> buildingHouseDTOList = buildingHouseDao.list(params);
if(null != buildingHouseDTOList && buildingHouseDTOList.size() > 0) {
for(BuildingHouseDTO buildingHouseDTO: buildingHouseDTOList) {
buildingHouseDTO.setHouseName(buildingHouseDTO.getBuildingName() + buildingHouseDTO.getHouseNumber());
BuildingDTO buildingDTO = buildingService.get(buildingHouseDTO.getBuildingId());
if(null != buildingDTO) {
buildingHouseDTO.setGrid(buildingDTO.getGrid());
}
}
}
return buildingHouseDTOList;
}
@Override
public List<BuildingHouseBO> listBO(Map<String, Object> params) {
return buildingHouseDao.listBO(params);
@ -365,7 +388,8 @@ public class BuildingHouseServiceImpl extends DefaultBaseService implements IBui
buildingHouseDao.updateGrid(params);
}
private void saveKafka(String uid, BuildingHouseVO buildingHouseVO, BuildingDTO buildingDTO, ResidentialDTO residentialDTO) {
@Override
public void saveKafka(String uid, BuildingHouseVO buildingHouseVO, BuildingDTO buildingDTO, ResidentialDTO residentialDTO) {
HouseBuildingHouse houseBuildingHouse = setKafkaData(buildingHouseVO, buildingDTO, residentialDTO);
JSONObject jsonObject = new JSONObject();
// 实体模板编码
@ -380,7 +404,8 @@ public class BuildingHouseServiceImpl extends DefaultBaseService implements IBui
kafkaTemplate.send("tableSync", jsonObject.toJSONString());
}
private void updateKafka(String uid, BuildingHouseVO buildingHouseVO, BuildingDTO buildingDTO, ResidentialDTO residentialDTO) {
@Override
public void updateKafka(String uid, BuildingHouseVO buildingHouseVO, BuildingDTO buildingDTO, ResidentialDTO residentialDTO) {
HouseBuildingHouse houseBuildingHouse = setKafkaData(buildingHouseVO, buildingDTO, residentialDTO);
JSONObject jsonObject = new JSONObject();
// 实体模板编码
@ -395,7 +420,8 @@ public class BuildingHouseServiceImpl extends DefaultBaseService implements IBui
kafkaTemplate.send("tableSync", jsonObject.toJSONString());
}
private void deleteKafka(String uid) {
@Override
public void deleteKafka(String uid) {
JSONObject jsonObject = new JSONObject();
// 实体模板编码
jsonObject.put("tableNumber", "C0005");
@ -405,7 +431,7 @@ public class BuildingHouseServiceImpl extends DefaultBaseService implements IBui
// action: save/updete/delete
jsonObject.put("action", "delete");
// 实体
jsonObject.put("data", new HouseCommunity());
jsonObject.put("data", new HouseBuildingHouse());
kafkaTemplate.send("tableSync", jsonObject.toJSONString());
}

View File

@ -185,4 +185,10 @@ public interface ICommunityService {
*/
Integer count(Map<String, Object> params);
void saveKafka(String uid, CommunityVO communityVO);
void updateKafka(String uid, CommunityVO communityVO);
void deleteKafka(String uid);
}

View File

@ -189,7 +189,8 @@ public class CommunityServiceImpl extends DefaultBaseService implements ICommuni
return count == null ? 0 : count;
}
private void saveKafka(String uid, CommunityVO communityVO) {
@Override
public void saveKafka(String uid, CommunityVO communityVO) {
HouseCommunity houseCommunity = setKafkaData(communityVO);
JSONObject jsonObject = new JSONObject();
// 实体模板编码
@ -204,7 +205,8 @@ public class CommunityServiceImpl extends DefaultBaseService implements ICommuni
kafkaTemplate.send("tableSync", jsonObject.toJSONString());
}
private void updateKafka(String uid, CommunityVO communityVO) {
@Override
public void updateKafka(String uid, CommunityVO communityVO) {
HouseCommunity houseCommunity = setKafkaData(communityVO);
JSONObject jsonObject = new JSONObject();
// 实体模板编码
@ -219,7 +221,8 @@ public class CommunityServiceImpl extends DefaultBaseService implements ICommuni
kafkaTemplate.send("tableSync", jsonObject.toJSONString());
}
private void deleteKafka(String uid) {
@Override
public void deleteKafka(String uid) {
JSONObject jsonObject = new JSONObject();
// 实体模板编码
jsonObject.put("tableNumber", "C0002");

View File

@ -1,6 +1,7 @@
package cn.com.tenlion.systemhouse.service.residential;
import cn.com.tenlion.systemhouse.pojo.bos.residential.ResidentialBO;
import cn.com.tenlion.systemhouse.pojo.dtos.community.CommunityDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.residential.ResidentialDTO;
import cn.com.tenlion.systemhouse.pojo.pos.residential.ResidentialPO;
import cn.com.tenlion.systemhouse.pojo.vos.residential.ResidentialVO;
@ -153,6 +154,8 @@ public interface IResidentialService {
*/
List<ResidentialDTO> list(Map<String, Object> params);
List<ResidentialDTO> listForKafKa(Map<String, Object> params);
/**
* 小区管理表列表
*
@ -191,4 +194,10 @@ public interface IResidentialService {
* @return
*/
List<Map<String, Object>> dataCount(Map<String, Object> params);
void saveKafka(String uid, ResidentialVO residentialVO, CommunityDTO communityDTO);
void updateKafka(String uid, ResidentialVO residentialVO, CommunityDTO communityDTO);
void deleteKafka(String uid);
}

View File

@ -2,37 +2,37 @@ package cn.com.tenlion.systemhouse.service.residential.impl;
import cn.com.tenlion.systemhouse.dao.residential.IResidentialDao;
import cn.com.tenlion.systemhouse.pojo.bos.residential.ResidentialBO;
import cn.com.tenlion.systemhouse.pojo.dtos.bigdata.HouseCommunity;
import cn.com.tenlion.systemhouse.pojo.dtos.bigdata.HouseResidential;
import cn.com.tenlion.systemhouse.pojo.dtos.community.CommunityDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.grid.GridDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.residential.ResidentialDTO;
import cn.com.tenlion.systemhouse.pojo.pos.residential.ResidentialPO;
import cn.com.tenlion.systemhouse.pojo.vos.community.CommunityVO;
import cn.com.tenlion.systemhouse.pojo.vos.residential.ResidentialVO;
import cn.com.tenlion.systemhouse.service.building.IBuildingService;
import cn.com.tenlion.systemhouse.service.community.ICommunityService;
import cn.com.tenlion.systemhouse.service.grid.IGridService;
import cn.com.tenlion.systemhouse.service.grid.impl.GridServiceImpl;
import cn.com.tenlion.systemhouse.service.residential.IResidentialService;
import cn.com.tenlion.systemhouse.utils.CheckDataUtils;
import com.alibaba.fastjson.JSONObject;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import ink.wgink.common.base.DefaultBaseService;
import ink.wgink.exceptions.SearchException;
import ink.wgink.module.dictionary.pojo.dtos.AreaDTO;
import ink.wgink.mongo.module.dictionary.service.IMongoAreaService;
import ink.wgink.pojo.ListPage;
import ink.wgink.pojo.result.SuccessResultList;
import ink.wgink.util.map.HashMapUtil;
import ink.wgink.util.UUIDUtil;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import ink.wgink.util.map.HashMapUtil;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @ClassName: ResidentialServiceImpl
@ -242,6 +242,20 @@ public class ResidentialServiceImpl extends DefaultBaseService implements IResid
return residentialDTOList;
}
@Override
public List<ResidentialDTO> listForKafKa(Map<String, Object> params) {
List<ResidentialDTO> residentialDTOList =residentialDao.list(params);
if(null != residentialDTOList && residentialDTOList.size() > 0) {
for(ResidentialDTO residentialDTO: residentialDTOList) {
CommunityDTO communityDTO = communityService.get(residentialDTO.getCommunity());
if(null != communityDTO) {
residentialDTO.setCommunityName(communityDTO.getCommunityName());
}
}
}
return residentialDTOList;
}
@Override
public List<ResidentialBO> listBO(Map<String, Object> params) {
return residentialDao.listBO(params);
@ -330,7 +344,8 @@ public class ResidentialServiceImpl extends DefaultBaseService implements IResid
}
}
private void saveKafka(String uid, ResidentialVO residentialVO, CommunityDTO communityDTO) {
@Override
public void saveKafka(String uid, ResidentialVO residentialVO, CommunityDTO communityDTO) {
HouseResidential houseResidential = setKafkaData(residentialVO, communityDTO);
JSONObject jsonObject = new JSONObject();
// 实体模板编码
@ -345,7 +360,8 @@ public class ResidentialServiceImpl extends DefaultBaseService implements IResid
kafkaTemplate.send("tableSync", jsonObject.toJSONString());
}
private void updateKafka(String uid, ResidentialVO residentialVO, CommunityDTO communityDTO) {
@Override
public void updateKafka(String uid, ResidentialVO residentialVO, CommunityDTO communityDTO) {
HouseResidential houseResidential = setKafkaData(residentialVO, communityDTO);
JSONObject jsonObject = new JSONObject();
// 实体模板编码
@ -360,7 +376,8 @@ public class ResidentialServiceImpl extends DefaultBaseService implements IResid
kafkaTemplate.send("tableSync", jsonObject.toJSONString());
}
private void deleteKafka(String uid) {
@Override
public void deleteKafka(String uid) {
JSONObject jsonObject = new JSONObject();
// 实体模板编码
jsonObject.put("tableNumber", "C0003");
@ -370,7 +387,7 @@ public class ResidentialServiceImpl extends DefaultBaseService implements IResid
// action: save/updete/delete
jsonObject.put("action", "delete");
// 实体
jsonObject.put("data", new HouseCommunity());
jsonObject.put("data", new HouseResidential());
kafkaTemplate.send("tableSync", jsonObject.toJSONString());
}

View File

@ -0,0 +1,91 @@
package cn.com.tenlion.systemhouse.utils.fullsync;
import cn.com.tenlion.systemhouse.pojo.dtos.community.CommunityDTO;
import cn.com.tenlion.systemhouse.pojo.vos.community.CommunityVO;
import cn.com.tenlion.systemhouse.service.community.ICommunityService;
import com.alibaba.fastjson.JSONObject;
import ink.wgink.exceptions.SearchException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 表单全量数据同步
* 接收数据整合系统发来的全量数据同步命令, 向数据整合系统发送表单的全部数据
*/
@Component
public class FullSyncC0002 implements ApplicationEventPublisherAware {
private ApplicationEventPublisher applicationEventPublisher;
@Autowired
private ICommunityService communityService;
@Autowired
KafkaTemplate kafkaTemplate;
public FullSyncC0002() {}
@KafkaListener(topics = "C0002FullSync")
public void message(ConsumerRecord<?, ?> record) {
System.out.println("收到全量同步命令");
JSONObject jsonObject = JSONObject.parseObject(record.value().toString());
System.out.println(jsonObject.toJSONString());// 表单编号
System.out.println(jsonObject.getString("tableNumber"));// 表单编号
System.out.println(jsonObject.getInteger("tableDataCount"));// 整合系统数据量
/**
* 1 . 查询本地数据的数量 , 如果本地数据量 > 整合系统数据量
* 2 . 查询数据库中所有的事件列表
* 3 . 遍历事件, 封装数据
* 4 . 发送新增案件消息
*/
Map<String, Object> params = new HashMap<>();
List<CommunityDTO> communityDTOList = communityService.list(params);
System.out.println(communityDTOList.size());
if(null != communityDTOList && communityDTOList.size() > 0) {
for(CommunityDTO communityDTO: communityDTOList) {
CommunityVO communityVO = setData(communityDTO);
communityService.saveKafka(communityDTO.getCommunityId(), communityVO);
}
}
}
@KafkaListener(topics = "C0002")
public void messageMsg(ConsumerRecord<?, ?> record) {
JSONObject jsonObject = JSONObject.parseObject(record.value().toString());
Map<String, Object> params = new HashMap<>();
if(!"delete".equals(jsonObject.getString("action"))) {
CommunityDTO communityDTO = communityService.get(jsonObject.getString("uid"));
CommunityVO communityVO = setData(communityDTO);
communityService.saveKafka(communityDTO.getCommunityId(), communityVO);
}else {
communityService.deleteKafka(jsonObject.getString("uid"));
}
}
private CommunityVO setData(CommunityDTO communityDTO) {
if(null == communityDTO) {
throw new SearchException("当前对象为空,请注意查看");
}
CommunityVO communityVO = new CommunityVO();
communityVO.setCommunityName(communityDTO.getCommunityName());
communityVO.setLocationCode(communityDTO.getLocationCode());
communityVO.setAddress(communityDTO.getAddress());
communityVO.setLongitude(communityDTO.getLongitude());
communityVO.setLatitude(communityDTO.getLatitude());
communityVO.setGrid(communityDTO.getGrid());
return communityVO;
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
}

View File

@ -0,0 +1,101 @@
package cn.com.tenlion.systemhouse.utils.fullsync;
import cn.com.tenlion.systemhouse.pojo.dtos.community.CommunityDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.residential.ResidentialDTO;
import cn.com.tenlion.systemhouse.pojo.vos.residential.ResidentialVO;
import cn.com.tenlion.systemhouse.service.community.ICommunityService;
import cn.com.tenlion.systemhouse.service.residential.IResidentialService;
import com.alibaba.fastjson.JSONObject;
import ink.wgink.exceptions.SearchException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 表单全量数据同步
* 接收数据整合系统发来的全量数据同步命令, 向数据整合系统发送表单的全部数据
*/
@Component
public class FullSyncC0003 implements ApplicationEventPublisherAware {
private ApplicationEventPublisher applicationEventPublisher;
@Autowired
private IResidentialService residentialService;
@Autowired
private ICommunityService communityService;
@Autowired
KafkaTemplate kafkaTemplate;
public FullSyncC0003() {}
@KafkaListener(topics = "C0003FullSync")
public void message(ConsumerRecord<?, ?> record) {
System.out.println("收到全量同步命令");
JSONObject jsonObject = JSONObject.parseObject(record.value().toString());
System.out.println(jsonObject.toJSONString());// 表单编号
System.out.println(jsonObject.getString("tableNumber"));// 表单编号
System.out.println(jsonObject.getInteger("tableDataCount"));// 整合系统数据量
/**
* 1 . 查询本地数据的数量 , 如果本地数据量 > 整合系统数据量
* 2 . 查询数据库中所有的事件列表
* 3 . 遍历事件, 封装数据
* 4 . 发送新增案件消息
*/
Map<String, Object> params = new HashMap<>();
List<ResidentialDTO> teamUserDTOList = residentialService.listForKafKa(params);
System.out.println(teamUserDTOList.size());
if(null != teamUserDTOList && teamUserDTOList.size() > 0) {
for(ResidentialDTO residentialDTO: teamUserDTOList) {
CommunityDTO communityDTO = communityService.get(residentialDTO.getCommunity());
ResidentialVO residentialVO = setData(residentialDTO, communityDTO);
residentialService.saveKafka(residentialDTO.getResidentialId(), residentialVO, communityDTO);
}
}
}
@KafkaListener(topics = "C0003")
public void messageMsg(ConsumerRecord<?, ?> record) {
JSONObject jsonObject = JSONObject.parseObject(record.value().toString());
Map<String, Object> params = new HashMap<>();
if(!"delete".equals(jsonObject.getString("action"))) {
ResidentialDTO residentialDTO = residentialService.get(jsonObject.getString("uid"));
CommunityDTO communityDTO = communityService.get(residentialDTO.getCommunity());
ResidentialVO residentialVO = setData(residentialDTO, communityDTO);
residentialService.saveKafka(residentialDTO.getResidentialId(), residentialVO, communityDTO);
}else {
residentialService.deleteKafka(jsonObject.getString("uid"));
}
}
private ResidentialVO setData(ResidentialDTO residentialDTO, CommunityDTO communityDTO) {
if(null == residentialDTO) {
throw new SearchException("当前对象为空,请注意查看");
}
if(null == communityDTO) {
throw new SearchException("当前对象为空,请注意查看");
}
ResidentialVO residentialVO = new ResidentialVO();
residentialVO.setResidentialName(residentialDTO.getResidentialName());
residentialVO.setResidentialType(residentialDTO.getResidentialType());
residentialVO.setLocationCode(residentialDTO.getLocationCode());
residentialVO.setResidentialAddress(residentialDTO.getResidentialAddress());
residentialVO.setLongitude(residentialDTO.getLongitude());
residentialVO.setLatitude(residentialDTO.getLatitude());
residentialVO.setGrid(residentialDTO.getGrid());
return residentialVO;
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
}

View File

@ -0,0 +1,105 @@
package cn.com.tenlion.systemhouse.utils.fullsync;
import cn.com.tenlion.systemhouse.pojo.dtos.building.BuildingDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.residential.ResidentialDTO;
import cn.com.tenlion.systemhouse.pojo.vos.building.BuildingVO;
import cn.com.tenlion.systemhouse.service.building.IBuildingService;
import cn.com.tenlion.systemhouse.service.residential.IResidentialService;
import com.alibaba.fastjson.JSONObject;
import ink.wgink.exceptions.SearchException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 表单全量数据同步
* 接收数据整合系统发来的全量数据同步命令, 向数据整合系统发送表单的全部数据
*/
@Component
public class FullSyncC0004 implements ApplicationEventPublisherAware {
private ApplicationEventPublisher applicationEventPublisher;
@Autowired
private IBuildingService buildingService;
@Autowired
private IResidentialService residentialService;
@Autowired
KafkaTemplate kafkaTemplate;
public FullSyncC0004() {}
@KafkaListener(topics = "C0004FullSync")
public void message(ConsumerRecord<?, ?> record) {
System.out.println("收到全量同步命令");
JSONObject jsonObject = JSONObject.parseObject(record.value().toString());
System.out.println(jsonObject.toJSONString());// 表单编号
System.out.println(jsonObject.getString("tableNumber"));// 表单编号
System.out.println(jsonObject.getInteger("tableDataCount"));// 整合系统数据量
/**
* 1 . 查询本地数据的数量 , 如果本地数据量 > 整合系统数据量
* 2 . 查询数据库中所有的事件列表
* 3 . 遍历事件, 封装数据
* 4 . 发送新增案件消息
*/
Map<String, Object> params = new HashMap<>();
List<BuildingDTO> buildingDTOList = buildingService.listForKafKa(params);
System.out.println(buildingDTOList.size());
if(null != buildingDTOList && buildingDTOList.size() > 0) {
for(BuildingDTO buildingDTO: buildingDTOList) {
ResidentialDTO residentialDTO = residentialService.get(buildingDTO.getResidentialId());
BuildingVO buildingVO = setData(buildingDTO, residentialDTO);
buildingService.saveKafka(buildingDTO.getBuildingId(), buildingVO);
}
}
}
@KafkaListener(topics = "C0004")
public void messageMsg(ConsumerRecord<?, ?> record) {
JSONObject jsonObject = JSONObject.parseObject(record.value().toString());
Map<String, Object> params = new HashMap<>();
if(!"delete".equals(jsonObject.getString("action"))) {
BuildingDTO buildingDTO = buildingService.get(jsonObject.getString("uid"));
ResidentialDTO residentialDTO = residentialService.get(buildingDTO.getResidentialId());
BuildingVO buildingVO = setData(buildingDTO, residentialDTO);
buildingService.saveKafka(buildingDTO.getBuildingId(), buildingVO);
}else {
buildingService.deleteKafka(jsonObject.getString("uid"));
}
}
private BuildingVO setData(BuildingDTO buildingDTO, ResidentialDTO residentialDTO) {
if(null == buildingDTO) {
throw new SearchException("当前对象为空,请注意查看");
}
BuildingVO buildingVO = new BuildingVO();
buildingVO.setCommunityName(residentialDTO.getCommunityName());
buildingVO.setResidentialName(buildingDTO.getResidentialName());
buildingVO.setBuildingNum(buildingDTO.getBuildingNum());
buildingVO.setBuildingType(buildingDTO.getBuildingType());
buildingVO.setUnitCount(buildingDTO.getUnitCount());
buildingVO.setHouseCount(buildingDTO.getHouseCount());
buildingVO.setFloorCount(buildingDTO.getFloorCount());
buildingVO.setElevator(buildingDTO.getElevator());
buildingVO.setFloorCountForUnit(buildingDTO.getFloorCountForUnit());
buildingVO.setHouseCountForFloor(buildingDTO.getHouseCountForFloor());
buildingVO.setLocationCode(buildingDTO.getLocationCode());
buildingVO.setLongitude(buildingDTO.getLongitude());
buildingVO.setLatitude(buildingDTO.getLatitude());
buildingVO.setGrid(buildingDTO.getGrid());
return buildingVO;
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
}

View File

@ -0,0 +1,121 @@
package cn.com.tenlion.systemhouse.utils.fullsync;
import cn.com.tenlion.systemhouse.pojo.dtos.building.BuildingDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.buildinghouse.BuildingHouseDTO;
import cn.com.tenlion.systemhouse.pojo.dtos.residential.ResidentialDTO;
import cn.com.tenlion.systemhouse.pojo.vos.buildinghouse.BuildingHouseVO;
import cn.com.tenlion.systemhouse.service.building.IBuildingService;
import cn.com.tenlion.systemhouse.service.buildinghouse.IBuildingHouseService;
import cn.com.tenlion.systemhouse.service.residential.IResidentialService;
import com.alibaba.fastjson.JSONObject;
import ink.wgink.exceptions.SearchException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 表单全量数据同步
* 接收数据整合系统发来的全量数据同步命令, 向数据整合系统发送表单的全部数据
*/
@Component
public class FullSyncC0005 implements ApplicationEventPublisherAware {
private ApplicationEventPublisher applicationEventPublisher;
@Autowired
private IBuildingHouseService buildingHouseService;
@Autowired
private IBuildingService buildingService;
@Autowired
private IResidentialService residentialService;
@Autowired
KafkaTemplate kafkaTemplate;
public FullSyncC0005() {}
@KafkaListener(topics = "C0005FullSync")
public void message(ConsumerRecord<?, ?> record) {
System.out.println("收到全量同步命令");
JSONObject jsonObject = JSONObject.parseObject(record.value().toString());
System.out.println(jsonObject.toJSONString());// 表单编号
System.out.println(jsonObject.getString("tableNumber"));// 表单编号
System.out.println(jsonObject.getInteger("tableDataCount"));// 整合系统数据量
/**
* 1 . 查询本地数据的数量 , 如果本地数据量 > 整合系统数据量
* 2 . 查询数据库中所有的事件列表
* 3 . 遍历事件, 封装数据
* 4 . 发送新增案件消息
*/
Map<String, Object> params = new HashMap<>();
List<BuildingHouseDTO> buildingHouseDTOList = buildingHouseService.listForKafKa(params);
System.out.println(buildingHouseDTOList.size());
if(null != buildingHouseDTOList && buildingHouseDTOList.size() > 0) {
for(BuildingHouseDTO buildingHouseDTO: buildingHouseDTOList) {
BuildingDTO buildingDTO = buildingService.get(buildingHouseDTO.getBuildingId());
if(null != buildingDTO) {
ResidentialDTO residentialDTO = residentialService.get(buildingDTO.getResidentialId());
if(null != residentialDTO) {
BuildingHouseVO buildingHouseVO = setData(buildingHouseDTO, buildingDTO);
buildingHouseService.saveKafka(buildingHouseDTO.getBuildingHouseId(), buildingHouseVO, buildingDTO, residentialDTO);
}else {
throw new SearchException("当前小区数据已不存在");
}
}else {
throw new SearchException("当前楼宇数据已不存在");
}
}
}
}
@KafkaListener(topics = "C0005")
public void messageMsg(ConsumerRecord<?, ?> record) {
JSONObject jsonObject = JSONObject.parseObject(record.value().toString());
Map<String, Object> params = new HashMap<>();
if(!"delete".equals(jsonObject.getString("action"))) {
BuildingHouseDTO buildingHouseDTO = buildingHouseService.get(jsonObject.getString("uid"));
BuildingDTO buildingDTO = buildingService.get(buildingHouseDTO.getBuildingId());
if(null != buildingDTO) {
ResidentialDTO residentialDTO = residentialService.get(buildingDTO.getResidentialId());
if(null != residentialDTO) {
BuildingHouseVO buildingHouseVO = setData(buildingHouseDTO, buildingDTO);
buildingHouseService.saveKafka(buildingHouseDTO.getBuildingHouseId(), buildingHouseVO, buildingDTO, residentialDTO);
}else {
throw new SearchException("当前小区数据已不存在");
}
}else {
throw new SearchException("当前楼宇数据已不存在");
}
}else {
buildingHouseService.deleteKafka(jsonObject.getString("uid"));
}
}
private BuildingHouseVO setData(BuildingHouseDTO buildingHouseDTO, BuildingDTO buildingDTO) {
if(null == buildingHouseDTO) {
throw new SearchException("当前对象为空,请注意查看");
}
if(null == buildingDTO) {
throw new SearchException("当前对象为空,请注意查看");
}
BuildingHouseVO buildingHouseVO = new BuildingHouseVO();
buildingHouseVO.setHouseNumber(buildingHouseDTO.getHouseNumber());
buildingHouseVO.setHouseStatus(buildingHouseDTO.getHouseStatus());
buildingHouseVO.setLocationCode(buildingHouseDTO.getLocationCode());
buildingHouseVO.setLongitude(buildingDTO.getLongitude());
buildingHouseVO.setLatitude(buildingDTO.getLatitude());
return buildingHouseVO;
}
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}
}