diff --git a/renren-admin/src/main/java/io/renren/modules/monitor/eventListen/GetAboutCameraChannelEventListener.java b/renren-admin/src/main/java/io/renren/modules/monitor/eventListen/GetAboutCameraChannelEventListener.java new file mode 100644 index 00000000..780f8c72 --- /dev/null +++ b/renren-admin/src/main/java/io/renren/modules/monitor/eventListen/GetAboutCameraChannelEventListener.java @@ -0,0 +1,58 @@ +package io.renren.modules.monitor.eventListen; + +import io.renren.modules.monitor.eventListen.saveCameraChannelEndEvent.SaveCameraChannelEndEvent; +import io.renren.modules.monitor.eventListen.saveOrgenizationEndEvent.SaveOrgenizationEndEvent; +import io.renren.modules.monitor.mapper.CameraOrgenizationMapper; +import io.renren.modules.monitor.service.MonitorService; +import lombok.extern.log4j.Log4j2; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.event.EventListener; +import org.springframework.core.annotation.Order; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +/** + * 关于获取视频通道相关信息的监听器 + * @author ytl + * @Date 2022/7/27 15:17 + **/ +@Component +@Log4j2 +public class GetAboutCameraChannelEventListener { + @Autowired + private MonitorService monitorService; + + @Autowired + private CameraOrgenizationMapper cameraOrgenMapper; + + @Async + @Order(value = 1)//值越小,越优先执行 + @EventListener(SaveOrgenizationEndEvent.class) + public void listenOrgenizationEvent(SaveOrgenizationEndEvent event) throws Exception{ + boolean msg = event.getMsg(); + log.info("开始处理SaveOrgenizationEndEvent 消息事件。。。。"+msg); + //......处理流程代码 + if(msg){ + monitorService.saveChannelInfoAsync(); + } + } + + @Async + @Order(value = 2)//值越小,越优先执行 + @EventListener(SaveCameraChannelEndEvent.class) + public void listenChannelEvent(SaveCameraChannelEndEvent event) throws Exception{ + boolean msg = event.getMsg(); + log.info("开始处理SaveCameraChannelEndEvent 消息事件。。。。"+msg); + + if(msg){ + //保存cache表信息到正式表 + monitorService.insertChannelCacheToCameraChannel(); + + //跟新t_region的channelcount + cameraOrgenMapper.updateRegionChannelCount(); + + //更新武伟达的标签表 + monitorService.synchronizeMtmLabel(); + } + } +} diff --git a/renren-admin/src/main/java/io/renren/modules/monitor/eventListen/saveCameraChannelEndEvent/SaveCameraChannelEndEvent.java b/renren-admin/src/main/java/io/renren/modules/monitor/eventListen/saveCameraChannelEndEvent/SaveCameraChannelEndEvent.java new file mode 100644 index 00000000..a38123c4 --- /dev/null +++ b/renren-admin/src/main/java/io/renren/modules/monitor/eventListen/saveCameraChannelEndEvent/SaveCameraChannelEndEvent.java @@ -0,0 +1,24 @@ +package io.renren.modules.monitor.eventListen.saveCameraChannelEndEvent; + +import org.springframework.context.ApplicationEvent; + +/** + * @author ytl + * @Date 2022/7/28 10:24 + **/ +public class SaveCameraChannelEndEvent extends ApplicationEvent { + private boolean msg; + + public SaveCameraChannelEndEvent(Object source,boolean msg) { + super(source); + this.msg = msg; + } + + public boolean getMsg(){ + return msg; + } + + public void setMsg(boolean msg){ + this.msg = msg; + } +} diff --git a/renren-admin/src/main/java/io/renren/modules/monitor/eventListen/saveOrgenizationEndEvent/SaveOrgenizationEndEvent.java b/renren-admin/src/main/java/io/renren/modules/monitor/eventListen/saveOrgenizationEndEvent/SaveOrgenizationEndEvent.java new file mode 100644 index 00000000..ed169aeb --- /dev/null +++ b/renren-admin/src/main/java/io/renren/modules/monitor/eventListen/saveOrgenizationEndEvent/SaveOrgenizationEndEvent.java @@ -0,0 +1,24 @@ +package io.renren.modules.monitor.eventListen.saveOrgenizationEndEvent; + +import lombok.extern.log4j.Log4j2; +import org.springframework.context.ApplicationEvent; + +/** + * @author ytl + * @Date 2022/7/27 15:13 + **/ +public class SaveOrgenizationEndEvent extends ApplicationEvent { + private boolean msg; + public SaveOrgenizationEndEvent(Object source,boolean msg) { + super(source); + this.msg = msg; + } + + public boolean getMsg(){ + return msg; + } + + public void setMsg(boolean msg){ + this.msg = msg; + } +} diff --git a/renren-admin/src/main/java/io/renren/modules/monitor/mapper/CameraChannelMapper.java b/renren-admin/src/main/java/io/renren/modules/monitor/mapper/CameraChannelMapper.java index 85246e42..34d087a0 100644 --- a/renren-admin/src/main/java/io/renren/modules/monitor/mapper/CameraChannelMapper.java +++ b/renren-admin/src/main/java/io/renren/modules/monitor/mapper/CameraChannelMapper.java @@ -11,6 +11,7 @@ import org.apache.ibatis.annotations.Delete; import org.apache.ibatis.annotations.Mapper; import org.apache.ibatis.annotations.Param; import org.apache.ibatis.annotations.Select; +import org.springframework.security.core.parameters.P; import java.util.List; import java.util.Map; @@ -73,8 +74,16 @@ public interface CameraChannelMapper extends BaseDao { void batchSaveMtmLabel(@Param("list") List> list); - @Delete("delete from t_camera_channel where gps_x is null or gps_y is null or gps_x = '' or gps_y = '' ") + @Delete("delete from t_camera_channel_cache where gps_x is null or gps_y is null or gps_x = '' or gps_y = '' ") void deleteByNonPlace(); Map selectCameraChannelById(@Param("channelId") String channelId); + + List selectCameraChannelByPid(@Param("parentId") String parentId); + + void batchSaveCameraChannel(@Param("list") List list); + + void insertChannelCacheToCameraChannel(); + + void batchSaveLabel(@Param("list") List> list); } diff --git a/renren-admin/src/main/java/io/renren/modules/monitor/mapper/CameraOrgenizationMapper.java b/renren-admin/src/main/java/io/renren/modules/monitor/mapper/CameraOrgenizationMapper.java index 4562507e..b22ae395 100644 --- a/renren-admin/src/main/java/io/renren/modules/monitor/mapper/CameraOrgenizationMapper.java +++ b/renren-admin/src/main/java/io/renren/modules/monitor/mapper/CameraOrgenizationMapper.java @@ -25,12 +25,14 @@ public interface CameraOrgenizationMapper extends BaseDao { void batchSaveOrgenization(List list); - @Select("SELECT name,parent_id,id,path FROM t_camera_organization WHERE id = #{id}") + @Select("SELECT name,parent_id,id,path FROM t_camera_organization_cache WHERE id = #{id}") Map selectOrgenizationById(@Param("id") String id); @Update("UPDATE t_camera_organization SET path = TRIM( TRAILING '->' FROM #{path}) where id = #{id}") void updateOrganizationPaht(@Param("path") String path,@Param("id") String id); + void updateOrganizationPath(@Param("path") String path,@Param("id") String id); + @Select(" SELECT id FROM t_camera_organization") List listOrgenization(); @@ -45,13 +47,15 @@ public interface CameraOrgenizationMapper extends BaseDao { @Select("select * from t_camera_organization") List testAll(); + List selectAll(); + @Update("truncate table ${tableName}") void truncate(@Param("tableName") String tableName) ; - @Select("select orgaid,id,name,path from t_camera_organization order by orgaid") + @Select("select orgaid,id,name,path from t_camera_organization_cache order by orgaid") List selectAllSubOrganizationMap(); - @Update("UPDATE t_camera_organization SET channelCount = #{channelCount} where orgaid = #{orgaId}") + @Update("UPDATE t_camera_organization_cache SET channelCount = #{channelCount} where orgaid = #{orgaId}") void editChannelCount(@Param("channelCount") Integer channelCount,@Param("orgaId") Integer orgaId); void updateRegionChannelCount(); @@ -62,4 +66,6 @@ public interface CameraOrgenizationMapper extends BaseDao { // "\t) b ON a.id = b.parent_id \n" + // ") ORDER BY orgaid ASC ") // List listOrgenization2(); + + void insertOrganizationCacheToCameraOrganization(); } diff --git a/renren-admin/src/main/java/io/renren/modules/monitor/service/MonitorService.java b/renren-admin/src/main/java/io/renren/modules/monitor/service/MonitorService.java index 6efd0b85..cb590328 100644 --- a/renren-admin/src/main/java/io/renren/modules/monitor/service/MonitorService.java +++ b/renren-admin/src/main/java/io/renren/modules/monitor/service/MonitorService.java @@ -8,6 +8,8 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.google.common.collect.Lists; import io.renren.modules.monitor.dto.ChannelLabelDto; import io.renren.modules.monitor.entity.*; +import io.renren.modules.monitor.eventListen.saveCameraChannelEndEvent.SaveCameraChannelEndEvent; +import io.renren.modules.monitor.eventListen.saveOrgenizationEndEvent.SaveOrgenizationEndEvent; import io.renren.modules.monitor.mapper.*; import lombok.extern.log4j.Log4j2; import org.bytedeco.javacv.FFmpegFrameGrabber; @@ -15,12 +17,14 @@ import org.bytedeco.javacv.Frame; import org.bytedeco.javacv.Java2DFrameConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Lazy; import org.springframework.http.*; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.Base64Utils; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; @@ -44,6 +48,7 @@ import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; @Service @Log4j2 @@ -51,6 +56,9 @@ public class MonitorService { private static Integer cpuNUm = Runtime.getRuntime().availableProcessors(); private static final ExecutorService executor = Executors.newFixedThreadPool(cpuNUm); + @Autowired + private ApplicationEventPublisher publisher; + @Autowired private RestTemplate restTemplate; @@ -1129,27 +1137,76 @@ public class MonitorService { return Result.success(maps); } - public List getOrgenization(List list){ - try { - List list1 = this.getOrgenizationRoot(); - list.addAll(list1); - list1.forEach(a->{ - if(a.getBooleanValue("isParent")){ - getOrgenizationByParent(list,a.getString("id")); - } - }); - if(list != null && list.size() > 0){ - cameraOrgenMapper.truncate("t_camera_organization"); + //以此作为获取视频资源 + public void getAndSaveOrgenization(){ + List orgenizationByPage = this.getOrgenization(new ArrayList(10000)); + if(orgenizationByPage != null && orgenizationByPage.size() > 0){ + cameraOrgenMapper.truncate("t_camera_organization_cache"); + List maps = JSONObject.parseArray(JSONObject.toJSONString(orgenizationByPage), Map.class); + if(maps.size() > 0){ + List> lists = Lists.partition(maps,100); + lists.forEach(list->{ + cameraOrgenMapper.batchSaveOrgenization(list); + }); + + //修改path 信息 + List maps2 = cameraOrgenMapper.selectAll(); + maps2.forEach(map->{ + this.setOrganizationPath(map,map.get("parent_id").toString()); + }); + + maps2.forEach(m->{ + cameraOrgenMapper.updateOrganizationPath(m.get("path").toString(),m.get("id").toString()); + }); + + //发布事件 + publisher.publishEvent(new SaveOrgenizationEndEvent(this,true)); } - return list; - }catch (Exception e){ - log.info(e.getMessage()); - return null; } } + public List getOrgenization(List list) { + int count = 0; + boolean flag = true; + while(flag){ + if(count >= 9){ + flag = false; + } + count ++; + try { + List list1 = this.getOrgenizationRoot(); + list.addAll(list1); + list1.forEach(a->{ + if(a.getBooleanValue("isParent")){ + getOrgenizationByParent(list,a.getString("id")); + } + }); +// if(list != null && list.size() > 0){ +// cameraOrgenMapper.truncate("t_camera_organization"); +// } + flag = false; + return list; + }catch (Exception e){ + System.out.println("第"+String.valueOf(count)+"次获取,错误是:"); + System.out.println(e.getMessage()); + if(count >= 10){ + log.error("获取视频通道组织信息失败,失败原因:{}",e.getMessage()); + return new ArrayList(); + } + list.clear(); + try { + Thread.sleep(1000); + } catch (InterruptedException interruptedException) { + interruptedException.printStackTrace(); + } + continue; + } + } + return list; + } + //获取根组织 - public List getOrgenizationRoot() throws Exception{ + public List getOrgenizationRoot(){ List list = new ArrayList<>(); String url = monitorDomain + "/videoService/devicesManager/deviceTree?id=&nodeType=1&typeCode=01&page=1&pageSize=3000"; HttpHeaders headers = new HttpHeaders(); @@ -1253,7 +1310,12 @@ public class MonitorService { HttpEntity httpEntity = new HttpEntity<>(null, headers); responseEntity = restTemplate.exchange(url, HttpMethod.GET,httpEntity,JSONObject.class); JSONObject re = responseEntity.getBody(); - List results = re.getJSONArray("results").toJavaList(Map.class); + List results = new ArrayList<>(); + if(re.getJSONArray("results") != null){ + results = re.getJSONArray("results").toJavaList(Map.class); + }else{ + log.info("根据地区id:{}获取摄像头信息失败,失败原因:{}",orgenId,re.toJSONString()); + } return results; } @@ -1269,16 +1331,18 @@ public class MonitorService { } //2、获取视频通道信息并保存,多线程版 - @Async + //@Async public void saveChannelInfoAsync() throws Exception { + AtomicInteger faulseCount = new AtomicInteger();//失败的次数 //1-清空t_camera_channel - cameraOrgenMapper.truncate("t_camera_channel"); + cameraOrgenMapper.truncate("t_camera_channel_cache"); //2-创建线程池 - ExecutorService executorService = Executors.newFixedThreadPool(20);//20个线程足够 + ExecutorService executorService = Executors.newFixedThreadPool(16); //3-查询全部地区 - List maps = cameraOrgenMapper.testAll(); + //List maps = cameraOrgenMapper.testAll(); + List maps = cameraOrgenMapper.selectAll(); List> lists = Lists.partition(maps, 1000); List completableFutureLis = new ArrayList<>(); @@ -1300,12 +1364,24 @@ public class MonitorService { }catch (Exception e){ log.info("根据地区id:{}查询视频通道失败,这是第{}次重试",m.get("id").toString(),tryCount); + if(tryCount >= 10){ + faulseCount.incrementAndGet(); + log.error("根据地区id:{},查询视频通道失败,超出重试次数",m.get("id").toString()); + //去t_camera_channel查询相关信息并保存 + cameraChannels = cameraChannelMapper.selectCameraChannelByPid(m.get("id").toString()); + if(cameraChannels.size() > 0){ + List> channelList = Lists.partition(cameraChannels,100); + for(List ll:channelList){ + cameraChannelMapper.batchSaveCameraChannel(ll); + } + } + } continue; } //5-保存视频通道信息 batchSaveChannelInfos(cameraChannels,m.get("id").toString()); //6-更新地区表的count - cameraOrgenMapper.updateOrganizationCount(m.get("id").toString()); + //cameraOrgenMapper.updateOrganizationCount(m.get("id").toString()); flag = false; } }); @@ -1318,14 +1394,21 @@ public class MonitorService { //6-更新完通道信息后,删除经度或纬度为空的视频通道信息 cameraChannelMapper.deleteByNonPlace(); - //7-查询地区下通道的数量并更新到地区表和市区表中 + //7-查询地区下通道的数量并更新到地区cache表中 editChannelCount(); + //发布事件 + publisher.publishEvent(new SaveCameraChannelEndEvent(this,true)); + //以下方法将在事件后处理 + //将cache表数据保存到相应的主表中 + //insertChannelCacheToCameraChannel); //8-同步武伟达的t_channel_mtm_label数据 - synchronizeMtmLabel(); + //synchronizeMtmLabel(); + } //同步武伟达的t_channel_mtm_label数据 + @Transactional(rollbackFor = Exception.class) public void synchronizeMtmLabel(){ DruidDataSource druidDataSource = new DruidDataSource(); druidDataSource.setUrl(jdbcUrl); @@ -1335,7 +1418,7 @@ public class MonitorService { jdbcTemplate.setDataSource(druidDataSource); List> maps = jdbcTemplate.queryForList("select * from t_channel_mtm_label"); - if(maps.size() > 0){ + if (maps.size() > 0){ //清空t_channel_mtm_label cameraOrgenMapper.truncate("t_channel_mtm_label"); List>> partition = Lists.partition(maps, 200); @@ -1344,8 +1427,26 @@ public class MonitorService { }); } + List> labelMaps = jdbcTemplate.queryForList("select * from t_label"); + if (labelMaps.size() > 0){ + //清空t_label + cameraOrgenMapper.truncate("t_label"); + List>> partition = Lists.partition(maps, 200); + partition.forEach(list->{ + cameraChannelMapper.batchSaveLabel(list); + }); + } + } + //将t_camera_organization_cache和camera_channel_cache表数据保存到相应的主表中 + @Transactional(rollbackFor = Exception.class) + public void insertChannelCacheToCameraChannel(){ + cameraOrgenMapper.truncate("t_camera_organization"); + cameraOrgenMapper.insertOrganizationCacheToCameraOrganization(); + cameraOrgenMapper.truncate("t_camera_channel"); + cameraChannelMapper.insertChannelCacheToCameraChannel(); + } //单独保存视频通道信息 public void batchSaveChannelInfos(List list,String parentId){ @@ -1366,23 +1467,24 @@ public class MonitorService { needSave.add(j); } } - }else{//更新count字段 - cameraOrgenMapper.updateOrganizationCount(parentId); } +// else{//更新count字段 +// cameraOrgenMapper.updateOrganizationCount(parentId); +// } //修改nodeName needSave.forEach(map->setNodeName(map,map.get("parentId").toString())); - //保存并更新count字段 + //保存 if(needSave.size() > 0){ List> partition = Lists.partition(needSave, 100); partition.forEach(l->{ cameraOrgenMapper.batchSaveCameraChannel(l); }); - cameraOrgenMapper.updateOrganizationCount(parentId); + //cameraOrgenMapper.updateOrganizationCount(parentId); } } - //更新完通道信息后,查询地区下通道的数量并更新到地区表中和市区表中 + //更新完通道信息后,查询地区下通道的数量并更新到地区表中 public void editChannelCount() throws Exception{ //1-更新地区表中的每个地区下channelCount List maps = cameraOrgenMapper.selectAllSubOrganizationMap(); @@ -1395,7 +1497,7 @@ public class MonitorService { }; } //2-更新市区表t_region表中的channelCount - cameraOrgenMapper.updateRegionChannelCount(); + //cameraOrgenMapper.updateRegionChannelCount(); } public List listChildOrgenIds(String id){ @@ -1408,6 +1510,18 @@ public class MonitorService { return childs; } + + public void test(){ + //保存cache表信息到正式表 + insertChannelCacheToCameraChannel(); + + cameraOrgenMapper.updateRegionChannelCount(); + + //更新武伟达的标签表 + //synchronizeMtmLabel(); + + + } } diff --git a/renren-admin/src/main/resources/mapper/monitor/CameraChannelMapper.xml b/renren-admin/src/main/resources/mapper/monitor/CameraChannelMapper.xml index 53d264b3..6b88ae10 100644 --- a/renren-admin/src/main/resources/mapper/monitor/CameraChannelMapper.xml +++ b/renren-admin/src/main/resources/mapper/monitor/CameraChannelMapper.xml @@ -117,7 +117,7 @@ @@ -242,4 +242,48 @@ SELECT channel_id AS channelId,channel_name AS channelName FROM t_camera_channel WHERE channel_id = #{channelId} LIMIT 1 + + + + + INSERT INTO t_camera_channel_cache + ( + channel_code,channel_id,channel_name,gps_x,gps_y,status, + parent_id,region_code,region_name,node_name + ) + VALUES + + ( + #{item.channelCode},#{item.channelId},#{item.channelName},#{item.gpsX},#{item.gpsY},#{item.status}, + #{item.parentId},#{item.regionCode},#{item.regionName},#{item.nodeName} + ) + + + + + INSERT INTO t_camera_channel + ( + channel_code,channel_id,channel_name,gps_x,gps_y,status, + parent_id,region_code,region_name,node_name,check_status + ) + SELECT channel_code,channel_id,channel_name,gps_x,gps_y,status, + parent_id,region_code,region_name,node_name,check_status + FROM t_camera_channel_cache + + + + INSERT INTO t_label ( + label_code,label_name,capture_patternn,city + ) + VALUES + + (#{item.label_code},#{item.label_name},#{item.capture_patternn},#{item.city}) + + \ No newline at end of file diff --git a/renren-admin/src/main/resources/mapper/monitor/CameraOrganizationMapper.xml b/renren-admin/src/main/resources/mapper/monitor/CameraOrganizationMapper.xml index 8e52a2fc..c2635911 100644 --- a/renren-admin/src/main/resources/mapper/monitor/CameraOrganizationMapper.xml +++ b/renren-admin/src/main/resources/mapper/monitor/CameraOrganizationMapper.xml @@ -3,7 +3,7 @@ - INSERT INTO t_camera_organization ( + INSERT INTO t_camera_organization_cache ( id,name,parent_id,sort,subCount,path,is_parent,is_root ) VALUES @@ -15,7 +15,7 @@ - INSERT INTO t_camera_channel + INSERT INTO t_camera_channel_cache ( channel_code,channel_id,channel_name,gps_x,gps_y,status, parent_id,region_code,region_name,node_name @@ -50,4 +50,19 @@ + + + + UPDATE t_camera_organization_cache SET path = TRIM( TRAILING '->' FROM #{path}) where id = #{id} + + + + INSERT INTO t_camera_organization + ( + id,name,parent_id,sort,subCount,is_parent,path,is_root,channelCount,count + ) + SELECT id,name,parent_id,sort,subCount,is_parent,path,is_root,channelCount,count FROM t_camera_organization_cache + \ No newline at end of file