知识库同步定时任务化

This commit is contained in:
wangliwen 2022-06-13 15:11:18 +08:00
parent 53653ffec3
commit cc21e26ab6
4 changed files with 230 additions and 148 deletions

View File

@ -1,7 +1,6 @@
package io.renren.modules.resource.controller;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import io.renren.common.annotation.LogOperation;
import io.renren.common.constant.Constant;
@ -11,13 +10,11 @@ import io.renren.common.validator.ValidatorUtils;
import io.renren.common.validator.group.AddGroup;
import io.renren.common.validator.group.DefaultGroup;
import io.renren.modules.resource.dto.ResourceDTO;
import io.renren.modules.resource.entity.AttrEntity;
import io.renren.modules.resource.service.ResourceService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import okhttp3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@ -27,19 +24,12 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.client.RestTemplate;
import springfox.documentation.annotations.ApiIgnore;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.HashMap;
import java.util.Map;
/**
* 资源表
@ -279,141 +269,7 @@ public class ResourceController {
@GetMapping("knowledgeBase")
@ApiOperation("对接知识库数据")
public Result<String> knowledgeBase() {
final int pageSize = 100;
final long timestamp = LocalDateTime.now().toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
final OkHttpClient client = new OkHttpClient();
Arrays.stream(catalogIds).map(index -> {
logger.info("处理:" + index);
CopyOnWriteArrayList<CompletableFuture> task = new CopyOnWriteArrayList<>();
AtomicBoolean end = new AtomicBoolean(true);
AtomicInteger pageIndex = new AtomicInteger(1);
AtomicInteger maxPage = new AtomicInteger(100); // 防止死循环
do {
int page = pageIndex.getAndIncrement();
logger.info("处理:" + index + " 分页{}", page);
task.add(CompletableFuture.supplyAsync(() -> {
try {
logger.info("分页任务处理:" + index + " 分页{} 时间 -->" + timestamp, page);
JSONObject bizContentParam = new JSONObject();
bizContentParam.put("appkey", appKey);
bizContentParam.put("catalogId", index);
bizContentParam.put("pageIndex", page);
bizContentParam.put("pageSize", pageSize);
String bizContent_ = bizContentParam.toJSONString();
logger.info("biz_content参数{}", bizContent_);
// 通过FormBody对象构建Builder来添加表单参数
FormBody.Builder signFormBody = new FormBody.Builder()
.add("app_id", appId)
.add("interface_id", methodId)
.add("version", version)
.add("timestamp", String.valueOf(timestamp))
.add("origin", origin)
.add("charset", charset)
.add("biz_content", bizContent_);
logger.info(index + "分页{}对接知识库数据请求参数:" + signFormBody.build().contentType().toString(), page);
Request signRequest = new Request.Builder().url(sign).post(signFormBody.build()).build();
Response signResponse =
client.newCall(signRequest).execute();
String signResult = signResponse.body().string();
logger.info("{}分页signResult数据" + signResult, page);
JSONObject signJsonObject = JSON.parseObject(signResult);
if (!signJsonObject.containsKey("data")) {
logger.info("获取sign异常" + signResult);
end.set(false);
throw new RuntimeException("获取sign异常");
}
if (signJsonObject.get("data") == null) {
logger.info("获取sign异常" + signResult);
end.set(false);
throw new RuntimeException("获取sign异常");
}
Map<String, Object> signData = (Map<String, Object>) signJsonObject.get("data");
String signString = signData.get("sign").toString();
signFormBody.add("sign", signString);
Request gatewayRequest = new Request.Builder().url(gateway).post(signFormBody.build()).build();
Response gatewayResponse = client.newCall(gatewayRequest).execute();
String gatewayResult = gatewayResponse.body().string();
logger.info("{}分页数据:" + gatewayResult, page);
JSONObject gatewayJsonObject = JSON.parseObject(gatewayResult);
JSONObject gatewayData = JSON.parseObject(gatewayJsonObject.get("data").toString());
JSONArray infos = gatewayData.getJSONObject("data").getJSONArray("infos");
int total = gatewayData.getJSONObject("data").getIntValue("total");
if (maxPage.get() != (total / pageSize)) {
maxPage.set(total / pageSize);
}
if (!infos.isEmpty()) { // 不为空则数据分页还没传输结束
logger.info("--继续分页--");
end.set(true); // TODO
} else {
end.set(false);
}
return infos;
} catch (Exception e) {
logger.info("获取异常:", e);
end.set(false);
return new JSONArray();
}
}).thenAcceptAsync(list -> {
logger.info("知识库数据量:{}", list.size());
if (list.size() < 1) {
end.set(false);
}
list.parallelStream().forEach(resource -> {
Map<String, Object> map = (Map<String, Object>) resource;
ResourceDTO dto = new ResourceDTO();
dto.setName(map.get("title").toString());
dto.setType("知识库");
dto.setVisits(0L);
//所属部门暂时设为青岛市政府办公厅
dto.setDeptId(1517116100113850370L);
dto.setNote1(map.get("uuid").toString());
dto.setDelFlag(0);
ArrayList<AttrEntity> infoList = new ArrayList<>();
map.forEach((key, value) -> {
switch (key) {
case "title":
dto.setName(value.toString());
break;
case "url":
dto.setLink(value.toString());
break;
case "createtime":
Date createDate = new Date(Long.parseLong(value.toString()));
dto.setCreateDate(createDate);
break;
default:
AttrEntity attrEntity = new AttrEntity();
attrEntity.setDelFlag(0);
attrEntity.setAttrType(key);
attrEntity.setAttrValue(value.toString());
infoList.add(attrEntity);
break;
}
});
AttrEntity attrEntity = new AttrEntity();
attrEntity.setDelFlag(0);
attrEntity.setAttrType("文件类型");
if ("f49561afc7204f008c4bb3cd821eb6ba".equals(index)) {
attrEntity.setAttrValue("政府公报");
} else {
attrEntity.setAttrValue("政策解读");
}
infoList.add(attrEntity);
dto.setInfoList(infoList);
resourceService.insertWithAttrs(dto);
logger.info("插入:" + dto.getName());
});
}));
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} while (end.get() || pageIndex.get() < maxPage.get());
return task;
}).collect(Collectors.toList());
resourceService.KnowledgeBase();
return new Result().ok("任务开启成功!");
}

View File

@ -74,4 +74,9 @@ public interface ResourceService extends CrudService<ResourceEntity, ResourceDTO
Object selectResourceListByType(String type);
Object selectResourceListByAppArea(String appArea);
/**
* 同步知识库
*/
void KnowledgeBase();
}

