diff --git a/python/packages/autogen-core/samples/common/adas/adas.py b/python/packages/autogen-core/samples/common/adas/adas.py index 7bd4cc2b779..ff7c21666c3 100644 --- a/python/packages/autogen-core/samples/common/adas/adas.py +++ b/python/packages/autogen-core/samples/common/adas/adas.py @@ -71,7 +71,7 @@ def print_repo_contents(repo, path="", indent=""): if content_file.type == "dir": documentation.extend(print_repo_contents(repo, content_file.path, indent + "│ ")) else: - if content_file.download_url.endswith('.md'): + if content_file.download_url.endswith('.md') or content_file.download_url.endswith('.ipynb'): print(f"Reading file from {content_file.download_url}") f = read_github_file(content_file.download_url) documentation.append("Title: " + content_file.name + "\nContents:\n" + f) diff --git a/python/packages/autogen-core/samples/common/adas/adas_prompt.py b/python/packages/autogen-core/samples/common/adas/adas_prompt.py index 0b577fbf240..11889c08888 100644 --- a/python/packages/autogen-core/samples/common/adas/adas_prompt.py +++ b/python/packages/autogen-core/samples/common/adas/adas_prompt.py @@ -610,39 +610,262 @@ async def output_result(_runtime: AgentRuntime, id: AgentId, message: WritingRes LLM_debate = { "thought": "By letting different LLMs debate with each other, we can leverage their diverse perspectives to find better solutions for tasks.", "name": "LLM Debate", - "code": """def forward(self, taskInfo): - # Instruction for initial reasoning - debate_initial_instruction = "Please think step by step and then solve the task." + "code": '''def forward(self, task, model_client_kwargs): + import asyncio + import json + import logging + import re + import sys + import uuid + from dataclasses import dataclass + from typing import Dict, List, Union + from autogen_core.base import MessageContext, TopicId, AgentId, AgentRuntime + from autogen_core.components import RoutedAgent, default_subscription, message_handler, TypeSubscription + from autogen_core.components.models import ( + AssistantMessage, + ChatCompletionClient, + LLMMessage, + SystemMessage, + UserMessage, + ) + from autogen_core.application import SingleThreadedAgentRuntime + from autogen_core.components import DefaultTopicId, RoutedAgent, message_handler, ClosureAgent + from autogen_ext.models import AzureOpenAIChatCompletionClient + from azure.identity import DefaultAzureCredential, get_bearer_token_provider + + token_provider = get_bearer_token_provider(DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default") - # Instruction for debating and updating the solution based on other agents' solutions - debate_instruction = "Given solutions to the problem from other agents, consider their opinions as additional advice. Please think carefully and provide an updated answer." + # Create an AzureOpenAI model client. + model_client = AzureOpenAIChatCompletionClient( + model=model_client_kwargs['model'], + api_version=model_client_kwargs['api_version'], + azure_endpoint=model_client_kwargs['azure_endpoint'], + azure_ad_token_provider=token_provider, + model_capabilities={ + "vision": True, + "function_calling": True, + "json_output": True, + }, + ) - # Initialize debate agents with different roles and a moderate temperature for varied reasoning - debate_agents = [LLMAgentBase(['thinking', 'answer'], 'Debate Agent', temperature=0.8, role=role) for role in ['Reading Comprehension Specialist', 'Logical Reasoning Strategist', 'Multidisciplinary Knowledge Integrator']] + @dataclass + class Question: + content: str - # Instruction for final decision-making based on all debates and solutions - final_decision_instruction = "Given all the above thinking and answers, reason over them carefully and provide a final answer." - final_decision_agent = LLMAgentBase(['thinking', 'answer'], 'Final Decision Agent', temperature=0.1) - max_round = 2 # Maximum number of debate rounds - all_thinking = [[] for _ in range(max_round)] - all_answer = [[] for _ in range(max_round)] + @dataclass + class Answer: + content: str + + + @dataclass + class SolverRequest: + content: str + question: str + + + @dataclass + class IntermediateSolverResponse: + content: str + question: str + answer: str + round: int + + + @dataclass + class FinalSolverResponse: + answer: str + + @default_subscription + class Solver(RoutedAgent): + def __init__(self, model_client: ChatCompletionClient, topic_type: str, num_neighbors: int, max_round: int) -> None: + super().__init__("A debator.") + self._topic_type = topic_type + self._model_client = model_client + self._num_neighbors = num_neighbors + self._history: List[LLMMessage] = [] + self._buffer: Dict[int, List[IntermediateSolverResponse]] = {} + self._system_messages = [ + SystemMessage( + ( + "You are a helpful assistant with expertise in reasoning. " + "Your task is to assist in solving a reasoning problem by providing " + "a clear and detailed solution. Limit your output within 100 words, " + "and your final answer should be a single string." + ) + ) + ] + self._round = 0 + self._max_round = max_round - # Perform debate rounds - for r in range(max_round): - for i in range(len(debate_agents)): - if r == 0: - thinking, answer = debate_agents[i]([taskInfo], debate_initial_instruction) + @message_handler + async def handle_request(self, message: SolverRequest, ctx: MessageContext) -> None: + # Add the question to the memory. + self._history.append(UserMessage(content=message.content, source="user")) + # Make an inference using the model. + model_result = await self._model_client.create(self._system_messages + self._history) + assert isinstance(model_result.content, str) + # Add the response to the memory. + self._history.append(AssistantMessage(content=model_result.content, source=self.metadata["type"])) + print(f"{'-'*80}\\nSolver {self.id} round {self._round}:\\n{model_result.content}") + # Increment the counter. + self._round += 1 + if self._round == self._max_round: + # If the counter reaches the maximum round, publishes a final response. + await self.publish_message(FinalSolverResponse(answer=model_result.content), topic_id=DefaultTopicId()) else: - input_infos = [taskInfo] + [all_thinking[r-1][i]] + all_thinking[r-1][:i] + all_thinking[r-1][i+1:] - thinking, answer = debate_agents[i](input_infos, debate_instruction) - all_thinking[r].append(thinking) - all_answer[r].append(answer) - - # Make the final decision based on all debate results and solutions - thinking, answer = final_decision_agent([taskInfo] + all_thinking[max_round-1] + all_answer[max_round-1], final_decision_instruction) - return answer -""" + # Publish intermediate response to the topic associated with this solver. + print("publish IntermediateSolverResponse") + await self.publish_message( + IntermediateSolverResponse( + content=model_result.content, + question=message.question, + answer=model_result.content, + round=self._round, + ), + topic_id=DefaultTopicId(type=self._topic_type), + ) + + @message_handler + async def handle_response(self, message: IntermediateSolverResponse, ctx: MessageContext) -> None: + # Add neighbor's response to the buffer. + self._buffer.setdefault(message.round, []).append(message) + # Check if all neighbors have responded. + if len(self._buffer[message.round]) == self._num_neighbors: + print( + f"{'-'*80}\\nSolver {self.id} round {message.round}:\\nReceived all responses from {self._num_neighbors} neighbors." + ) + # Prepare the prompt for the next question. + prompt = "These are the solutions to the problem from other agents:\\n" + for resp in self._buffer[message.round]: + prompt += f"One agent solution: {resp.content}\\n" + prompt += ( + "Using the solutions from other agents as additional information, " + "can you provide your answer to the problem? " + f"The original problem is {message.question}. " + "Your final answer should be a single string." + ) + # Send the question to the agent itself to solve. + await self.send_message(SolverRequest(content=prompt, question=message.question), self.id) + # Clear the buffer. + self._buffer.pop(message.round) + + + @default_subscription + class Aggregator(RoutedAgent): + def __init__(self, num_solvers: int) -> None: + super().__init__("Aggregator") + self._num_solvers = num_solvers + self._buffer: List[FinalSolverResponse] = [] + + @message_handler + async def handle_question(self, message: Question, ctx: MessageContext) -> None: + print(f"{'-'*80}\\nAggregator {self.id} received question:\\n{message.content}") + prompt = ( + f"Can you solve the following problem?\\n{message.content}\\n" + "Explain your reasoning. Your final answer should be a single string." + ) + print(f"{'-'*80}\\nAggregator {self.id} publishes initial solver request.") + await self.publish_message(SolverRequest(content=prompt, question=message.content), topic_id=DefaultTopicId()) + + @message_handler + async def handle_final_solver_response(self, message: FinalSolverResponse, ctx: MessageContext) -> None: + self._buffer.append(message) + if len(self._buffer) == self._num_solvers: + print(f"{'-'*80}\\nAggregator {self.id} received all final answers from {self._num_solvers} solvers.") + # Find the majority answer. + answers = [resp.answer for resp in self._buffer] + majority_answer = max(set(answers), key=answers.count) + # Publish the aggregated response. + await self.publish_message(Answer(content=majority_answer), topic_id=TopicId("result", self.id.key)) + # Clear the responses. + self._buffer.clear() + print(f"{'-'*80}\\nAggregator {self.id} publishes final answer:\\n{majority_answer}") + + + # Define the main function to set up and run the agent system + async def main(): + queue = asyncio.Queue[Answer]() + async def output_result(_runtime: AgentRuntime, id: AgentId, message: Answer, ctx: MessageContext) -> None: + await queue.put(message) + + runtime = SingleThreadedAgentRuntime() + await Solver.register( + runtime, + "SolverA", + lambda: Solver( + model_client=model_client, + topic_type="SolverA", + num_neighbors=2, + max_round=3, + ), + ) + await Solver.register( + runtime, + "SolverB", + lambda: Solver( + model_client=model_client, + topic_type="SolverB", + num_neighbors=2, + max_round=3, + ), + ) + await Solver.register( + runtime, + "SolverC", + lambda: Solver( + model_client=model_client, + topic_type="SolverC", + num_neighbors=2, + max_round=3, + ), + ) + await Solver.register( + runtime, + "SolverD", + lambda: Solver( + model_client=model_client, + topic_type="SolverD", + num_neighbors=2, + max_round=3, + ), + ) + await Aggregator.register(runtime, "Aggregator", lambda: Aggregator(num_solvers=4)) + + # Subscriptions for topic published to by SolverA. + await runtime.add_subscription(TypeSubscription("SolverA", "SolverD")) + await runtime.add_subscription(TypeSubscription("SolverA", "SolverB")) + + # Subscriptions for topic published to by SolverB. + await runtime.add_subscription(TypeSubscription("SolverB", "SolverA")) + await runtime.add_subscription(TypeSubscription("SolverB", "SolverC")) + + # Subscriptions for topic published to by SolverC. + await runtime.add_subscription(TypeSubscription("SolverC", "SolverB")) + await runtime.add_subscription(TypeSubscription("SolverC", "SolverD")) + + # Subscriptions for topic published to by SolverD. + await runtime.add_subscription(TypeSubscription("SolverD", "SolverC")) + await runtime.add_subscription(TypeSubscription("SolverD", "SolverA")) + + # All solvers and the aggregator subscribe to the default topic. + + result_topic = TypeSubscription(topic_type="result", agent_type="output_result") + await ClosureAgent.register(runtime, "output_result", output_result, subscriptions=lambda: [result_topic]) + + runtime.start() + await runtime.publish_message(Question(content=task), DefaultTopicId()) + + # Keep processing messages until idle. + await runtime.stop_when_idle() + + # Return the answer from the queue + res = (await queue.get()).content + print(f"res {res}") + return res + + return asyncio.run(main()) +''' } Take_a_step_back = {"thought": "Let LLM first think about the principles involved in solving this task which could be helpful. By understanding the underlying principles, the model can better reason through the problem and provide a more accurate solution.", @@ -1032,8 +1255,57 @@ async def main(): topic_id=TopicId(type="orchestrator_type") ) ``` +Or use the `type_subscription()` class decorator on the agent. +``` +@type_subscription(topic_type="orchestrator_type") +class OrchestratorAgent(RoutedAgent): + pass + +async def main(): + await OrchestratorAgent.register(runtime, "orchestrator", lambda: OrchestratorAgent()) + + await runtime.publish_message( + message=DiverseThoughtTask(task='What is the most creative art medium?'), + topic_id=TopicId(type="orchestrator_type") + ) +``` Now, you can publish directly to a specific topic through the runtime. +10. This is WRONG: ``` +class OrchestratorAgent(RoutedAgent): + pass + +async def main(): + await OrchestratorAgent.register(runtime, "orchestrator", lambda: OrchestratorAgent()) + + await runtime.publish_message( + message=DiverseThoughtTask(task='What is the most creative art medium?'), + topic_id=DefaultTopicId() + ) +``` +When there is a single scope of publishing, that is, all agents publish and subscribe to all broadcasted messages, we can use the convenience classes `DefaultTopicId` and `default_subscription()` to simplify our code. +Use the `default_subscription` class decorator on the agent. +``` +@default_subscription +class OrchestratorAgent(RoutedAgent): + pass + +async def main(): + await OrchestratorAgent.register(runtime, "orchestrator", lambda: OrchestratorAgent()) + + await runtime.publish_message( + message=DiverseThoughtTask(task='What is the most creative art medium?'), + topic_id=DefaultTopicId() + ) +``` + +11. This is WRONG: ``` +await runtime.publish_message(DiverseThoughtTask(task='Who is the most creative composer?'), AgentId("consensus_agent", "default")) +``` +The `publish_message` should publish to a topic. Use `TopicId` or `DefaultTopicId`. For example: ``` +await runtime.publish_message(DiverseThoughtTask(task='Who is the most creative composer?'), TopicId("consensus_agent", "default")) +``` + ## CORRECT Implementation examples: Here are some correct patterns you should follow: @@ -1137,8 +1409,8 @@ def get_init_archive(): # return [COT]#, COT_SC, Reflexion, LLM_debate, Take_a_step_back, QD, Role_Assignment] # return [COT_SC]#, COT_SC, Reflexion, LLM_debate, Take_a_step_back, QD, Role_Assignment] # return [Reflexion]#, COT_SC, Reflexion, LLM_debate, Take_a_step_back, QD, Role_Assignment] - return [COT, COT_SC, Reflexion] # LLM_debate, Take_a_step_back, QD, Role_Assignment] - + # return [COT, COT_SC, Reflexion] # LLM_debate, Take_a_step_back, QD, Role_Assignment] + return [LLM_debate] def get_prompt(current_archive, adaptive=False):