From 14846a3e84b4bbc6b019f67d7e28ebc83c47a986 Mon Sep 17 00:00:00 2001 From: Ryan Sweet Date: Mon, 28 Oct 2024 17:28:36 -0700 Subject: [PATCH] first draft of stateful persistence grains for each agent.... (#3954) * adds Orleans persistence for AgentState --- dotnet/AutoGen.sln | 7 + .../HelloAgentState/HelloAgentState.csproj | 21 +++ .../samples/Hello/HelloAgentState/Program.cs | 75 ++++++++++ .../samples/Hello/HelloAgentState/README.md | 138 ++++++++++++++++++ .../{AgentState.cs => ChatState.cs} | 5 +- .../Abstractions/MessageExtensions.cs | 22 ++- .../Microsoft.AutoGen.Abstractions.csproj | 1 - .../src/Microsoft.AutoGen/Agents/AgentBase.cs | 14 +- .../Microsoft.AutoGen/Agents/AgentClient.cs | 12 +- .../Microsoft.AutoGen/Agents/AgentContext.cs | 11 +- .../Agents/AgentWorkerRuntime.cs | 25 +++- .../Agents/Agents/AIAgent/InferenceAgent.cs | 3 +- .../Agents/Agents/AIAgent/SKAiAgent.cs | 1 + .../IOAgent/ConsoleAgent/ConsoleAgent.cs | 2 +- .../Agents/IOAgent/FileAgent/FileAgent.cs | 2 +- .../Agents/Agents/IOAgent/IOAgent.cs | 8 +- .../Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs | 2 +- .../Microsoft.AutoGen/Agents/IAgentBase.cs | 22 +++ .../Microsoft.AutoGen/Agents/IAgentContext.cs | 2 + .../Runtime/AgentStateGrain.cs | 20 --- .../Runtime/AgentWorkerHostingExtensions.cs | 35 ++--- .../Runtime/IAgentStateGrain.cs | 7 - .../Runtime/IWorkerAgentGrain.cs | 9 ++ .../Runtime/IWorkerGateway.cs | 2 + .../Runtime/Microsoft.AutoGen.Runtime.csproj | 9 ++ .../Runtime/OrleansRuntimeHostingExtenions.cs | 85 +++++++++++ .../Runtime/WorkerAgentGrain.cs | 31 ++++ .../Runtime/WorkerGateway.cs | 12 ++ .../Runtime/WorkerGatewayService.cs | 14 ++ protos/agent_worker.proto | 23 +++ .../_worker_runtime_host_servicer.py | 14 ++ .../application/protos/agent_worker_pb2.py | 16 +- .../application/protos/agent_worker_pb2.pyi | 76 ++++++++++ .../protos/agent_worker_pb2_grpc.py | 66 +++++++++ .../protos/agent_worker_pb2_grpc.pyi | 34 +++++ 35 files changed, 749 insertions(+), 77 deletions(-) create mode 100644 dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj create mode 100644 dotnet/samples/Hello/HelloAgentState/Program.cs create mode 100644 dotnet/samples/Hello/HelloAgentState/README.md rename dotnet/src/Microsoft.AutoGen/Abstractions/{AgentState.cs => ChatState.cs} (65%) create mode 100644 dotnet/src/Microsoft.AutoGen/Agents/IAgentBase.cs delete mode 100644 dotnet/src/Microsoft.AutoGen/Runtime/AgentStateGrain.cs delete mode 100644 dotnet/src/Microsoft.AutoGen/Runtime/IAgentStateGrain.cs create mode 100644 dotnet/src/Microsoft.AutoGen/Runtime/IWorkerAgentGrain.cs create mode 100644 dotnet/src/Microsoft.AutoGen/Runtime/OrleansRuntimeHostingExtenions.cs create mode 100644 dotnet/src/Microsoft.AutoGen/Runtime/WorkerAgentGrain.cs diff --git a/dotnet/AutoGen.sln b/dotnet/AutoGen.sln index 1106ebf844f7..83147d38dc7b 100644 --- a/dotnet/AutoGen.sln +++ b/dotnet/AutoGen.sln @@ -125,6 +125,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AIModelClientHostingExtensi EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "sample", "sample", "{686480D7-8FEC-4ED3-9C5D-CEBE1057A7ED}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloAgentState", "samples\Hello\HelloAgentState\HelloAgentState.csproj", "{64EF61E7-00A6-4E5E-9808-62E10993A0E5}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -335,6 +337,10 @@ Global {97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Debug|Any CPU.Build.0 = Debug|Any CPU {97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Release|Any CPU.ActiveCfg = Release|Any CPU {97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Release|Any CPU.Build.0 = Release|Any CPU + {64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -395,6 +401,7 @@ Global {A20B9894-F352-4338-872A-F215A241D43D} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45} {8F7560CF-EEBB-4333-A69F-838CA40FD85D} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45} {97550E87-48C6-4EBF-85E1-413ABAE9DBFD} = {18BF8DD7-0585-48BF-8F97-AD333080CE06} + {64EF61E7-00A6-4E5E-9808-62E10993A0E5} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {93384647-528D-46C8-922C-8DB36A382F0B} diff --git a/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj b/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj new file mode 100644 index 000000000000..eb2ba96d6644 --- /dev/null +++ b/dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + Exe + net8.0 + enable + enable + + + diff --git a/dotnet/samples/Hello/HelloAgentState/Program.cs b/dotnet/samples/Hello/HelloAgentState/Program.cs new file mode 100644 index 000000000000..6880bdd61679 --- /dev/null +++ b/dotnet/samples/Hello/HelloAgentState/Program.cs @@ -0,0 +1,75 @@ +// Copyright (c) Microsoft. All rights reserved. + +using Microsoft.AutoGen.Abstractions; +using Microsoft.AutoGen.Agents; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; + +// send a message to the agent +var app = await AgentsApp.PublishMessageAsync("HelloAgents", new NewMessageReceived +{ + Message = "World" +}, local: false); + +await app.WaitForShutdownAsync(); + +namespace Hello +{ + [TopicSubscription("HelloAgents")] + public class HelloAgent( + IAgentContext context, + [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : ConsoleAgent( + context, + typeRegistry), + ISayHello, + IHandle, + IHandle + { + private AgentState? State { get; set; } + public async Task Handle(NewMessageReceived item) + { + var response = await SayHello(item.Message).ConfigureAwait(false); + var evt = new Output + { + Message = response + }.ToCloudEvent(this.AgentId.Key); + var entry = "We said hello to " + item.Message; + await Store(new AgentState + { + AgentId = this.AgentId, + TextData = entry + }).ConfigureAwait(false); + await PublishEvent(evt).ConfigureAwait(false); + var goodbye = new ConversationClosed + { + UserId = this.AgentId.Key, + UserMessage = "Goodbye" + }.ToCloudEvent(this.AgentId.Key); + await PublishEvent(goodbye).ConfigureAwait(false); + } + public async Task Handle(ConversationClosed item) + { + State = await Read(this.AgentId).ConfigureAwait(false); + var read = State?.TextData ?? "No state data found"; + var goodbye = $"{read}\n********************* {item.UserId} said {item.UserMessage} ************************"; + var evt = new Output + { + Message = goodbye + }.ToCloudEvent(this.AgentId.Key); + await PublishEvent(evt).ConfigureAwait(false); + //sleep + await Task.Delay(10000).ConfigureAwait(false); + await AgentsApp.ShutdownAsync().ConfigureAwait(false); + + } + public async Task SayHello(string ask) + { + var response = $"\n\n\n\n***************Hello {ask}**********************\n\n\n\n"; + return response; + } + } + public interface ISayHello + { + public Task SayHello(string ask); + } +} diff --git a/dotnet/samples/Hello/HelloAgentState/README.md b/dotnet/samples/Hello/HelloAgentState/README.md new file mode 100644 index 000000000000..06c4883182c9 --- /dev/null +++ b/dotnet/samples/Hello/HelloAgentState/README.md @@ -0,0 +1,138 @@ +# AutoGen 0.4 .NET Hello World Sample + +This [sample](Program.cs) demonstrates how to create a simple .NET console application that listens for an event and then orchestrates a series of actions in response. + +## Prerequisites + +To run this sample, you'll need: [.NET 8.0](https://dotnet.microsoft.com/en-us/) or later. +Also recommended is the [GitHub CLI](https://cli.github.com/). + +## Instructions to run the sample + +```bash +# Clone the repository +gh repo clone microsoft/autogen +cd dotnet/samples/Hello +dotnet run +``` + +## Key Concepts + +This sample illustrates how to create your own agent that inherits from a base agent and listens for an event. It also shows how to use the SDK's App Runtime locally to start the agent and send messages. + +Flow Diagram: + +```mermaid +%%{init: {'theme':'forest'}}%% +graph LR; + A[Main] --> |"PublishEvent(NewMessage('World'))"| B{"Handle(NewMessageReceived item)"} + B --> |"PublishEvent(Output('***Hello, World***'))"| C[ConsoleAgent] + C --> D{"WriteConsole()"} + B --> |"PublishEvent(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item)"} + B --> |"PublishEvent(Output('***Goodbye***'))"| C + E --> F{"Shutdown()"} + +``` + +### Writing Event Handlers + +The heart of an autogen application are the event handlers. Agents select a ```TopicSubscription``` to listen for events on a specific topic. When an event is received, the agent's event handler is called with the event data. + +Within that event handler you may optionally *emit* new events, which are then sent to the event bus for other agents to process. The EventTypes are declared gRPC ProtoBuf messages that are used to define the schema of the event. The default protos are available via the ```Microsoft.AutoGen.Abstractions;``` namespace and are defined in [autogen/protos](/autogen/protos). The EventTypes are registered in the agent's constructor using the ```IHandle``` interface. + +```csharp +TopicSubscription("HelloAgents")] +public class HelloAgent( + IAgentContext context, + [FromKeyedServices("EventTypes")] EventTypes typeRegistry) : ConsoleAgent( + context, + typeRegistry), + ISayHello, + IHandle, + IHandle +{ + public async Task Handle(NewMessageReceived item) + { + var response = await SayHello(item.Message).ConfigureAwait(false); + var evt = new Output + { + Message = response + }.ToCloudEvent(this.AgentId.Key); + await PublishEvent(evt).ConfigureAwait(false); + var goodbye = new ConversationClosed + { + UserId = this.AgentId.Key, + UserMessage = "Goodbye" + }.ToCloudEvent(this.AgentId.Key); + await PublishEvent(goodbye).ConfigureAwait(false); + } +``` + +### Inheritance and Composition + +This sample also illustrates inheritance in AutoGen. The `HelloAgent` class inherits from `ConsoleAgent`, which is a base class that provides a `WriteConsole` method. + +### Starting the Application Runtime + +AuotoGen provides a flexible runtime ```Microsoft.AutoGen.Agents.App``` that can be started in a variety of ways. The `Program.cs` file demonstrates how to start the runtime locally and send a message to the agent all in one go using the ```App.PublishMessageAsync``` method. + +```csharp +// send a message to the agent +var app = await App.PublishMessageAsync("HelloAgents", new NewMessageReceived +{ + Message = "World" +}, local: true); + +await App.RuntimeApp!.WaitForShutdownAsync(); +await app.WaitForShutdownAsync(); +``` + +### Sending Messages + +The set of possible Messages is defined in gRPC ProtoBuf specs. These are then turned into C# classes by the gRPC tools. You can define your own Message types by creating a new .proto file in your project and including the gRPC tools in your ```.csproj``` file: + +```proto +syntax = "proto3"; +package devteam; +option csharp_namespace = "DevTeam.Shared"; +message NewAsk { + string org = 1; + string repo = 2; + string ask = 3; + int64 issue_number = 4; +} +message ReadmeRequested { + string org = 1; + string repo = 2; + int64 issue_number = 3; + string ask = 4; +} +``` + +```xml + + + + + +``` + +You can send messages using the [```Microsoft.AutoGen.Agents``` class](autogen/dotnet/src/Microsoft.AutoGen/Agents/AgentClient.cs). Messages are wrapped in [the CloudEvents specification](https://cloudevents.io) and sent to the event bus. + +### Managing State + +There is a simple API for persisting agent state. + +```csharp + await Store(new AgentState + { + AgentId = this.AgentId, + TextData = entry + }).ConfigureAwait(false); +``` + +which can be read back using Read: + +```csharp + State = await Read(this.AgentId).ConfigureAwait(false); +``` diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/AgentState.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/ChatState.cs similarity index 65% rename from dotnet/src/Microsoft.AutoGen/Abstractions/AgentState.cs rename to dotnet/src/Microsoft.AutoGen/Abstractions/ChatState.cs index 53093bc9b9d2..8185c153d9d0 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/AgentState.cs +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/ChatState.cs @@ -1,6 +1,9 @@ +using Google.Protobuf; + namespace Microsoft.AutoGen.Abstractions; -public class AgentState where T : class, new() +public class ChatState + where T : IMessage, new() { public List History { get; set; } = new(); public T Data { get; set; } = new(); diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/MessageExtensions.cs b/dotnet/src/Microsoft.AutoGen/Abstractions/MessageExtensions.cs index 724a706b102e..5fa09ae218b2 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/MessageExtensions.cs +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/MessageExtensions.cs @@ -16,9 +16,29 @@ public static CloudEvent ToCloudEvent(this T message, string source) where T }; } - public static T FromCloudEvent(this CloudEvent cloudEvent) where T : IMessage, new() { return cloudEvent.ProtoData.Unpack(); } + public static AgentState ToAgentState(this T state, AgentId agentId, string eTag) where T : IMessage + { + return new AgentState + { + ProtoData = Any.Pack(state), + AgentId = agentId, + ETag = eTag + }; + } + + public static T FromAgentState(this AgentState state) where T : IMessage, new() + { + if (state.HasTextData == true) + { + if (typeof(T) == typeof(AgentState)) + { + return (T)(IMessage)state; + } + } + return state.ProtoData.Unpack(); + } } diff --git a/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj b/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj index fe480940cbda..52f933e19595 100644 --- a/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj +++ b/dotnet/src/Microsoft.AutoGen/Abstractions/Microsoft.AutoGen.Abstractions.csproj @@ -14,7 +14,6 @@ - diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs index 6307988a46c5..62779f8366c7 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs @@ -108,7 +108,16 @@ await this.InvokeWithActivityAsync( break; } } - + protected async Task Store(AgentState state) + { + await _context.Store(state).ConfigureAwait(false); + return; + } + protected async Task Read(AgentId agentId) where T : IMessage, new() + { + var agentstate = await _context.Read(agentId).ConfigureAwait(false); + return agentstate.FromAgentState(); + } private void OnResponseCore(RpcResponse response) { var requestId = response.RequestId; @@ -186,7 +195,6 @@ static async ((AgentBase Agent, RpcRequest Request, TaskCompletionSource logger, AgentWorkerRuntime runtime, DistributedContextPropagator distributedContextPropagator, [FromKeyedServices("EventTypes")] EventTypes eventTypes) : AgentBase(new ClientContext(logger, runtime, distributedContextPropagator), eventTypes) { public async ValueTask PublishEventAsync(CloudEvent evt) => await PublishEvent(evt); public async ValueTask SendRequestAsync(AgentId target, string method, Dictionary parameters) => await RequestAsync(target, method, parameters); - public async ValueTask PublishEventAsync(string topic, IMessage evt) { await PublishEventAsync(evt.ToCloudEvent(topic)).ConfigureAwait(false); @@ -23,12 +21,10 @@ private sealed class ClientContext(ILogger logger, AgentWorkerRunti public AgentBase? AgentInstance { get; set; } public ILogger Logger { get; } = logger; public DistributedContextPropagator DistributedContextPropagator { get; } = distributedContextPropagator; - public async ValueTask PublishEventAsync(CloudEvent @event) { await runtime.PublishEvent(@event).ConfigureAwait(false); } - public async ValueTask SendRequestAsync(AgentBase agent, RpcRequest request) { await runtime.SendRequest(AgentInstance!, request).ConfigureAwait(false); @@ -38,5 +34,13 @@ public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse respons { await runtime.SendResponse(response).ConfigureAwait(false); } + public ValueTask Store(AgentState value) + { + throw new NotImplementedException(); + } + public ValueTask Read(AgentId agentId) + { + throw new NotImplementedException(); + } } } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs index 43d1137c8615..779cc86a608a 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentContext.cs @@ -12,20 +12,25 @@ internal sealed class AgentContext(AgentId agentId, AgentWorkerRuntime runtime, public ILogger Logger { get; } = logger; public AgentBase? AgentInstance { get; set; } public DistributedContextPropagator DistributedContextPropagator { get; } = distributedContextPropagator; - public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse response) { response.RequestId = request.RequestId; await _runtime.SendResponse(response); } - public async ValueTask SendRequestAsync(AgentBase agent, RpcRequest request) { await _runtime.SendRequest(agent, request); } - public async ValueTask PublishEventAsync(CloudEvent @event) { await _runtime.PublishEvent(@event); } + public async ValueTask Store(AgentState value) + { + await _runtime.Store(value); + } + public async ValueTask Read(AgentId agentId) + { + return await _runtime.Read(agentId); + } } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentWorkerRuntime.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentWorkerRuntime.cs index d0df48f71bff..f335881fc09b 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentWorkerRuntime.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentWorkerRuntime.cs @@ -84,7 +84,6 @@ private async Task RunReadPump() request.Agent.ReceiveMessage(message); break; case Message.MessageOneofCase.CloudEvent: - // TODO: Reimplement // HACK: Send the message to an instance of each agent type // where AgentId = (namespace: event.Namespace, name: agentType) @@ -323,10 +322,32 @@ public async Task StopAsync(CancellationToken cancellationToken) _channel?.Dispose(); } } - public ValueTask SendRequest(RpcRequest request) { throw new NotImplementedException(); } + public ValueTask Store(AgentState value) + { + var agentId = value.AgentId ?? throw new InvalidOperationException("AgentId is required when saving AgentState."); + var response = _client.SaveState(value); + if (!response.Success) + { + throw new InvalidOperationException($"Error saving AgentState for AgentId {agentId}."); + } + return ValueTask.CompletedTask; + } + public async ValueTask Read(AgentId agentId) + { + var response = await _client.GetStateAsync(agentId); + // if (response.Success && response.AgentState.AgentId is not null) - why is success always false? + if (response.AgentState.AgentId is not null) + { + return response.AgentState; + } + else + { + throw new KeyNotFoundException($"Failed to read AgentState for {agentId}."); + } + } } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/InferenceAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/InferenceAgent.cs index e1f932fa6642..15c4fc095fa6 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/InferenceAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/InferenceAgent.cs @@ -1,6 +1,7 @@ +using Google.Protobuf; using Microsoft.Extensions.AI; namespace Microsoft.AutoGen.Agents.Client; -public abstract class InferenceAgent : AgentBase where T : class, new() +public abstract class InferenceAgent : AgentBase where T : IMessage, new() { protected IChatClient ChatClient { get; } public InferenceAgent( diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/SKAiAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/SKAiAgent.cs index 84bd2f821906..becd2c208fa6 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/SKAiAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/AIAgent/SKAiAgent.cs @@ -40,6 +40,7 @@ public virtual async Task CallFunction(string template, KernelArguments var function = _kernel.CreateFunctionFromPrompt(template, promptSettings); var result = (await _kernel.InvokeAsync(function, arguments).ConfigureAwait(true)).ToString(); AddToHistory(result, ChatUserType.Agent); + //await Store(_state.Data.ToAgentState(AgentId,""));//TODO add eTag return result; } diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/ConsoleAgent/ConsoleAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/ConsoleAgent/ConsoleAgent.cs index c6e9f4392da9..2df6c7965031 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/ConsoleAgent/ConsoleAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/ConsoleAgent/ConsoleAgent.cs @@ -3,7 +3,7 @@ namespace Microsoft.AutoGen.Agents; -public abstract class ConsoleAgent : IOAgent, +public abstract class ConsoleAgent : IOAgent, IUseConsole, IHandle, IHandle diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/FileAgent/FileAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/FileAgent/FileAgent.cs index f8bf4630428b..2149a32d23cc 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/FileAgent/FileAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/FileAgent/FileAgent.cs @@ -10,7 +10,7 @@ public abstract class FileAgent( [FromKeyedServices("EventTypes")] EventTypes typeRegistry, string inputPath = "input.txt", string outputPath = "output.txt" - ) : IOAgent(context, typeRegistry), + ) : IOAgent(context, typeRegistry), IUseFiles, IHandle, IHandle diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/IOAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/IOAgent.cs index 7d1438720377..fc0f49733176 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/IOAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/IOAgent.cs @@ -2,16 +2,12 @@ namespace Microsoft.AutoGen.Agents; -public abstract class IOAgent : AgentBase where T : class, new() +public abstract class IOAgent : AgentBase { - protected AgentState _state; public string _route = "base"; - - public IOAgent(IAgentContext context, EventTypes typeRegistry) : base(context, typeRegistry) + protected IOAgent(IAgentContext context, EventTypes eventTypes) : base(context, eventTypes) { - _state = new(); } - public virtual async Task Handle(Input item) { diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs index 47d107d63da7..418ef8d5ab0e 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Agents/IOAgent/WebAPIAgent/WebAPIAgent.cs @@ -6,7 +6,7 @@ namespace Microsoft.AutoGen.Agents; -public abstract class WebAPIAgent : IOAgent, +public abstract class WebAPIAgent : IOAgent, IUseWebAPI, IHandle, IHandle diff --git a/dotnet/src/Microsoft.AutoGen/Agents/IAgentBase.cs b/dotnet/src/Microsoft.AutoGen/Agents/IAgentBase.cs new file mode 100644 index 000000000000..122dff2c6270 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Agents/IAgentBase.cs @@ -0,0 +1,22 @@ +using Microsoft.AutoGen.Abstractions; +using Microsoft.Extensions.Logging; + +namespace Microsoft.AutoGen.Agents +{ + public interface IAgentBase + { + // Properties + string AgentId { get; } + ILogger Logger { get; } + IAgentContext Context { get; } + + // Methods + Task CallHandler(CloudEvent item); + Task HandleRequest(RpcRequest request); + Task Start(); + Task ReceiveMessage(Message message); + Task Store(AgentState state); + Task Read(AgentId agentId); + Task PublishEvent(CloudEvent item); + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Agents/IAgentContext.cs b/dotnet/src/Microsoft.AutoGen/Agents/IAgentContext.cs index a7911e37e51b..0dfa78b36e9f 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/IAgentContext.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/IAgentContext.cs @@ -10,6 +10,8 @@ public interface IAgentContext AgentBase? AgentInstance { get; set; } DistributedContextPropagator DistributedContextPropagator { get; } ILogger Logger { get; } + ValueTask Store(AgentState value); + ValueTask Read(AgentId agentId); ValueTask SendResponseAsync(RpcRequest request, RpcResponse response); ValueTask SendRequestAsync(AgentBase agent, RpcRequest request); ValueTask PublishEventAsync(CloudEvent @event); diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/AgentStateGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime/AgentStateGrain.cs deleted file mode 100644 index d717e26f46f1..000000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime/AgentStateGrain.cs +++ /dev/null @@ -1,20 +0,0 @@ -namespace Microsoft.AutoGen.Runtime; - -internal sealed class AgentStateGrain([PersistentState("state", "agent-state")] IPersistentState> state) : Grain, IAgentStateGrain -{ - public ValueTask<(Dictionary State, string ETag)> ReadStateAsync() - { - return new((state.State, state.Etag)); - } - - public async ValueTask WriteStateAsync(Dictionary value, string eTag) - { - if (string.Equals(state.Etag, eTag, StringComparison.Ordinal)) - { - state.State = value; - await state.WriteStateAsync(); - } - - return state.Etag; - } -} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerHostingExtensions.cs b/dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerHostingExtensions.cs index 48e911f351ef..447b527417a5 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerHostingExtensions.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime/AgentWorkerHostingExtensions.cs @@ -5,19 +5,27 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Hosting; -using Orleans.Serialization; namespace Microsoft.AutoGen.Runtime; public static class AgentWorkerHostingExtensions { - public static IHostApplicationBuilder AddAgentService(this IHostApplicationBuilder builder) + public static WebApplicationBuilder AddAgentService(this WebApplicationBuilder builder, bool local = false) { + if (local) + { + //TODO: make configuration more flexible + builder.WebHost.ConfigureKestrel(serverOptions => + { + serverOptions.ListenLocalhost(5001, listenOptions => + { + listenOptions.Protocols = HttpProtocols.Http2; + listenOptions.UseHttps(); + }); + }); + } builder.Services.AddGrpc(); - builder.Services.AddSerializer(serializer => serializer.AddProtobufSerializer()); - - // Ensure Orleans is added before the hosted service to guarantee that it starts first. - builder.UseOrleans(); + builder.AddOrleans(local); builder.Services.TryAddSingleton(DistributedContextPropagator.Current); builder.Services.AddSingleton(); builder.Services.AddSingleton(sp => sp.GetRequiredService()); @@ -27,22 +35,9 @@ public static IHostApplicationBuilder AddAgentService(this IHostApplicationBuild public static WebApplicationBuilder AddLocalAgentService(this WebApplicationBuilder builder) { - builder.WebHost.ConfigureKestrel(serverOptions => - { - serverOptions.ListenLocalhost(5001, listenOptions => - { - listenOptions.Protocols = HttpProtocols.Http2; - listenOptions.UseHttps(); - }); - }); - builder.AddAgentService(); - builder.UseOrleans(siloBuilder => - { - siloBuilder.UseLocalhostClustering(); ; - }); + builder.AddAgentService(local: true); return builder; } - public static WebApplication MapAgentService(this WebApplication app) { app.MapGrpcService(); diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/IAgentStateGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime/IAgentStateGrain.cs deleted file mode 100644 index b5ece3ad6fa5..000000000000 --- a/dotnet/src/Microsoft.AutoGen/Runtime/IAgentStateGrain.cs +++ /dev/null @@ -1,7 +0,0 @@ -namespace Microsoft.AutoGen.Runtime; - -internal interface IAgentStateGrain : IGrainWithStringKey -{ - ValueTask<(Dictionary State, string ETag)> ReadStateAsync(); - ValueTask WriteStateAsync(Dictionary state, string eTag); -} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerAgentGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerAgentGrain.cs new file mode 100644 index 000000000000..ce93b9a41efd --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerAgentGrain.cs @@ -0,0 +1,9 @@ +using Microsoft.AutoGen.Abstractions; + +namespace Microsoft.AutoGen.Runtime; + +internal interface IWorkerAgentGrain : IGrainWithStringKey +{ + ValueTask ReadStateAsync(); + ValueTask WriteStateAsync(AgentState state, string eTag); +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerGateway.cs b/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerGateway.cs index c48c0fa8a6ca..ec63cdcc8874 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime/IWorkerGateway.cs @@ -7,4 +7,6 @@ public interface IWorkerGateway : IGrainObserver { ValueTask InvokeRequest(RpcRequest request); ValueTask BroadcastEvent(CloudEvent evt); + ValueTask Store(AgentState value); + ValueTask Read(AgentId agentId); } diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/Microsoft.AutoGen.Runtime.csproj b/dotnet/src/Microsoft.AutoGen/Runtime/Microsoft.AutoGen.Runtime.csproj index 37e1bd292681..40a240c2f699 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/Microsoft.AutoGen.Runtime.csproj +++ b/dotnet/src/Microsoft.AutoGen/Runtime/Microsoft.AutoGen.Runtime.csproj @@ -21,6 +21,15 @@ + + + + + + + + + diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/OrleansRuntimeHostingExtenions.cs b/dotnet/src/Microsoft.AutoGen/Runtime/OrleansRuntimeHostingExtenions.cs new file mode 100644 index 000000000000..3f980cf85d36 --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Runtime/OrleansRuntimeHostingExtenions.cs @@ -0,0 +1,85 @@ +using System.Configuration; +using Microsoft.AspNetCore.Builder; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.Hosting; +using Orleans.Configuration; +using Orleans.Serialization; + +namespace Microsoft.AutoGen.Runtime; + +public static class OrleansRuntimeHostingExtenions +{ + public static WebApplicationBuilder AddOrleans(this WebApplicationBuilder builder, bool local = false) + { + + builder.Services.AddSerializer(serializer => serializer.AddProtobufSerializer()); + // Ensure Orleans is added before the hosted service to guarantee that it starts first. + //TODO: make all of this configurable + builder.Host.UseOrleans(siloBuilder => + { + // Development mode or local mode uses in-memory storage and streams + if (builder.Environment.IsDevelopment() || local) + { + siloBuilder.UseLocalhostClustering() + .AddMemoryStreams("StreamProvider") + .AddMemoryGrainStorage("PubSubStore") + .AddMemoryGrainStorage("AgentStateStore"); + + siloBuilder.UseInMemoryReminderService(); + siloBuilder.UseDashboard(x => x.HostSelf = true); + + siloBuilder.UseInMemoryReminderService(); + } + else + { + var cosmosDbconnectionString = builder.Configuration.GetValue("Orleans:CosmosDBConnectionString") ?? + throw new ConfigurationErrorsException( + "Orleans:CosmosDBConnectionString is missing from configuration. This is required for persistence in production environments."); + siloBuilder.Configure(options => + { + //TODO: make this configurable + options.ClusterId = "AutoGen-cluster"; + options.ServiceId = "AutoGen-cluster"; + }); + siloBuilder.Configure(options => + { + options.ResponseTimeout = TimeSpan.FromMinutes(3); + options.SystemResponseTimeout = TimeSpan.FromMinutes(3); + }); + siloBuilder.Configure(options => + { + options.ResponseTimeout = TimeSpan.FromMinutes(3); + }); + siloBuilder.UseCosmosClustering(o => + { + o.ConfigureCosmosClient(cosmosDbconnectionString); + o.ContainerName = "AutoGen"; + o.DatabaseName = "clustering"; + o.IsResourceCreationEnabled = true; + }); + + siloBuilder.UseCosmosReminderService(o => + { + o.ConfigureCosmosClient(cosmosDbconnectionString); + o.ContainerName = "AutoGen"; + o.DatabaseName = "reminders"; + o.IsResourceCreationEnabled = true; + }); + siloBuilder.AddCosmosGrainStorage( + name: "AgentStateStore", + configureOptions: o => + { + o.ConfigureCosmosClient(cosmosDbconnectionString); + o.ContainerName = "AutoGen"; + o.DatabaseName = "persistence"; + o.IsResourceCreationEnabled = true; + }); + //TODO: replace with EventHub + siloBuilder + .AddMemoryStreams("StreamProvider") + .AddMemoryGrainStorage("PubSubStore"); + } + }); + return builder; + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerAgentGrain.cs b/dotnet/src/Microsoft.AutoGen/Runtime/WorkerAgentGrain.cs new file mode 100644 index 000000000000..3bbe7d78cd5b --- /dev/null +++ b/dotnet/src/Microsoft.AutoGen/Runtime/WorkerAgentGrain.cs @@ -0,0 +1,31 @@ +using Microsoft.AutoGen.Abstractions; + +namespace Microsoft.AutoGen.Runtime; + +internal sealed class WorkerAgentGrain([PersistentState("state", "AgentStateStore")] IPersistentState state) : Grain, IWorkerAgentGrain +{ + public async ValueTask WriteStateAsync(AgentState newState, string eTag) + { + // etags for optimistic concurrency control + // if the Etag is null, its a new state + // if the passed etag is null or empty, we should not check the current state's Etag - caller doesnt care + // if both etags are set, they should match or it means that the state has changed since the last read. + if ((string.IsNullOrEmpty(state.Etag)) || (string.IsNullOrEmpty(eTag)) || (string.Equals(state.Etag, eTag, StringComparison.Ordinal))) + { + state.State = newState; + await state.WriteStateAsync(); + } + else + { + //TODO - this is probably not the correct behavior to just throw - I presume we want to somehow let the caller know that the state has changed and they need to re-read it + throw new ArgumentException( + "The provided ETag does not match the current ETag. The state has been modified by another request."); + } + return state.Etag; + } + + public ValueTask ReadStateAsync() + { + return ValueTask.FromResult(state.State); + } +} diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGateway.cs b/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGateway.cs index 6cb26bc1c710..6d549ef7270f 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGateway.cs @@ -37,6 +37,7 @@ public WorkerGateway(IClusterClient clusterClient, ILogger logger public async ValueTask BroadcastEvent(CloudEvent evt) { + // TODO: filter the workers that receive the event var tasks = new List(_workers.Count); foreach (var (_, connection) in _workers) { @@ -211,7 +212,18 @@ private static async Task InvokeRequestDelegate(WorkerProcessConnection connecti await connection.ResponseStream.WriteAsync(new Message { Response = new RpcResponse { RequestId = request.RequestId, Error = ex.Message } }); } } + public async ValueTask Store(AgentState value) + { + var agentId = value.AgentId ?? throw new ArgumentNullException(nameof(value.AgentId)); + var agentState = _clusterClient.GetGrain($"{agentId.Type}:{agentId.Key}"); + await agentState.WriteStateAsync(value, value.ETag); + } + public async ValueTask Read(AgentId agentId) + { + var agentState = _clusterClient.GetGrain($"{agentId.Type}:{agentId.Key}"); + return await agentState.ReadStateAsync(); + } /* private async ValueTask SubscribeToTopic(WorkerProcessConnection connection, RpcRequest request) { diff --git a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGatewayService.cs b/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGatewayService.cs index b817bc04925b..8600aa5fd233 100644 --- a/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGatewayService.cs +++ b/dotnet/src/Microsoft.AutoGen/Runtime/WorkerGatewayService.cs @@ -21,4 +21,18 @@ public override async Task OpenChannel(IAsyncStreamReader requestStream throw; } } + public override async Task GetState(AgentId request, ServerCallContext context) + { + var state = await agentWorker.Read(request); + return new GetStateResponse { AgentState = state }; + } + + public override async Task SaveState(AgentState request, ServerCallContext context) + { + await agentWorker.Store(request); + return new SaveStateResponse + { + Success = true // TODO: Implement error handling + }; + } } diff --git a/protos/agent_worker.proto b/protos/agent_worker.proto index ec472923be32..7b0b5245dd3e 100644 --- a/protos/agent_worker.proto +++ b/protos/agent_worker.proto @@ -82,6 +82,29 @@ message AddSubscriptionResponse { service AgentRpc { rpc OpenChannel (stream Message) returns (stream Message); + rpc GetState(AgentId) returns (GetStateResponse); + rpc SaveState(AgentState) returns (SaveStateResponse); +} + +message AgentState { + AgentId agent_id = 1; + string eTag = 2; + oneof data { + bytes binary_data = 3; + string text_data = 4; + google.protobuf.Any proto_data = 5; + } +} + +message GetStateResponse { + AgentState agent_state = 1; + bool success = 2; + optional string error = 3; +} + +message SaveStateResponse { + bool success = 1; + optional string error = 2; } message Message { diff --git a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py index 9308edbcd8db..1ed794c35f29 100644 --- a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py +++ b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py @@ -243,3 +243,17 @@ async def _process_add_subscription_request( ) case None: logger.warning("Received empty subscription message") + + async def GetState( # type: ignore + self, + request: agent_worker_pb2.AgentId, + context: grpc.aio.ServicerContext[agent_worker_pb2.AgentId, agent_worker_pb2.GetStateResponse], + ) -> agent_worker_pb2.GetStateResponse: # type: ignore + raise NotImplementedError("Method not implemented!") + + async def SaveState( # type: ignore + self, + request: agent_worker_pb2.AgentState, + context: grpc.aio.ServicerContext[agent_worker_pb2.AgentId, agent_worker_pb2.SaveStateResponse], + ) -> agent_worker_pb2.SaveStateResponse: # type: ignore + raise NotImplementedError("Method not implemented!") diff --git a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py index cfbc0522b856..0637e866c4de 100644 --- a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py +++ b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py @@ -16,7 +16,7 @@ from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_worker.proto\x12\x06\x61gents\x1a\x10\x63loudevent.proto\x1a\x19google/protobuf/any.proto\"\'\n\x07TopicId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"$\n\x07\x41gentId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\"E\n\x07Payload\x12\x11\n\tdata_type\x18\x01 \x01(\t\x12\x19\n\x11\x64\x61ta_content_type\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\x89\x02\n\nRpcRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12$\n\x06source\x18\x02 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12\x1f\n\x06target\x18\x03 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0e\n\x06method\x18\x04 \x01(\t\x12 \n\x07payload\x18\x05 \x01(\x0b\x32\x0f.agents.Payload\x12\x32\n\x08metadata\x18\x06 \x03(\x0b\x32 .agents.RpcRequest.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"\xb8\x01\n\x0bRpcResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12 \n\x07payload\x18\x02 \x01(\x0b\x32\x0f.agents.Payload\x12\r\n\x05\x65rror\x18\x03 \x01(\t\x12\x33\n\x08metadata\x18\x04 \x03(\x0b\x32!.agents.RpcResponse.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe4\x01\n\x05\x45vent\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x14\n\x0ctopic_source\x18\x02 \x01(\t\x12$\n\x06source\x18\x03 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12 \n\x07payload\x18\x04 \x01(\x0b\x32\x0f.agents.Payload\x12-\n\x08metadata\x18\x05 \x03(\x0b\x32\x1b.agents.Event.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"<\n\x18RegisterAgentTypeRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\"^\n\x19RegisterAgentTypeResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\":\n\x10TypeSubscription\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"T\n\x0cSubscription\x12\x34\n\x10typeSubscription\x18\x01 \x01(\x0b\x32\x18.agents.TypeSubscriptionH\x00\x42\x0e\n\x0csubscription\"X\n\x16\x41\x64\x64SubscriptionRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12*\n\x0csubscription\x18\x02 \x01(\x0b\x32\x14.agents.Subscription\"\\\n\x17\x41\x64\x64SubscriptionResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\xc6\x03\n\x07Message\x12%\n\x07request\x18\x01 \x01(\x0b\x32\x12.agents.RpcRequestH\x00\x12\'\n\x08response\x18\x02 \x01(\x0b\x32\x13.agents.RpcResponseH\x00\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\r.agents.EventH\x00\x12\x44\n\x18registerAgentTypeRequest\x18\x04 \x01(\x0b\x32 .agents.RegisterAgentTypeRequestH\x00\x12\x46\n\x19registerAgentTypeResponse\x18\x05 \x01(\x0b\x32!.agents.RegisterAgentTypeResponseH\x00\x12@\n\x16\x61\x64\x64SubscriptionRequest\x18\x06 \x01(\x0b\x32\x1e.agents.AddSubscriptionRequestH\x00\x12\x42\n\x17\x61\x64\x64SubscriptionResponse\x18\x07 \x01(\x0b\x32\x1f.agents.AddSubscriptionResponseH\x00\x12,\n\ncloudEvent\x18\x08 \x01(\x0b\x32\x16.cloudevent.CloudEventH\x00\x42\t\n\x07message2?\n\x08\x41gentRpc\x12\x33\n\x0bOpenChannel\x12\x0f.agents.Message\x1a\x0f.agents.Message(\x01\x30\x01\x42!\xaa\x02\x1eMicrosoft.AutoGen.Abstractionsb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_worker.proto\x12\x06\x61gents\x1a\x10\x63loudevent.proto\x1a\x19google/protobuf/any.proto\"\'\n\x07TopicId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"$\n\x07\x41gentId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\"E\n\x07Payload\x12\x11\n\tdata_type\x18\x01 \x01(\t\x12\x19\n\x11\x64\x61ta_content_type\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\x89\x02\n\nRpcRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12$\n\x06source\x18\x02 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12\x1f\n\x06target\x18\x03 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0e\n\x06method\x18\x04 \x01(\t\x12 \n\x07payload\x18\x05 \x01(\x0b\x32\x0f.agents.Payload\x12\x32\n\x08metadata\x18\x06 \x03(\x0b\x32 .agents.RpcRequest.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"\xb8\x01\n\x0bRpcResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12 \n\x07payload\x18\x02 \x01(\x0b\x32\x0f.agents.Payload\x12\r\n\x05\x65rror\x18\x03 \x01(\t\x12\x33\n\x08metadata\x18\x04 \x03(\x0b\x32!.agents.RpcResponse.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe4\x01\n\x05\x45vent\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x14\n\x0ctopic_source\x18\x02 \x01(\t\x12$\n\x06source\x18\x03 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12 \n\x07payload\x18\x04 \x01(\x0b\x32\x0f.agents.Payload\x12-\n\x08metadata\x18\x05 \x03(\x0b\x32\x1b.agents.Event.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"<\n\x18RegisterAgentTypeRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\"^\n\x19RegisterAgentTypeResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\":\n\x10TypeSubscription\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"T\n\x0cSubscription\x12\x34\n\x10typeSubscription\x18\x01 \x01(\x0b\x32\x18.agents.TypeSubscriptionH\x00\x42\x0e\n\x0csubscription\"X\n\x16\x41\x64\x64SubscriptionRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12*\n\x0csubscription\x18\x02 \x01(\x0b\x32\x14.agents.Subscription\"\\\n\x17\x41\x64\x64SubscriptionResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\x9d\x01\n\nAgentState\x12!\n\x08\x61gent_id\x18\x01 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0c\n\x04\x65Tag\x18\x02 \x01(\t\x12\x15\n\x0b\x62inary_data\x18\x03 \x01(\x0cH\x00\x12\x13\n\ttext_data\x18\x04 \x01(\tH\x00\x12*\n\nproto_data\x18\x05 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00\x42\x06\n\x04\x64\x61ta\"j\n\x10GetStateResponse\x12\'\n\x0b\x61gent_state\x18\x01 \x01(\x0b\x32\x12.agents.AgentState\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"B\n\x11SaveStateResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\xc6\x03\n\x07Message\x12%\n\x07request\x18\x01 \x01(\x0b\x32\x12.agents.RpcRequestH\x00\x12\'\n\x08response\x18\x02 \x01(\x0b\x32\x13.agents.RpcResponseH\x00\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\r.agents.EventH\x00\x12\x44\n\x18registerAgentTypeRequest\x18\x04 \x01(\x0b\x32 .agents.RegisterAgentTypeRequestH\x00\x12\x46\n\x19registerAgentTypeResponse\x18\x05 \x01(\x0b\x32!.agents.RegisterAgentTypeResponseH\x00\x12@\n\x16\x61\x64\x64SubscriptionRequest\x18\x06 \x01(\x0b\x32\x1e.agents.AddSubscriptionRequestH\x00\x12\x42\n\x17\x61\x64\x64SubscriptionResponse\x18\x07 \x01(\x0b\x32\x1f.agents.AddSubscriptionResponseH\x00\x12,\n\ncloudEvent\x18\x08 \x01(\x0b\x32\x16.cloudevent.CloudEventH\x00\x42\t\n\x07message2\xb2\x01\n\x08\x41gentRpc\x12\x33\n\x0bOpenChannel\x12\x0f.agents.Message\x1a\x0f.agents.Message(\x01\x30\x01\x12\x35\n\x08GetState\x12\x0f.agents.AgentId\x1a\x18.agents.GetStateResponse\x12:\n\tSaveState\x12\x12.agents.AgentState\x1a\x19.agents.SaveStateResponseB!\xaa\x02\x1eMicrosoft.AutoGen.Abstractionsb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -60,8 +60,14 @@ _globals['_ADDSUBSCRIPTIONREQUEST']._serialized_end=1303 _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_start=1305 _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_end=1397 - _globals['_MESSAGE']._serialized_start=1400 - _globals['_MESSAGE']._serialized_end=1854 - _globals['_AGENTRPC']._serialized_start=1856 - _globals['_AGENTRPC']._serialized_end=1919 + _globals['_AGENTSTATE']._serialized_start=1400 + _globals['_AGENTSTATE']._serialized_end=1557 + _globals['_GETSTATERESPONSE']._serialized_start=1559 + _globals['_GETSTATERESPONSE']._serialized_end=1665 + _globals['_SAVESTATERESPONSE']._serialized_start=1667 + _globals['_SAVESTATERESPONSE']._serialized_end=1733 + _globals['_MESSAGE']._serialized_start=1736 + _globals['_MESSAGE']._serialized_end=2190 + _globals['_AGENTRPC']._serialized_start=2193 + _globals['_AGENTRPC']._serialized_end=2371 # @@protoc_insertion_point(module_scope) diff --git a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi index 6c57fa8a9fcd..522124ab8891 100644 --- a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi +++ b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi @@ -6,6 +6,7 @@ isort:skip_file import builtins import cloudevent_pb2 import collections.abc +import google.protobuf.any_pb2 import google.protobuf.descriptor import google.protobuf.internal.containers import google.protobuf.message @@ -333,6 +334,81 @@ class AddSubscriptionResponse(google.protobuf.message.Message): global___AddSubscriptionResponse = AddSubscriptionResponse +@typing.final +class AgentState(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + AGENT_ID_FIELD_NUMBER: builtins.int + ETAG_FIELD_NUMBER: builtins.int + BINARY_DATA_FIELD_NUMBER: builtins.int + TEXT_DATA_FIELD_NUMBER: builtins.int + PROTO_DATA_FIELD_NUMBER: builtins.int + eTag: builtins.str + binary_data: builtins.bytes + text_data: builtins.str + @property + def agent_id(self) -> global___AgentId: ... + @property + def proto_data(self) -> google.protobuf.any_pb2.Any: ... + def __init__( + self, + *, + agent_id: global___AgentId | None = ..., + eTag: builtins.str = ..., + binary_data: builtins.bytes = ..., + text_data: builtins.str = ..., + proto_data: google.protobuf.any_pb2.Any | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["agent_id", b"agent_id", "binary_data", b"binary_data", "data", b"data", "proto_data", b"proto_data", "text_data", b"text_data"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["agent_id", b"agent_id", "binary_data", b"binary_data", "data", b"data", "eTag", b"eTag", "proto_data", b"proto_data", "text_data", b"text_data"]) -> None: ... + def WhichOneof(self, oneof_group: typing.Literal["data", b"data"]) -> typing.Literal["binary_data", "text_data", "proto_data"] | None: ... + +global___AgentState = AgentState + +@typing.final +class GetStateResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + AGENT_STATE_FIELD_NUMBER: builtins.int + SUCCESS_FIELD_NUMBER: builtins.int + ERROR_FIELD_NUMBER: builtins.int + success: builtins.bool + error: builtins.str + @property + def agent_state(self) -> global___AgentState: ... + def __init__( + self, + *, + agent_state: global___AgentState | None = ..., + success: builtins.bool = ..., + error: builtins.str | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["_error", b"_error", "agent_state", b"agent_state", "error", b"error"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["_error", b"_error", "agent_state", b"agent_state", "error", b"error", "success", b"success"]) -> None: ... + def WhichOneof(self, oneof_group: typing.Literal["_error", b"_error"]) -> typing.Literal["error"] | None: ... + +global___GetStateResponse = GetStateResponse + +@typing.final +class SaveStateResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + SUCCESS_FIELD_NUMBER: builtins.int + ERROR_FIELD_NUMBER: builtins.int + success: builtins.bool + error: builtins.str + def __init__( + self, + *, + success: builtins.bool = ..., + error: builtins.str | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["_error", b"_error", "error", b"error"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["_error", b"_error", "error", b"error", "success", b"success"]) -> None: ... + def WhichOneof(self, oneof_group: typing.Literal["_error", b"_error"]) -> typing.Literal["error"] | None: ... + +global___SaveStateResponse = SaveStateResponse + @typing.final class Message(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor diff --git a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2_grpc.py b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2_grpc.py index d561618a2cec..fc27021587f6 100644 --- a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2_grpc.py +++ b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2_grpc.py @@ -19,6 +19,16 @@ def __init__(self, channel): request_serializer=agent__worker__pb2.Message.SerializeToString, response_deserializer=agent__worker__pb2.Message.FromString, ) + self.GetState = channel.unary_unary( + '/agents.AgentRpc/GetState', + request_serializer=agent__worker__pb2.AgentId.SerializeToString, + response_deserializer=agent__worker__pb2.GetStateResponse.FromString, + ) + self.SaveState = channel.unary_unary( + '/agents.AgentRpc/SaveState', + request_serializer=agent__worker__pb2.AgentState.SerializeToString, + response_deserializer=agent__worker__pb2.SaveStateResponse.FromString, + ) class AgentRpcServicer(object): @@ -30,6 +40,18 @@ def OpenChannel(self, request_iterator, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') + def GetState(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def SaveState(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + def add_AgentRpcServicer_to_server(servicer, server): rpc_method_handlers = { @@ -38,6 +60,16 @@ def add_AgentRpcServicer_to_server(servicer, server): request_deserializer=agent__worker__pb2.Message.FromString, response_serializer=agent__worker__pb2.Message.SerializeToString, ), + 'GetState': grpc.unary_unary_rpc_method_handler( + servicer.GetState, + request_deserializer=agent__worker__pb2.AgentId.FromString, + response_serializer=agent__worker__pb2.GetStateResponse.SerializeToString, + ), + 'SaveState': grpc.unary_unary_rpc_method_handler( + servicer.SaveState, + request_deserializer=agent__worker__pb2.AgentState.FromString, + response_serializer=agent__worker__pb2.SaveStateResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( 'agents.AgentRpc', rpc_method_handlers) @@ -64,3 +96,37 @@ def OpenChannel(request_iterator, agent__worker__pb2.Message.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def GetState(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/agents.AgentRpc/GetState', + agent__worker__pb2.AgentId.SerializeToString, + agent__worker__pb2.GetStateResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def SaveState(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/agents.AgentRpc/SaveState', + agent__worker__pb2.AgentState.SerializeToString, + agent__worker__pb2.SaveStateResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2_grpc.pyi b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2_grpc.pyi index 1642ca2af10f..bf6bc1ba2d64 100644 --- a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2_grpc.pyi +++ b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2_grpc.pyi @@ -24,12 +24,32 @@ class AgentRpcStub: agent_worker_pb2.Message, ] + GetState: grpc.UnaryUnaryMultiCallable[ + agent_worker_pb2.AgentId, + agent_worker_pb2.GetStateResponse, + ] + + SaveState: grpc.UnaryUnaryMultiCallable[ + agent_worker_pb2.AgentState, + agent_worker_pb2.SaveStateResponse, + ] + class AgentRpcAsyncStub: OpenChannel: grpc.aio.StreamStreamMultiCallable[ agent_worker_pb2.Message, agent_worker_pb2.Message, ] + GetState: grpc.aio.UnaryUnaryMultiCallable[ + agent_worker_pb2.AgentId, + agent_worker_pb2.GetStateResponse, + ] + + SaveState: grpc.aio.UnaryUnaryMultiCallable[ + agent_worker_pb2.AgentState, + agent_worker_pb2.SaveStateResponse, + ] + class AgentRpcServicer(metaclass=abc.ABCMeta): @abc.abstractmethod def OpenChannel( @@ -38,4 +58,18 @@ class AgentRpcServicer(metaclass=abc.ABCMeta): context: _ServicerContext, ) -> typing.Union[collections.abc.Iterator[agent_worker_pb2.Message], collections.abc.AsyncIterator[agent_worker_pb2.Message]]: ... + @abc.abstractmethod + def GetState( + self, + request: agent_worker_pb2.AgentId, + context: _ServicerContext, + ) -> typing.Union[agent_worker_pb2.GetStateResponse, collections.abc.Awaitable[agent_worker_pb2.GetStateResponse]]: ... + + @abc.abstractmethod + def SaveState( + self, + request: agent_worker_pb2.AgentState, + context: _ServicerContext, + ) -> typing.Union[agent_worker_pb2.SaveStateResponse, collections.abc.Awaitable[agent_worker_pb2.SaveStateResponse]]: ... + def add_AgentRpcServicer_to_server(servicer: AgentRpcServicer, server: typing.Union[grpc.Server, grpc.aio.Server]) -> None: ...