1、修改大数据局配置文件,在最后增加了一个同步label标签表的数据库连接

2、简单修改了获取视频资源的部分代码
This commit is contained in:
yitonglei 2022-08-01 11:02:26 +08:00
parent b68daeda1c
commit 8628a34b1e
4 changed files with 139 additions and 129 deletions

View File

@ -51,8 +51,12 @@ public class GetAboutCameraChannelEventListener {
//更新t_region的channelcount
cameraOrgenMapper.updateRegionChannelCount();
log.info("获取视频资源的组织和通道新消息流程结束。。。。。。");
//更新武伟达的标签表
monitorService.synchronizeMtmLabel();
}
}
}

View File

@ -1,5 +1,6 @@
package io.renren.modules.monitor.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import io.renren.common.dao.BaseDao;
import io.renren.modules.monitor.entity.CameraOrganization;
import org.apache.ibatis.annotations.Mapper;

View File

@ -1221,7 +1221,7 @@ public class MonitorService {
}
//组织递归根据父ID查,简化了分页
public List<JSONObject> getOrgenizationByParent(List<JSONObject> list,String id,RestTemplate restTemplate){
public List<JSONObject> getOrgenizationByParent(List<JSONObject> list,String id,RestTemplate restTemplate) {
HttpHeaders headers = new HttpHeaders();
headers.add("X-Subject-Token",token);
String url;
@ -1235,17 +1235,14 @@ public class MonitorService {
responseEntity = restTemplate.exchange(url, HttpMethod.GET,httpEntity,JSONObject.class);
JSONObject re = responseEntity.getBody();
if(re.getIntValue("totalCount")>0){
JSONArray results = re.getJSONArray("results");
List<JSONObject> jsonObjects = results.toJavaList(JSONObject.class);
list.addAll(jsonObjects);
jsonObjects.forEach(js->{
if(js.getBooleanValue("isParent")){
try {
getOrgenizationByParent(list, js.getString("id"), restTemplate);
}catch (Exception e){
log.error("根据父id:{}获取下级组织信息失败",js.getString("id"));
}
}
});
}
@ -1304,6 +1301,80 @@ public class MonitorService {
// return Result.success();
// }
//2获取视频通道信息并保存,多线程版
public void saveChannelInfoAsync() throws Exception {
RestTemplate restTemplate = this.getRestTemplate();
AtomicInteger faulseCount = new AtomicInteger();//失败的次数
//1-清空t_camera_channel
cameraOrgenMapper.truncate("t_camera_channel_cache");
Thread.sleep(1500);
//2-查询全部地区
//List<Map> maps = cameraOrgenMapper.testAll();
List<Map> maps = cameraOrgenMapper.selectAll();
if (maps != null && maps.size() > 0){
List<List<Map>> lists = Lists.partition(maps, 1000);
//3-创建线程池
ExecutorService executorService;
executorService = Executors.newFixedThreadPool(lists.size() + 1);
List<CompletableFuture> completableFutureLis = new ArrayList<>();
lists.forEach(l->{
completableFutureLis.add(
CompletableFuture.runAsync(()->{
l.forEach(m->{
boolean flag = true;
int tryCount = 0;//每个地区重试10次去获取该地区下视频通道信息
while(flag){
if(tryCount >= 9){
flag = false;
}
tryCount++;
//4-根据地区id去查询视频通道信息
List<Map> cameraChannels = new ArrayList<>();
try {
cameraChannels = getChannelInfo(m.get("id").toString(),restTemplate);
}catch (Exception e){
log.info("根据组织id:{}查询视频通道失败,这是第{}次重试",m.get("id").toString(),tryCount);
if(tryCount >= 10){
faulseCount.incrementAndGet();
log.error("根据组织id:{},查询视频通道失败,超出重试次数。将去正式表中查询",m.get("id").toString());
//去t_camera_channel查询相关信息并保存
cameraChannels = cameraChannelMapper.selectCameraChannelByPid(m.get("id").toString());
if(cameraChannels.size() > 0){
List<List<Map>> channelList = Lists.partition(cameraChannels,100);
for(List<Map> ll:channelList){
cameraChannelMapper.batchSaveCameraChannel(ll);
}
} else {
log.error("根据组织id:{},从正式表中查询视频通道失败,不存在该部门数据",m.get("id").toString());
}
}
continue;
}
//5-保存视频通道信息
batchSaveChannelInfos(cameraChannels,m.get("id").toString());
flag = false;
}
});
},executorService)
);
});
executorService.shutdown();
CompletableFuture.allOf(completableFutureLis.toArray(new CompletableFuture[completableFutureLis.size()])).join();
//6-更新完通道信息后,删除经度或纬度为空的视频通道信息
cameraChannelMapper.deleteByNonPlace();
//7-查询地区下通道的数量并更新到地区cache表中
editChannelCount();
//发布事件
publisher.publishEvent(new SaveCameraChannelEndEvent(this,true));
}
}
//根据组织id获取通道信息
public List<Map> getChannelInfo(String orgenId,RestTemplate restTemplate) throws Exception{
String url = monitorDomain +"/videoService/devicesManager/deviceTree?nodeType=1&typeCode=01;0;ALL;ALL&page=1&pageSize=3000&id="+orgenId;
@ -1334,118 +1405,6 @@ public class MonitorService {
}
}
//2获取视频通道信息并保存,多线程版
public void saveChannelInfoAsync() throws Exception {
RestTemplate restTemplate = this.getRestTemplate();
AtomicInteger faulseCount = new AtomicInteger();//失败的次数
//1-清空t_camera_channel
cameraOrgenMapper.truncate("t_camera_channel_cache");
//2-创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(16);
//3-查询全部地区
//List<Map> maps = cameraOrgenMapper.testAll();
List<Map> maps = cameraOrgenMapper.selectAll();
List<List<Map>> lists = Lists.partition(maps, 1000);
List<CompletableFuture> completableFutureLis = new ArrayList<>();
lists.forEach(l->{
completableFutureLis.add(
CompletableFuture.runAsync(()->{
l.forEach(m->{
boolean flag = true;
int tryCount = 0;//每个地区重试10次去获取该地区下视频通道信息
while(flag){
if(tryCount >= 9){
flag = false;
}
tryCount++;
//4-根据地区id去查询视频通道信息
List<Map> cameraChannels = new ArrayList<>();
try {
cameraChannels = getChannelInfo(m.get("id").toString(),restTemplate);
}catch (Exception e){
log.info("根据组织id:{}查询视频通道失败,这是第{}次重试",m.get("id").toString(),tryCount);
if(tryCount >= 10){
faulseCount.incrementAndGet();
log.error("根据组织id:{},查询视频通道失败,超出重试次数,将去正式表中查询",m.get("id").toString());
//去t_camera_channel查询相关信息并保存
cameraChannels = cameraChannelMapper.selectCameraChannelByPid(m.get("id").toString());
if(cameraChannels.size() > 0){
List<List<Map>> channelList = Lists.partition(cameraChannels,100);
for(List<Map> ll:channelList){
cameraChannelMapper.batchSaveCameraChannel(ll);
}
} else {
log.error("根据组织id:{},从正式表中查询视频通道失败,不存在该部门数据",m.get("id").toString());
}
}
continue;
}
//5-保存视频通道信息
batchSaveChannelInfos(cameraChannels,m.get("id").toString());
flag = false;
}
});
},executorService)
);
});
CompletableFuture.allOf(completableFutureLis.toArray(new CompletableFuture[completableFutureLis.size()])).join();
//6-更新完通道信息后,删除经度或纬度为空的视频通道信息
cameraChannelMapper.deleteByNonPlace();
//7-查询地区下通道的数量并更新到地区cache表中
editChannelCount();
//发布事件
publisher.publishEvent(new SaveCameraChannelEndEvent(this,true));
}
//同步武伟达的t_channel_mtm_label数据
@Transactional(rollbackFor = Exception.class)
public void synchronizeMtmLabel(){
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl(jdbcUrl);
druidDataSource.setDriverClassName(jdbcDriverClassName);
druidDataSource.setUsername(jdbcUserName);
druidDataSource.setPassword(jdbcPassWord);
jdbcTemplate.setDataSource(druidDataSource);
List<Map<String, Object>> maps = jdbcTemplate.queryForList("select * from t_channel_mtm_label");
if (maps.size() > 0){
//清空t_channel_mtm_label
cameraOrgenMapper.truncate("t_channel_mtm_label");
List<List<Map<String, Object>>> partition = Lists.partition(maps, 200);
partition.forEach(list->{
cameraChannelMapper.batchSaveMtmLabel(list);
});
}
List<Map<String, Object>> labelMaps = jdbcTemplate.queryForList("select * from t_label");
if (labelMaps.size() > 0){
//清空t_label
cameraOrgenMapper.truncate("t_label");
List<List<Map<String, Object>>> partition = Lists.partition(maps, 200);
partition.forEach(list->{
cameraChannelMapper.batchSaveLabel(list);
});
}
}
//将t_camera_organization_cache和camera_channel_cache表数据保存到相应的主表中
@Transactional(rollbackFor = Exception.class)
public void insertChannelCacheToCameraChannel(){
cameraOrgenMapper.truncate("t_camera_organization");
cameraOrgenMapper.insertOrganizationCacheToCameraOrganization();
cameraOrgenMapper.truncate("t_camera_channel");
cameraChannelMapper.insertChannelCacheToCameraChannel();
}
//单独保存视频通道信息
public void batchSaveChannelInfos(List<Map> list,String parentId){
List<Map> needSave = new ArrayList<>();
@ -1492,21 +1451,50 @@ public class MonitorService {
}
}
public List<Map> listChildOrgenIds(String id){
Map orgenNow = cameraOrgenMapper.selectOrgenizationById(id);
List<Map> childs = new ArrayList<>();
if(!orgenNow.isEmpty()){
String path = orgenNow.get("path").toString();
childs = cameraOrgenMapper.selectSubOrganizationMapByPath(id);
//将t_camera_organization_cache和camera_channel_cache表数据保存到相应的主表中
@Transactional(rollbackFor = Exception.class)
public void insertChannelCacheToCameraChannel(){
cameraOrgenMapper.truncate("t_camera_organization");
cameraOrgenMapper.insertOrganizationCacheToCameraOrganization();
cameraOrgenMapper.truncate("t_camera_channel");
cameraChannelMapper.insertChannelCacheToCameraChannel();
}
return childs;
//同步武伟达的t_channel_mtm_label数据
@Transactional(rollbackFor = Exception.class)
public void synchronizeMtmLabel(){
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl(jdbcUrl);
druidDataSource.setDriverClassName(jdbcDriverClassName);
druidDataSource.setUsername(jdbcUserName);
druidDataSource.setPassword(jdbcPassWord);
jdbcTemplate.setDataSource(druidDataSource);
List<Map<String, Object>> maps = jdbcTemplate.queryForList("select * from t_channel_mtm_label");
if (maps.size() > 0){
//清空t_channel_mtm_label
cameraOrgenMapper.truncate("t_channel_mtm_label");
List<List<Map<String, Object>>> partition = Lists.partition(maps, 200);
partition.forEach(list->{
cameraChannelMapper.batchSaveMtmLabel(list);
});
}
List<Map<String, Object>> labelMaps = jdbcTemplate.queryForList("select * from t_label");
if (labelMaps.size() > 0){
//清空t_label
cameraOrgenMapper.truncate("t_label");
List<List<Map<String, Object>>> partition = Lists.partition(maps, 200);
partition.forEach(list->{
cameraChannelMapper.batchSaveLabel(list);
});
}
}
public RestTemplate getRestTemplate(){
SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();
factory.setReadTimeout(20000);//单位为ms
factory.setConnectTimeout(3000);//单位为ms
factory.setReadTimeout(30000);//单位为ms
factory.setConnectTimeout(10000);//单位为ms
factory.setOutputStreaming(false);
RestTemplate restTemplate = new RestTemplate(factory);
@ -1514,6 +1502,16 @@ public class MonitorService {
return restTemplate;
}
//测试用的
public List<Map> listChildOrgenIds(String id){
Map orgenNow = cameraOrgenMapper.selectOrgenizationById(id);
List<Map> childs = new ArrayList<>();
if(!orgenNow.isEmpty()){
String path = orgenNow.get("path").toString();
childs = cameraOrgenMapper.selectSubOrganizationMapByPath(id);
}
return childs;
}
}

View File

@ -55,3 +55,10 @@ hisense:
qdyjj:
ipAndPort: 15.72.178.136:9494
#同步摄像头标签表t_channel_mtm_label需要链接的数据库配置
synchro:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://15.72.183.91:3306/monitor_manage?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai&nullCatalogMeansCurrent=true&useSSL=false
username: root
password: w@CmM1mBVQkPhdrc