diff --git a/df-llm-agent/llm_agent_app/app.py b/df-llm-agent/llm_agent_app/app.py index 3a1c8ec..c1ad56b 100644 --- a/df-llm-agent/llm_agent_app/app.py +++ b/df-llm-agent/llm_agent_app/app.py @@ -53,6 +53,8 @@ async def llm_agent_stream_system(request, platform=""): return res # 组件 + + @llm_agent_app.route("/ai/azure/deepflow/modules", methods=["POST"]) @wrap_resp async def llm_agent_module(request): diff --git a/df-llm-agent/llm_agent_app/llm_agent.py b/df-llm-agent/llm_agent_app/llm_agent.py index 50252b0..c0e67fa 100644 --- a/df-llm-agent/llm_agent_app/llm_agent.py +++ b/df-llm-agent/llm_agent_app/llm_agent.py @@ -77,42 +77,42 @@ class llmAgentWorker(object): - request = None + def __init__(self, request=None, user_info='', user_question={}, system_content="", messages="", query="", res_chat_id=0, engine_name="", output=[], output_all=[], custom_llm={}, azure_client=None, langchain_azure_client=None): + self.request = request - user_info = "" + self.user_info = user_info - # 原生问题 - user_question = {} + # 原生问题 + self.user_question = user_question - # - system_content = "" - messages = "" + # + self.system_content = system_content + self.messages = messages - # 拼装后的问题 - query = "" + # 拼装后的问题 + self.query = query - # 会话id - res_chat_id = 0 + # 会话id + self.res_chat_id = res_chat_id - # 使用引擎名称 - engine_name = "" + # 使用引擎名称 + self.engine_name = engine_name - # llm返回的数据经过提取后用于显示的内容 - output = [] - # llm返回的原生数据或请求异常信息 - output_all = [] + # llm返回的数据经过提取后用于显示的内容 + self.output = output + # llm返回的原生数据或请求异常信息 + self.output_all = output_all - custom_llm = {} + self.custom_llm = custom_llm - # agent - azure_client = None + # agent + self.azure_client = azure_client - # Df组件 - langchain_azure_client = None + # Df组件 + self.langchain_azure_client = langchain_azure_client # openai token 计算 - @classmethod - async def num_tokens_from_messages(cls, messages, model="gpt-3.5-turbo-0613"): + async def num_tokens_from_messages(self, messages, model="gpt-3.5-turbo-0613"): """Return the number of tokens used by a list of messages.""" try: encoding = tiktoken.encoding_for_model(model) @@ -138,14 +138,14 @@ async def num_tokens_from_messages(cls, messages, model="gpt-3.5-turbo-0613"): print( "Warning: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613." ) - return await cls.num_tokens_from_messages( + return await self.num_tokens_from_messages( messages, model="gpt-3.5-turbo-0613" ) elif "gpt-4" in model: print( "Warning: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613." ) - return await cls.num_tokens_from_messages(messages, model="gpt-4-0613") + return await self.num_tokens_from_messages(messages, model="gpt-4-0613") else: raise NotImplementedError( f"""num_tokens_from_messages() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens.""" @@ -161,39 +161,36 @@ async def num_tokens_from_messages(cls, messages, model="gpt-3.5-turbo-0613"): return num_tokens # 记录会话,等gpt处理后再依据返回更新该对话 - @classmethod - async def chat_add(cls): + async def chat_add(self): chat_data = {} - chat_data["engine"] = cls.engine_name - chat_data["chat_topic_id"] = cls.request.ctx.chat_topic_id - chat_data["input"] = cls.user_question + chat_data["engine"] = self.engine_name + chat_data["chat_topic_id"] = self.request.ctx.chat_topic_id + chat_data["input"] = self.user_question # chat_data["output"] = "" res = await chat_record_worker.chat_record_add( - user_info=cls.user_info, args={}, data=chat_data + user_info=self.user_info, args={}, data=chat_data ) - cls.res_chat_id = res.get("res_chat_id", 0) + self.res_chat_id = res.get("res_chat_id", 0) - if cls.request.ctx.chat_topic_id <= 0: - cls.request.ctx.chat_topic_id = res.get("res_chat_topic_id", 0) + if self.request.ctx.chat_topic_id <= 0: + self.request.ctx.chat_topic_id = res.get("res_chat_topic_id", 0) # 更新会话 - @classmethod - async def chat_up(cls): + async def chat_up(self): chat_data = {} - chat_data["output"] = "".join(cls.output) - chat_data["output_all"] = cls.output_all + chat_data["output"] = "".join(self.output) + chat_data["output_all"] = self.output_all await chat_record_worker.chat_record_update( - user_info=cls.user_info, res_chat_id=cls.res_chat_id, data=chat_data + user_info=self.user_info, res_chat_id=self.res_chat_id, data=chat_data ) # 基础参数配置 - @classmethod async def assistant_base( - cls, request, user_info, platform, engine_name, prompt_type, args, data + self, user_info, platform, engine_name, prompt_type, args, data ): # user_id = user_info.get("ID", 0) @@ -208,13 +205,13 @@ async def assistant_base( ) # - cls.query = [{"role": "user", "content": data["user_content"]}] + self.query = [{"role": "user", "content": data["user_content"]}] - cls.system_content = data["system_content"] + self.system_content = data["system_content"] if platform == "baidu" or platform == "zhipu": - cls.query = [ - {"role": "user", "content": cls.system_content}, + self.query = [ + {"role": "user", "content": self.system_content}, { "role": "assistant", "content": "好的,后面的回复将按照你给我的角色和要求来解答", @@ -296,7 +293,7 @@ async def assistant_base( # 组件 if prompt_type == "langchain": - cls.langchain_azure_client = AzureChatOpenAI( + self.langchain_azure_client = AzureChatOpenAI( azure_endpoint=engine_config.get("api_base"), deployment_name=engine_config.get("engine_name"), openai_api_version=engine_config.get("api_version"), @@ -304,13 +301,13 @@ async def assistant_base( openai_api_key=engine_config.get("api_key"), ) else: - cls.azure_client = AsyncAzureOpenAI( + self.azure_client = AsyncAzureOpenAI( api_key=engine_config.get("api_key"), api_version=engine_config.get("api_version"), azure_endpoint=engine_config.get("api_base"), ) - cls.engine_name = engine_config.get("engine_name") + self.engine_name = engine_config.get("engine_name") elif platform == "openai": for key in ("api_key", "engine_name"): @@ -321,7 +318,7 @@ async def assistant_base( ) openai.api_key = engine_config.get("api_key") - cls.engine_name = engine_config.get("engine_name") + self.engine_name = engine_config.get("engine_name") elif platform == "aliyun": for key in ("api_key", "engine_name"): @@ -332,7 +329,7 @@ async def assistant_base( ) dashscope.api_key = engine_config.get("api_key") - cls.engine_name = engine_config.get("engine_name") + self.engine_name = engine_config.get("engine_name") elif platform == "baidu": for key in ("api_key", "api_secre", "engine_name"): @@ -344,7 +341,7 @@ async def assistant_base( qianfan.AK(engine_config.get("api_key")) qianfan.SK(engine_config.get("api_secre")) - cls.engine_name = engine_config.get("engine_name") + self.engine_name = engine_config.get("engine_name") elif platform == "zhipu": for key in ("api_key", "engine_name"): @@ -355,7 +352,7 @@ async def assistant_base( ) zhipuai.api_key = engine_config.get("api_key") - cls.engine_name = engine_config.get("engine_name") + self.engine_name = engine_config.get("engine_name") elif platform == "custom_llm": for key in ( @@ -378,34 +375,33 @@ async def assistant_base( custom_llm_config["api_version"] = engine_config.get("api_version") custom_llm_config["api_url"] = engine_config.get("api_url") - cls.custom_llm = custom_llm_config - cls.engine_name = engine_config.get("engine_name") + self.custom_llm = custom_llm_config + self.engine_name = engine_config.get("engine_name") else: raise BadRequestException( "INVALID_PARAMETERS", f"{const.INVALID_PARAMETERS}, 模型所在平台名称错误", ) # - cls.user_info = user_info - cls.request = request - cls.user_question = json.dumps(data) + self.user_info = user_info + self.user_question = json.dumps(data) # 消息 - # cls.messages = [{'role': 'user', 'content': 'Count to 1000, with a comma between each number and no newlines. E.g., 1, 2, 3, ...'}] + # self.messages = [{'role': 'user', 'content': 'Count to 1000, with a comma between each number and no newlines. E.g., 1, 2, 3, ...'}] - cls.messages = [{"role": "system", "content": cls.system_content}, *cls.query] + self.messages = [{"role": "system", "content": self.system_content}, *self.query] if platform == "baidu" or platform == "zhipu": - cls.messages = [*cls.query] + self.messages = [*self.query] if platform == "custom_llm": - cls.messages = {"messages": cls.messages} + self.messages = {"messages": self.messages} conv_history_tokens = 0 if platform == "azure" or platform == "openai": try: - conv_history_tokens = await cls.num_tokens_from_messages(cls.messages) + conv_history_tokens = await self.num_tokens_from_messages(self.messages) except Exception as e: raise BadRequestException( "FAIL", f"{const.FAIL}: 计算token数量错误: {e}" @@ -414,7 +410,7 @@ async def assistant_base( elif platform == "aliyun": response_token = dashscope.Tokenization.call( - model=cls.engine_name, messages=cls.messages + model=self.engine_name, messages=self.messages ) if response_token.status_code != HTTPStatus.OK: raise BadRequestException( @@ -427,19 +423,16 @@ async def assistant_base( print(conv_history_tokens) # 记录会话 - await cls.chat_add() + await self.chat_add() # 流处理 - @classmethod async def assistant_stream( - cls, request, user_info, platform, engine_name, prompt_type, args, data + self, user_info, platform, engine_name, prompt_type, args, data ): # 校验 - await cls.assistant_base( - request, user_info, platform, engine_name, prompt_type, args, data - ) + await self.assistant_base(user_info, platform, engine_name, prompt_type, args, data) # 开始时间 working_start_time = datetime.datetime.now() @@ -447,12 +440,12 @@ async def assistant_stream( if platform == "azure" or platform == "openai": try: if platform == "azure": - response = await cls.azure_client.chat.completions.create( - model=cls.engine_name, messages=cls.messages, stream=True + response = await self.azure_client.chat.completions.create( + model=self.engine_name, messages=self.messages, stream=True ) else: response = await openai.ChatCompletion.acreate( - engine=cls.engine_name, messages=cls.messages, stream=True + engine=self.engine_name, messages=self.messages, stream=True ) except Exception as e: @@ -473,9 +466,9 @@ async def generate_data(output, output_all): working_end_time.timestamp() - working_start_time.timestamp() ) - # msg = f"用户: {cls.user_info.get('ID', 0)} 请求gpt开始时间: {working_start_time}, 结束时间: {working_end_time}, 共耗时: {all_time} 秒,返回信息: {item}" + # msg = f"用户: {self.user_info.get('ID', 0)} 请求gpt开始时间: {working_start_time}, 结束时间: {working_end_time}, 共耗时: {all_time} 秒,返回信息: {item}" msg = {} - msg["user_id"] = cls.user_info.get("ID", 0) + msg["user_id"] = self.user_info.get("ID", 0) msg["start_time"] = f"{working_start_time}" msg["end_time"] = f"{working_end_time}" msg["all_time"] = all_time @@ -498,9 +491,9 @@ async def generate_data(output, output_all): yield content # 写入 - cls.output = output - cls.output_all = output_all - await cls.chat_up() + self.output = output + self.output_all = output_all + await self.chat_up() return generate_data(output, output_all) @@ -508,8 +501,8 @@ async def generate_data(output, output_all): try: responses = dashscope.Generation.call( - model=cls.engine_name, # Generation.Models.qwen_turbo, - messages=cls.messages, + model=self.engine_name, # Generation.Models.qwen_turbo, + messages=self.messages, result_format="message", stream=True, incremental_output=True, @@ -531,9 +524,9 @@ async def generate_data(output, output_all): working_end_time.timestamp() - working_start_time.timestamp() ) - # msg = f"用户: {cls.user_info.get('ID', 0)} 请求gpt开始时间: {working_start_time}, 结束时间: {working_end_time}, 共耗时: {all_time} 秒,返回信息: {response}" + # msg = f"用户: {self.user_info.get('ID', 0)} 请求gpt开始时间: {working_start_time}, 结束时间: {working_end_time}, 共耗时: {all_time} 秒,返回信息: {response}" msg = {} - msg["user_id"] = cls.user_info.get("ID", 0) + msg["user_id"] = self.user_info.get("ID", 0) msg["start_time"] = f"{working_start_time}" msg["end_time"] = f"{working_end_time}" msg["all_time"] = all_time @@ -557,9 +550,9 @@ async def generate_data(output, output_all): yield content # 写入 - cls.output = output - cls.output_all = output_all - await cls.chat_up() + self.output = output + self.output_all = output_all + await self.chat_up() return generate_data(output, output_all) @@ -569,7 +562,7 @@ async def generate_data(output, output_all): chat_comp = qianfan.ChatCompletion() # 指定特定模型 response = await chat_comp.ado( - model=cls.engine_name, messages=cls.messages, stream=True + model=self.engine_name, messages=self.messages, stream=True ) except Exception as e: @@ -588,10 +581,10 @@ async def generate_data(output, output_all): working_end_time.timestamp() - working_start_time.timestamp() ) - # msg = f"用户: {cls.user_info.get('ID', 0)} 请求gpt开始时间: {working_start_time}, 结束时间: {working_end_time}, 共耗时: {all_time} 秒,返回信息: {item}" + # msg = f"用户: {self.user_info.get('ID', 0)} 请求gpt开始时间: {working_start_time}, 结束时间: {working_end_time}, 共耗时: {all_time} 秒,返回信息: {item}" msg = {} - msg["user_id"] = cls.user_info.get("ID", 0) + msg["user_id"] = self.user_info.get("ID", 0) msg["start_time"] = f"{working_start_time}" msg["end_time"] = f"{working_end_time}" msg["all_time"] = all_time @@ -612,9 +605,9 @@ async def generate_data(output, output_all): yield content # 写入 - cls.output = output - cls.output_all = output_all - await cls.chat_up() + self.output = output + self.output_all = output_all + await self.chat_up() return generate_data(output, output_all) @@ -622,7 +615,7 @@ async def generate_data(output, output_all): try: response = zhipuai.model_api.sse_invoke( - model=cls.engine_name, prompt=cls.messages + model=self.engine_name, prompt=self.messages ) except Exception as e: @@ -643,9 +636,9 @@ async def generate_data(output, output_all): working_end_time.timestamp() - working_start_time.timestamp() ) - # msg = f"用户: {cls.user_info.get('ID', 0)} 请求gpt开始时间: {working_start_time}, 结束时间: {working_end_time}, 共耗时: {all_time} 秒,返回信息: " + # msg = f"用户: {self.user_info.get('ID', 0)} 请求gpt开始时间: {working_start_time}, 结束时间: {working_end_time}, 共耗时: {all_time} 秒,返回信息: " msg = {} - msg["user_id"] = cls.user_info.get("ID", 0) + msg["user_id"] = self.user_info.get("ID", 0) msg["start_time"] = f"{working_start_time}" msg["end_time"] = f"{working_end_time}" msg["all_time"] = all_time @@ -675,9 +668,9 @@ async def generate_data(output, output_all): yield content # 写入 - cls.output = output - cls.output_all = output_all - await cls.chat_up() + self.output = output + self.output_all = output_all + await self.chat_up() return generate_data(output, output_all) @@ -686,12 +679,12 @@ async def generate_data(output, output_all): lcuuid = generate_uuid() headers = { "BCS-APIHub-RequestId": lcuuid, - "X-CHJ-GWToken": cls.custom_llm["api_key"], + "X-CHJ-GWToken": self.custom_llm["api_key"], } - url = cls.custom_llm["api_url"] + url = self.custom_llm["api_url"] try: response = curl_tools.curl_app_stream( - "post", url, headers, json.dumps(cls.messages) + "post", url, headers, json.dumps(self.messages) ) except BadRequestException as e: raise BadRequestException( @@ -724,9 +717,9 @@ async def generate_data(output, output_all): working_end_time.timestamp() - working_start_time.timestamp() ) - # msg = f"用户: {cls.user_info.get('ID', 0)} 请求gpt开始时间: {working_start_time}, 结束时间: {working_end_time}, 共耗时: {all_time} 秒,返回信息: {item}" + # msg = f"用户: {self.user_info.get('ID', 0)} 请求gpt开始时间: {working_start_time}, 结束时间: {working_end_time}, 共耗时: {all_time} 秒,返回信息: {item}" msg = {} - msg["user_id"] = cls.user_info.get("ID", 0) + msg["user_id"] = self.user_info.get("ID", 0) msg["start_time"] = f"{working_start_time}" msg["end_time"] = f"{working_end_time}" msg["all_time"] = all_time @@ -751,25 +744,23 @@ async def generate_data(output, output_all): yield content # 写入 - cls.output = output - cls.output_all = output_all - await cls.chat_up() + self.output = output + self.output_all = output_all + await self.chat_up() return generate_data(output, output_all) # 组件 - @classmethod - async def module(cls, request, user_info, platform, engine_name, args, data): + async def module(self, user_info, platform, engine_name, args, data): # 校验 - await cls.assistant_base( - request, user_info, platform, engine_name, "langchain", args, data - ) + await self.assistant_base(user_info, platform, engine_name, "langchain", args, data + ) # 开始时间 working_start_time = datetime.datetime.now() # azure模型 - llm = cls.langchain_azure_client + llm = self.langchain_azure_client # 字符串返回 output_parser = StrOutputParser() @@ -843,7 +834,7 @@ async def module(cls, request, user_info, platform, engine_name, args, data): full_chain = {"topic": chain, "question": lambda x: x["question"]} | branch # 问题 - question = cls.query[0]["content"] + question = self.query[0]["content"] try: # 异步一次性返回 @@ -853,22 +844,22 @@ async def module(cls, request, user_info, platform, engine_name, args, data): working_end_time = datetime.datetime.now() all_time = working_end_time.timestamp() - working_start_time.timestamp() msg = {} - msg["user_id"] = cls.user_info.get("ID", 0) + msg["user_id"] = self.user_info.get("ID", 0) msg["start_time"] = f"{working_start_time}" msg["end_time"] = f"{working_end_time}" msg["all_time"] = all_time msg["return"] = res # 记录并返回 - cls.output.append(f"{res}") - cls.output_all.append(msg) + self.output.append(f"{res}") + self.output_all.append(msg) return res except Exception as e: - cls.output_all.append(e) + self.output_all.append(e) raise BadRequestException("APP_ERROR", const.APP_ERROR, f"{e}") finally: # 更新会话记录,包括所有返回可记录数据 - await cls.chat_up() + await self.chat_up() llm_agent_worker = llmAgentWorker() diff --git a/df-llm-agent/llm_agent_app/worker.py b/df-llm-agent/llm_agent_app/worker.py index 382fae5..0b3715c 100644 --- a/df-llm-agent/llm_agent_app/worker.py +++ b/df-llm-agent/llm_agent_app/worker.py @@ -1,6 +1,6 @@ from exception import BadRequestException import const -from llm_agent_app.llm_agent import llm_agent_worker +from llm_agent_app.llm_agent import llmAgentWorker from config import config from utils.curl_tools import curl_tools from utils import logger @@ -14,51 +14,45 @@ class app_worker(object): - args = data = user_info = None - def __init__(self, request): - app_worker.request = request - app_worker.args = request.args - if app_worker.args: + self.request = request + self.args = request.args + if self.args: for k, v in self.args.items(): - app_worker.args[k] = [i for i in v] + self.args[k] = [i for i in v] - app_worker.data = request.json - app_worker.user_info = request.ctx.user + self.data = request.json + self.user_info = request.ctx.user - @classmethod - async def llm_agent_config_add(cls): + async def llm_agent_config_add(self): # 校验todoing - return await llm_agent_config_worker.llm_agent_config_add(cls.user_info, cls.args, cls.data) + return await llm_agent_config_worker.llm_agent_config_add(self.user_info, self.args, self.data) - @classmethod - async def llm_agent_config_list(cls, platform=""): + async def llm_agent_config_list(self, platform=""): - return await llm_agent_config_worker.llm_agent_config_list(cls.user_info, platform) + return await llm_agent_config_worker.llm_agent_config_list(self.user_info, platform) - @classmethod - async def llm_agent_config_update(cls, platform="", key_name=""): + async def llm_agent_config_update(self, platform="", key_name=""): # 校验todoing - return await llm_agent_config_worker.llm_agent_config_update(cls.user_info, platform, key_name, cls.args, cls.data) + return await llm_agent_config_worker.llm_agent_config_update(self.user_info, platform, key_name, self.args, self.data) - @classmethod - async def llm_agent_config_delete(cls, platform="", engine_name=""): + async def llm_agent_config_delete(self, platform="", engine_name=""): # 校验todoing - return await llm_agent_config_worker.llm_agent_config_delete(cls.user_info, platform, engine_name, cls.args, cls.data) + return await llm_agent_config_worker.llm_agent_config_delete(self.user_info, platform, engine_name, self.args, self.data) # 流处理 - @classmethod - async def llm_agent_stream(cls, platform, prompt_type=''): + async def llm_agent_stream(self, platform, prompt_type=''): # 校验todoing - engine_name = cls.args.get("engine", "") + engine_name = self.args.get("engine", "") if not engine_name: raise BadRequestException("INVALID_PARAMETERS", f"{const.INVALID_PARAMETERS}, 缺失使用的引擎名称") - return await llm_agent_worker.assistant_stream(cls.request, cls.user_info, platform, engine_name, prompt_type, cls.args, cls.data) + llm_agent_worker = llmAgentWorker(self.request) + return await llm_agent_worker.assistant_stream(self.user_info, platform, engine_name, prompt_type, self.args, self.data) # 组件 - @classmethod - async def llm_agent_module(cls, platform): - engine_name = cls.args.get("engine", "") + async def llm_agent_module(self, platform): + engine_name = self.args.get("engine", "") if not engine_name: raise BadRequestException("INVALID_PARAMETERS", f"{const.INVALID_PARAMETERS}, 缺失使用的引擎名称") - return await llm_agent_worker.module(cls.request, cls.user_info, platform, engine_name, cls.args, cls.data) + llm_agent_worker = llmAgentWorker(self.request) + return await llm_agent_worker.module(self.user_info, platform, engine_name, self.args, self.data) diff --git a/df-llm-agent/resource_app/worker.py b/df-llm-agent/resource_app/worker.py index c3529f3..9625090 100644 --- a/df-llm-agent/resource_app/worker.py +++ b/df-llm-agent/resource_app/worker.py @@ -12,31 +12,24 @@ class app_worker(object): - args = data = user_info = None - def __init__(self, request): - app_worker.request = request - app_worker.args = request.args - if app_worker.args: + self.request = request + self.args = request.args + if self.args: for k, v in self.args.items(): - app_worker.args[k] = [i for i in v] - # app_worker.files = request.files - # app_worker.data = request.json - app_worker.user_info = request.ctx.user + self.args[k] = [i for i in v] + self.user_info = request.ctx.user - @classmethod - async def img_add(cls): + async def img_add(self): # 校验todoing - files = cls.request.files - return await resource_worker.img_add(cls.user_info, cls.args, files) + files = self.request.files + return await resource_worker.img_add(self.user_info, self.args, files) - @classmethod - async def img_add_b64(cls): + async def img_add_b64(self): # 校验todoing - data = cls.request.json - return await resource_worker.img_add_b64(cls.user_info, cls.args, data) + data = self.request.json + return await resource_worker.img_add_b64(self.user_info, self.args, data) - @classmethod - async def img_get(cls, hash_name=""): + async def img_get(self, hash_name=""): # 校验todoing - return await resource_worker.img_get(cls.user_info, cls.args, hash_name) + return await resource_worker.img_get(self.user_info, self.args, hash_name) diff --git a/df-llm-agent/server.py b/df-llm-agent/server.py index b9cc9fe..5a94eff 100644 --- a/df-llm-agent/server.py +++ b/df-llm-agent/server.py @@ -34,7 +34,7 @@ @app.middleware("request") async def run_before_handler(request): req_headers = dict(request.headers) - req_path = request.path + # req_path = request.path if request.method.lower() == 'options': headers = {