Skip to content

Commit

Permalink
first draft of stateful persistence grains for each agent.... (micros…
Browse files Browse the repository at this point in the history
…oft#3954)

* adds Orleans persistence for AgentState
  • Loading branch information
rysweet authored Oct 29, 2024
1 parent 6925cd4 commit 14846a3
Show file tree
Hide file tree
Showing 35 changed files with 749 additions and 77 deletions.
7 changes: 7 additions & 0 deletions dotnet/AutoGen.sln
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AIModelClientHostingExtensi
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "sample", "sample", "{686480D7-8FEC-4ED3-9C5D-CEBE1057A7ED}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HelloAgentState", "samples\Hello\HelloAgentState\HelloAgentState.csproj", "{64EF61E7-00A6-4E5E-9808-62E10993A0E5}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -335,6 +337,10 @@ Global
{97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{97550E87-48C6-4EBF-85E1-413ABAE9DBFD}.Release|Any CPU.Build.0 = Release|Any CPU
{64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{64EF61E7-00A6-4E5E-9808-62E10993A0E5}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -395,6 +401,7 @@ Global
{A20B9894-F352-4338-872A-F215A241D43D} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45}
{8F7560CF-EEBB-4333-A69F-838CA40FD85D} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45}
{97550E87-48C6-4EBF-85E1-413ABAE9DBFD} = {18BF8DD7-0585-48BF-8F97-AD333080CE06}
{64EF61E7-00A6-4E5E-9808-62E10993A0E5} = {7EB336C2-7C0A-4BC8-80C6-A3173AB8DC45}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {93384647-528D-46C8-922C-8DB36A382F0B}
Expand Down
21 changes: 21 additions & 0 deletions dotnet/samples/Hello/HelloAgentState/HelloAgentState.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">

<ItemGroup>
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Abstractions\Microsoft.AutoGen.Abstractions.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Agents\Microsoft.AutoGen.Agents.csproj" />
<ProjectReference Include="..\..\..\src\Microsoft.AutoGen\Runtime\Microsoft.AutoGen.Runtime.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" />
<PackageReference Include="Aspire.Hosting.AppHost" />
</ItemGroup>

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

</Project>
75 changes: 75 additions & 0 deletions dotnet/samples/Hello/HelloAgentState/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright (c) Microsoft. All rights reserved.

using Microsoft.AutoGen.Abstractions;
using Microsoft.AutoGen.Agents;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;

// send a message to the agent
var app = await AgentsApp.PublishMessageAsync("HelloAgents", new NewMessageReceived
{
Message = "World"
}, local: false);

await app.WaitForShutdownAsync();

namespace Hello
{
[TopicSubscription("HelloAgents")]
public class HelloAgent(
IAgentContext context,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry) : ConsoleAgent(
context,
typeRegistry),
ISayHello,
IHandle<NewMessageReceived>,
IHandle<ConversationClosed>
{
private AgentState? State { get; set; }
public async Task Handle(NewMessageReceived item)
{
var response = await SayHello(item.Message).ConfigureAwait(false);
var evt = new Output
{
Message = response
}.ToCloudEvent(this.AgentId.Key);
var entry = "We said hello to " + item.Message;
await Store(new AgentState
{
AgentId = this.AgentId,
TextData = entry
}).ConfigureAwait(false);
await PublishEvent(evt).ConfigureAwait(false);
var goodbye = new ConversationClosed
{
UserId = this.AgentId.Key,
UserMessage = "Goodbye"
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(goodbye).ConfigureAwait(false);
}
public async Task Handle(ConversationClosed item)
{
State = await Read<AgentState>(this.AgentId).ConfigureAwait(false);
var read = State?.TextData ?? "No state data found";
var goodbye = $"{read}\n********************* {item.UserId} said {item.UserMessage} ************************";
var evt = new Output
{
Message = goodbye
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt).ConfigureAwait(false);
//sleep
await Task.Delay(10000).ConfigureAwait(false);
await AgentsApp.ShutdownAsync().ConfigureAwait(false);

}
public async Task<string> SayHello(string ask)
{
var response = $"\n\n\n\n***************Hello {ask}**********************\n\n\n\n";
return response;
}
}
public interface ISayHello
{
public Task<string> SayHello(string ask);
}
}
138 changes: 138 additions & 0 deletions dotnet/samples/Hello/HelloAgentState/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# AutoGen 0.4 .NET Hello World Sample

This [sample](Program.cs) demonstrates how to create a simple .NET console application that listens for an event and then orchestrates a series of actions in response.

## Prerequisites

To run this sample, you'll need: [.NET 8.0](https://dotnet.microsoft.com/en-us/) or later.
Also recommended is the [GitHub CLI](https://cli.github.com/).

## Instructions to run the sample

```bash
# Clone the repository
gh repo clone microsoft/autogen
cd dotnet/samples/Hello
dotnet run
```

## Key Concepts

This sample illustrates how to create your own agent that inherits from a base agent and listens for an event. It also shows how to use the SDK's App Runtime locally to start the agent and send messages.

Flow Diagram:

```mermaid
%%{init: {'theme':'forest'}}%%
graph LR;
A[Main] --> |"PublishEvent(NewMessage('World'))"| B{"Handle(NewMessageReceived item)"}
B --> |"PublishEvent(Output('***Hello, World***'))"| C[ConsoleAgent]
C --> D{"WriteConsole()"}
B --> |"PublishEvent(ConversationClosed('Goodbye'))"| E{"Handle(ConversationClosed item)"}
B --> |"PublishEvent(Output('***Goodbye***'))"| C
E --> F{"Shutdown()"}
```

### Writing Event Handlers

The heart of an autogen application are the event handlers. Agents select a ```TopicSubscription``` to listen for events on a specific topic. When an event is received, the agent's event handler is called with the event data.

Within that event handler you may optionally *emit* new events, which are then sent to the event bus for other agents to process. The EventTypes are declared gRPC ProtoBuf messages that are used to define the schema of the event. The default protos are available via the ```Microsoft.AutoGen.Abstractions;``` namespace and are defined in [autogen/protos](/autogen/protos). The EventTypes are registered in the agent's constructor using the ```IHandle``` interface.

```csharp
TopicSubscription("HelloAgents")]
public class HelloAgent(
IAgentContext context,
[FromKeyedServices("EventTypes")] EventTypes typeRegistry) : ConsoleAgent(
context,
typeRegistry),
ISayHello,
IHandle<NewMessageReceived>,
IHandle<ConversationClosed>
{
public async Task Handle(NewMessageReceived item)
{
var response = await SayHello(item.Message).ConfigureAwait(false);
var evt = new Output
{
Message = response
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(evt).ConfigureAwait(false);
var goodbye = new ConversationClosed
{
UserId = this.AgentId.Key,
UserMessage = "Goodbye"
}.ToCloudEvent(this.AgentId.Key);
await PublishEvent(goodbye).ConfigureAwait(false);
}
```

### Inheritance and Composition

This sample also illustrates inheritance in AutoGen. The `HelloAgent` class inherits from `ConsoleAgent`, which is a base class that provides a `WriteConsole` method.

### Starting the Application Runtime

AuotoGen provides a flexible runtime ```Microsoft.AutoGen.Agents.App``` that can be started in a variety of ways. The `Program.cs` file demonstrates how to start the runtime locally and send a message to the agent all in one go using the ```App.PublishMessageAsync``` method.

```csharp
// send a message to the agent
var app = await App.PublishMessageAsync("HelloAgents", new NewMessageReceived
{
Message = "World"
}, local: true);

await App.RuntimeApp!.WaitForShutdownAsync();
await app.WaitForShutdownAsync();
```

### Sending Messages

The set of possible Messages is defined in gRPC ProtoBuf specs. These are then turned into C# classes by the gRPC tools. You can define your own Message types by creating a new .proto file in your project and including the gRPC tools in your ```.csproj``` file:

```proto
syntax = "proto3";
package devteam;
option csharp_namespace = "DevTeam.Shared";
message NewAsk {
string org = 1;
string repo = 2;
string ask = 3;
int64 issue_number = 4;
}
message ReadmeRequested {
string org = 1;
string repo = 2;
int64 issue_number = 3;
string ask = 4;
}
```

```xml
<ItemGroup>
<PackageReference Include="Google.Protobuf" />
<PackageReference Include="Grpc.Tools" PrivateAssets="All" />
<Protobuf Include="..\Protos\messages.proto" Link="Protos\messages.proto" />
</ItemGroup>
```

You can send messages using the [```Microsoft.AutoGen.Agents``` class](autogen/dotnet/src/Microsoft.AutoGen/Agents/AgentClient.cs). Messages are wrapped in [the CloudEvents specification](https://cloudevents.io) and sent to the event bus.
### Managing State

There is a simple API for persisting agent state.

```csharp
await Store(new AgentState
{
AgentId = this.AgentId,
TextData = entry
}).ConfigureAwait(false);
```

which can be read back using Read:

```csharp
State = await Read<AgentState>(this.AgentId).ConfigureAwait(false);
```
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
using Google.Protobuf;

namespace Microsoft.AutoGen.Abstractions;

public class AgentState<T> where T : class, new()
public class ChatState
<T> where T : IMessage, new()
{
public List<ChatHistoryItem> History { get; set; } = new();
public T Data { get; set; } = new();
Expand Down
22 changes: 21 additions & 1 deletion dotnet/src/Microsoft.AutoGen/Abstractions/MessageExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,29 @@ public static CloudEvent ToCloudEvent<T>(this T message, string source) where T

};
}

public static T FromCloudEvent<T>(this CloudEvent cloudEvent) where T : IMessage, new()
{
return cloudEvent.ProtoData.Unpack<T>();
}
public static AgentState ToAgentState<T>(this T state, AgentId agentId, string eTag) where T : IMessage
{
return new AgentState
{
ProtoData = Any.Pack(state),
AgentId = agentId,
ETag = eTag
};
}

public static T FromAgentState<T>(this AgentState state) where T : IMessage, new()
{
if (state.HasTextData == true)
{
if (typeof(T) == typeof(AgentState))
{
return (T)(IMessage)state;
}
}
return state.ProtoData.Unpack<T>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
<Protobuf Include="..\..\..\..\protos\agent_worker.proto" GrpcServices="Client;Server" Link="Protos\agent_worker.proto" />
<Protobuf Include="..\..\..\..\protos\cloudevent.proto" GrpcServices="Client;Server" Link="Protos\cloudevent.proto" />
<Protobuf Include="..\..\..\..\protos\agent_events.proto" GrpcServices="Client;Server" Link="Protos\agent_events.proto" />
<Protobuf Include="..\..\..\..\protos\agent_states.proto" GrpcServices="Client;Server" Link="Protos\agent_states.proto" />
</ItemGroup>

<ItemGroup>
Expand Down
14 changes: 11 additions & 3 deletions dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,16 @@ await this.InvokeWithActivityAsync(
break;
}
}

protected async Task Store(AgentState state)
{
await _context.Store(state).ConfigureAwait(false);
return;
}
protected async Task<T> Read<T>(AgentId agentId) where T : IMessage, new()
{
var agentstate = await _context.Read(agentId).ConfigureAwait(false);
return agentstate.FromAgentState<T>();
}
private void OnResponseCore(RpcResponse response)
{
var requestId = response.RequestId;
Expand Down Expand Up @@ -186,7 +195,6 @@ static async ((AgentBase Agent, RpcRequest Request, TaskCompletionSource<RpcResp

protected async ValueTask PublishEvent(CloudEvent item)
{
//TODO: Reimplement
var activity = s_source.StartActivity($"PublishEvent '{item.Type}'", ActivityKind.Client, Activity.Current?.Context ?? default);
activity?.SetTag("peer.service", $"{item.Type}/{item.Source}");

Expand All @@ -200,7 +208,7 @@ static async ((AgentBase Agent, CloudEvent Event, TaskCompletionSource<CloudEven
},
(this, item, completion),
activity,
item.Type).ConfigureAwait(false);// TODO: It's not the descriptor's name probably
item.Type).ConfigureAwait(false);
}

public Task CallHandler(CloudEvent item)
Expand Down
12 changes: 8 additions & 4 deletions dotnet/src/Microsoft.AutoGen/Agents/AgentClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@
using Microsoft.Extensions.Logging;

namespace Microsoft.AutoGen.Agents;

public sealed class AgentClient(ILogger<AgentClient> logger, AgentWorkerRuntime runtime, DistributedContextPropagator distributedContextPropagator,
[FromKeyedServices("EventTypes")] EventTypes eventTypes)
: AgentBase(new ClientContext(logger, runtime, distributedContextPropagator), eventTypes)
{
public async ValueTask PublishEventAsync(CloudEvent evt) => await PublishEvent(evt);
public async ValueTask<RpcResponse> SendRequestAsync(AgentId target, string method, Dictionary<string, string> parameters) => await RequestAsync(target, method, parameters);

public async ValueTask PublishEventAsync(string topic, IMessage evt)
{
await PublishEventAsync(evt.ToCloudEvent(topic)).ConfigureAwait(false);
Expand All @@ -23,12 +21,10 @@ private sealed class ClientContext(ILogger<AgentClient> logger, AgentWorkerRunti
public AgentBase? AgentInstance { get; set; }
public ILogger Logger { get; } = logger;
public DistributedContextPropagator DistributedContextPropagator { get; } = distributedContextPropagator;

public async ValueTask PublishEventAsync(CloudEvent @event)
{
await runtime.PublishEvent(@event).ConfigureAwait(false);
}

public async ValueTask SendRequestAsync(AgentBase agent, RpcRequest request)
{
await runtime.SendRequest(AgentInstance!, request).ConfigureAwait(false);
Expand All @@ -38,5 +34,13 @@ public async ValueTask SendResponseAsync(RpcRequest request, RpcResponse respons
{
await runtime.SendResponse(response).ConfigureAwait(false);
}
public ValueTask Store(AgentState value)
{
throw new NotImplementedException();
}
public ValueTask<AgentState> Read(AgentId agentId)
{
throw new NotImplementedException();
}
}
}
Loading

0 comments on commit 14846a3

Please sign in to comment.