TODO 多线程处理知识库同步

This commit is contained in:
wangliwen 2022-06-10 18:00:11 +08:00
parent 4308dcec36
commit 185bfc0591
1 changed files with 187 additions and 79 deletions

View File

@ -21,17 +21,20 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.*;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.RestTemplate;
import springfox.documentation.annotations.ApiIgnore;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* 资源表
@ -43,7 +46,6 @@ import java.util.Map;
@RequestMapping("/resource")
@Api(tags = "资源表")
public class ResourceController {
@Value("${qdyjj.ipAndPort}")
private String ipAndPort;
@ -214,7 +216,7 @@ public class ResourceController {
@GetMapping("ZywMessage")
@ApiOperation("资源转发")
public Result ZywMessage(){
public Result ZywMessage() {
String url = "http://15.72.158.81/zyjk/ZywMessage.asmx";
String parame = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n" +
"<soap:Envelope xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\" xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\">\n" +
@ -226,13 +228,13 @@ public class ResourceController {
requestHeaders.setContentType(MediaType.TEXT_XML);
HttpEntity<String> requestEntity = new HttpEntity(parame, requestHeaders);
try {
String body = restTemplate.postForEntity(url,requestEntity,String.class).getBody();
String body = restTemplate.postForEntity(url, requestEntity, String.class).getBody();
String json = body.substring(body.indexOf("{"), body.indexOf("}") + 1);
HashMap map = JSONObject.parseObject(json, HashMap.class);
return new Result().ok(map);
} catch (Exception e) {
e.printStackTrace();
return new Result().ok(new HashMap<String, Object>(){{
return new Result().ok(new HashMap<String, Object>() {{
put("sxmlcount", 0);
put("yfbfwcount", 0);
put("yfbjk", 0);
@ -242,7 +244,7 @@ public class ResourceController {
@GetMapping("qdyjjWeather")
@ApiOperation("青岛应急局-查询青岛市地区天气信息")
public Result qdyjjWeather(String cityName){
public Result qdyjjWeather(String cityName) {
String loginUrl = "http://" + ipAndPort + "/service-oauth/login";
String weatherUrl = "http://" + ipAndPort + "/service-map/qxWeather/getTodayWeatherInfo";
HashMap<String, Object> loginParam = new HashMap<>();
@ -262,7 +264,7 @@ public class ResourceController {
return new Result().ok(weatherBody);
} catch (Exception e) {
e.printStackTrace();
return new Result().ok(new HashMap(){{
return new Result().ok(new HashMap() {{
put("message", "接口调用失败!");
put("code", "500");
}});
@ -271,81 +273,187 @@ public class ResourceController {
@GetMapping("knowledgeBase")
@ApiOperation("对接知识库数据")
public void knowledgeBase(){
public Result<String> knowledgeBase() {
long timestamp = new Date().getTime();
MultiValueMap<String, Object> paramMap = new LinkedMultiValueMap<>();
paramMap.add("app_id",appId);
final MultiValueMap<String, Object> paramMap = new LinkedMultiValueMap<>();
paramMap.add("app_id", appId);
paramMap.add("interface_id", methodId);
paramMap.add("version", version);
paramMap.add("charset", charset);
paramMap.add("timestamp", String.valueOf(timestamp));
paramMap.add("origin", origin);
String bizContent;
for (String catalogId: catalogIds) {
bizContent = "{\"appkey\":\""+ appKey +"\",\n" +
"\"catalogId\":\""+ catalogId +"\",\n" +
"\"pageIndex\":1,\n" +
"\"pageSize\":1000}";
paramMap.add("biz_content", bizContent);
try {
String signResult = restTemplate.postForObject(sign, paramMap, String.class);
JSONObject signJsonObject = JSON.parseObject(signResult);
Map<String, Object> signData = (Map<String, Object>)signJsonObject.get("data");
String signString = signData.get("sign").toString();
paramMap.add("sign", signString);
String gatewayResult = restTemplate.postForObject(gateway, paramMap, String.class);
JSONObject gatewayJsonObject = JSON.parseObject(gatewayResult);
JSONObject gatewayData = JSON.parseObject(gatewayJsonObject.get("data").toString());
JSONArray infos = gatewayData.getJSONObject("data").getJSONArray("infos");
infos.forEach(item -> {
Map<String, Object> map = (Map<String, Object>) item;
ResourceDTO dto = new ResourceDTO();
dto.setName(map.get("title").toString());
dto.setType("知识库");
dto.setVisits(0L);
//所属部门暂时设为青岛市政府办公厅
dto.setDeptId(1517116100113850370L);
dto.setDelFlag(0);
ArrayList<AttrEntity> infoList = new ArrayList<>();
map.forEach((key, value) -> {
switch (key) {
case "title":
dto.setName(value.toString());
break;
case "url":
dto.setLink(value.toString());
break;
case "createtime":
Date createDate = new Date(Long.parseLong(value.toString()));
dto.setCreateDate(createDate);
break;
default:
AttrEntity attrEntity = new AttrEntity();
attrEntity.setDelFlag(0);
attrEntity.setAttrType(key);
attrEntity.setAttrValue(value.toString());
infoList.add(attrEntity);
break;
int pageSize = 100;
AtomicInteger pageIndex = new AtomicInteger(1);
Arrays.stream(catalogIds).map(index -> {
List<CompletableFuture> task = new ArrayList<>();
AtomicBoolean end = new AtomicBoolean(true);
do {
paramMap.remove("sign");
paramMap.remove("biz_content");
RestTemplate restTemplate1 = new RestTemplate();
final MultiValueMap<String, Object> paramMapIndex = new LinkedMultiValueMap<>();
int page = pageIndex.getAndIncrement();
task.add(CompletableFuture.supplyAsync(() -> {
final JSONObject bizContentParam = new JSONObject();
bizContentParam.put("appkey", appKey);
bizContentParam.put("catalogId", index);
bizContentParam.put("pageIndex", page);
bizContentParam.put("pageSize", pageSize);
String bizContent_ = bizContentParam.toJSONString();
paramMapIndex.add("biz_content", bizContent_);
logger.error(index + "分页{}对接知识库数据请求参数:" + JSON.toJSONString(paramMapIndex), page);
try {
String signResult = restTemplate1.postForObject(sign, paramMapIndex, String.class);
logger.error("{}分页signResult数据" + signResult, page);
JSONObject signJsonObject = JSON.parseObject(signResult);
if (!signJsonObject.containsKey("data")) {
logger.error("获取sign异常" + signResult);
end.set(false);
throw new RuntimeException("获取sign异常");
}
});
AttrEntity attrEntity = new AttrEntity();
attrEntity.setDelFlag(0);
attrEntity.setAttrType("文件类型");
if ("f49561afc7204f008c4bb3cd821eb6ba".equals(catalogId)) {
attrEntity.setAttrValue("政府公报");
} else {
attrEntity.setAttrValue("政策解读");
if (signJsonObject.get("data") == null) {
logger.error("获取sign异常" + signResult);
end.set(false);
throw new RuntimeException("获取sign异常");
}
Map<String, Object> signData = (Map<String, Object>) signJsonObject.get("data");
String signString = signData.get("sign").toString();
paramMapIndex.add("sign", signString);
String gatewayResult = restTemplate1.postForObject(gateway, paramMapIndex, String.class);
logger.error("{}分页数据:" + gatewayResult, page);
JSONObject gatewayJsonObject = JSON.parseObject(gatewayResult);
JSONObject gatewayData = JSON.parseObject(gatewayJsonObject.get("data").toString());
JSONArray infos = gatewayData.getJSONObject("data").getJSONArray("infos");
if (!infos.isEmpty()) { // 不为空则数据分页还没传输结束
end.set(true);
} else {
end.set(false);
}
return infos;
} catch (Exception e) {
logger.error("获取异常:", e);
end.set(false);
return new JSONArray();
} finally {
paramMapIndex.clear();
}
infoList.add(attrEntity);
dto.setInfoList(infoList);
resourceService.insertWithAttrs(dto);
});
} catch (Exception e) {
e.printStackTrace();
}
paramMap.remove("sign");
paramMap.remove("biz_content");
}
}).thenAcceptAsync(list -> {
logger.info("知识库数据量:{}", list.size());
list.parallelStream().forEach(resource -> {
Map<String, Object> map = (Map<String, Object>) resource;
ResourceDTO dto = new ResourceDTO();
dto.setName(map.get("title").toString());
dto.setType("知识库");
dto.setVisits(0L);
//所属部门暂时设为青岛市政府办公厅
dto.setDeptId(1517116100113850370L);
dto.setDelFlag(0);
ArrayList<AttrEntity> infoList = new ArrayList<>();
map.forEach((key, value) -> {
switch (key) {
case "title":
dto.setName(value.toString());
break;
case "url":
dto.setLink(value.toString());
break;
case "createtime":
Date createDate = new Date(Long.parseLong(value.toString()));
dto.setCreateDate(createDate);
break;
default:
AttrEntity attrEntity = new AttrEntity();
attrEntity.setDelFlag(0);
attrEntity.setAttrType(key);
attrEntity.setAttrValue(value.toString());
infoList.add(attrEntity);
break;
}
});
AttrEntity attrEntity = new AttrEntity();
attrEntity.setDelFlag(0);
attrEntity.setAttrType("文件类型");
if ("f49561afc7204f008c4bb3cd821eb6ba".equals(index)) {
attrEntity.setAttrValue("政府公报");
} else {
attrEntity.setAttrValue("政策解读");
}
infoList.add(attrEntity);
dto.setInfoList(infoList);
logger.info("插入:" + dto.getName());
resourceService.insertWithAttrs(dto);
});
}));
} while (end.get());
return task;
}).collect(Collectors.toList());
return new Result().ok("任务开启成功!");
// CompletableFuture.allOf(completableFutures.stream().flatMap(i -> i.stream()).toArray(new CompletableFuture[completableFutures.size()])).join();
// for (String catalogId : catalogIds) {
// bizContent = "{\"appkey\":\"" + appKey + "\",\n" +
// "\"catalogId\":\"" + catalogId + "\",\n" +
// "\"pageIndex\":1,\n" +
// "\"pageSize\":1000}";
// paramMap.add("biz_content", bizContent);
// try {
// String signResult = restTemplate.postForObject(sign, paramMap, String.class);
// JSONObject signJsonObject = JSON.parseObject(signResult);
// Map<String, Object> signData = (Map<String, Object>) signJsonObject.get("data");
// String signString = signData.get("sign").toString();
// paramMap.add("sign", signString);
// String gatewayResult = restTemplate.postForObject(gateway, paramMap, String.class);
// JSONObject gatewayJsonObject = JSON.parseObject(gatewayResult);
// JSONObject gatewayData = JSON.parseObject(gatewayJsonObject.get("data").toString());
// JSONArray infos = gatewayData.getJSONObject("data").getJSONArray("infos");
// infos.forEach(item -> {
// Map<String, Object> map = (Map<String, Object>) item;
// ResourceDTO dto = new ResourceDTO();
// dto.setName(map.get("title").toString());
// dto.setType("知识库");
// dto.setVisits(0L);
// //所属部门暂时设为青岛市政府办公厅
// dto.setDeptId(1517116100113850370L);
// dto.setDelFlag(0);
// ArrayList<AttrEntity> infoList = new ArrayList<>();
// map.forEach((key, value) -> {
// switch (key) {
// case "title":
// dto.setName(value.toString());
// break;
// case "url":
// dto.setLink(value.toString());
// break;
// case "createtime":
// Date createDate = new Date(Long.parseLong(value.toString()));
// dto.setCreateDate(createDate);
// break;
// default:
// AttrEntity attrEntity = new AttrEntity();
// attrEntity.setDelFlag(0);
// attrEntity.setAttrType(key);
// attrEntity.setAttrValue(value.toString());
// infoList.add(attrEntity);
// break;
// }
// });
// AttrEntity attrEntity = new AttrEntity();
// attrEntity.setDelFlag(0);
// attrEntity.setAttrType("文件类型");
// if ("f49561afc7204f008c4bb3cd821eb6ba".equals(catalogId)) {
// attrEntity.setAttrValue("政府公报");
// } else {
// attrEntity.setAttrValue("政策解读");
// }
// infoList.add(attrEntity);
// dto.setInfoList(infoList);
// resourceService.insertWithAttrs(dto);
// });
// } catch (Exception e) {
// e.printStackTrace();
// }
// paramMap.remove("sign");
// paramMap.remove("biz_content");
// }
}
@ -359,7 +467,7 @@ public class ResourceController {
@ApiImplicitParam(name = Constant.ORDER_FIELD, value = "排序字段", paramType = "query", dataType = "String"),
@ApiImplicitParam(name = Constant.ORDER, value = "排序方式,可选值(asc、desc)", paramType = "query", dataType = "String"),
})
public Result algorithmPage(@ApiIgnore@RequestParam Map<String, Object> params) {
public Result algorithmPage(@ApiIgnore @RequestParam Map<String, Object> params) {
return new Result().ok(resourceService.algorithmPage(params));
}