View File

@ -24,12 +24,26 @@ import io.renren.modules.resourceCollection.dao.ResourceCollectionDao;
import io.renren.modules.resourceScore.dao.ResourceScoreDao;
import io.renren.modules.security.user.SecurityUser;
import io.renren.modules.sys.dao.SysDeptDao;
import okhttp3.FormBody;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
@ -40,6 +54,33 @@ import java.util.stream.Collectors;
*/
@Service
public class ResourceServiceImpl extends CrudServiceImpl<ResourceDao, ResourceEntity, ResourceDTO> implements ResourceService {
private static Logger logger = LoggerFactory.getLogger(ResourceServiceImpl.class);
@Value("${zsk.appid}")
private String appId;
@Value("${zsk.appkey}")
private String appKey;
@Value("${zsk.url.sign}")
private String sign;
@Value("${zsk.url.gateway}")
private String gateway;
@Value("${zsk.methodId}")
private String methodId;
@Value("${zsk.param.charset}")
private String charset;
@Value("${zsk.param.origin}")
private String origin;
@Value("${zsk.param.version}")
private String version;
@Value("${zsk.catalogIds}")
private String[] catalogIds;
@Autowired
private ResourceDao resourceDao;
@ -62,6 +103,9 @@ public class ResourceServiceImpl extends CrudServiceImpl<ResourceDao, ResourceEn
@Autowired
private SysDeptDao sysDeptDao;
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public QueryWrapper<ResourceEntity> getWrapper(Map<String, Object> params) {
QueryWrapper<ResourceEntity> wrapper = new QueryWrapper<>();
@ -533,4 +577,152 @@ public class ResourceServiceImpl extends CrudServiceImpl<ResourceDao, ResourceEn
return resourceDao.selectByAppArea(appArea);
}
/**
* 同步知识库
*/
@Override
public void KnowledgeBase() {
final List<String> knowledgeUUID =
jdbcTemplate.queryForList("SELECT note1 FROM tb_data_resource WHERE type ='知识库';", String.class)
.stream().distinct().collect(Collectors.toList());
final int pageSize = 100;
final long timestamp = LocalDateTime.now().toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
final OkHttpClient client = new OkHttpClient();
Arrays.stream(catalogIds).map(index -> {
logger.info("处理:" + index);
CopyOnWriteArrayList<CompletableFuture> task = new CopyOnWriteArrayList<>();
AtomicBoolean end = new AtomicBoolean(true);
AtomicInteger pageIndex = new AtomicInteger(1);
AtomicInteger maxPage = new AtomicInteger(100); // 防止死循环
do {
int page = pageIndex.getAndIncrement();
logger.info("处理:" + index + " 分页{}", page);
task.add(CompletableFuture.supplyAsync(() -> {
try {
logger.info("分页任务处理:" + index + " 分页{} 时间 -->" + timestamp, page);
JSONObject bizContentParam = new JSONObject();
bizContentParam.put("appkey", appKey);
bizContentParam.put("catalogId", index);
bizContentParam.put("pageIndex", page);
bizContentParam.put("pageSize", pageSize);
String bizContent_ = bizContentParam.toJSONString();
logger.info("biz_content参数{}", bizContent_);
// 通过FormBody对象构建Builder来添加表单参数
FormBody.Builder signFormBody = new FormBody.Builder()
.add("app_id", appId)
.add("interface_id", methodId)
.add("version", version)
.add("timestamp", String.valueOf(timestamp))
.add("origin", origin)
.add("charset", charset)
.add("biz_content", bizContent_);
logger.info(index + "分页{}对接知识库数据请求参数:" + signFormBody.build().contentType().toString(), page);
Request signRequest = new Request.Builder().url(sign).post(signFormBody.build()).build();
Response signResponse =
client.newCall(signRequest).execute();
String signResult = signResponse.body().string();
logger.info("{}分页signResult数据" + signResult, page);
JSONObject signJsonObject = JSON.parseObject(signResult);
if (!signJsonObject.containsKey("data")) {
logger.info("获取sign异常" + signResult);
end.set(false);
throw new RuntimeException("获取sign异常");
}
if (signJsonObject.get("data") == null) {
logger.info("获取sign异常" + signResult);
end.set(false);
throw new RuntimeException("获取sign异常");
}
Map<String, Object> signData = (Map<String, Object>) signJsonObject.get("data");
String signString = signData.get("sign").toString();
signFormBody.add("sign", signString);
Request gatewayRequest = new Request.Builder().url(gateway).post(signFormBody.build()).build();
Response gatewayResponse = client.newCall(gatewayRequest).execute();
String gatewayResult = gatewayResponse.body().string();
logger.info("{}分页数据:" + gatewayResult, page);
JSONObject gatewayJsonObject = JSON.parseObject(gatewayResult);
JSONObject gatewayData = JSON.parseObject(gatewayJsonObject.get("data").toString());
JSONArray infos = gatewayData.getJSONObject("data").getJSONArray("infos");
int total = gatewayData.getJSONObject("data").getIntValue("total");
if (maxPage.get() != (total / pageSize)) {
maxPage.set(total / pageSize);
}
if (!infos.isEmpty()) { // 不为空则数据分页还没传输结束
logger.info("--继续分页--");
end.set(true); // TODO
} else {
end.set(false);
}
return infos;
} catch (Exception e) {
logger.info("获取异常:", e);
end.set(false);
return new JSONArray();
}
}).thenAcceptAsync(list -> {
logger.info("知识库数据量:{}", list.size());
if (list.size() < 1) {
end.set(false);
}
list.parallelStream().filter(resource -> {
Map<String, Object> map = (Map<String, Object>) resource;
return !knowledgeUUID.contains(map.get("uuid").toString());
}).forEach(resource -> {
Map<String, Object> map = (Map<String, Object>) resource;
ResourceDTO dto = new ResourceDTO();
dto.setName(map.get("title").toString());
dto.setType("知识库");
dto.setVisits(0L);
//所属部门暂时设为青岛市政府办公厅
dto.setDeptId(1517116100113850370L);
dto.setNote1(map.get("uuid").toString());
dto.setDelFlag(0);
ArrayList<AttrEntity> infoList = new ArrayList<>();
map.forEach((key, value) -> {
switch (key) {
case "title":
dto.setName(value.toString());
break;
case "url":
dto.setLink(value.toString());
break;
case "createtime":
Date createDate = new Date(Long.parseLong(value.toString()));
dto.setCreateDate(createDate);
break;
default:
AttrEntity attrEntity = new AttrEntity();
attrEntity.setDelFlag(0);
attrEntity.setAttrType(key);
attrEntity.setAttrValue(value.toString());
infoList.add(attrEntity);
break;
}
});
AttrEntity attrEntity = new AttrEntity();
attrEntity.setDelFlag(0);
attrEntity.setAttrType("文件类型");
if ("f49561afc7204f008c4bb3cd821eb6ba".equals(index)) {
attrEntity.setAttrValue("政府公报");
} else {
attrEntity.setAttrValue("政策解读");
}
infoList.add(attrEntity);
dto.setInfoList(infoList);
this.insertWithAttrs(dto);
logger.info("插入:" + dto.getName());
});
}));
try {
Thread.sleep(500L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
} while (end.get() || pageIndex.get() < maxPage.get());
return task;
}).collect(Collectors.toList());
}
}

View File

@ -0,0 +1,29 @@
package io.renren.modules.resource.task;
import io.renren.modules.job.task.ITask;
import io.renren.modules.resource.service.ResourceService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 知识库定时任务
*/
@Component("knowledgeBaseTask")
public class KnowledgeBaseTask implements ITask {
private static Logger logger = LoggerFactory.getLogger(KnowledgeBaseTask.class);
@Autowired
private ResourceService resourceService;
/**
* 执行定时任务接口
*
* @param params 参数多参数使用JSON数据
*/
@Override
public void run(String params) {
logger.info("执行同步知识库任务");
resourceService.KnowledgeBase();
}
}