1、根据新的流程重新梳理获取视频资源(地区和通道信息)

This commit is contained in:
yitonglei 2022-07-29 09:31:47 +08:00
parent 7586127226
commit 0ccdf069f8
8 changed files with 332 additions and 38 deletions

View File

@ -0,0 +1,58 @@
package io.renren.modules.monitor.eventListen;
import io.renren.modules.monitor.eventListen.saveCameraChannelEndEvent.SaveCameraChannelEndEvent;
import io.renren.modules.monitor.eventListen.saveOrgenizationEndEvent.SaveOrgenizationEndEvent;
import io.renren.modules.monitor.mapper.CameraOrgenizationMapper;
import io.renren.modules.monitor.service.MonitorService;
import lombok.extern.log4j.Log4j2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* 关于获取视频通道相关信息的监听器
* @author ytl
* @Date 2022/7/27 15:17
**/
@Component
@Log4j2
public class GetAboutCameraChannelEventListener {
@Autowired
private MonitorService monitorService;
@Autowired
private CameraOrgenizationMapper cameraOrgenMapper;
@Async
@Order(value = 1)//值越小越优先执行
@EventListener(SaveOrgenizationEndEvent.class)
public void listenOrgenizationEvent(SaveOrgenizationEndEvent event) throws Exception{
boolean msg = event.getMsg();
log.info("开始处理SaveOrgenizationEndEvent 消息事件。。。。"+msg);
//......处理流程代码
if(msg){
monitorService.saveChannelInfoAsync();
}
}
@Async
@Order(value = 2)//值越小越优先执行
@EventListener(SaveCameraChannelEndEvent.class)
public void listenChannelEvent(SaveCameraChannelEndEvent event) throws Exception{
boolean msg = event.getMsg();
log.info("开始处理SaveCameraChannelEndEvent 消息事件。。。。"+msg);
if(msg){
//保存cache表信息到正式表
monitorService.insertChannelCacheToCameraChannel();
//跟新t_region的channelcount
cameraOrgenMapper.updateRegionChannelCount();
//更新武伟达的标签表
monitorService.synchronizeMtmLabel();
}
}
}

View File

@ -0,0 +1,24 @@
package io.renren.modules.monitor.eventListen.saveCameraChannelEndEvent;
import org.springframework.context.ApplicationEvent;
/**
* @author ytl
* @Date 2022/7/28 10:24
**/
public class SaveCameraChannelEndEvent extends ApplicationEvent {
private boolean msg;
public SaveCameraChannelEndEvent(Object source,boolean msg) {
super(source);
this.msg = msg;
}
public boolean getMsg(){
return msg;
}
public void setMsg(boolean msg){
this.msg = msg;
}
}

View File

@ -0,0 +1,24 @@
package io.renren.modules.monitor.eventListen.saveOrgenizationEndEvent;
import lombok.extern.log4j.Log4j2;
import org.springframework.context.ApplicationEvent;
/**
* @author ytl
* @Date 2022/7/27 15:13
**/
public class SaveOrgenizationEndEvent extends ApplicationEvent {
private boolean msg;
public SaveOrgenizationEndEvent(Object source,boolean msg) {
super(source);
this.msg = msg;
}
public boolean getMsg(){
return msg;
}
public void setMsg(boolean msg){
this.msg = msg;
}
}

View File

@ -11,6 +11,7 @@ import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.springframework.security.core.parameters.P;
import java.util.List;
import java.util.Map;
@ -73,8 +74,16 @@ public interface CameraChannelMapper extends BaseDao<CameraChannel> {
void batchSaveMtmLabel(@Param("list") List<Map<String,Object>> list);
@Delete("delete from t_camera_channel where gps_x is null or gps_y is null or gps_x = '' or gps_y = '' ")
@Delete("delete from t_camera_channel_cache where gps_x is null or gps_y is null or gps_x = '' or gps_y = '' ")
void deleteByNonPlace();
Map selectCameraChannelById(@Param("channelId") String channelId);
List<Map> selectCameraChannelByPid(@Param("parentId") String parentId);
void batchSaveCameraChannel(@Param("list") List<Map> list);
void insertChannelCacheToCameraChannel();
void batchSaveLabel(@Param("list") List<Map<String,Object>> list);
}

