From b27a041c51164650740c8f7fd60f8f6e4b106ceb Mon Sep 17 00:00:00 2001 From: Joe Landers Date: Fri, 13 Sep 2024 12:30:46 -0700 Subject: [PATCH 1/5] This change ensures we don't try to send messages to the UI when the underlying AutoGen framework is just populating history. --- samples/apps/autogen-studio/autogenstudio/workflowmanager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samples/apps/autogen-studio/autogenstudio/workflowmanager.py b/samples/apps/autogen-studio/autogenstudio/workflowmanager.py index 8c15cdccc2a..6745b103df0 100644 --- a/samples/apps/autogen-studio/autogenstudio/workflowmanager.py +++ b/samples/apps/autogen-studio/autogenstudio/workflowmanager.py @@ -930,7 +930,7 @@ def receive( request_reply: Optional[bool] = None, silent: Optional[bool] = False, ): - if self.message_processor: + if self.message_processor and not self.a_human_input_function: self.message_processor(sender, self, message, request_reply, silent, sender_type="agent") super().receive(message, sender, request_reply, silent) @@ -1015,7 +1015,7 @@ def receive( request_reply: Optional[bool] = None, silent: Optional[bool] = False, ): - if self.message_processor: + if self.message_processor and not self.a_human_input_function: self.message_processor(sender, self, message, request_reply, silent, sender_type="groupchat") super().receive(message, sender, request_reply, silent) From 0c700c2f330c584bfc8341072af7936c21d8a224 Mon Sep 17 00:00:00 2001 From: Joe Landers Date: Wed, 4 Sep 2024 18:47:26 -0700 Subject: [PATCH 2/5] Enable Tool Calls - Add constants to control tool calls on or off - Bug fix for Agents are dicts work when running from a local JSON file - Trying to get a good display of tool call arguments. (+1 squashed commit) --- .../autogenstudio/chatmanager.py | 9 +- .../autogen-studio/autogenstudio/web/app.py | 9 +- .../autogenstudio/workflowmanager.py | 189 ++++++++++++++++-- 3 files changed, 191 insertions(+), 16 deletions(-) diff --git a/samples/apps/autogen-studio/autogenstudio/chatmanager.py b/samples/apps/autogen-studio/autogenstudio/chatmanager.py index e8ed3abfd62..e1345d04377 100644 --- a/samples/apps/autogen-studio/autogenstudio/chatmanager.py +++ b/samples/apps/autogen-studio/autogenstudio/chatmanager.py @@ -17,7 +17,11 @@ class AutoGenChatManager: """ def __init__( - self, message_queue: Queue, websocket_manager: WebSocketConnectionManager = None, human_input_timeout: int = 180 + self, + message_queue: Queue, + websocket_manager: WebSocketConnectionManager = None, + human_input_timeout: int = 180, + use_tool_calls = False, ) -> None: """ Initializes the AutoGenChatManager with a message queue. @@ -27,6 +31,7 @@ def __init__( self.message_queue = message_queue self.websocket_manager = websocket_manager self.a_human_input_timeout = human_input_timeout + self.use_tool_calls = use_tool_calls def send(self, message: dict) -> None: """ @@ -116,6 +121,7 @@ def chat( send_message_function=self.send, a_send_message_function=self.a_send, connection_id=connection_id, + use_tool_calls=self.use_tool_calls, ) message_text = message.content.strip() @@ -167,6 +173,7 @@ async def a_chat( a_human_input_function=self.a_prompt_for_input, a_human_input_timeout=self.a_human_input_timeout, connection_id=connection_id, + use_tool_calls=self.use_tool_calls, ) message_text = message.content.strip() diff --git a/samples/apps/autogen-studio/autogenstudio/web/app.py b/samples/apps/autogen-studio/autogenstudio/web/app.py index bbd087f52ea..7f2fc428cba 100644 --- a/samples/apps/autogen-studio/autogenstudio/web/app.py +++ b/samples/apps/autogen-studio/autogenstudio/web/app.py @@ -21,6 +21,11 @@ from ..version import VERSION from ..websocket_connection_manager import WebSocketConnectionManager +# These should both probably be handled by some "settings" UI page. +HUMAN_INPUT_TIMEOUT_SECONDS = 300 +USE_TOOL_CALLS = True + + profiler = Profiler() managers = {"chat": None} # manage calls to autogen # Create thread-safe queue for messages between api thread and autogen threads @@ -65,9 +70,6 @@ def message_handler(): database_engine_uri = folders["database_engine_uri"] dbmanager = DBManager(engine_uri=database_engine_uri) -HUMAN_INPUT_TIMEOUT_SECONDS = 180 - - @asynccontextmanager async def lifespan(app: FastAPI): print("***** App started *****") @@ -75,6 +77,7 @@ async def lifespan(app: FastAPI): message_queue=message_queue, websocket_manager=websocket_manager, human_input_timeout=HUMAN_INPUT_TIMEOUT_SECONDS, + use_tool_calls=USE_TOOL_CALLS, ) dbmanager.create_db_and_tables() diff --git a/samples/apps/autogen-studio/autogenstudio/workflowmanager.py b/samples/apps/autogen-studio/autogenstudio/workflowmanager.py index 6745b103df0..411f4bdfa2f 100644 --- a/samples/apps/autogen-studio/autogenstudio/workflowmanager.py +++ b/samples/apps/autogen-studio/autogenstudio/workflowmanager.py @@ -1,6 +1,7 @@ import json import os import time +import copy from datetime import datetime from typing import Any, Coroutine, Dict, List, Optional, Union @@ -11,6 +12,7 @@ AgentType, CodeExecutionConfigTypes, Message, + Skill, SocketMessage, Workflow, WorkFlowSummaryMethod, @@ -44,6 +46,7 @@ def __init__( a_human_input_function: Optional[callable] = None, a_human_input_timeout: Optional[int] = 60, connection_id: Optional[str] = None, + use_tool_calls: bool = False, ) -> None: """ Initializes the WorkflowManager with agents specified in the config and optional message history. @@ -92,6 +95,78 @@ def __init__( self.history = history or [] self.sender = None self.receiver = None + self.use_tool_calls = use_tool_calls + + + def _connect_agent_skills(self, skills_module, caller_configs, caller, executor): + if caller.llm_config is not False: + for skill in caller_configs.get("skills", []): + if not isinstance(skill, Skill): + skill = Skill.model_validate(skill) + func_name = skill.name + func = getattr(skills_module, func_name) + # f = caller.register_for_llm(name=func_name, description=skill.description, api_style="function")(func) + f = caller.register_for_llm(name=func_name, description=skill.description)(func) + executor.register_for_execution(name=func_name)(f) + + def _connect_group_chat_skills(self, skills_module, group_configs, group_chat_manager): + agent_configs = group_configs.get("agents", []) + group = group_chat_manager.groupchat + admin_name = group.admin_name + admin_agent = group.agent_by_name(admin_name) + count_agents = len(agent_configs) + for i in range(count_agents): + agent_config = agent_configs[i] + agent_name = agent_config["config"].get("name") + agent = group.agent_by_name(agent_name) + # For some reason I don't yet understand, the group chat manager isn't given the correct name, so + # group.agent_by_name(agent_name) fails. + if agent == None: + # Ideally, every nested group within the group will have it's own unique name, but just in case... + groups = [agent for agent in group.agents + if isinstance(agent, ExtendedGroupChatManager) and agent.manager_name == agent_name] + group_configs_list = [agent_config for agent_config in agent_configs if + agent_config.get("config", {}).get("name") == agent_name] + for j in range(len(groups)): + self._connect_group_chat_skills(skills_module, group_configs_list[j], groups[j]) + elif isinstance(agent, autogen.GroupChatManager): + self._connect_group_chat_skills(skills_module, agent_config, agent) + else: + if admin_agent: + self._connect_agent_skills(skills_module, agent_config, agent, admin_agent) + else: + # when setting up tool calls, you just need any other agent, + # beside the current one. + next_agent = i + 1 + if next_agent >= count_agents: + next_agent = 0 + executor_name = agent_configs[next_agent].get("config").get("name") + executor = group.agent_by_name(executor_name) + self._connect_agent_skills(skills_module, agent_config, agent, executor) + + def _connect_tools(self, sender_configs, receiver_configs): + import importlib.util + import sys + + # import the skills as proper tools + # TODO: This may all be a bit too "wild west". + if self.work_dir not in sys.path: + sys.path.append(self.work_dir) # TODO: Is it necessary? + + spec = importlib.util.spec_from_file_location("skills", os.path.join(self.work_dir, "skills.py")) + skills_module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(skills_module) + globals().update(vars(skills_module)) # TODO: Is it necessary? + + if isinstance(self.sender, autogen.GroupChatManager): + self._connect_group_chat_skills(skills_module, sender_configs, self.sender) + else: + self._connect_agent_skills(skills_module, sender_configs, self.sender, self.receiver) + + if isinstance(self.receiver, autogen.GroupChatManager): + self._connect_group_chat_skills(skills_module, receiver_configs, self.receiver) + else: + self._connect_agent_skills(skills_module, receiver_configs, self.receiver, self.sender) def _run_workflow(self, message: str, history: Optional[List[Message]] = None, clear_history: bool = False) -> None: """ @@ -105,12 +180,22 @@ def _run_workflow(self, message: str, history: Optional[List[Message]] = None, c """ for agent in self.workflow.get("agents", []): if agent.get("link").get("agent_type") == "sender": - self.sender = self.load(agent.get("agent")) + sender_configs = agent.get("agent") + self.sender = self.load(sender_configs) elif agent.get("link").get("agent_type") == "receiver": - self.receiver = self.load(agent.get("agent")) + receiver_configs = agent.get("agent") + self.receiver = self.load(receiver_configs) + if self.sender and self.receiver: # save all agent skills to skills.py save_skills_to_file(self.workflow_skills, self.work_dir) + + if self.use_tool_calls: + if sender_configs and receiver_configs: + self._connect_tools(sender_configs, receiver_configs) + else: + raise ValueError("parameter 'use_tool_calls' invalid with hard-coded Agents") + if history: self._populate_history(history) self.sender.initiate_chat( @@ -135,12 +220,17 @@ async def _a_run_workflow( """ for agent in self.workflow.get("agents", []): if agent.get("link").get("agent_type") == "sender": - self.sender = self.load(agent.get("agent")) + sender_configs = agent.get("agent") + self.sender = self.load(sender_configs) elif agent.get("link").get("agent_type") == "receiver": - self.receiver = self.load(agent.get("agent")) + receiver_configs = agent.get("agent") + self.receiver = self.load(receiver_configs) if self.sender and self.receiver: # save all agent skills to skills.py save_skills_to_file(self.workflow_skills, self.work_dir) + + self._connect_tools(sender_configs, receiver_configs) + if history: self._populate_history(history) await self.sender.a_initiate_chat( @@ -317,9 +407,16 @@ def sanitize_agent(self, agent: Dict) -> Agent: agent["config"]["llm_config"] = False agent = Agent.model_validate(agent) - agent.config.is_termination_msg = agent.config.is_termination_msg or ( - lambda x: "TERMINATE" in x.get("content", "").rstrip()[-20:] - ) + + # When using tool calls, the "content" of a message + # can sometimes be null. Not sure why. Handle it here. + def check_for_termination_message(x): + content = x.get("content", "") + if content is None: + x["content"] = "Tool is executing..." + return content is not None and "TERMINATE" in content.rstrip()[-20:] + + agent.config.is_termination_msg = agent.config.is_termination_msg or check_for_termination_message def get_default_system_message(agent_type: str) -> str: if agent_type == "assistant": @@ -346,7 +443,8 @@ def get_default_system_message(agent_type: str) -> str: for skill in skills: self.workflow_skills.append(skill) skills_prompt = "" - skills_prompt = get_skills_prompt(skills, self.work_dir) + if not self.use_tool_calls: + skills_prompt = get_skills_prompt(skills, self.work_dir) if agent.config.system_message: agent.config.system_message = agent.config.system_message + "\n\n" + skills_prompt else: @@ -383,6 +481,7 @@ def load(self, agent: Any) -> autogen.Agent: a_human_input_timeout=self.a_human_input_timeout, connection_id=self.connection_id, llm_config=agent.config.llm_config.model_dump(), + manager_name=agent.config.name ) return agent @@ -566,6 +665,7 @@ def __init__( a_human_input_function: Optional[callable] = None, a_human_input_timeout: Optional[int] = 60, connection_id: Optional[str] = None, + use_tool_calls: bool = False, ) -> None: """ Initializes the WorkflowManager with agents specified in the config and optional message history. @@ -606,6 +706,7 @@ def __init__( self.sender = None self.receiver = None self.model_client = None + self.use_tool_calls = use_tool_calls def _run_workflow(self, message: str, history: Optional[List[Message]] = None, clear_history: bool = False) -> None: """ @@ -620,7 +721,7 @@ def _run_workflow(self, message: str, history: Optional[List[Message]] = None, c user_proxy = { "config": { "name": "user_proxy", - "human_input_mode": "NEVER", + "human_input_mode": "TERMINATE", "max_consecutive_auto_reply": 25, "code_execution_config": "local", "default_auto_reply": "TERMINATE", @@ -650,6 +751,7 @@ def _run_workflow(self, message: str, history: Optional[List[Message]] = None, c a_send_message_function=self.a_send_message_function, a_human_input_timeout=self.a_human_input_timeout, connection_id=self.connection_id, + use_tool_calls=self.use_tool_calls, ) task_prompt = ( f""" @@ -851,6 +953,7 @@ def __new__( a_human_input_function: Optional[callable] = None, a_human_input_timeout: Optional[int] = 60, connection_id: Optional[str] = None, + use_tool_calls: bool = False, ) -> None: """ Initializes the WorkflowManager with agents specified in the config and optional message history. @@ -888,6 +991,7 @@ def __new__( a_human_input_function=a_human_input_function, a_human_input_timeout=a_human_input_timeout, connection_id=connection_id, + use_tool_calls=use_tool_calls, ) elif self.workflow.get("type") == WorkFlowType.sequential.value: return SequentialWorkflowManager( @@ -900,6 +1004,7 @@ def __new__( a_human_input_function=a_human_input_function, a_human_input_timeout=a_human_input_timeout, connection_id=connection_id, + use_tool_calls=use_tool_calls, ) @@ -931,7 +1036,23 @@ def receive( silent: Optional[bool] = False, ): if self.message_processor and not self.a_human_input_function: - self.message_processor(sender, self, message, request_reply, silent, sender_type="agent") + if isinstance(message, dict) and message.get("tool_calls"): + tool_calls = [ + { + "name": func["name"], + "arguments": ', '.join(f'{key}: {value}' for key, value in json.loads(func["arguments"]).items()) + } + for func in [tc.get("function") for tc in message["tool_calls"]] + ] + tool_call_msgs = [f"requested tool call: {func.get("name")}({func.get("arguments")})" for func in tool_calls] + if not message.get("content") == None: + tool_call_msgs.insert(0, message.get("content")) + new_message = copy.deepcopy(message) + new_message["content"] = "\n".join(tool_call_msgs) + self.message_processor(sender, self, new_message, request_reply, silent, sender_type="agent") + else: + self.message_processor(sender, self, message, request_reply, silent, sender_type="agent") + super().receive(message, sender, request_reply, silent) async def a_receive( @@ -942,9 +1063,27 @@ async def a_receive( silent: Optional[bool] = False, ) -> None: if self.a_message_processor: - await self.a_message_processor(sender, self, message, request_reply, silent, sender_type="agent") + if isinstance(message, dict) and message.get("tool_calls"): + tool_calls = [ + { + "name": func["name"], + #"arguments": ', '.join(f'{key}: {value}' for key, value in json.loads(func["arguments"]).items()) + } + for func in [tc.get("function") for tc in message["tool_calls"]] + ] + #tool_call_msgs = [f"requested tool call: {func.get("name")}({func.get("arguments")})" for func in tool_calls] + tool_call_msgs = [f"requested tool call: {func.get("name")}" for func in tool_calls] + if not message.get("content") == None: + tool_call_msgs.insert(0, message.get("content")) + new_message = copy.deepcopy(message) + new_message["content"] = "\n".join(tool_call_msgs) + await self.a_message_processor(sender, self, new_message, request_reply, silent, sender_type="agent") + else: + await self.a_message_processor(sender, self, message, request_reply, silent, sender_type="agent") + elif self.message_processor: self.message_processor(sender, self, message, request_reply, silent, sender_type="agent") + await super().a_receive(message, sender, request_reply, silent) # Strangely, when the response from a_get_human_input == "" (empty string) the libs call into the @@ -991,7 +1130,8 @@ async def a_get_human_input(self, prompt: str) -> str: class ExtendedGroupChatManager(autogen.GroupChatManager): def __init__( - self, + self, + manager_name="group_chat_manager", message_processor=None, a_message_processor=None, a_human_input_function=None, @@ -1007,6 +1147,7 @@ def __init__( self.a_human_input_response = None self.a_human_input_timeout = a_human_input_timeout self.connection_id = connection_id + self.manager_name = manager_name def receive( self, @@ -1019,6 +1160,12 @@ def receive( self.message_processor(sender, self, message, request_reply, silent, sender_type="groupchat") super().receive(message, sender, request_reply, silent) + def shorten_text(text: str, n: int) -> str: + # If the text is longer than n, shorten it and add an ellipsis + if len(text) > n: + return text[:n] + "..." + return text + async def a_receive( self, message: Union[Dict, str], @@ -1027,9 +1174,27 @@ async def a_receive( silent: Optional[bool] = False, ) -> None: if self.a_message_processor: + if isinstance(message, dict) and message.get("tool_calls"): + tool_calls = [ + { + "name": func["name"], + "arguments": ', '.join(f'{key}: {json.dumps(self.shorten_text(value,20))}' for key, value in json.loads(func["arguments"]).items()) + } + for func in [tc.get("function") for tc in message["tool_calls"]] + ] + tool_call_msgs = [f"requested tool call: {func.get("name")}({func.get("arguments")})" for func in tool_calls] + #tool_call_msgs = [f"requested tool call: {func.get("name")}" for func in tool_calls] + if not message.get("content") == None: + tool_call_msgs.insert(0, message.get("content")) + new_message = copy.deepcopy(message) + new_message["content"] = "\n".join(tool_call_msgs) await self.a_message_processor(sender, self, message, request_reply, silent, sender_type="groupchat") + else: + await self.a_message_processor(sender, self, message, request_reply, silent, sender_type="groupchat") elif self.message_processor: self.message_processor(sender, self, message, request_reply, silent, sender_type="groupchat") + + await super().a_receive(message, sender, request_reply, silent) def get_human_input(self, prompt: str) -> str: From 644235e3ffc20959ff50c444b46d46bef1f48b56 Mon Sep 17 00:00:00 2001 From: Joe Landers Date: Wed, 11 Sep 2024 17:24:28 -0700 Subject: [PATCH 3/5] Revert experimental change --- samples/apps/autogen-studio/autogenstudio/workflowmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/apps/autogen-studio/autogenstudio/workflowmanager.py b/samples/apps/autogen-studio/autogenstudio/workflowmanager.py index 411f4bdfa2f..e90acb72e34 100644 --- a/samples/apps/autogen-studio/autogenstudio/workflowmanager.py +++ b/samples/apps/autogen-studio/autogenstudio/workflowmanager.py @@ -721,7 +721,7 @@ def _run_workflow(self, message: str, history: Optional[List[Message]] = None, c user_proxy = { "config": { "name": "user_proxy", - "human_input_mode": "TERMINATE", + "human_input_mode": "NEVER", "max_consecutive_auto_reply": 25, "code_execution_config": "local", "default_auto_reply": "TERMINATE", From 9fb54d3ad16aebae1a1fd6d75cfceb5fa3408374 Mon Sep 17 00:00:00 2001 From: Joe Landers Date: Wed, 11 Sep 2024 18:12:36 -0700 Subject: [PATCH 4/5] Small formatting bug --- samples/apps/autogen-studio/autogenstudio/workflowmanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/apps/autogen-studio/autogenstudio/workflowmanager.py b/samples/apps/autogen-studio/autogenstudio/workflowmanager.py index e90acb72e34..b1ed830f7aa 100644 --- a/samples/apps/autogen-studio/autogenstudio/workflowmanager.py +++ b/samples/apps/autogen-studio/autogenstudio/workflowmanager.py @@ -1188,7 +1188,7 @@ async def a_receive( tool_call_msgs.insert(0, message.get("content")) new_message = copy.deepcopy(message) new_message["content"] = "\n".join(tool_call_msgs) - await self.a_message_processor(sender, self, message, request_reply, silent, sender_type="groupchat") + await self.a_message_processor(sender, self, message, request_reply, silent, sender_type="groupchat") else: await self.a_message_processor(sender, self, message, request_reply, silent, sender_type="groupchat") elif self.message_processor: From bb7db689b64ed906d9139b2b1d403b74132f34cb Mon Sep 17 00:00:00 2001 From: Joe Landers Date: Wed, 11 Sep 2024 20:10:36 -0700 Subject: [PATCH 5/5] Fix for inconsistent messaging in tool calls --- .../autogenstudio/workflowmanager.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/samples/apps/autogen-studio/autogenstudio/workflowmanager.py b/samples/apps/autogen-studio/autogenstudio/workflowmanager.py index b1ed830f7aa..0a5e7312e98 100644 --- a/samples/apps/autogen-studio/autogenstudio/workflowmanager.py +++ b/samples/apps/autogen-studio/autogenstudio/workflowmanager.py @@ -1028,6 +1028,12 @@ def __init__( self.a_human_input_timeout = a_human_input_timeout self.connection_id = connection_id + def shorten_text(self, text: str, n: int) -> str: + # If the text is longer than n, shorten it and add an ellipsis + if len(text) > n: + return text[:n] + "..." + return text + def receive( self, message: Union[Dict, str], @@ -1040,7 +1046,7 @@ def receive( tool_calls = [ { "name": func["name"], - "arguments": ', '.join(f'{key}: {value}' for key, value in json.loads(func["arguments"]).items()) + "arguments": ', '.join(f'{key}: {json.dumps(self.shorten_text(value,20))}' for key, value in json.loads(func["arguments"]).items()) } for func in [tc.get("function") for tc in message["tool_calls"]] ] @@ -1067,12 +1073,11 @@ async def a_receive( tool_calls = [ { "name": func["name"], - #"arguments": ', '.join(f'{key}: {value}' for key, value in json.loads(func["arguments"]).items()) + "arguments": ', '.join(f'{key}: {json.dumps(self.shorten_text(value,20))}' for key, value in json.loads(func["arguments"]).items()) } for func in [tc.get("function") for tc in message["tool_calls"]] ] - #tool_call_msgs = [f"requested tool call: {func.get("name")}({func.get("arguments")})" for func in tool_calls] - tool_call_msgs = [f"requested tool call: {func.get("name")}" for func in tool_calls] + tool_call_msgs = [f"requested tool call: {func.get("name")}({func.get("arguments")})" for func in tool_calls] if not message.get("content") == None: tool_call_msgs.insert(0, message.get("content")) new_message = copy.deepcopy(message) @@ -1160,7 +1165,7 @@ def receive( self.message_processor(sender, self, message, request_reply, silent, sender_type="groupchat") super().receive(message, sender, request_reply, silent) - def shorten_text(text: str, n: int) -> str: + def shorten_text(self, text: str, n: int) -> str: # If the text is longer than n, shorten it and add an ellipsis if len(text) > n: return text[:n] + "..." @@ -1188,7 +1193,7 @@ async def a_receive( tool_call_msgs.insert(0, message.get("content")) new_message = copy.deepcopy(message) new_message["content"] = "\n".join(tool_call_msgs) - await self.a_message_processor(sender, self, message, request_reply, silent, sender_type="groupchat") + await self.a_message_processor(sender, self, new_message, request_reply, silent, sender_type="groupchat") else: await self.a_message_processor(sender, self, message, request_reply, silent, sender_type="groupchat") elif self.message_processor: