async def local_doc_chat( knowledge_base_id: str = Body(..., description="Knowledge Base Name", example="kb1"), question: str = Body(..., description="Question", example="工伤保险是什么?"), history: List[List[str]] = Body( [], description="History of previous questions and answers", example=[ ], ), ): vs_path = os.path.join(VS_ROOT_PATH, knowledge_base_id) if not os.path.exists(vs_path): # return BaseResponse(code=1, msg=f"Knowledge base {knowledge_base_id} not found") return ChatMessage( question=question, response=f"Knowledge base {knowledge_base_id} not found", history=history, ) else: start = time.time() for resp, history in local_doc_qa.get_knowledge_based_answer( query=question, vs_path=vs_path, chat_history=history, streaming=True ): pass end = time.time() print(f'总共用时{end-start}') return ChatMessage( question=question, response=resp["result"], history=history )
时间: 2024-02-15 10:27:42 浏览: 141
这段代码定义了一个名为 `local_doc_chat` 的异步函数,它接受三个参数:`knowledge_base_id`、`question` 和 `history`,其中 `knowledge_base_id` 和 `question` 都是必填参数,而 `history` 是一个可选参数,默认值为一个空列表。
在函数体内,首先检查指定的知识库是否存在。如果不存在,则返回一个 `ChatMessage` 实例,它的 `question` 属性为用户提出的问题,`response` 属性为提示信息,`history` 属性为之前的聊天历史记录。
如果知识库存在,则调用 `local_doc_qa.get_knowledge_based_answer` 方法来获取答案。这个方法接受四个参数:`query`、`vs_path`、`chat_history` 和 `streaming`。其中,`query` 是用户提出的问题,`vs_path` 是知识库的本地路径,`chat_history` 是之前的聊天历史记录,`streaming` 参数表示是否使用流式处理。
最后,函数返回一个 `ChatMessage` 实例,它的 `question` 属性为用户提出的问题,`response` 属性为获取到的答案,`history` 属性为更新后的聊天历史记录。
相关问题
async def list_docs( knowledge_base_id: Optional[str] = Query(default=None, description="Knowledge Base Name", example="kb1") ): if knowledge_base_id: local_doc_folder = get_folder_path(knowledge_base_id) if not os.path.exists(local_doc_folder): return {"code": 1, "msg": f"Knowledge base {knowledge_base_id} not found"} all_doc_names = [ doc for doc in os.listdir(local_doc_folder) if os.path.isfile(os.path.join(local_doc_folder, doc)) ] return ListDocsResponse(data=all_doc_names) else: if not os.path.exists(UPLOAD_ROOT_PATH): all_doc_ids = [] else: all_doc_ids = [ folder for folder in os.listdir(UPLOAD_ROOT_PATH) if os.path.isdir(os.path.join(UPLOAD_ROOT_PATH, folder)) ] return ListDocsResponse(data=all_doc_ids)
HASH_SIZE;
return index;
}
public Record findRecord(String phone) {
int index = hash(phone); // 计算散列值
for (int i = 0; i < HASH_SIZE; i++) {
int j = (index + i这是一个 Python 函数,用于列出文档列表。该函数包含一个可选的参数 knowledge_base_id,如果提 * i) % HASH_SIZE; // 二次探测
if (hashTable[j] == null || hashTable[j].status == EMPTY)
break;
if (hashTable[j].status == true && hashTable[j].phone.equals(phone))
return hash供了该参数,则会列出该知识库中的所有文档。否则,将列出所有知识库中Table[j]; // 找到记录
}
return null; // 未找到记录
}
public static void main(String的文档。函数首先检查提供的知识库 ID 是否存在,如果存在,则获取该知识库中的[] args) {
AddressBook addressBook = new AddressBook();
}
private class Record {
public String name;
public String phone;
public String address;
public boolean status; // 记录是否被删除
public Record(String name, String phone所有文档名并返回。如果没有提供知识库 ID,则获取所有知识库中的文档名并返回。, String address, boolean status) {
this.name = name;
this.phone = phone;
this.address = address;
this该函数返回一个 ListDocsResponse 对象,其中包含文档名的列表。
async def stream_chat(websocket: WebSocket, knowledge_base_id: str): await websocket.accept() turn = 1 while True: input_json = await websocket.receive_json() question, history, knowledge_base_id = input_json[""], input_json["history"], input_json["knowledge_base_id"] vs_path = os.path.join(VS_ROOT_PATH, knowledge_base_id) if not os.path.exists(vs_path): await websocket.send_json({"error": f"Knowledge base {knowledge_base_id} not found"}) await websocket.close() return await websocket.send_json({"question": question, "turn": turn, "flag": "start"}) last_print_len = 0 for resp, history in local_doc_qa.get_knowledge_based_answer( query=question, vs_path=vs_path, chat_history=history, streaming=True ): await websocket.send_text(resp["result"][last_print_len:]) last_print_len = len(resp["result"]) source_documents = [ f"""出处 [{inum + 1}] {os.path.split(doc.metadata['source'])[-1]}:\n\n{doc.page_content}\n\n""" f"""相关度:{doc.metadata['score']}\n\n""" for inum, doc in enumerate(resp["source_documents"]) ] await websocket.send_text( json.dumps( { "question": question, "turn": turn, "flag": "end", "sources_documents": source_documents, }, ensure_ascii=False, ) ) turn += 1
这段代码看起来是一个基于 WebSocket 实现的 AI 对话系统,它会接收客户端发送过来的消息,然后调用本地文档问答系统来提供回答。其中,vs_path 是知识库的路径,local_doc_qa 是本地文档问答系统的实例,chat_history 则是历史对话记录,用于上下文理解。在获取到回答后,它会将回答和相关的文档信息返回给客户端。
阅读全文