View File

@ -25,12 +25,14 @@ public interface CameraOrgenizationMapper extends BaseDao<CameraOrganization> {
void batchSaveOrgenization(List<Map> list);
@Select("SELECT name,parent_id,id,path FROM t_camera_organization WHERE id = #{id}")
@Select("SELECT name,parent_id,id,path FROM t_camera_organization_cache WHERE id = #{id}")
Map selectOrgenizationById(@Param("id") String id);
@Update("UPDATE t_camera_organization SET path = TRIM( TRAILING '->' FROM #{path}) where id = #{id}")
void updateOrganizationPaht(@Param("path") String path,@Param("id") String id);
void updateOrganizationPath(@Param("path") String path,@Param("id") String id);
@Select(" SELECT id FROM t_camera_organization")
List<Map> listOrgenization();
@ -45,13 +47,15 @@ public interface CameraOrgenizationMapper extends BaseDao<CameraOrganization> {
@Select("select * from t_camera_organization")
List<Map> testAll();
List<Map> selectAll();
@Update("truncate table ${tableName}")
void truncate(@Param("tableName") String tableName) ;
@Select("select orgaid,id,name,path from t_camera_organization order by orgaid")
@Select("select orgaid,id,name,path from t_camera_organization_cache order by orgaid")
List<Map> selectAllSubOrganizationMap();
@Update("UPDATE t_camera_organization SET channelCount = #{channelCount} where orgaid = #{orgaId}")
@Update("UPDATE t_camera_organization_cache SET channelCount = #{channelCount} where orgaid = #{orgaId}")
void editChannelCount(@Param("channelCount") Integer channelCount,@Param("orgaId") Integer orgaId);
void updateRegionChannelCount();
@ -62,4 +66,6 @@ public interface CameraOrgenizationMapper extends BaseDao<CameraOrganization> {
// "\t) b ON a.id = b.parent_id \n" +
// ") ORDER BY orgaid ASC ")
// List<Map> listOrgenization2();
void insertOrganizationCacheToCameraOrganization();
}

View File

@ -8,6 +8,8 @@ import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.common.collect.Lists;
import io.renren.modules.monitor.dto.ChannelLabelDto;
import io.renren.modules.monitor.entity.*;
import io.renren.modules.monitor.eventListen.saveCameraChannelEndEvent.SaveCameraChannelEndEvent;
import io.renren.modules.monitor.eventListen.saveOrgenizationEndEvent.SaveOrgenizationEndEvent;
import io.renren.modules.monitor.mapper.*;
import lombok.extern.log4j.Log4j2;
import org.bytedeco.javacv.FFmpegFrameGrabber;
@ -15,12 +17,14 @@ import org.bytedeco.javacv.Frame;
import org.bytedeco.javacv.Java2DFrameConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Lazy;
import org.springframework.http.*;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.Base64Utils;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
@ -44,6 +48,7 @@ import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@Service
@Log4j2
@ -51,6 +56,9 @@ public class MonitorService {
private static Integer cpuNUm = Runtime.getRuntime().availableProcessors();
private static final ExecutorService executor = Executors.newFixedThreadPool(cpuNUm);
@Autowired
private ApplicationEventPublisher publisher;
@Autowired
private RestTemplate restTemplate;
@ -1129,27 +1137,76 @@ public class MonitorService {
return Result.success(maps);
}
public List<JSONObject> getOrgenization(List<JSONObject> list){
try {
List<JSONObject> list1 = this.getOrgenizationRoot();
list.addAll(list1);
list1.forEach(a->{
if(a.getBooleanValue("isParent")){
getOrgenizationByParent(list,a.getString("id"));
}
});
if(list != null && list.size() > 0){
cameraOrgenMapper.truncate("t_camera_organization");
//以此作为获取视频资源
public void getAndSaveOrgenization(){
List<JSONObject> orgenizationByPage = this.getOrgenization(new ArrayList<JSONObject>(10000));
if(orgenizationByPage != null && orgenizationByPage.size() > 0){
cameraOrgenMapper.truncate("t_camera_organization_cache");
List<Map> maps = JSONObject.parseArray(JSONObject.toJSONString(orgenizationByPage), Map.class);
if(maps.size() > 0){
List<List<Map>> lists = Lists.partition(maps,100);
lists.forEach(list->{
cameraOrgenMapper.batchSaveOrgenization(list);
});
//修改path 信息
List<Map> maps2 = cameraOrgenMapper.selectAll();
maps2.forEach(map->{
this.setOrganizationPath(map,map.get("parent_id").toString());
});
maps2.forEach(m->{
cameraOrgenMapper.updateOrganizationPath(m.get("path").toString(),m.get("id").toString());
});
//发布事件
publisher.publishEvent(new SaveOrgenizationEndEvent(this,true));
}
return list;
}catch (Exception e){
log.info(e.getMessage());
return null;
}
}
public List<JSONObject> getOrgenization(List<JSONObject> list) {
int count = 0;
boolean flag = true;
while(flag){
if(count >= 9){
flag = false;
}
count ++;
try {
List<JSONObject> list1 = this.getOrgenizationRoot();
list.addAll(list1);
list1.forEach(a->{
if(a.getBooleanValue("isParent")){
getOrgenizationByParent(list,a.getString("id"));
}
});
// if(list != null && list.size() > 0){
// cameraOrgenMapper.truncate("t_camera_organization");
// }
flag = false;
return list;
}catch (Exception e){
System.out.println(""+String.valueOf(count)+"次获取,错误是:");
System.out.println(e.getMessage());
if(count >= 10){
log.error("获取视频通道组织信息失败,失败原因:{}",e.getMessage());
return new ArrayList<JSONObject>();
}
list.clear();
try {
Thread.sleep(1000);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
continue;
}
}
return list;
}
//获取根组织
public List<JSONObject> getOrgenizationRoot() throws Exception{
public List<JSONObject> getOrgenizationRoot(){
List<JSONObject> list = new ArrayList<>();
String url = monitorDomain + "/videoService/devicesManager/deviceTree?id=&nodeType=1&typeCode=01&page=1&pageSize=3000";
HttpHeaders headers = new HttpHeaders();
@ -1253,7 +1310,12 @@ public class MonitorService {
HttpEntity<Map> httpEntity = new HttpEntity<>(null, headers);
responseEntity = restTemplate.exchange(url, HttpMethod.GET,httpEntity,JSONObject.class);
JSONObject re = responseEntity.getBody();
List<Map> results = re.getJSONArray("results").toJavaList(Map.class);
List<Map> results = new ArrayList<>();
if(re.getJSONArray("results") != null){
results = re.getJSONArray("results").toJavaList(Map.class);
}else{
log.info("根据地区id:{}获取摄像头信息失败,失败原因:{}",orgenId,re.toJSONString());
}
return results;
}
@ -1269,16 +1331,18 @@ public class MonitorService {
}
//2获取视频通道信息并保存,多线程版
@Async
//@Async
public void saveChannelInfoAsync() throws Exception {
AtomicInteger faulseCount = new AtomicInteger();//失败的次数
//1-清空t_camera_channel
cameraOrgenMapper.truncate("t_camera_channel");
cameraOrgenMapper.truncate("t_camera_channel_cache");
//2-创建线程池
ExecutorService executorService = Executors.newFixedThreadPool(20);//20个线程足够
ExecutorService executorService = Executors.newFixedThreadPool(16);
//3-查询全部地区
List<Map> maps = cameraOrgenMapper.testAll();
//List<Map> maps = cameraOrgenMapper.testAll();
List<Map> maps = cameraOrgenMapper.selectAll();
List<List<Map>> lists = Lists.partition(maps, 1000);
List<CompletableFuture> completableFutureLis = new ArrayList<>();
@ -1300,12 +1364,24 @@ public class MonitorService {
}catch (Exception e){
log.info("根据地区id:{}查询视频通道失败,这是第{}次重试",m.get("id").toString(),tryCount);
if(tryCount >= 10){
faulseCount.incrementAndGet();
log.error("根据地区id:{},查询视频通道失败,超出重试次数",m.get("id").toString());
//去t_camera_channel查询相关信息并保存
cameraChannels = cameraChannelMapper.selectCameraChannelByPid(m.get("id").toString());
if(cameraChannels.size() > 0){
List<List<Map>> channelList = Lists.partition(cameraChannels,100);
for(List<Map> ll:channelList){
cameraChannelMapper.batchSaveCameraChannel(ll);
}
}
}
continue;
}
//5-保存视频通道信息
batchSaveChannelInfos(cameraChannels,m.get("id").toString());
//6-更新地区表的count
cameraOrgenMapper.updateOrganizationCount(m.get("id").toString());
//cameraOrgenMapper.updateOrganizationCount(m.get("id").toString());
flag = false;
}
});
@ -1318,14 +1394,21 @@ public class MonitorService {
//6-更新完通道信息后,删除经度或纬度为空的视频通道信息
cameraChannelMapper.deleteByNonPlace();
//7-查询地区下通道的数量并更新到地区表和市区表中
//7-查询地区下通道的数量并更新到地区cache表中
editChannelCount();
//发布事件
publisher.publishEvent(new SaveCameraChannelEndEvent(this,true));
//以下方法将在事件后处理
//将cache表数据保存到相应的主表中
//insertChannelCacheToCameraChannel);
//8-同步武伟达的t_channel_mtm_label数据
synchronizeMtmLabel();
//synchronizeMtmLabel();
}
//同步武伟达的t_channel_mtm_label数据
@Transactional(rollbackFor = Exception.class)
public void synchronizeMtmLabel(){
DruidDataSource druidDataSource = new DruidDataSource();
druidDataSource.setUrl(jdbcUrl);
@ -1335,7 +1418,7 @@ public class MonitorService {
jdbcTemplate.setDataSource(druidDataSource);
List<Map<String, Object>> maps = jdbcTemplate.queryForList("select * from t_channel_mtm_label");
if(maps.size() > 0){
if (maps.size() > 0){
//清空t_channel_mtm_label
cameraOrgenMapper.truncate("t_channel_mtm_label");
List<List<Map<String, Object>>> partition = Lists.partition(maps, 200);
@ -1344,8 +1427,26 @@ public class MonitorService {
});
}
List<Map<String, Object>> labelMaps = jdbcTemplate.queryForList("select * from t_label");
if (labelMaps.size() > 0){
//清空t_label
cameraOrgenMapper.truncate("t_label");
List<List<Map<String, Object>>> partition = Lists.partition(maps, 200);
partition.forEach(list->{
cameraChannelMapper.batchSaveLabel(list);
});
}
}
//将t_camera_organization_cache和camera_channel_cache表数据保存到相应的主表中
@Transactional(rollbackFor = Exception.class)
public void insertChannelCacheToCameraChannel(){
cameraOrgenMapper.truncate("t_camera_organization");
cameraOrgenMapper.insertOrganizationCacheToCameraOrganization();
cameraOrgenMapper.truncate("t_camera_channel");
cameraChannelMapper.insertChannelCacheToCameraChannel();
}
//单独保存视频通道信息
public void batchSaveChannelInfos(List<Map> list,String parentId){
@ -1366,23 +1467,24 @@ public class MonitorService {
needSave.add(j);
}
}
}else{//更新count字段
cameraOrgenMapper.updateOrganizationCount(parentId);
}
// else{//更新count字段
// cameraOrgenMapper.updateOrganizationCount(parentId);
// }
//修改nodeName
needSave.forEach(map->setNodeName(map,map.get("parentId").toString()));
//保存并更新count字段
//保存
if(needSave.size() > 0){
List<List<Map>> partition = Lists.partition(needSave, 100);
partition.forEach(l->{
cameraOrgenMapper.batchSaveCameraChannel(l);
});
cameraOrgenMapper.updateOrganizationCount(parentId);
//cameraOrgenMapper.updateOrganizationCount(parentId);
}
}
//更新完通道信息后查询地区下通道的数量并更新到地区表中和市区表中
//更新完通道信息后查询地区下通道的数量并更新到地区表中
public void editChannelCount() throws Exception{
//1-更新地区表中的每个地区下channelCount
List<Map> maps = cameraOrgenMapper.selectAllSubOrganizationMap();
@ -1395,7 +1497,7 @@ public class MonitorService {
};
}
//2-更新市区表t_region表中的channelCount
cameraOrgenMapper.updateRegionChannelCount();
//cameraOrgenMapper.updateRegionChannelCount();
}
public List<Map> listChildOrgenIds(String id){
@ -1408,6 +1510,18 @@ public class MonitorService {
return childs;
}
public void test(){
//保存cache表信息到正式表
insertChannelCacheToCameraChannel();
cameraOrgenMapper.updateRegionChannelCount();
//更新武伟达的标签表
//synchronizeMtmLabel();
}
}

View File

@ -117,7 +117,7 @@
</select>
<select id="selectChannelCounts" parameterType="java.lang.String" resultType="integer">
SELECT COUNT(a.idt_camera_channel) FROM t_camera_channel a inner join t_camera_organization b on a.parent_id = b.id
SELECT COUNT(a.idt_camera_channel) FROM t_camera_channel_cache a inner join t_camera_organization_cache b on a.parent_id = b.id
WHERE b.path like concat(#{path},'%')
</select>
@ -242,4 +242,48 @@
SELECT channel_id AS channelId,channel_name AS channelName FROM t_camera_channel
WHERE channel_id = #{channelId} LIMIT 1
</select>
<select id="selectCameraChannelByPid" parameterType="java.lang.String" resultType="java.util.Map">
SELECT channel_code AS channelCode,channel_id AS channelId,channel_name AS channelName,gps_x AS gpsX,
gps_y AS gpsY,status,parent_id AS parentId,region_code AS regionCode,region_name AS regionName,
node_name AS nodeName
FROM t_camera_channel
WHERE parent_id = #{parentId}
</select>
<insert id="batchSaveCameraChannel" parameterType="java.util.List">
INSERT INTO t_camera_channel_cache
(
channel_code,channel_id,channel_name,gps_x,gps_y,status,
parent_id,region_code,region_name,node_name
)
VALUES
<foreach collection="list" item="item" separator="," >
(
#{item.channelCode},#{item.channelId},#{item.channelName},#{item.gpsX},#{item.gpsY},#{item.status},
#{item.parentId},#{item.regionCode},#{item.regionName},#{item.nodeName}
)
</foreach>
</insert>
<insert id="insertChannelCacheToCameraChannel">
INSERT INTO t_camera_channel
(
channel_code,channel_id,channel_name,gps_x,gps_y,status,
parent_id,region_code,region_name,node_name,check_status
)
SELECT channel_code,channel_id,channel_name,gps_x,gps_y,status,
parent_id,region_code,region_name,node_name,check_status
FROM t_camera_channel_cache
</insert>
<insert id="batchSaveLabel" parameterType="java.util.List">
INSERT INTO t_label (
label_code,label_name,capture_patternn,city
)
VALUES
<foreach collection="list" item="item" separator=",">
(#{item.label_code},#{item.label_name},#{item.capture_patternn},#{item.city})
</foreach>
</insert>
</mapper>

View File

@ -3,7 +3,7 @@
<mapper namespace="io.renren.modules.monitor.mapper.CameraOrgenizationMapper">
<insert id="batchSaveOrgenization" parameterType="java.util.List">
INSERT INTO t_camera_organization (
INSERT INTO t_camera_organization_cache (
id,name,parent_id,sort,subCount,path,is_parent,is_root
) VALUES
<foreach collection="list" item="item" separator="," >
@ -15,7 +15,7 @@
<insert id="batchSaveCameraChannel" parameterType="java.util.List">
INSERT INTO t_camera_channel
INSERT INTO t_camera_channel_cache
(
channel_code,channel_id,channel_name,gps_x,gps_y,status,
parent_id,region_code,region_name,node_name
@ -50,4 +50,19 @@
<!-- ON a.id = b.parent_id-->
<!-- ) ORDER BY orgaid ASC-->
<!-- </select>-->
<select id="selectAll" resultType="java.util.Map">
select * from t_camera_organization_cache
</select>
<update id="updateOrganizationPath" parameterType="java.lang.String">
UPDATE t_camera_organization_cache SET path = TRIM( TRAILING '->' FROM #{path}) where id = #{id}
</update>
<insert id="insertOrganizationCacheToCameraOrganization">
INSERT INTO t_camera_organization
(
id,name,parent_id,sort,subCount,is_parent,path,is_root,channelCount,count
)
SELECT id,name,parent_id,sort,subCount,is_parent,path,is_root,channelCount,count FROM t_camera_organization_cache
</insert>
</mapper>