Skip to content

Commit

Permalink
.Net Processes - Map Step Feature (#9339)
Browse files Browse the repository at this point in the history
### Motivation and Context
<!-- Thank you for your contribution to the semantic-kernel repo!
Please help reviewers and future users, providing the following
information:
  1. Why is this change required?
  2. What problem does it solve?
  3. What scenario does it contribute to?
  4. If it fixes an open issue, please link to the issue here.
-->

Fixes: #9193

### Description
<!-- Describe your changes, the overall approach, the underlying design.
These notes will help understanding how your code works. Thanks! -->

Map each value from a set to a map-operation and present the results as
a set for potential reduction.

**Includes:**
- `ProcessMapBuilder` (Core)
- `KernelProcessMap` / `KernelProcessMapState` (Abstractions)
- `LocalMap` (LocalRuntime)
- `MapActor` (DaprRuntime)
- Serialization
- Unit Tests (exhaustive)
- Integration Tests
- Sample

**Features:**
- Executes map operations in parallel
- Handles when output type has been transformed from input type
- Accepts either step or subprocess for map operation
- Proxies events for edges defined directly on the map-step
- Allows additional (non proxied) edges for the map-operation
- Participates in state serialization

**Follow-up:**
- Documentation -
#9616

### Contribution Checklist
<!-- Before submitting this PR, please make sure: -->

- [X] The code builds clean without any errors or warnings
- [X] The PR follows the [SK Contribution
Guidelines](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md)
and the [pre-submission formatting
script](https://github.com/microsoft/semantic-kernel/blob/main/CONTRIBUTING.md#development-scripts)
raises no violations
- [X] All unit tests pass, and I have added new tests where possible
- [X] I didn't break anyone 😄
  • Loading branch information
crickman authored Nov 19, 2024
1 parent 93bf06e commit ad4d35b
Show file tree
Hide file tree
Showing 53 changed files with 2,627 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
<RootNamespace></RootNamespace>
<!-- Suppress: "Declare types in namespaces", "Require ConfigureAwait", "Experimental" -->
<NoWarn>
$(NoWarn);CS8618,IDE0009,CA1051,CA1050,CA1707,CA1054,CA2007,VSTHRD111,CS1591,RCS1110,RCS1243,CA5394,SKEXP0001,SKEXP0010,SKEXP0020,SKEXP0040,SKEXP0050,SKEXP0060,SKEXP0070,SKEXP0080,SKEXP0101,SKEXP0110,OPENAI001</NoWarn>
$(NoWarn);CS8618,IDE0009,CA1051,CA1050,CA1707,CA1054,CA2007,VSTHRD111,CS1591,RCS1110,RCS1243,CA5394,SKEXP0001,SKEXP0010,SKEXP0020,SKEXP0040,SKEXP0050,SKEXP0060,SKEXP0070,SKEXP0080,SKEXP0101,SKEXP0110,OPENAI001
</NoWarn>
<OutputType>Library</OutputType>
<UserSecretsId>5ee045b0-aea3-4f08-8d31-32d1a6f8fed0</UserSecretsId>
</PropertyGroup>
Expand Down Expand Up @@ -40,7 +41,7 @@
<ItemGroup>
<Compile Include="$(RepoRoot)/dotnet/src/InternalUtilities/src/Schema/*.cs" Link="%(RecursiveDir)/InternalUtilities/Schema/%(Filename)%(Extension)" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Agents\Core\Agents.Core.csproj" />
<ProjectReference Include="..\..\src\Connectors\Connectors.AzureOpenAI\Connectors.AzureOpenAI.csproj" />
Expand All @@ -57,9 +58,9 @@
</ItemGroup>

<ItemGroup>
<EmbeddedResource Include="Resources\*">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</EmbeddedResource>
<EmbeddedResource Include="$(RepoRoot)/dotnet/samples/LearnResources/Resources/Grimms-The-King-of-the-Golden-Mountain.txt" Link="%(RecursiveDir)Resources/%(Filename)%(Extension)"/>
<EmbeddedResource Include="$(RepoRoot)/dotnet/samples/LearnResources/Resources/Grimms-The-Water-of-Life.txt" Link="%(RecursiveDir)Resources/%(Filename)%(Extension)"/>
<EmbeddedResource Include="$(RepoRoot)/dotnet/samples/LearnResources/Resources/Grimms-The-White-Snake.txt" Link="%(RecursiveDir)Resources/%(Filename)%(Extension)"/>
</ItemGroup>

</Project>
236 changes: 236 additions & 0 deletions dotnet/samples/GettingStartedWithProcesses/Step05/Step05_MapReduce.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Text;
using Microsoft.SemanticKernel;
using Resources;

namespace Step05;

/// <summary>
/// Demonstrate usage of <see cref="KernelProcessMap"/> for a map-reduce operation.
/// </summary>
public class Step05_MapReduce : BaseTest
{
// Target Open AI Services
protected override bool ForceOpenAI => true;

/// <summary>
/// Factor to increase the scale of the content processed.
/// </summary>
private const int ScaleFactor = 100;

private readonly string _sourceContent;

public Step05_MapReduce(ITestOutputHelper output)
: base(output, redirectSystemConsoleOutput: true)
{
// Initialize the test content
StringBuilder content = new();

for (int count = 0; count < ScaleFactor; ++count)
{
content.AppendLine(EmbeddedResource.Read("Grimms-The-King-of-the-Golden-Mountain.txt"));
content.AppendLine(EmbeddedResource.Read("Grimms-The-Water-of-Life.txt"));
content.AppendLine(EmbeddedResource.Read("Grimms-The-White-Snake.txt"));
}

this._sourceContent = content.ToString().ToUpperInvariant();
}

[Fact]
public async Task RunMapReduceAsync()
{
// Define the process
KernelProcess process = SetupMapReduceProcess(nameof(RunMapReduceAsync), "Start");

// Execute the process
Kernel kernel = new();
using LocalKernelProcessContext localProcess =
await process.StartAsync(
kernel,
new KernelProcessEvent
{
Id = "Start",
Data = this._sourceContent,
});

// Display the results
Dictionary<string, int> results = (Dictionary<string, int>?)kernel.Data[ResultStep.ResultKey] ?? [];
foreach (var result in results)
{
Console.WriteLine($"{result.Key}: {result.Value}");
}
}

private KernelProcess SetupMapReduceProcess(string processName, string inputEventId)
{
ProcessBuilder process = new(processName);

ProcessStepBuilder chunkStep = process.AddStepFromType<ChunkStep>();
process
.OnInputEvent(inputEventId)
.SendEventTo(new ProcessFunctionTargetBuilder(chunkStep));

ProcessMapBuilder mapStep = process.AddMapStepFromType<CountStep>();
chunkStep
.OnEvent(ChunkStep.EventId)
.SendEventTo(new ProcessFunctionTargetBuilder(mapStep));

ProcessStepBuilder resultStep = process.AddStepFromType<ResultStep>();
mapStep
.OnEvent(CountStep.EventId)
.SendEventTo(new ProcessFunctionTargetBuilder(resultStep));

return process.Build();
}

// Step for breaking the content into chunks
private sealed class ChunkStep : KernelProcessStep
{
public const string EventId = "ChunkComplete";

[KernelFunction]
public async ValueTask ChunkAsync(KernelProcessStepContext context, string content)
{
int chunkSize = content.Length / Environment.ProcessorCount;
string[] chunks = ChunkContent(content, chunkSize).ToArray();

await context.EmitEventAsync(new() { Id = EventId, Data = chunks });
}

private IEnumerable<string> ChunkContent(string content, int chunkSize)
{
for (int index = 0; index < content.Length; index += chunkSize)
{
yield return content.Substring(index, Math.Min(chunkSize, content.Length - index));
}
}
}

// Step for counting the words in a chunk
private sealed class CountStep : KernelProcessStep
{
public const string EventId = "CountComplete";

[KernelFunction]
public async ValueTask ComputeAsync(KernelProcessStepContext context, string chunk)
{
Dictionary<string, int> counts = [];

string[] words = chunk.Split([' ', '\n', '\r', '.', ',', '’'], StringSplitOptions.RemoveEmptyEntries);
foreach (string word in words)
{
if (s_notInteresting.Contains(word))
{
continue;
}

counts.TryGetValue(word.Trim(), out int count);
counts[word] = ++count;
}

await context.EmitEventAsync(new() { Id = EventId, Data = counts });
}
}

// Step for combining the results
private sealed class ResultStep : KernelProcessStep
{
public const string ResultKey = "WordCount";

[KernelFunction]
public async ValueTask ComputeAsync(KernelProcessStepContext context, IList<Dictionary<string, int>> results, Kernel kernel)
{
Dictionary<string, int> totals = [];

foreach (Dictionary<string, int> result in results)
{
foreach (KeyValuePair<string, int> pair in result)
{
totals.TryGetValue(pair.Key, out int count);
totals[pair.Key] = count + pair.Value;
}
}

var sorted =
from kvp in totals
orderby kvp.Value descending
select kvp;

kernel.Data[ResultKey] = sorted.Take(10).ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
}
}

// Uninteresting words to remove from content
private static readonly HashSet<string> s_notInteresting =
[
"A",
"ALL",
"AN",
"AND",
"AS",
"AT",
"BE",
"BEFORE",
"BUT",
"BY",
"CAME",
"COULD",
"FOR",
"GO",
"HAD",
"HAVE",
"HE",
"HER",
"HIM",
"HIMSELF",
"HIS",
"HOW",
"I",
"IF",
"IN",
"INTO",
"IS",
"IT",
"ME",
"MUST",
"MY",
"NO",
"NOT",
"NOW",
"OF",
"ON",
"ONCE",
"ONE",
"ONLY",
"OUT",
"S",
"SAID",
"SAW",
"SET",
"SHE",
"SHOULD",
"SO",
"THAT",
"THE",
"THEM",
"THEN",
"THEIR",
"THERE",
"THEY",
"THIS",
"TO",
"VERY",
"WAS",
"WENT",
"WERE",
"WHAT",
"WHEN",
"WHO",
"WILL",
"WITH",
"WOULD",
"UP",
"UPON",
"YOU",
];
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private static void StoreProcessStateLocally(KernelProcessStateMetadata processS
throw new KernelException($"Filepath for process {processStateInfo.Name} does not have .json extension");
}

var content = JsonSerializer.Serialize<KernelProcessStepStateMetadata>(processStateInfo, s_jsonOptions);
string content = JsonSerializer.Serialize(processStateInfo, s_jsonOptions);
Console.WriteLine($"Process State: \n{content}");
Console.WriteLine($"Saving Process State Locally: \n{Path.GetFullPath(fullFilepath)}");
File.WriteAllText(fullFilepath, content);
Expand Down
31 changes: 31 additions & 0 deletions dotnet/src/Experimental/Process.Abstractions/KernelProcessMap.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;

namespace Microsoft.SemanticKernel;

/// <summary>
/// A serializable representation of a ProcessMap.
/// </summary>
public sealed record KernelProcessMap : KernelProcessStepInfo
{
/// <summary>
/// The map operation.
/// </summary>
public KernelProcessStepInfo Operation { get; }

/// <summary>
/// Creates a new instance of the <see cref="KernelProcess"/> class.
/// </summary>
/// <param name="state">The process state.</param>
/// <param name="operation">The map operation.</param>
/// <param name="edges">The edges for the map.</param>
public KernelProcessMap(KernelProcessMapState state, KernelProcessStepInfo operation, Dictionary<string, List<KernelProcessEdge>> edges)
: base(typeof(KernelProcessMap), state, edges)
{
Verify.NotNull(operation, nameof(operation));
Verify.NotNullOrWhiteSpace(state.Name, $"{nameof(state)}.{nameof(KernelProcessMapState.Name)}");
Verify.NotNullOrWhiteSpace(state.Id, $"{nameof(state)}.{nameof(KernelProcessMapState.Id)}");

this.Operation = operation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Runtime.Serialization;

namespace Microsoft.SemanticKernel;

/// <summary>
/// Represents the state of a <see cref="KernelProcessMap"/>.
/// </summary>
[DataContract]
public sealed record KernelProcessMapState : KernelProcessStepState
{
/// <summary>
/// Initializes a new instance of the <see cref="KernelProcessMapState"/> class.
/// </summary>
/// <param name="name">The name of the associated <see cref="KernelProcessMap"/></param>
/// <param name="version">version id of the process step state</param>
/// <param name="id">The Id of the associated <see cref="KernelProcessMap"/></param>
public KernelProcessMapState(string name, string version, string id)
: base(name, version, id)
{
Verify.NotNullOrWhiteSpace(id, nameof(id));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Microsoft.SemanticKernel;

/// <summary>
/// Represents the state of a process.
/// Represents the state of a <see cref="KernelProcess"/>.
/// </summary>
[DataContract]
public sealed record KernelProcessState : KernelProcessStepState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public ValueTask EmitEventAsync(
object? data = null,
KernelProcessEventVisibility visibility = KernelProcessEventVisibility.Internal)
{
Verify.NotNullOrWhiteSpace(eventId, nameof(eventId));

return this._stepMessageChannel.EmitEventAsync(
new KernelProcessEvent
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ internal static void RegisterDerivedType(Type derivedType)
/// <param name="id">The Id of the associated <see cref="KernelProcessStep"/></param>
public KernelProcessStepState(string name, string version, string? id = null)
{
Verify.NotNullOrWhiteSpace(name);
Verify.NotNullOrWhiteSpace(version);
Verify.NotNullOrWhiteSpace(name, nameof(name));
Verify.NotNullOrWhiteSpace(version, nameof(version));

this.Id = id;
this.Name = name;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Runtime.Serialization;
using System.Text.Json.Serialization;

namespace Microsoft.SemanticKernel.Process.Models;

/// <summary>
/// Process state used for State Persistence serialization
/// </summary>
public sealed record class KernelProcessMapStateMetadata : KernelProcessStepStateMetadata
{
/// <summary>
/// Process State of Steps if provided
/// </summary>
[DataMember]
[JsonPropertyName("operationState")]
public KernelProcessStepStateMetadata? OperationState { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.SemanticKernel.Process.Models;
/// </summary>
[JsonPolymorphic(TypeDiscriminatorPropertyName = "$type", UnknownDerivedTypeHandling = JsonUnknownDerivedTypeHandling.FallBackToNearestAncestor)]
[JsonDerivedType(typeof(KernelProcessStepStateMetadata), typeDiscriminator: nameof(ProcessConstants.SupportedComponents.Step))]
[JsonDerivedType(typeof(KernelProcessMapStateMetadata), typeDiscriminator: nameof(ProcessConstants.SupportedComponents.Map))]
[JsonDerivedType(typeof(KernelProcessStateMetadata), typeDiscriminator: nameof(ProcessConstants.SupportedComponents.Process))]
public record class KernelProcessStepStateMetadata
{
Expand Down
Loading

0 comments on commit ad4d35b

Please sign in to comment.