Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

.Net Processes - Map Step Feature #9339

Open
wants to merge 94 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
94 commits
Select commit Hold shift + click to select a range
b263e0a
Checkpoint
crickman Oct 16, 2024
3638c4d
Merge from main
crickman Oct 16, 2024
230c456
Checkpoint
crickman Oct 17, 2024
ccf99af
Merge branch 'main' into processes-map-step
crickman Oct 17, 2024
ecfa86d
Merge branch 'main' into processes-map-step
crickman Oct 19, 2024
d5dea83
Checkpoint
crickman Oct 20, 2024
991c5d1
Clean-up
crickman Oct 20, 2024
04f6ee7
Cleanup
crickman Oct 20, 2024
ccd6418
Cleanup
crickman Oct 20, 2024
59aad71
Namespace
crickman Oct 20, 2024
3a74058
namespace
crickman Oct 20, 2024
47fa435
Clean-up
crickman Oct 21, 2024
13fd520
Cleanup
crickman Oct 21, 2024
b8b676f
Subprocess validation
crickman Oct 21, 2024
d327f0a
Clean
crickman Oct 21, 2024
fbe57c3
Merge branch 'main' into processes-map-step
crickman Oct 21, 2024
ff4889e
Merge branch 'main' into processes-map-step
crickman Oct 23, 2024
6db5a51
Checkpoint
crickman Oct 23, 2024
e8f54fa
Resolve merge from main
crickman Oct 23, 2024
34e663c
Merge branch 'processes-map-step' of https://github.com/microsoft/sem…
crickman Oct 23, 2024
60f6b5b
TYPO
crickman Oct 23, 2024
851f6d4
Build
crickman Oct 23, 2024
52aea0c
Namespace
crickman Oct 23, 2024
a756299
Whitespace
crickman Oct 23, 2024
4927344
Checkpoint
crickman Oct 23, 2024
e573bba
Resolve merge from main
crickman Oct 23, 2024
f79ae9d
Checkpoint
crickman Oct 24, 2024
29d5762
More samples
crickman Oct 24, 2024
a716452
Clean
crickman Oct 24, 2024
397ae7d
Refine
crickman Oct 24, 2024
f021ba5
Resolve merge from main
crickman Oct 24, 2024
d4d2cf9
Fix merge
crickman Oct 24, 2024
95a86d0
Namespace
crickman Oct 24, 2024
09bec2d
Merge branch 'main' into processes-map-step
crickman Oct 24, 2024
c24fcb1
Core test coverage
crickman Oct 24, 2024
ed963a8
Merge branch 'main' into processes-map-step
crickman Oct 24, 2024
48c3c97
Cleanup
crickman Oct 24, 2024
fd27d39
LocalRuntime test coverage
crickman Oct 24, 2024
c4c9dc2
Merge branch 'main' into processes-map-step
crickman Oct 25, 2024
f2805f4
Merge branch 'main' into processes-map-step
crickman Oct 25, 2024
e0a94c7
Checkpoint
crickman Oct 26, 2024
ef71fee
Merge branch 'processes-map-step' of https://github.com/microsoft/sem…
crickman Oct 26, 2024
3096391
Merge branch 'main' into processes-map-step
crickman Oct 26, 2024
f74714d
Sync to shared folder structure
crickman Oct 26, 2024
cd65b85
Heading
crickman Oct 26, 2024
255251c
Better
crickman Oct 26, 2024
2748cd2
Schwing!
crickman Oct 26, 2024
48533a2
Sample
crickman Oct 26, 2024
220c172
Spell
crickman Oct 26, 2024
f4911d0
Clean-up
crickman Oct 28, 2024
488edb9
Dapr
crickman Oct 29, 2024
01e0c9e
Merge to main
crickman Nov 5, 2024
5b27db5
Resolve merge from main
crickman Nov 5, 2024
bb9ad81
Complete merge + Serialization stubs for "map"
crickman Nov 6, 2024
f420f15
Typo
crickman Nov 6, 2024
695a5b8
Whitespace
crickman Nov 6, 2024
c59cfe0
Merge branch 'main' into processes-map-step
crickman Nov 6, 2024
6c2ad07
Checkpoint
crickman Nov 6, 2024
0c5891a
Resolve merge from branch
crickman Nov 6, 2024
2419e94
Update sync
crickman Nov 6, 2024
8d3288b
Merge branch 'main' into processes-map-step
crickman Nov 6, 2024
36d7069
Merge branch 'processes-map-step' of https://github.com/microsoft/sem…
crickman Nov 6, 2024
4b972e0
Rename: EventProxy
crickman Nov 6, 2024
c12cb4c
Dapr Checkpoint
crickman Nov 6, 2024
676b3c8
Merge branch 'main' into processes-map-step
crickman Nov 7, 2024
5ced0fb
Merge branch 'main' into processes-map-step
crickman Nov 7, 2024
ccf3b35
DAPR Checkpoint
crickman Nov 7, 2024
9dc104b
Resolve merge from main
crickman Nov 7, 2024
3c0e7d8
Merge branch 'main' into processes-map-step
crickman Nov 7, 2024
4ad6f2d
Namespace
crickman Nov 7, 2024
ac19f2c
Refine `ToDaprStepInfoAsync`
crickman Nov 7, 2024
b060373
Resolve merge from main
crickman Nov 7, 2024
2c2956d
Fix map metadata
crickman Nov 7, 2024
dab0151
Resolve merge from main
crickman Nov 7, 2024
779ecb6
Build
crickman Nov 7, 2024
8d9eeb1
Update
crickman Nov 7, 2024
a5c4f59
whitespace
crickman Nov 7, 2024
12d914c
Map Serialization
crickman Nov 8, 2024
3572b34
Merge branch 'main' into processes-map-step
crickman Nov 8, 2024
53f306d
Fix unit-test race
crickman Nov 8, 2024
dbf381b
Stronger
crickman Nov 8, 2024
2336aef
Integration tests
crickman Nov 8, 2024
8b7b485
REvert demo
crickman Nov 8, 2024
3b5781e
Namespace
crickman Nov 8, 2024
091f5ad
Stablize unit-tests
crickman Nov 8, 2024
813a842
Namespace
crickman Nov 8, 2024
349c41d
Merge branch 'main' into processes-map-step
crickman Nov 8, 2024
b370a6f
Serialization tweaks
crickman Nov 8, 2024
03cf6ba
Merge branch 'main' into processes-map-step
crickman Nov 8, 2024
3110acf
`else`
crickman Nov 8, 2024
85f44a4
Merge branch 'processes-map-step' of https://github.com/microsoft/sem…
crickman Nov 8, 2024
560c427
Update based on PR comments
crickman Nov 8, 2024
c99bd2e
Merge branch 'main' into processes-map-step
crickman Nov 8, 2024
4ab9dd1
Merge branch 'main' into processes-map-step
crickman Nov 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
</ItemGroup>

