sse optimize

This commit is contained in:
moyangzhan 2024-03-21 00:11:12 +08:00
parent 41822d9898
commit 219132961d
4 changed files with 28 additions and 9 deletions

View File

@ -33,6 +33,8 @@ public enum ErrorEnum {
B_LLM_SECRET_KEY_NOT_SET("B0009", "LLM的secret key没设置"),
B_MESSAGE_NOT_FOUND("B0008", "消息不存在"),
B_LLM_SERVICE_DISABLED("B0009", "LLM服务不可用"),
B_KNOWLEDGE_BASE_IS_EMPTY("B0010", "知识库内容为空"),
B_KNOWLEDGE_BASE_NO_ANSWER("B0011", "[无答案]")
;
private String code;

View File

@ -47,7 +47,7 @@ public class SSEEmitterHelper {
}
stringRedisTemplate.opsForValue().set(askingKey, "1", 15, TimeUnit.SECONDS);
try {
sseEmitter.send(SseEmitter.event().name("start"));
sseEmitter.send(SseEmitter.event().name("[START]"));
} catch (IOException e) {
log.error("error", e);
sseEmitter.completeWithError(e);
@ -65,7 +65,7 @@ public class SSEEmitterHelper {
throwable -> {
try {
log.error("sseEmitter error,uid:{},on error:{}", user.getId(), throwable);
sseEmitter.send(SseEmitter.event().name("error").data(throwable.getMessage()));
sseEmitter.send(SseEmitter.event().name("[ERROR]").data(throwable.getMessage()));
} catch (IOException e) {
log.error("error", e);
} finally {
@ -84,9 +84,19 @@ public class SSEEmitterHelper {
});
}
public void sendAndComplete(SseEmitter sseEmitter, String msg){
try {
sseEmitter.send(SseEmitter.event().name("[START]"));
sseEmitter.send(SseEmitter.event().name("[DONE]").data(msg));
} catch (IOException e) {
throw new RuntimeException(e);
}
sseEmitter.complete();
}
public void sendErrorMsg(SseEmitter sseEmitter, String errorMsg) {
try {
sseEmitter.send(SseEmitter.event().name("error").data(errorMsg));
sseEmitter.send(SseEmitter.event().name("[ERROR]").data(errorMsg));
} catch (IOException e) {
throw new RuntimeException(e);
}

View File

@ -99,7 +99,8 @@ public abstract class AbstractLLMService<T> {
} else {
tokenStream = chatAssistant.chat(params.getUserMessage());
}
tokenStream.onNext((content) -> {
tokenStream
.onNext((content) -> {
log.info("get content:{}", content);
//加空格配合前端的fetchEventSource进行解析见https://github.com/Azure/fetch-event-source/blob/45ac3cfffd30b05b79fbf95c21e67d4ef59aa56a/src/parse.ts#L129-L133
try {
@ -117,7 +118,7 @@ public abstract class AbstractLLMService<T> {
String meta = JsonUtil.toJson(chatMeta).replaceAll("\r\n", "");
log.info("meta:" + meta);
try {
params.getSseEmitter().send(" [META]" + meta);
params.getSseEmitter().send(SseEmitter.event().name("[DONE]").data(" [META]" + meta));
} catch (IOException e) {
log.error("stream onComplete error", e);
throw new RuntimeException(e);
@ -133,7 +134,7 @@ public abstract class AbstractLLMService<T> {
if(StringUtils.isBlank(errorMsg)){
errorMsg = error.getMessage();
}
params.getSseEmitter().send(SseEmitter.event().name("error").data(errorMsg));
params.getSseEmitter().send(SseEmitter.event().name("[ERROR]").data(errorMsg));
} catch (IOException e) {
log.error("sse error", e);
}

View File

@ -20,6 +20,7 @@ import dev.langchain4j.data.document.parser.TextDocumentParser;
import dev.langchain4j.data.document.parser.apache.pdfbox.ApachePdfBoxDocumentParser;
import dev.langchain4j.data.document.parser.apache.poi.ApachePoiDocumentParser;
import dev.langchain4j.data.message.AiMessage;
import dev.langchain4j.model.input.Prompt;
import dev.langchain4j.model.output.Response;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
@ -265,14 +266,19 @@ public class KnowledgeBaseService extends ServiceImpl<KnowledgeBaseMapper, Knowl
log.info("retrieveAndPushToLLM,kbUuid:{},userId:{}", kbUuid, user.getId());
KnowledgeBase knowledgeBase = getOrThrow(kbUuid);
String prompt = ragService.retrieveAndCreatePrompt(kbUuid, req.getQuestion()).text();
Prompt prompt = ragService.retrieveAndCreatePrompt(kbUuid, req.getQuestion());
if(null == prompt){
sseEmitterHelper.sendAndComplete(sseEmitter, B_KNOWLEDGE_BASE_NO_ANSWER.getInfo());
return;
}
String promptText = prompt.text();
SseAskParams sseAskParams = new SseAskParams();
sseAskParams.setSystemMessage(StringUtils.EMPTY);
sseAskParams.setSseEmitter(sseEmitter);
sseAskParams.setUserMessage(prompt);
sseAskParams.setUserMessage(promptText);
sseAskParams.setModelName(req.getModelName());
sseEmitterHelper.process(user, sseAskParams, (response, promptMeta, answerMeta) -> {
knowledgeBaseQaRecordService.createNewRecord(user, knowledgeBase, req.getQuestion(), prompt, promptMeta.getTokens(), response, answerMeta.getTokens());
knowledgeBaseQaRecordService.createNewRecord(user, knowledgeBase, req.getQuestion(), promptText, promptMeta.getTokens(), response, answerMeta.getTokens());
userDayCostService.appendCostToUser(user, promptMeta.getTokens() + answerMeta.getTokens());
});
}