diff --git a/ProtoActor.sln b/ProtoActor.sln index ce1be916fe..705002dff4 100644 --- a/ProtoActor.sln +++ b/ProtoActor.sln @@ -156,6 +156,10 @@ EndProjectSection EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "HostedService", "benchmarks\HostedService\HostedService.csproj", "{7840C651-9352-4CB2-9152-4793EC219DE9}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DurableFunctions", "examples\DurableFunctions\DurableFunctions.csproj", "{181B6946-85C5-4484-B14A-E001A4F9D5E6}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Proto.Cluster.Durable", "src\Proto.Cluster.Durable\Proto.Cluster.Durable.csproj", "{8D11A769-19F5-4458-9A7A-A4B25A5EF92F}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -728,6 +732,30 @@ Global {7840C651-9352-4CB2-9152-4793EC219DE9}.Release|x64.Build.0 = Release|Any CPU {7840C651-9352-4CB2-9152-4793EC219DE9}.Release|x86.ActiveCfg = Release|Any CPU {7840C651-9352-4CB2-9152-4793EC219DE9}.Release|x86.Build.0 = Release|Any CPU + {181B6946-85C5-4484-B14A-E001A4F9D5E6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {181B6946-85C5-4484-B14A-E001A4F9D5E6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {181B6946-85C5-4484-B14A-E001A4F9D5E6}.Debug|x64.ActiveCfg = Debug|Any CPU + {181B6946-85C5-4484-B14A-E001A4F9D5E6}.Debug|x64.Build.0 = Debug|Any CPU + {181B6946-85C5-4484-B14A-E001A4F9D5E6}.Debug|x86.ActiveCfg = Debug|Any CPU + {181B6946-85C5-4484-B14A-E001A4F9D5E6}.Debug|x86.Build.0 = Debug|Any CPU + {181B6946-85C5-4484-B14A-E001A4F9D5E6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {181B6946-85C5-4484-B14A-E001A4F9D5E6}.Release|Any CPU.Build.0 = Release|Any CPU + {181B6946-85C5-4484-B14A-E001A4F9D5E6}.Release|x64.ActiveCfg = Release|Any CPU + {181B6946-85C5-4484-B14A-E001A4F9D5E6}.Release|x64.Build.0 = Release|Any CPU + {181B6946-85C5-4484-B14A-E001A4F9D5E6}.Release|x86.ActiveCfg = Release|Any CPU + {181B6946-85C5-4484-B14A-E001A4F9D5E6}.Release|x86.Build.0 = Release|Any CPU + {8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Debug|x64.ActiveCfg = Debug|Any CPU + {8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Debug|x64.Build.0 = Debug|Any CPU + {8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Debug|x86.ActiveCfg = Debug|Any CPU + {8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Debug|x86.Build.0 = Debug|Any CPU + {8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Release|Any CPU.Build.0 = Release|Any CPU + {8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Release|x64.ActiveCfg = Release|Any CPU + {8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Release|x64.Build.0 = Release|Any CPU + {8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Release|x86.ActiveCfg = Release|Any CPU + {8D11A769-19F5-4458-9A7A-A4B25A5EF92F}.Release|x86.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -795,6 +823,8 @@ Global {C8C3C2EE-083A-46B4-8F5A-0C0F442C14BE} = {222D5932-627D-406B-9FA5-B60B38FD3019} {8B241043-F933-4C1F-AB57-BAE8B624F133} = {222D5932-627D-406B-9FA5-B60B38FD3019} {7840C651-9352-4CB2-9152-4793EC219DE9} = {0F3AB331-C042-4371-A2F0-0AFDFA13DC9F} + {181B6946-85C5-4484-B14A-E001A4F9D5E6} = {59DCCC96-DDAF-469F-9E8E-9BC733285082} + {8D11A769-19F5-4458-9A7A-A4B25A5EF92F} = {771514F1-12AE-4A26-89CB-2646D3EF7034} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {CD0D1E44-8118-4682-8793-6B20ABFA824C} diff --git a/examples/DurableFunctions/DurableFunctions.csproj b/examples/DurableFunctions/DurableFunctions.csproj new file mode 100644 index 0000000000..1dff79c3ea --- /dev/null +++ b/examples/DurableFunctions/DurableFunctions.csproj @@ -0,0 +1,26 @@ + + + + Exe + net5.0 + + + + + + + + + + + + + + + + + + + + + diff --git a/examples/DurableFunctions/Program.cs b/examples/DurableFunctions/Program.cs new file mode 100644 index 0000000000..85c876f067 --- /dev/null +++ b/examples/DurableFunctions/Program.cs @@ -0,0 +1,99 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using MongoDB.Driver; +using Proto; +using Proto.Cluster; +using Proto.Cluster.Consul; +using Proto.Cluster.Durable; +using Proto.Cluster.Identity; +using Proto.Cluster.Identity.MongoDb; +using Proto.Remote.GrpcCore; +using Serilog; +using Serilog.Events; +using Log = Serilog.Log; + +namespace DurableFunctions +{ + class Program + { + static async Task Main(string[] args) + { + SetupLogger(); + + var db = GetMongo(); + var pids = db.GetCollection("pids"); + + var identity = new IdentityStorageLookup(new MongoIdentityStorage("mycluster", pids)); + var provider = new ConsulProvider(new ConsulProviderConfig()); + var system = new ActorSystem() + .WithRemote( + GrpcCoreRemoteConfig + .BindToLocalhost() + ) + .WithCluster( + ClusterConfig + .Setup("mycluster",provider,identity) + .WithClusterKind("MyFunc", Props.FromProducer(() => new MyFunction())) + .WithClusterKind("SomeActor",Props.FromProducer(() => new SomeActor())) + ) + .WithDurableFunctions(); + + await system + .Cluster() + .StartMemberAsync(); + + await system.Cluster().RequestAsync("foo", "MyFunc", 123, CancellationToken.None); + + Console.ReadLine(); + } + + private static void SetupLogger() + { + Log.Logger = new LoggerConfiguration() + .WriteTo.Console(LogEventLevel.Information, "[{Timestamp:HH:mm:ss} {Level:u3}] {Message:lj}{NewLine}") + .CreateLogger(); + + var l = LoggerFactory.Create(l => + l.AddSerilog() + ); + + Proto.Log.SetLoggerFactory(l); + } + + private static IMongoDatabase GetMongo() + { + var connectionString = "mongodb://127.0.0.1:27017/ProtoMongo"; + var url = MongoUrl.Create(connectionString); + var settings = MongoClientSettings.FromUrl(url); + var client = new MongoClient(settings); + var database = client.GetDatabase("DurableFunctions"); + return database; + } + } + + public class MyFunction : DurableFunction + { + protected override async Task Run(DurableContext context) + { + var x = await context.RequestAsync("foo", "SomeActor", 222); + var y = await context.RequestAsync("foo", "SomeActor", 333); + Console.WriteLine($"result {x * y}"); + } + } + + public class SomeActor : IActor + { + public Task ReceiveAsync(IContext context) + { + if (context.Message is int i) + { + Console.WriteLine($"got call for {i}"); + context.Respond(i*2); + } + + return Task.CompletedTask; + } + } +} \ No newline at end of file diff --git a/src/Proto.Cluster.Durable/DurableContext.cs b/src/Proto.Cluster.Durable/DurableContext.cs new file mode 100644 index 0000000000..289c08be9f --- /dev/null +++ b/src/Proto.Cluster.Durable/DurableContext.cs @@ -0,0 +1,52 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2020 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System; +using System.Threading.Tasks; + +namespace Proto.Cluster.Durable +{ + public class DurableContext + { + private readonly Cluster _cluster; + private readonly ClusterIdentity _identity; + public object Message { get; set; } + + public DurableContext(Cluster cluster, ClusterIdentity identity) + { + _cluster = cluster; + _identity = identity; + } + + public Task WaitForExternalEvent() + { + return null; + } + + public Task CreateTimer() + { + return null; + } + + public async Task RequestAsync(string identity, string kind, object message) + { + //send request to local orchestrator + //orchestrator saves request to DB + + //await response from orchestrator + var target = new ClusterIdentity + { + Identity = identity, + Kind = kind, + }; + + var request = new DurableRequest(_identity, target, message); + + var response = await _cluster.DurableRequestAsync(request); + var m = response.Message; + return (T) m; + } + } +} \ No newline at end of file diff --git a/src/Proto.Cluster.Durable/DurableExtensions.cs b/src/Proto.Cluster.Durable/DurableExtensions.cs new file mode 100644 index 0000000000..60248609ee --- /dev/null +++ b/src/Proto.Cluster.Durable/DurableExtensions.cs @@ -0,0 +1,26 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2020 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System.Threading.Tasks; + +namespace Proto.Cluster.Durable +{ + public static class DurableExtensions + { + public static async Task DurableRequestAsync(this Cluster self, DurableRequest message) + { + var d = self.System.Extensions.Get(); + var response = await d.DurableRequestAsync(message); + return response; + } + + public static ActorSystem WithDurableFunctions(this ActorSystem system) + { + var p = new DurablePlugin(system.Cluster()); + system.Extensions.Register(p); + return system; + } + } +} \ No newline at end of file diff --git a/src/Proto.Cluster.Durable/DurableFunction.cs b/src/Proto.Cluster.Durable/DurableFunction.cs new file mode 100644 index 0000000000..f5a58378f3 --- /dev/null +++ b/src/Proto.Cluster.Durable/DurableFunction.cs @@ -0,0 +1,39 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2020 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System; +using System.Threading.Tasks; +using JetBrains.Annotations; + +namespace Proto.Cluster.Durable +{ + [PublicAPI] + public abstract class DurableFunction : IActor + { + private ClusterIdentity? _identity; + private DurableContext? _durableContext; + + async Task IActor.ReceiveAsync(IContext context) + { + if (context.Message is ClusterInit init) + { + _identity = init.ClusterIdentity; + _durableContext = new DurableContext(init.Cluster, _identity); + } + + if (_durableContext != null && context.Sender != null) + { + //if workflow not exists, save new workflow, also save message + + context.Respond(123); //this should be a real message like "FunctionStarted" or something + + _durableContext.Message = context.Message!; //use the saved message here + await Run(_durableContext); + } + } + + protected abstract Task Run(DurableContext context); + } +} \ No newline at end of file diff --git a/src/Proto.Cluster.Durable/DurablePlugin.cs b/src/Proto.Cluster.Durable/DurablePlugin.cs new file mode 100644 index 0000000000..ad1f39df1f --- /dev/null +++ b/src/Proto.Cluster.Durable/DurablePlugin.cs @@ -0,0 +1,34 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2020 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Proto.Extensions; + +namespace Proto.Cluster.Durable +{ + public class DurablePlugin : IActorSystemExtension + { + private readonly Dictionary _cache = new(); + private readonly Cluster _cluster; + + public DurablePlugin(Cluster cluster) + { + _cluster = cluster; + } + + public async Task DurableRequestAsync(DurableRequest request) + { + if (_cache.TryGetValue(request, out var response)) return response; + + var responseMessage = await _cluster.RequestAsync(request.Target.Identity, request.Target.Kind, request.Message, CancellationToken.None); + response = new DurableResponse(responseMessage); + _cache.TryAdd(request, response); + + return response; + } + } +} \ No newline at end of file diff --git a/src/Proto.Cluster.Durable/DurableRequest.cs b/src/Proto.Cluster.Durable/DurableRequest.cs new file mode 100644 index 0000000000..76c8934c45 --- /dev/null +++ b/src/Proto.Cluster.Durable/DurableRequest.cs @@ -0,0 +1,9 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2020 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +namespace Proto.Cluster.Durable +{ + public record DurableRequest(ClusterIdentity Sender, ClusterIdentity Target, object Message); +} \ No newline at end of file diff --git a/src/Proto.Cluster.Durable/DurableResponse.cs b/src/Proto.Cluster.Durable/DurableResponse.cs new file mode 100644 index 0000000000..92cbac02fa --- /dev/null +++ b/src/Proto.Cluster.Durable/DurableResponse.cs @@ -0,0 +1,9 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2015-2020 Asynkron AB All rights reserved +// +// ----------------------------------------------------------------------- +namespace Proto.Cluster.Durable +{ + public record DurableResponse(object Message); +} \ No newline at end of file diff --git a/src/Proto.Cluster.Durable/Proto.Cluster.Durable.csproj b/src/Proto.Cluster.Durable/Proto.Cluster.Durable.csproj new file mode 100644 index 0000000000..9cbf119692 --- /dev/null +++ b/src/Proto.Cluster.Durable/Proto.Cluster.Durable.csproj @@ -0,0 +1,20 @@ + + + + net5.0;netstandard2.1 + 9 + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + diff --git a/src/Proto.Cluster.Durable/protos.proto b/src/Proto.Cluster.Durable/protos.proto new file mode 100644 index 0000000000..683f1fdbe3 --- /dev/null +++ b/src/Proto.Cluster.Durable/protos.proto @@ -0,0 +1,5 @@ +syntax = "proto3"; +package cluster; +option csharp_namespace = "Proto.Cluster"; + +import "Proto.Actor/Protos.proto"; diff --git a/src/Proto.Remote/Endpoints/EndpointActor.cs b/src/Proto.Remote/Endpoints/EndpointActor.cs index 07ec536932..77579cb5fa 100644 --- a/src/Proto.Remote/Endpoints/EndpointActor.cs +++ b/src/Proto.Remote/Endpoints/EndpointActor.cs @@ -10,6 +10,7 @@ using Grpc.Core; using Microsoft.Extensions.Logging; using System.Linq; +using Google.Protobuf; namespace Proto.Remote { @@ -253,7 +254,7 @@ private Task RemoteDeliver(IEnumerable m, IContext context) var envelope = new MessageEnvelope { - MessageData = bytes, + MessageData = ByteString.CopyFrom(bytes), Sender = rd.Sender, Target = targetId, TypeId = typeId, diff --git a/src/Proto.Remote/Endpoints/EndpointReader.cs b/src/Proto.Remote/Endpoints/EndpointReader.cs index 89feca436f..478ffd4b62 100644 --- a/src/Proto.Remote/Endpoints/EndpointReader.cs +++ b/src/Proto.Remote/Endpoints/EndpointReader.cs @@ -99,7 +99,7 @@ public override async Task Receive( var target = targets[envelope.Target]; var typeName = typeNames[envelope.TypeId]; var message = - _serialization.Deserialize(typeName, envelope.MessageData, envelope.SerializerId); + _serialization.Deserialize(typeName, envelope.MessageData.Span, envelope.SerializerId); switch (message) { diff --git a/src/Proto.Remote/Serialization/ISerializer.cs b/src/Proto.Remote/Serialization/ISerializer.cs index c5a7f952c9..8046a6dca6 100644 --- a/src/Proto.Remote/Serialization/ISerializer.cs +++ b/src/Proto.Remote/Serialization/ISerializer.cs @@ -3,14 +3,15 @@ // Copyright (C) 2015-2020 Asynkron AB All rights reserved // // ----------------------------------------------------------------------- +using System; using Google.Protobuf; namespace Proto.Remote { public interface ISerializer { - ByteString Serialize(object obj); - object Deserialize(ByteString bytes, string typeName); + ReadOnlySpan Serialize(object obj); + object Deserialize(ReadOnlySpan bytes, string typeName); string GetTypeName(object message); } } \ No newline at end of file diff --git a/src/Proto.Remote/Serialization/JsonSerializer.cs b/src/Proto.Remote/Serialization/JsonSerializer.cs index c0b2bc4537..658221a875 100644 --- a/src/Proto.Remote/Serialization/JsonSerializer.cs +++ b/src/Proto.Remote/Serialization/JsonSerializer.cs @@ -4,6 +4,7 @@ // // ----------------------------------------------------------------------- using System; +using System.Text; using Google.Protobuf; namespace Proto.Remote @@ -17,21 +18,21 @@ public JsonSerializer(Serialization serialization) _serialization = serialization; } - public ByteString Serialize(object obj) + public ReadOnlySpan Serialize(object obj) { if (obj is JsonMessage jsonMessage) { - return ByteString.CopyFromUtf8(jsonMessage.Json); + return Encoding.UTF8.GetBytes(jsonMessage.Json); } var message = obj as IMessage; var json = JsonFormatter.Default.Format(message); - return ByteString.CopyFromUtf8(json); + return Encoding.UTF8.GetBytes(json); } - public object Deserialize(ByteString bytes, string typeName) + public object Deserialize(ReadOnlySpan bytes, string typeName) { - var json = bytes.ToStringUtf8(); + var json = Encoding.UTF8.GetString(bytes); var parser = _serialization.TypeLookup[typeName]; var o = parser.ParseJson(json); diff --git a/src/Proto.Remote/Serialization/ProtobufSerializer.cs b/src/Proto.Remote/Serialization/ProtobufSerializer.cs index d721741fff..54dde8252b 100644 --- a/src/Proto.Remote/Serialization/ProtobufSerializer.cs +++ b/src/Proto.Remote/Serialization/ProtobufSerializer.cs @@ -17,16 +17,16 @@ public ProtobufSerializer(Serialization serialization) _serialization = serialization; } - public ByteString Serialize(object obj) + public ReadOnlySpan Serialize(object obj) { var message = obj as IMessage; - return message.ToByteString(); + return message.ToByteArray(); } - public object Deserialize(ByteString bytes, string typeName) + public object Deserialize(ReadOnlySpan bytes, string typeName) { var parser = _serialization.TypeLookup[typeName]; - var o = parser.ParseFrom(bytes); + var o = parser.ParseFrom(bytes.ToArray()); return o; } diff --git a/src/Proto.Remote/Serialization/Serialization.cs b/src/Proto.Remote/Serialization/Serialization.cs index a0fbae2539..309938abce 100644 --- a/src/Proto.Remote/Serialization/Serialization.cs +++ b/src/Proto.Remote/Serialization/Serialization.cs @@ -4,6 +4,7 @@ // // ----------------------------------------------------------------------- +using System; using System.Collections.Generic; using Google.Protobuf; using Google.Protobuf.Reflection; @@ -44,11 +45,11 @@ public void RegisterFileDescriptor(FileDescriptor fd) } } - public ByteString Serialize(object message, int serializerId) => _serializers[serializerId].Serialize(message); + public ReadOnlySpan Serialize(object message, int serializerId) => _serializers[serializerId].Serialize(message); public string GetTypeName(object message, int serializerId) => _serializers[serializerId].GetTypeName(message); - public object Deserialize(string typeName, ByteString bytes, int serializerId) => + public object Deserialize(string typeName, ReadOnlySpan bytes, int serializerId) => _serializers[serializerId].Deserialize(bytes, typeName); } } \ No newline at end of file diff --git a/tests/Proto.Cluster.Identity.Tests/Proto.Cluster.Identity.Tests.csproj b/tests/Proto.Cluster.Identity.Tests/Proto.Cluster.Identity.Tests.csproj index fa8377dab5..39a117ee9d 100644 --- a/tests/Proto.Cluster.Identity.Tests/Proto.Cluster.Identity.Tests.csproj +++ b/tests/Proto.Cluster.Identity.Tests/Proto.Cluster.Identity.Tests.csproj @@ -8,7 +8,6 @@ - diff --git a/tests/Proto.Remote.Tests/RemoteTests/HostedGrpcNetWithCustomeSerializerTests.cs b/tests/Proto.Remote.Tests/RemoteTests/HostedGrpcNetWithCustomeSerializerTests.cs index 6b44160767..56382667bb 100644 --- a/tests/Proto.Remote.Tests/RemoteTests/HostedGrpcNetWithCustomeSerializerTests.cs +++ b/tests/Proto.Remote.Tests/RemoteTests/HostedGrpcNetWithCustomeSerializerTests.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Concurrent; +using System.Text; using System.Threading.Tasks; using Google.Protobuf; using Microsoft.Extensions.Hosting; @@ -19,16 +20,16 @@ public class CustomSerializer : ISerializer { private readonly ConcurrentDictionary _types = new(); - public object Deserialize(ByteString bytes, string typeName) + public object Deserialize(ReadOnlySpan bytes, string typeName) { var type = _types.GetOrAdd(typeName, name => Type.GetType(name)); - return System.Text.Json.JsonSerializer.Deserialize(bytes.ToStringUtf8(), type); + return System.Text.Json.JsonSerializer.Deserialize(bytes, type); } public string GetTypeName(object message) => message.GetType().AssemblyQualifiedName; - public ByteString Serialize(object obj) => - ByteString.CopyFromUtf8(System.Text.Json.JsonSerializer.Serialize(obj)); + public ReadOnlySpan Serialize(object obj) => + Encoding.UTF8.GetBytes(System.Text.Json.JsonSerializer.Serialize(obj)); } public class Fixture : RemoteFixture