<ItemGroup>
<EmbeddedResource Include="Resources\*">
<EmbeddedResource Include="..\LearnResources\Resources\Grimms-*">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</EmbeddedResource>
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Text;
using Microsoft.SemanticKernel;

namespace Step05;

/// <summary>
/// DEV HARNESS
/// </summary>
public class Step05_MapReduce : BaseTest
{
// Target Open AI Services
protected override bool ForceOpenAI => true;

/// <summary>
/// Factor to increase the scale of the content processed.
/// </summary>
private const int ScaleFactor = 100;

private readonly string _sourceContent;

public Step05_MapReduce(ITestOutputHelper output)
: base(output, redirectSystemConsoleOutput: true)
{
StringBuilder content = new();

for (int count = 0; count < ScaleFactor; ++count)
{
content.AppendLine(File.ReadAllText("Grimms-The-King-of-the-Golden-Mountain.txt"));
content.AppendLine(File.ReadAllText("Grimms-The-Water-of-Life.txt"));
content.AppendLine(File.ReadAllText("Grimms-The-White-Snake.txt"));
}

this._sourceContent = content.ToString().ToUpperInvariant();
}

[Fact]
public async Task RunMapReduceAsync()
{
KernelProcess process = SetupMapReduceProcess(nameof(RunMapReduceAsync), "Start");
Kernel kernel = new();
using LocalKernelProcessContext localProcess =
await process.StartAsync(
kernel,
new KernelProcessEvent
{
Id = "Start",
Data = this._sourceContent,
});

Dictionary<string, int> results = (Dictionary<string, int>?)kernel.Data[ResultStep.ResultKey] ?? [];
foreach (var result in results)
{
Console.WriteLine($"{result.Key}: {result.Value}");
}
}

private KernelProcess SetupMapReduceProcess(string processName, string inputEventId)
{
ProcessBuilder process = new(processName);

ProcessStepBuilder chunkStep = process.AddStepFromType<ChunkStep>();
process
.OnInputEvent(inputEventId)
.SendEventTo(new ProcessFunctionTargetBuilder(chunkStep));

ProcessStepBuilder countStep = process.AddStepFromType<CountStep>();
ProcessMapBuilder mapStep = process.AddMapForTarget(new ProcessFunctionTargetBuilder(countStep));
chunkStep
.OnEvent(ChunkStep.EventId)
.SendEventTo(mapStep);

ProcessStepBuilder resultStep = process.AddStepFromType<ResultStep>();
mapStep
.OnEvent(CountStep.EventId)
.SendEventTo(new ProcessFunctionTargetBuilder(resultStep));

return process.Build();
}

private sealed class ChunkStep : KernelProcessStep
{
public const string EventId = "ChunkComplete";

[KernelFunction]
public async ValueTask ChunkAsync(KernelProcessStepContext context, string content)
{
int chunkSize = content.Length / Environment.ProcessorCount;
string[] chunks = ChunkContent(content, chunkSize).ToArray();

await context.EmitEventAsync(new() { Id = EventId, Data = chunks });
}

private IEnumerable<string> ChunkContent(string content, int chunkSize)
{
for (int index = 0; index < content.Length; index += chunkSize)
{
yield return content.Substring(index, Math.Min(chunkSize, content.Length - index));
}
}
}

private sealed class CountStep : KernelProcessStep
{
public const string EventId = "CountComplete";

[KernelFunction]
public async ValueTask ComputeAsync(KernelProcessStepContext context, string chunk)
{
Dictionary<string, int> counts = [];

string[] words = chunk.Split([' ', '\n', '\r', '.', ',', '’'], StringSplitOptions.RemoveEmptyEntries);
foreach (string word in words)
{
if (s_notInteresting.Contains(word))
{
continue;
}

counts.TryGetValue(word.Trim(), out int count);
counts[word] = ++count;
}

await context.EmitEventAsync(new() { Id = EventId, Data = counts });
}
}

private sealed class ResultStep : KernelProcessStep
{
public const string ResultKey = "WordCount";

[KernelFunction]
public async ValueTask ComputeAsync(KernelProcessStepContext context, IList<Dictionary<string, int>> results, Kernel kernel)
{
Dictionary<string, int> totals = [];

foreach (Dictionary<string, int> result in results)
{
foreach (KeyValuePair<string, int> pair in result)
{
totals.TryGetValue(pair.Key, out int count);
totals[pair.Key] = count + pair.Value;
}
}

var sorted =
from kvp in totals
orderby kvp.Value descending
select kvp;

kernel.Data[ResultKey] = sorted.Take(10).ToDictionary(kvp => kvp.Key, kvp => kvp.Value);
}
}

private static readonly HashSet<string> s_notInteresting =
[
"A",
"ALL",
"AN",
"AND",
"AS",
"AT",
"BE",
"BEFORE",
"BUT",
"BY",
"CAME",
"COULD",
"FOR",
"GO",
"HAD",
"HAVE",
"HE",
"HER",
"HIM",
"HIMSELF",
"HIS",
"HOW",
"I",
"IF",
"IN",
"INTO",
"IS",
"IT",
"ME",
"MUST",
"MY",
"NO",
"NOT",
"NOW",
"OF",
"ON",
"ONCE",
"ONE",
"ONLY",
"OUT",
"S",
"SAID",
"SAW",
"SET",
"SHE",
"SHOULD",
"SO",
"THAT",
"THE",
"THEM",
"THEN",
"THEIR",
"THERE",
"THEY",
"THIS",
"TO",
"VERY",
"WAS",
"WENT",
"WERE",
"WHAT",
"WHEN",
"WHO",
"WILL",
"WITH",
"WOULD",
"UP",
"UPON",
"YOU",
];
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private static void StoreProcessStateLocally(KernelProcessStateMetadata processS
throw new KernelException($"Filepath for process {processStateInfo.Name} does not have .json extension");
}

var content = JsonSerializer.Serialize<KernelProcessStepStateMetadata>(processStateInfo, s_jsonOptions);
string content = JsonSerializer.Serialize(processStateInfo, s_jsonOptions);
Console.WriteLine($"Process State: \n{content}");
Console.WriteLine($"Saving Process State Locally: \n{Path.GetFullPath(fullFilepath)}");
File.WriteAllText(fullFilepath, content);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Collections.Generic;

namespace Microsoft.SemanticKernel;

/// <summary>
/// A serializable representation of a ProcessMap.
/// </summary>
public sealed record KernelProcessMap : KernelProcessStepInfo
{
/// <summary>
/// The map operation.
esttenorio marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
public KernelProcess Operation { get; }

/// <summary>
/// Creates a new instance of the <see cref="KernelProcess"/> class.
/// </summary>
/// <param name="state">The process state.</param>
/// <param name="operation">The map operation.</param>
/// <param name="edges">The edges for the map.</param>
public KernelProcessMap(KernelProcessMapState state, KernelProcess operation, Dictionary<string, List<KernelProcessEdge>> edges)
: base(typeof(KernelProcessMap), state, edges)
{
Verify.NotNull(operation, nameof(operation));
Verify.NotNullOrWhiteSpace(state.Name, $"{nameof(state)}.{nameof(KernelProcessMapState.Name)}");
Verify.NotNullOrWhiteSpace(state.Id, $"{nameof(state)}.{nameof(KernelProcessMapState.Id)}");

this.Operation = operation;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Runtime.Serialization;

namespace Microsoft.SemanticKernel;

/// <summary>
/// Represents the state of a <see cref="KernelProcessMap"/>.
/// </summary>
[DataContract]
public sealed record KernelProcessMapState : KernelProcessStepState
{
/// <summary>
/// Initializes a new instance of the <see cref="KernelProcessMapState"/> class.
/// </summary>
/// <param name="name">The name of the associated <see cref="KernelProcessMap"/></param>
/// <param name="version">version id of the process step state</param>
/// <param name="id">The Id of the associated <see cref="KernelProcessMap"/></param>
public KernelProcessMapState(string name, string version, string id)
: base(name, version, id)
{
Verify.NotNullOrWhiteSpace(id, nameof(id));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
namespace Microsoft.SemanticKernel;

/// <summary>
/// Represents the state of a process.
/// Represents the state of a <see cref="KernelProcess"/>.
/// </summary>
[DataContract]
public sealed record KernelProcessState : KernelProcessStepState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,6 @@ public virtual ValueTask ActivateAsync(KernelProcessStepState state)
/// <inheritdoc/>
public virtual ValueTask ActivateAsync(KernelProcessStepState<TState> state)
{
return default;
return base.ActivateAsync(state);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public ValueTask EmitEventAsync(
object? data = null,
KernelProcessEventVisibility visibility = KernelProcessEventVisibility.Internal)
{
Verify.NotNullOrWhiteSpace(eventId, nameof(eventId));

return this._stepMessageChannel.EmitEventAsync(
new KernelProcessEvent
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ internal static void RegisterDerivedType(Type derivedType)
/// <param name="id">The Id of the associated <see cref="KernelProcessStep"/></param>
public KernelProcessStepState(string name, string version, string? id = null)
{
Verify.NotNullOrWhiteSpace(name);
Verify.NotNullOrWhiteSpace(version);
Verify.NotNullOrWhiteSpace(name, nameof(name));
Verify.NotNullOrWhiteSpace(version, nameof(version));

this.Id = id;
this.Name = name;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Runtime.Serialization;
using System.Text.Json.Serialization;

namespace Microsoft.SemanticKernel.Process.Models;

/// <summary>
/// Process state used for State Persistence serialization
/// </summary>
public sealed record class KernelProcessMapStateMetadata : KernelProcessStepStateMetadata
{
/// <summary>
/// Process State of Steps if provided
/// </summary>
[DataMember]
[JsonPropertyName("mapState")]
public KernelProcessStateMetadata? OperationState { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.SemanticKernel.Process.Models;
/// </summary>
[JsonPolymorphic(TypeDiscriminatorPropertyName = "$type", UnknownDerivedTypeHandling = JsonUnknownDerivedTypeHandling.FallBackToNearestAncestor)]
[JsonDerivedType(typeof(KernelProcessStepStateMetadata), typeDiscriminator: nameof(ProcessConstants.SupportedComponents.Step))]
[JsonDerivedType(typeof(KernelProcessMapStateMetadata), typeDiscriminator: nameof(ProcessConstants.SupportedComponents.Map))]
[JsonDerivedType(typeof(KernelProcessStateMetadata), typeDiscriminator: nameof(ProcessConstants.SupportedComponents.Process))]
public record class KernelProcessStepStateMetadata
{
Expand Down
Loading
Loading