Merge branch 'master' into docker_package

This commit is contained in:
wangliwen 2022-08-24 15:57:49 +08:00
commit be82f34ad1
2 changed files with 102 additions and 78 deletions

View File

@ -1,8 +1,11 @@
package io.renren.modules.gateway.controller;
import cn.hutool.core.io.FileUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import io.renren.modules.gateway.dao.ApiCountHistoryDao;
import io.renren.modules.gateway.entity.ApiCountHistoryEntity;
import io.renren.modules.gateway.service.MonitorServiceV2;
import io.renren.modules.monitor.entity.Result;
import io.renren.modules.resource.dao.AttrDao;
@ -30,6 +33,7 @@ import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
@ -37,6 +41,7 @@ import java.net.URLConnection;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@RestController
@Api(tags = "网关统计")
@ -67,16 +72,35 @@ public class MonitorControllerV2 {
@Autowired
private MonitorServiceV2 monitorServiceV2;
@GetMapping("/queryGroupByAbility")
@ApiOperation("统计数据按能力归集")
public Result queryGroupByAbility(String query, String time) throws InterruptedException, ExecutionException, TimeoutException {
@Autowired
private ApiCountHistoryDao apiCountHistoryDao;
public List queryMetricCount(String queryFormat, Long start, Long end, String metricElement, int limit) throws InterruptedException, ExecutionException, TimeoutException {
String url = gatewayDomain + "/juapi/metrics/api/v1/query?query={query}&time={time}";
List<String> querys = new ArrayList<>();
List<CompletableFuture<List<?>>> futures = querys.stream().map(item -> {
return CompletableFuture.supplyAsync(() -> {
ResponseEntity<HashMap> entity = restTemplate.getForEntity(url, HashMap.class, query, time);
//查询重启记录时间节点
LambdaQueryWrapper<ApiCountHistoryEntity> historyEntityLambdaQueryWrapper = new QueryWrapper<ApiCountHistoryEntity>().lambda()
.select(ApiCountHistoryEntity::getUpdateTime)
.ge(ApiCountHistoryEntity::getUpdateTime, new Date(start * 1000))
.le(ApiCountHistoryEntity::getUpdateTime, new Date(end * 1000))
.orderByAsc(ApiCountHistoryEntity::getUpdateTime);
List<ApiCountHistoryEntity> apiCountHistoryEntities = apiCountHistoryDao.selectList(historyEntityLambdaQueryWrapper);
List<Long> timePoint = apiCountHistoryEntities.stream()
.map(item -> item.getUpdateTime().getTime()/1000)
.distinct()
.collect(Collectors.toList());
timePoint.add(0, start);
timePoint.add(end);
//根据时间拆分按照时间段异步请求
List<CompletableFuture<List<?>>> futures = new ArrayList<>();
for (int i = 0; i < timePoint.size()-1; i++) {
HashMap<String, Long> map = new HashMap<String, Long>();
Long startTime = timePoint.get(i);
Long endTime = timePoint.get(i+1);
String query = String.format(queryFormat, endTime - startTime);
futures.add(CompletableFuture.supplyAsync(() -> {
ResponseEntity<HashMap> entity = restTemplate.getForEntity(url, HashMap.class, query, endTime);
/** 接口数据示例
* {
* "status": "success",
@ -104,12 +128,13 @@ public class MonitorControllerV2 {
for (HashMap hashMap : result) {
Map metric = (Map) hashMap.get("metric");
List value = (List) hashMap.get("value");
if (metric == null || metric.get("groupInfo") == null || value == null || value.size() != 2) {
if (metric == null || metric.get(metricElement) == null || value == null || value.size() != 2) {
continue;
}
try {
Long groupInfo = Long.valueOf((String) metric.get("groupInfo"));
metric.put("groupInfo", groupInfo);
metric.put("count", Double.valueOf((String) value.get(1)).intValue());
Long groupInfo = Long.valueOf((String) metric.get(metricElement));
metric.put(metricElement, groupInfo);
results.add(metric);
} catch (Exception e) {
//忽略
@ -119,85 +144,85 @@ public class MonitorControllerV2 {
return results;
}
return Collections.emptyList();
}, executor);
}).collect(Collectors.toList());
HashMap<String, Map<String, Object>> filterMap = new HashMap<String, Map<String, Object>>();
}, executor));
}
//不同时间段的数据相加
HashMap<Long, Map<String, Object>> filterMap = new HashMap<Long, Map<String, Object>>();
for (CompletableFuture<List<?>> future : futures) {
List<Map<String, Object>> list = (List<Map<String, Object>>) future.get(30, TimeUnit.SECONDS);
list.forEach(item -> {
// filterMap.containsKey(item.)
Long groupInfo = (Long) item.get(metricElement);
if (filterMap.containsKey(groupInfo)) {
Map<String, Object> objectMap = filterMap.get(groupInfo);
Integer count = (Integer) objectMap.get("count");
Integer itemCount = (Integer) item.get("count");
objectMap.put("count", count + itemCount);
}else {
filterMap.put(groupInfo, item);
}
});
}
ResponseEntity<HashMap> entity = restTemplate.getForEntity(url, HashMap.class, query, time);
HashMap body = entity.getBody();
HashMap data = (HashMap) body.get("data");
if (data != null){
List<HashMap> result = (List) data.get("result");
ArrayList<Map> results = new ArrayList<>();
ArrayList<Long> abilityIds = new ArrayList<>();
for (HashMap hashMap : result) {
Map metric = (Map) hashMap.get("metric");
if (metric != null && metric.get("groupInfo") != null){
List value = (List) hashMap.get("value");
if (value.size() == 2){
metric.put("count", value.get(1));
try{
Long groupInfo = Long.valueOf((String) metric.get("groupInfo"));
abilityIds.add(groupInfo);
metric.put("groupInfo", groupInfo);
results.add(metric);
}catch (Exception e) {
//忽略
}
//重新排序
List<Map<String, Object>> results = filterMap.values().stream()
.sorted(Comparator.comparingInt(item -> (int) ((Map) item).get("count"))
.reversed())
.limit(limit)
.collect(Collectors.toList());
return results;
}
@GetMapping("/queryGroupByAbility")
@ApiOperation("统计数据按能力归集")
public Result queryGroupByAbility(Long start, Long end) throws InterruptedException, ExecutionException, TimeoutException {
String queryFormat = "topk(10, sum(label_replace(increase(apigateway_http_status[%ds:1s]), \"groupInfo\", \"$2\", \"matched_uri\", \"/juapi/(.*?)/(.*?)/.*\")) by (groupInfo))";
List<Map<String, Object>> results = queryMetricCount(queryFormat, start, end, "groupInfo", 10);
//数据聚合
if (!results.isEmpty()) {
List<Object> ids = results.stream().map(item -> item.get("groupInfo")).collect(Collectors.toList());
LambdaQueryWrapper<ResourceEntity> queryWrapper = new QueryWrapper<ResourceEntity>().lambda();
queryWrapper
.select(ResourceEntity::getId,
ResourceEntity::getName,
ResourceEntity::getApiMethodType,
ResourceEntity::getType,
ResourceEntity::getApiUrl)
.in(ResourceEntity::getId, ids);
List<ResourceEntity> entities = resourceDao.selectList(queryWrapper);
for (Map map : results) {
Long groupInfo = (Long) map.get("groupInfo");
for (ResourceEntity resourceEntity : entities) {
if (groupInfo != null && groupInfo.equals(resourceEntity.getId())) {
map.put("name", resourceEntity.getName());
map.put("ApiMethodType", resourceEntity.getApiMethodType());
map.put("type", resourceEntity.getType());
map.put("apiUrl", resourceEntity.getApiUrl());
LambdaQueryWrapper<AttrEntity> attrQueryWrapper = new LambdaQueryWrapper<>();
attrQueryWrapper.select(AttrEntity::getAttrType,AttrEntity::getAttrValue)
.eq(AttrEntity::getDataResourceId, groupInfo)
.eq(AttrEntity::getAttrType,"服务商名")
.eq(AttrEntity::getDelFlag, 0);
AttrEntity attrEntity = attrDao.selectOne(attrQueryWrapper);
if (attrEntity.getAttrValue() != null) {
map.put("privider", attrEntity.getAttrValue());
}
entities.remove(resourceEntity);
break;
}
}
}
if (!results.isEmpty()) {
LambdaQueryWrapper<ResourceEntity> queryWrapper = new QueryWrapper<ResourceEntity>().lambda();
queryWrapper
.select(ResourceEntity::getId,
ResourceEntity::getName,
ResourceEntity::getApiMethodType,
ResourceEntity::getType,
ResourceEntity::getApiUrl)
.in(ResourceEntity::getId, abilityIds);
List<ResourceEntity> entities = resourceDao.selectList(queryWrapper);
for (Map map : results) {
Long groupInfo = (Long) map.get("groupInfo");
for (ResourceEntity resourceEntity : entities) {
if (groupInfo != null && groupInfo.equals(resourceEntity.getId())) {
map.put("name", resourceEntity.getName());
map.put("ApiMethodType", resourceEntity.getApiMethodType());
map.put("type", resourceEntity.getType());
map.put("apiUrl", resourceEntity.getApiUrl());
LambdaQueryWrapper<AttrEntity> attrQueryWrapper = new LambdaQueryWrapper<>();
attrQueryWrapper.select(AttrEntity::getAttrType,AttrEntity::getAttrValue)
.eq(AttrEntity::getDataResourceId,groupInfo)
.eq(AttrEntity::getAttrType,"服务商名")
.eq(AttrEntity::getDelFlag, 0);
AttrEntity attrEntity = attrDao.selectOne(attrQueryWrapper);
if (attrEntity.getAttrValue() != null) {
map.put("privider", attrEntity.getAttrValue());
}
entities.remove(resourceEntity);
break;
}
}
}
}
return Result.success(results);
}
return Result.success(Collections.emptyList());
return Result.success(results);
}
@GetMapping("/queryGroupByDepartment")
@ApiOperation("统计数据按部门归集")
public Result queryGroupByDepartment(String query, String time){
/** ================*/
String url = gatewayDomain + "/juapi/metrics/api/v1/query?query={query}" + "&time=" + time;
ResponseEntity<HashMap> entity = restTemplate.getForEntity(url, HashMap.class, query);
HashMap body = entity.getBody();

View File

@ -56,8 +56,8 @@ import java.util.concurrent.atomic.AtomicInteger;
@Service
@Log4j2
public class MonitorService {
private static Integer cpuNUm = Runtime.getRuntime().availableProcessors();
private static final ExecutorService executor = Executors.newFixedThreadPool(cpuNUm);
// private static Integer cpuNUm = Runtime.getRuntime().availableProcessors();
private static final ExecutorService executor = Executors.newFixedThreadPool(1);
@Autowired
private ApplicationEventPublisher publisher;
@ -205,12 +205,11 @@ public class MonitorService {
}
} catch (Exception e) {
log.info("[monitor-capture]: keepalive faild, restart.....");
e.printStackTrace();
log.error("[monitor-capture]: keepalive faild, restart.....", e);
//出错后重新登录
timer.cancel();
thatMonitorService.start();
thatMonitorService.init();
}