异步导入知识库的实现

This commit is contained in:
wangliwen 2022-06-13 14:00:27 +08:00
parent d3149c3ca9
commit 53653ffec3
2 changed files with 64 additions and 111 deletions

View File

@ -38,7 +38,7 @@
<javacv.version>1.5.7</javacv.version> <javacv.version>1.5.7</javacv.version>
<system.linux-x86_64>linux-x86_64</system.linux-x86_64> <system.linux-x86_64>linux-x86_64</system.linux-x86_64>
<ffmpeg.version>5.0</ffmpeg.version> <ffmpeg.version>5.0</ffmpeg.version>
<okhttp.version>4.9.3</okhttp.version> <okhttp.version>3.14.9</okhttp.version>
<java.version>1.8</java.version> <java.version>1.8</java.version>
</properties> </properties>
@ -143,6 +143,12 @@
<groupId>io.minio</groupId> <groupId>io.minio</groupId>
<artifactId>minio</artifactId> <artifactId>minio</artifactId>
<version>${minio.version}</version> <version>${minio.version}</version>
<exclusions>
<exclusion>
<artifactId>okio</artifactId>
<groupId>com.squareup.okio</groupId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.springframework.boot</groupId> <groupId>org.springframework.boot</groupId>

View File

@ -17,10 +17,7 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiOperation;
import okhttp3.FormBody; import okhttp3.*;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@ -30,12 +27,16 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType; import org.springframework.http.MediaType;
import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap; import org.springframework.util.MultiValueMap;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.RestTemplate; import org.springframework.web.client.RestTemplate;
import springfox.documentation.annotations.ApiIgnore; import springfox.documentation.annotations.ApiIgnore;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.*; import java.util.*;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -278,54 +279,52 @@ public class ResourceController {
@GetMapping("knowledgeBase") @GetMapping("knowledgeBase")
@ApiOperation("对接知识库数据") @ApiOperation("对接知识库数据")
public Result<String> knowledgeBase() { public Result<String> knowledgeBase() {
final int pageSize = 100;
// final MultiValueMap<String, Object> paramMap = new LinkedMultiValueMap<>(); final long timestamp = LocalDateTime.now().toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
// paramMap.add("app_id", appId); final OkHttpClient client = new OkHttpClient();
// paramMap.add("interface_id", methodId);
// paramMap.add("version", version);
// paramMap.add("charset", charset);
// paramMap.add("timestamp", String.valueOf(timestamp));
// paramMap.add("origin", origin);
int pageSize = 100;
AtomicInteger pageIndex = new AtomicInteger(1);
Arrays.stream(catalogIds).map(index -> { Arrays.stream(catalogIds).map(index -> {
List<CompletableFuture> task = new ArrayList<>(); logger.info("处理:" + index);
AtomicBoolean end = new AtomicBoolean(false); CopyOnWriteArrayList<CompletableFuture> task = new CopyOnWriteArrayList<>();
AtomicBoolean end = new AtomicBoolean(true);
AtomicInteger pageIndex = new AtomicInteger(1);
AtomicInteger maxPage = new AtomicInteger(100); // 防止死循环
do { do {
int page = pageIndex.getAndIncrement(); int page = pageIndex.getAndIncrement();
logger.info("处理:" + index + " 分页{}", page);
task.add(CompletableFuture.supplyAsync(() -> { task.add(CompletableFuture.supplyAsync(() -> {
long timestamp = new Date().getTime(); try {
final JSONObject bizContentParam = new JSONObject(); logger.info("分页任务处理:" + index + " 分页{} 时间 -->" + timestamp, page);
OkHttpClient client = new OkHttpClient(); JSONObject bizContentParam = new JSONObject();
bizContentParam.put("appkey", appKey); bizContentParam.put("appkey", appKey);
bizContentParam.put("catalogId", index); bizContentParam.put("catalogId", index);
bizContentParam.put("pageIndex", page); bizContentParam.put("pageIndex", page);
bizContentParam.put("pageSize", pageSize); bizContentParam.put("pageSize", pageSize);
String bizContent_ = bizContentParam.toJSONString(); String bizContent_ = bizContentParam.toJSONString();
logger.info("biz_content参数{}", bizContent_);
// 通过FormBody对象构建Builder来添加表单参数 // 通过FormBody对象构建Builder来添加表单参数
final FormBody.Builder signFormBody = new FormBody.Builder() FormBody.Builder signFormBody = new FormBody.Builder()
.add("app_id", appId) .add("app_id", appId)
.add("interface_id", methodId) .add("interface_id", methodId)
.add("version", version) .add("version", version)
.add("timestamp", String.valueOf(timestamp)) .add("timestamp", String.valueOf(timestamp))
.add("origin", origin) .add("origin", origin)
.add("charset", charset)
.add("biz_content", bizContent_); .add("biz_content", bizContent_);
logger.error(index + "分页{}对接知识库数据请求参数:" + signFormBody.build().contentType().toString(), page); logger.info(index + "分页{}对接知识库数据请求参数:" + signFormBody.build().contentType().toString(), page);
Request signRequest = new Request.Builder().url(sign).post(signFormBody.build()).build(); Request signRequest = new Request.Builder().url(sign).post(signFormBody.build()).build();
try {
Response signResponse = Response signResponse =
client.newCall(signRequest).execute(); client.newCall(signRequest).execute();
String signResult = signResponse.body().string(); String signResult = signResponse.body().string();
logger.error("{}分页signResult数据" + signResult, page); logger.info("{}分页signResult数据" + signResult, page);
JSONObject signJsonObject = JSON.parseObject(signResult); JSONObject signJsonObject = JSON.parseObject(signResult);
if (!signJsonObject.containsKey("data")) { if (!signJsonObject.containsKey("data")) {
logger.error("获取sign异常" + signResult); logger.info("获取sign异常" + signResult);
end.set(false); end.set(false);
throw new RuntimeException("获取sign异常"); throw new RuntimeException("获取sign异常");
} }
if (signJsonObject.get("data") == null) { if (signJsonObject.get("data") == null) {
logger.error("获取sign异常" + signResult); logger.info("获取sign异常" + signResult);
end.set(false); end.set(false);
throw new RuntimeException("获取sign异常"); throw new RuntimeException("获取sign异常");
} }
@ -333,26 +332,34 @@ public class ResourceController {
String signString = signData.get("sign").toString(); String signString = signData.get("sign").toString();
signFormBody.add("sign", signString); signFormBody.add("sign", signString);
Request gatewayRequest = new Request.Builder().url(sign).post(signFormBody.build()).build(); Request gatewayRequest = new Request.Builder().url(gateway).post(signFormBody.build()).build();
Response gatewayResponse = client.newCall(gatewayRequest).execute(); Response gatewayResponse = client.newCall(gatewayRequest).execute();
String gatewayResult = gatewayResponse.body().string(); String gatewayResult = gatewayResponse.body().string();
logger.error("{}分页数据:" + gatewayResult, page); logger.info("{}分页数据:" + gatewayResult, page);
JSONObject gatewayJsonObject = JSON.parseObject(gatewayResult); JSONObject gatewayJsonObject = JSON.parseObject(gatewayResult);
JSONObject gatewayData = JSON.parseObject(gatewayJsonObject.get("data").toString()); JSONObject gatewayData = JSON.parseObject(gatewayJsonObject.get("data").toString());
JSONArray infos = gatewayData.getJSONObject("data").getJSONArray("infos"); JSONArray infos = gatewayData.getJSONObject("data").getJSONArray("infos");
int total = gatewayData.getJSONObject("data").getIntValue("total");
if (maxPage.get() != (total / pageSize)) {
maxPage.set(total / pageSize);
}
if (!infos.isEmpty()) { // 不为空则数据分页还没传输结束 if (!infos.isEmpty()) { // 不为空则数据分页还没传输结束
end.set(true); logger.info("--继续分页--");
end.set(true); // TODO
} else { } else {
end.set(false); end.set(false);
} }
return infos; return infos;
} catch (Exception e) { } catch (Exception e) {
logger.error("获取异常:", e); logger.info("获取异常:", e);
end.set(false); end.set(false);
return new JSONArray(); return new JSONArray();
} }
}).thenAcceptAsync(list -> { }).thenAcceptAsync(list -> {
logger.info("知识库数据量:{}", list.size()); logger.info("知识库数据量:{}", list.size());
if (list.size() < 1) {
end.set(false);
}
list.parallelStream().forEach(resource -> { list.parallelStream().forEach(resource -> {
Map<String, Object> map = (Map<String, Object>) resource; Map<String, Object> map = (Map<String, Object>) resource;
ResourceDTO dto = new ResourceDTO(); ResourceDTO dto = new ResourceDTO();
@ -361,6 +368,7 @@ public class ResourceController {
dto.setVisits(0L); dto.setVisits(0L);
//所属部门暂时设为青岛市政府办公厅 //所属部门暂时设为青岛市政府办公厅
dto.setDeptId(1517116100113850370L); dto.setDeptId(1517116100113850370L);
dto.setNote1(map.get("uuid").toString());
dto.setDelFlag(0); dto.setDelFlag(0);
ArrayList<AttrEntity> infoList = new ArrayList<>(); ArrayList<AttrEntity> infoList = new ArrayList<>();
map.forEach((key, value) -> { map.forEach((key, value) -> {
@ -394,80 +402,19 @@ public class ResourceController {
} }
infoList.add(attrEntity); infoList.add(attrEntity);
dto.setInfoList(infoList); dto.setInfoList(infoList);
logger.info("插入:" + dto.getName());
resourceService.insertWithAttrs(dto); resourceService.insertWithAttrs(dto);
logger.info("插入:" + dto.getName());
}); });
})); }));
} while (end.get()); try {
Thread.sleep(500L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} while (end.get() || pageIndex.get() < maxPage.get());
return task; return task;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
return new Result().ok("任务开启成功!"); return new Result().ok("任务开启成功!");
// CompletableFuture.allOf(completableFutures.stream().flatMap(i -> i.stream()).toArray(new CompletableFuture[completableFutures.size()])).join();
// for (String catalogId : catalogIds) {
// bizContent = "{\"appkey\":\"" + appKey + "\",\n" +
// "\"catalogId\":\"" + catalogId + "\",\n" +
// "\"pageIndex\":1,\n" +
// "\"pageSize\":1000}";
// paramMap.add("biz_content", bizContent);
// try {
// String signResult = restTemplate.postForObject(sign, paramMap, String.class);
// JSONObject signJsonObject = JSON.parseObject(signResult);
// Map<String, Object> signData = (Map<String, Object>) signJsonObject.get("data");
// String signString = signData.get("sign").toString();
// paramMap.add("sign", signString);
// String gatewayResult = restTemplate.postForObject(gateway, paramMap, String.class);
// JSONObject gatewayJsonObject = JSON.parseObject(gatewayResult);
// JSONObject gatewayData = JSON.parseObject(gatewayJsonObject.get("data").toString());
// JSONArray infos = gatewayData.getJSONObject("data").getJSONArray("infos");
// infos.forEach(item -> {
// Map<String, Object> map = (Map<String, Object>) item;
// ResourceDTO dto = new ResourceDTO();
// dto.setName(map.get("title").toString());
// dto.setType("知识库");
// dto.setVisits(0L);
// //所属部门暂时设为青岛市政府办公厅
// dto.setDeptId(1517116100113850370L);
// dto.setDelFlag(0);
// ArrayList<AttrEntity> infoList = new ArrayList<>();
// map.forEach((key, value) -> {
// switch (key) {
// case "title":
// dto.setName(value.toString());
// break;
// case "url":
// dto.setLink(value.toString());
// break;
// case "createtime":
// Date createDate = new Date(Long.parseLong(value.toString()));
// dto.setCreateDate(createDate);
// break;
// default:
// AttrEntity attrEntity = new AttrEntity();
// attrEntity.setDelFlag(0);
// attrEntity.setAttrType(key);
// attrEntity.setAttrValue(value.toString());
// infoList.add(attrEntity);
// break;
// }
// });
// AttrEntity attrEntity = new AttrEntity();
// attrEntity.setDelFlag(0);
// attrEntity.setAttrType("文件类型");
// if ("f49561afc7204f008c4bb3cd821eb6ba".equals(catalogId)) {
// attrEntity.setAttrValue("政府公报");
// } else {
// attrEntity.setAttrValue("政策解读");
// }
// infoList.add(attrEntity);
// dto.setInfoList(infoList);
// resourceService.insertWithAttrs(dto);
// });
// } catch (Exception e) {
// e.printStackTrace();
// }
// paramMap.remove("sign");
// paramMap.remove("biz_content");
// }
} }