Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persistence.cs use callback with index #2137

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions benchmarks/GossipBenchmark/Node1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@
using Proto.Cluster;
using Proto.Cluster.Consul;
using Proto.Cluster.Partition;
using Proto.Cluster.Seed;
using Proto.Cluster.SeedNode.Redis;
using Proto.Context;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using StackExchange.Redis;
using static Proto.CancellationTokens;
using ProtosReflection = ClusterHelloWorld.Messages.ProtosReflection;

Expand Down
3 changes: 0 additions & 3 deletions benchmarks/GossipBenchmark/Node2/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,9 @@
using Proto.Cluster;
using Proto.Cluster.Consul;
using Proto.Cluster.Partition;
using Proto.Cluster.Seed;
using Proto.Cluster.SeedNode.Redis;
using Proto.Context;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using StackExchange.Redis;
using static System.Threading.Tasks.Task;
using ProtosReflection = ClusterHelloWorld.Messages.ProtosReflection;

Expand Down
2 changes: 0 additions & 2 deletions benchmarks/GossipDecoder/Program.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// See https://aka.ms/new-console-template for more information

using System.Runtime.InteropServices.JavaScript;
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using Proto.Cluster;

Console.WriteLine("Hello, World!");
Expand Down
3 changes: 0 additions & 3 deletions benchmarks/SkyriseMini/Client/ProtoActorExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
using System.IO.Compression;
using Grpc.Net.Client;
using Grpc.Net.Compression;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand Down
6 changes: 1 addition & 5 deletions benchmarks/SkyriseMini/Server/ProtoActorExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
using System.IO.Compression;
using Grpc.Net.Client;
using Grpc.Net.Compression;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Proto;
using Proto.Cluster;
using Proto.Cluster.Consul;
using Proto.Cluster.Partition;
using Proto.Cluster.PartitionActivator;
using Proto.DependencyInjection;
using Proto.Remote;
using Proto.Remote.GrpcNet;
Expand Down
2 changes: 0 additions & 2 deletions examples/ClusterK8sGrains/Node1/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
using static Proto.CancellationTokens;
using ProtosReflection = ClusterHelloWorld.Messages.ProtosReflection;
using System.Runtime.Loader;
using Microsoft.Extensions.Configuration;
using Extensions = Proto.Remote.GrpcNet.Extensions;

// Hook SIGTERM to a cancel token to know when k8s is shutting us down
// hostBuilder should be used in production
Expand Down
1 change: 0 additions & 1 deletion examples/ClusterK8sGrains/Node2/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using System.Threading.Tasks;
using System.Threading;
using ClusterHelloWorld.Messages;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
using Proto;
using Proto.Cluster;
Expand Down
4 changes: 2 additions & 2 deletions examples/Patterns/Saga/InMemoryProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ public class InMemoryProvider : IProvider
public Task<(object Snapshot, long Index)> GetSnapshotAsync(string actorName) =>
Task.FromResult(((object)default(Snapshot), 0L));

public Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback)
public Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback)
{
if (Events.TryGetValue(actorName, out var events))
{
foreach (var e in events.Where(e => e.Key >= indexStart && e.Key <= indexEnd))
{
callback(e.Value);
callback(e.Value, e.Key);
}
}

Expand Down
5 changes: 0 additions & 5 deletions examples/Proto.Cluster.Dashboard.Host/Program.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
using Google.Protobuf.WellKnownTypes;
using MudBlazor.Services;
using Proto;
using Proto.Cluster;
using Proto.Cluster.Dashboard;
using Proto.Cluster.Partition;
using Proto.Cluster.Seed;
using Proto.Remote;
using Proto.Remote.GrpcNet;
using Proto.Remote.HealthChecks;

var builder = WebApplication.CreateBuilder(args);
Expand Down
1 change: 0 additions & 1 deletion examples/remotechannels/Client/Program.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Threading.Tasks;
using Common;
using Proto;
using Proto.Remote;
Expand Down
4 changes: 0 additions & 4 deletions src/Proto.Actor/Configuration/ActorSystemConfig.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,9 @@
// -----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Diagnostics;
using JetBrains.Annotations;
using Microsoft.Extensions.Logging;
using Proto.Context;
using Proto.Extensions;
using System.Text.Json;
using System.Text.Json.Serialization;

// ReSharper disable once CheckNamespace
Expand Down
2 changes: 0 additions & 2 deletions src/Proto.Actor/Extensions/IActorSystemExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Threading;
using System.Threading.Tasks;
using Proto.Diagnostics;

namespace Proto.Extensions;
Expand Down
2 changes: 0 additions & 2 deletions src/Proto.Actor/Process/Process.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

// ReSharper disable once CheckNamespace

using System;
using System.Threading.Tasks;
using Proto.Mailbox;

namespace Proto;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down
1 change: 0 additions & 1 deletion src/Proto.Cluster/Partition/PartitionPlacementActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down
1 change: 0 additions & 1 deletion src/Proto.Cluster/Seed/FixedServerSeedNodeDiscovery.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Threading.Tasks;
using Proto.Cluster.SingleNode;

namespace Proto.Cluster.Seed;

Expand Down
2 changes: 0 additions & 2 deletions src/Proto.Cluster/Seed/SeedNodeClusterProviderOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
// </copyright>
// -----------------------------------------------------------------------

using System.Collections.Immutable;

namespace Proto.Cluster.Seed;

public record SeedNodeClusterProviderOptions(ISeedNodeDiscovery Discovery);
2 changes: 0 additions & 2 deletions src/Proto.Cluster/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Proto.Cluster;
using Proto.Cluster.Identity;
using Proto.Cluster.Partition;
using Proto.Cluster.Seed;
using Proto.DependencyInjection;
using Proto.Remote.GrpcNet;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand Down
6 changes: 3 additions & 3 deletions src/Proto.Persistence.Couchbase/CouchbaseProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public CouchbaseProvider(IBucket bucket)
_bucket = bucket;
}

public Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback)
public Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback)
{
var query = GenerateGetEventsQuery(actorName, indexStart, indexEnd);

Expand Down Expand Up @@ -105,7 +105,7 @@ private string GenerateGetEventsQuery(string actorName, long indexStart, long in
$"AND b.eventIndex <= {indexEnd} " +
"ORDER BY b.eventIndex ASC";

private async Task<long> ExecuteGetEventsQueryAsync(string query, Action<object> callback)
private async Task<long> ExecuteGetEventsQueryAsync(string query, Action<object, long> callback)
{
var req = QueryRequest.Create(query);

Expand All @@ -119,7 +119,7 @@ private async Task<long> ExecuteGetEventsQueryAsync(string query, Action<object>

foreach (var @event in events)
{
callback(@event.Data);
callback(@event.Data, @event.EventIndex);
}

return events.LastOrDefault()?.EventIndex ?? -1;
Expand Down
11 changes: 6 additions & 5 deletions src/Proto.Persistence.DynamoDB/DynamoDBProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public DynamoDBProvider(IAmazonDynamoDB dynamoDBClient, DynamoDBProviderOptions
_snapshotsTable = Table.LoadTable(dynamoDBClient, options.SnapshotsTableName, DynamoDBEntryConversion.V2);
}

public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback)
public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback)
{
var config = new QueryOperationConfig { ConsistentRead = true };
config.Filter.AddCondition(_options.EventsTableHashKey, QueryOperator.Equal, actorName);
Expand All @@ -52,8 +52,9 @@ public async Task<long> GetEventsAsync(string actorName, long indexStart, long i

foreach (var doc in results)
{
callback(GetData(doc));
lastIndex++;
var (@event, index) = GetData(doc);
callback(@event, index);
lastIndex=index;
}

if (query.IsDone)
Expand All @@ -64,14 +65,14 @@ public async Task<long> GetEventsAsync(string actorName, long indexStart, long i

return lastIndex;

object GetData(Document doc)
(object @event, long index) GetData(Document doc)
{
var dataTypeE = doc.GetValueOrThrow(_options.EventsTableDataTypeKey);
var dataE = doc.GetValueOrThrow(_options.EventsTableDataKey);

var dataType = Type.GetType(dataTypeE.AsString());

return _dynamoDBContext.FromDocumentDynamic(dataE.AsDocument(), dataType);
return (_dynamoDBContext.FromDocumentDynamic(dataE.AsDocument(), dataType), dataE.AsLong());
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Persistence.Marten/MartenProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public MartenProvider(IDocumentStore store)
_store = store;
}

public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback)
public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback)
{
var session = _store.IdentitySession();
await using var _ = session.ConfigureAwait(false);
Expand All @@ -27,7 +27,7 @@ public async Task<long> GetEventsAsync(string actorName, long indexStart, long i

foreach (var @event in events)
{
callback(@event.Data);
callback(@event.Data, @event.Index);
}

return events.LastOrDefault()?.Index ?? -1;
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Persistence.MongoDB/MongoDBProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public MongoDBProvider(IMongoDatabase mongoDB)

private IMongoCollection<Snapshot> SnapshotCollection => _mongoDB.GetCollection<Snapshot>("snapshots");

public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback)
public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback)
{
var sort = Builders<Event>.Sort.Ascending("EventIndex");

Expand All @@ -36,7 +36,7 @@ public async Task<long> GetEventsAsync(string actorName, long indexStart, long i

foreach (var @event in events)
{
callback(@event.Data);
callback(@event.Data, @event.EventIndex);
}

return events.Any() ? events.Last().EventIndex : -1;
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Persistence.RavenDB/RavenDBProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public RavenDBProvider(IDocumentStore store)
SetupIndexes();
}

public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback)
public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback)
{
using var session = _store.OpenAsyncSession();

Expand All @@ -30,7 +30,7 @@ public async Task<long> GetEventsAsync(string actorName, long indexStart, long i

foreach (var @event in events)
{
callback(@event.Data);
callback(@event.Data, @event.Index);
}

return events.LastOrDefault()?.Index ?? -1;
Expand Down
4 changes: 2 additions & 2 deletions src/Proto.Persistence.SqlServer/SqlServerProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ await ExecuteNonQueryAsync(
CreateParameter("SnapshotIndex", BigInt, inclusiveToIndex)
);

public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback)
public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback)
{
await using var connection = new SqlConnection(_connectionString);

Expand All @@ -107,7 +107,7 @@ public async Task<long> GetEventsAsync(string actorName, long indexStart, long i
{
lastIndex = (long)eventReader["EventIndex"];

callback(JsonConvert.DeserializeObject<object>(eventReader["EventData"].ToString(), AutoTypeSettings));
callback(JsonConvert.DeserializeObject<object>(eventReader["EventData"].ToString(), AutoTypeSettings), lastIndex);
}

return lastIndex;
Expand Down
7 changes: 4 additions & 3 deletions src/Proto.Persistence.Sqlite/SqliteProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public async Task DeleteSnapshotsAsync(string actorName, long inclusiveToIndex)
await deleteCommand.ExecuteNonQueryAsync().ConfigureAwait(false);
}

public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback)
public async Task<long> GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback)
{
using var connection = new SqliteConnection(ConnectionString);

Expand All @@ -92,9 +92,10 @@ public async Task<long> GetEventsAsync(string actorName, long indexStart, long i

while (await reader.ReadAsync().ConfigureAwait(false))
{
indexes.Add(Convert.ToInt64(reader["EventIndex"]));
var index = Convert.ToInt64(reader["EventIndex"]);
indexes.Add(index);

callback(JsonConvert.DeserializeObject<object>(reader["EventData"].ToString(), AutoTypeSettings));
callback(JsonConvert.DeserializeObject<object>(reader["EventData"].ToString(), AutoTypeSettings), index);
}

return indexes.Any() ? indexes.LastOrDefault() : -1;
Expand Down
2 changes: 1 addition & 1 deletion src/Proto.Persistence/IProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public interface IEventStore
/// <param name="indexEnd">Index of the last event to get (inclusive)</param>
/// <param name="callback">A callback which should be called for each read event, in the order the events are stored</param>
/// <returns>Index of the last read event or -1 if none</returns>
Task<long> GetEventsAsync(string actorId, long indexStart, long indexEnd, Action<object> callback);
Task<long> GetEventsAsync(string actorId, long indexStart, long indexEnd, Action<object, long> callback);

/// <summary>
/// Writes an event to event stream of particular actor
Expand Down
14 changes: 7 additions & 7 deletions src/Proto.Persistence/Persistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,10 +230,10 @@ await _eventStore.GetEventsAsync(
_actorId,
fromEventIndex,
long.MaxValue,
@event =>
(@event, index) =>
{
Index++;
_applyEvent?.Invoke(new RecoverEvent(@event, Index));
Index = index;
_applyEvent?.Invoke(new RecoverEvent(@event, index));
}
).ConfigureAwait(false);
}
Expand All @@ -260,10 +260,10 @@ await _eventStore.GetEventsAsync(
_actorId,
fromIndex,
toIndex,
@event =>
(@event, index) =>
{
_applyEvent?.Invoke(new ReplayEvent(@event, Index));
Index++;
_applyEvent?.Invoke(new ReplayEvent(@event, index));
Index=index;
}
).ConfigureAwait(false);
}
Expand Down Expand Up @@ -339,7 +339,7 @@ private class ManualSnapshots : ISnapshotStrategy
private class NoEventStore : IEventStore
{
public Task<long>
GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object> callback) =>
GetEventsAsync(string actorName, long indexStart, long indexEnd, Action<object, long> callback) =>
Task.FromResult(-1L);

public Task<long> PersistEventAsync(string actorName, long index, object @event) => Task.FromResult(0L);
Expand Down
Loading