parallelStream 使用自定义 ForkJoinPool防止任务堆积

This commit is contained in:
wangliwen 2022-08-12 14:11:34 +08:00
parent 8980f61b1c
commit f45e3ea2f4
1 changed files with 50 additions and 61 deletions

View File

@ -474,14 +474,6 @@ public class ResourceServiceImpl extends CrudServiceImpl<ResourceDao, ResourceEn
).limit(pageSize).collect(Collectors.toList()); ).limit(pageSize).collect(Collectors.toList());
return temp; return temp;
}).get(); }).get();
// ids = selectDTOPageSpecilTotal.parallelStream().map(Map.class::cast).sorted(Comparator.comparing(x -> {
// Map index = (Map) x;
// String string = (index.get("total") == null) ? "0" : index.get("total").toString();
// return Long.valueOf(string);
// }
// ).reversed()).skip((long) (pageNum - 1) * pageSize).limit(pageSize).map(x ->
// Long.valueOf(x.get("id").toString())
// ).limit(pageSize).collect(Collectors.toList());
break; break;
case "ASC": // total 升序 case "ASC": // total 升序
ids = customThreadPool.submit(() -> { ids = customThreadPool.submit(() -> {
@ -494,13 +486,6 @@ public class ResourceServiceImpl extends CrudServiceImpl<ResourceDao, ResourceEn
).limit(pageSize).collect(Collectors.toList()); ).limit(pageSize).collect(Collectors.toList());
return temp; return temp;
}).get(); }).get();
// ids = selectDTOPageSpecilTotal.parallelStream().map(Map.class::cast).sorted(Comparator.comparing(x -> {
// String string = (x.get("total") == null) ? "0" : x.get("total").toString();
// return Long.valueOf(string);
// }
// )).skip((pageNum - 1) * pageSize).limit(pageSize).map(x ->
// Long.valueOf(x.get("id").toString())
// ).limit(pageSize).collect(Collectors.toList());
break; break;
} }
resourceDTOS = resourceDao.selectDTOPage(resourceDTO, null, null, null, null, ids); resourceDTOS = resourceDao.selectDTOPage(resourceDTO, null, null, null, null, ids);
@ -1131,6 +1116,7 @@ public class ResourceServiceImpl extends CrudServiceImpl<ResourceDao, ResourceEn
public void KnowledgeBase() { public void KnowledgeBase() {
final List<String> knowledgeUUID = jdbcTemplate.queryForList("SELECT note1 FROM tb_data_resource WHERE type ='知识库' AND note1 IS NOT NULL FOR UPDATE;", String.class).stream().distinct().collect(Collectors.toList()); final List<String> knowledgeUUID = jdbcTemplate.queryForList("SELECT note1 FROM tb_data_resource WHERE type ='知识库' AND note1 IS NOT NULL FOR UPDATE;", String.class).stream().distinct().collect(Collectors.toList());
final int pageSize = CPU_NUM * 10; final int pageSize = CPU_NUM * 10;
ForkJoinPool customThreadPool = new ForkJoinPool(CPU_NUM * 3);
Arrays.stream(catalogIds).map(index -> { Arrays.stream(catalogIds).map(index -> {
logger.info("处理:{}", index); logger.info("处理:{}", index);
CopyOnWriteArrayList<CompletableFuture> task = new CopyOnWriteArrayList<>(); CopyOnWriteArrayList<CompletableFuture> task = new CopyOnWriteArrayList<>();
@ -1208,54 +1194,57 @@ public class ResourceServiceImpl extends CrudServiceImpl<ResourceDao, ResourceEn
if (list.isEmpty()) { if (list.isEmpty()) {
end.set(false); end.set(false);
} }
list.parallelStream().filter(resource -> { customThreadPool.submit(() -> {
Map<String, Object> map = (Map<String, Object>) resource; list.parallelStream().filter(resource -> {
return !knowledgeUUID.contains(map.get("uuid").toString()); Map<String, Object> map = (Map<String, Object>) resource;
}).forEach(resource -> { return !knowledgeUUID.contains(map.get("uuid").toString());
Map<String, Object> map = (Map<String, Object>) resource; }).forEach(resource -> {
ResourceDTO dto = new ResourceDTO(); Map<String, Object> map = (Map<String, Object>) resource;
dto.setName(map.get("title").toString()); ResourceDTO dto = new ResourceDTO();
dto.setType("知识库"); dto.setName(map.get("title").toString());
dto.setVisits(0L); dto.setType("知识库");
//所属部门暂时设为青岛市政府办公厅 dto.setVisits(0L);
dto.setDeptId(1517116100113850370L); //所属部门暂时设为青岛市政府办公厅
dto.setNote1(map.get("uuid").toString()); dto.setDeptId(1517116100113850370L);
dto.setDelFlag(0); dto.setNote1(map.get("uuid").toString());
ArrayList<AttrEntity> infoList = new ArrayList<>(); dto.setDelFlag(0);
map.forEach((key, value) -> { ArrayList<AttrEntity> infoList = new ArrayList<>();
switch (key) { map.forEach((key, value) -> {
case "title": switch (key) {
dto.setName(value.toString()); case "title":
break; dto.setName(value.toString());
case "url": break;
dto.setLink(value.toString()); case "url":
break; dto.setLink(value.toString());
case "createtime": break;
Date createDate = new Date(Long.parseLong(value.toString())); case "createtime":
dto.setCreateDate(createDate); Date createDate = new Date(Long.parseLong(value.toString()));
break; dto.setCreateDate(createDate);
default: break;
AttrEntity attrEntity = new AttrEntity(); default:
attrEntity.setDelFlag(0); AttrEntity attrEntity = new AttrEntity();
attrEntity.setAttrType(key); attrEntity.setDelFlag(0);
attrEntity.setAttrValue(value.toString()); attrEntity.setAttrType(key);
infoList.add(attrEntity); attrEntity.setAttrValue(value.toString());
break; infoList.add(attrEntity);
break;
}
});
AttrEntity attrEntity = new AttrEntity();
attrEntity.setDelFlag(0);
attrEntity.setAttrType("文件类型");
if ("f49561afc7204f008c4bb3cd821eb6ba".equals(index)) {
attrEntity.setAttrValue("政府公报");
} else {
attrEntity.setAttrValue("政策解读");
} }
infoList.add(attrEntity);
dto.setInfoList(infoList);
this.insertWithAttrs(dto);
logger.info("插入:{}", dto.getName());
}); });
AttrEntity attrEntity = new AttrEntity(); }).join();
attrEntity.setDelFlag(0); customThreadPool.shutdown();
attrEntity.setAttrType("文件类型");
if ("f49561afc7204f008c4bb3cd821eb6ba".equals(index)) {
attrEntity.setAttrValue("政府公报");
} else {
attrEntity.setAttrValue("政策解读");
}
infoList.add(attrEntity);
dto.setInfoList(infoList);
this.insertWithAttrs(dto);
logger.info("插入:{}", dto.getName());
});
}, executor)); }, executor));
try { try {
Thread.sleep(300L); Thread.sleep(300L);