diff --git a/.gitignore b/.gitignore index 8226c8f..7c373b6 100644 --- a/.gitignore +++ b/.gitignore @@ -181,4 +181,5 @@ Build/** Tools/** # Visual Studio -.vs/ +**/.vs/** +.vs/** diff --git a/Src/LiquidProjections.Testing/MemoryEventSource.cs b/Src/LiquidProjections.Testing/MemoryEventSource.cs index 9eb6aa8..295528a 100644 --- a/Src/LiquidProjections.Testing/MemoryEventSource.cs +++ b/Src/LiquidProjections.Testing/MemoryEventSource.cs @@ -1,9 +1,7 @@ using System; using System.Collections.Generic; -using System.Collections.ObjectModel; using System.Globalization; using System.Linq; -using System.Threading; using System.Threading.Tasks; using LiquidProjections.Abstractions; @@ -15,7 +13,7 @@ namespace LiquidProjections.Testing public class MemoryEventSource { private readonly int batchSize; - private readonly List subscriptions = new List(); + private readonly List subscriptions = new List(); private readonly List history = new List(); private long lastHistoryCheckpoint; private TaskCompletionSource historyGrowthTaskCompletionSource = new TaskCompletionSource(); @@ -80,7 +78,7 @@ public IDisposable Subscribe(long? lastProcessedCheckpoint, Subscriber subscribe /// public async Task SubscribeAsync(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId) { - Subscription subscription = SubscribeWithoutWaitingInternal(lastProcessedCheckpoint, subscriber, subscriptionId); + MemorySubscription subscription = SubscribeWithoutWaitingInternal(lastProcessedCheckpoint, subscriber, subscriptionId); try { @@ -131,9 +129,9 @@ public IDisposable SubscribeWithoutWaiting(long? lastProcessedCheckpoint, Subscr return SubscribeWithoutWaitingInternal(lastProcessedCheckpoint, subscriber, subscriptionId); } - private Subscription SubscribeWithoutWaitingInternal(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId) + private MemorySubscription SubscribeWithoutWaitingInternal(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId) { - var subscription = new Subscription(lastProcessedCheckpoint ?? 0, batchSize, subscriber, subscriptionId, this); + var subscription = new MemorySubscription(lastProcessedCheckpoint ?? 0, batchSize, subscriber, subscriptionId, this); lock (syncRoot) { @@ -287,7 +285,7 @@ public Transaction WriteWithHeadersWithoutWaiting(object anEvent, IDictionaryA task that completes after all the subscriptions have processed the transactions. public async Task WaitForAllSubscriptions() { - List subscriptionsAtStart; + List subscriptionsAtStart; long checkpointAtStart; lock (syncRoot) @@ -296,7 +294,7 @@ public async Task WaitForAllSubscriptions() checkpointAtStart = lastHistoryCheckpoint; } - foreach (Subscription subscription in subscriptionsAtStart) + foreach (MemorySubscription subscription in subscriptionsAtStart) { try { @@ -319,12 +317,12 @@ public bool HasSubscriptionForId(string subscriptionId) { lock (syncRoot) { - Subscription subscription = subscriptions.SingleOrDefault(aSubscription => aSubscription.Id == subscriptionId); + MemorySubscription subscription = subscriptions.SingleOrDefault(aSubscription => aSubscription.Id == subscriptionId); return (subscription != null) && !subscription.IsDisposed; } } - private bool IsFutureCheckpoint(long checkpoint) + internal bool IsFutureCheckpoint(long checkpoint) { lock (syncRoot) { @@ -332,7 +330,7 @@ private bool IsFutureCheckpoint(long checkpoint) } } - private int GetNextTransactionIndex(long checkpoint) + internal int GetNextTransactionIndex(long checkpoint) { lock (syncRoot) { @@ -352,7 +350,7 @@ private int GetNextTransactionIndex(long checkpoint) } } - private Task WaitForNewTransactions() + internal Task WaitForNewTransactions() { lock (syncRoot) { @@ -360,7 +358,7 @@ private Task WaitForNewTransactions() } } - private Transaction[] GetTransactionsFromIndex(int startIndex) + internal Transaction[] GetTransactionsFromIndex(int startIndex) { lock (syncRoot) { @@ -370,212 +368,5 @@ private Transaction[] GetTransactionsFromIndex(int startIndex) return result; } } - - private class Subscription : IDisposable - { - private long lastProcessedCheckpoint; - private readonly int batchSize; - private readonly Subscriber subscriber; - private readonly MemoryEventSource memoryEventSource; - private bool isDisposed; - private CancellationTokenSource cancellationTokenSource; - private readonly object syncRoot = new object(); - private Task task; - private TaskCompletionSource progressCompletionSource = new TaskCompletionSource(); - - private readonly TaskCompletionSource waitForCheckingWhetherItIsAheadCompletionSource = - new TaskCompletionSource(); - - public Subscription(long lastProcessedCheckpoint, int batchSize, - Subscriber subscriber, string subscriptionId, MemoryEventSource memoryEventSource) - { - this.lastProcessedCheckpoint = lastProcessedCheckpoint; - this.batchSize = batchSize; - this.subscriber = subscriber; - Id = subscriptionId; - this.memoryEventSource = memoryEventSource; - } - - public void Start() - { - if (task != null) - { - throw new InvalidOperationException("Already started."); - } - - lock (syncRoot) - { - if (isDisposed) - { - throw new ObjectDisposedException(nameof(Subscription)); - } - - cancellationTokenSource = new CancellationTokenSource(); - - SubscriptionInfo info = new SubscriptionInfo - { - Id = Id, - Subscription = this, - CancellationToken = cancellationTokenSource.Token - }; - - task = Task.Factory.StartNew( - async () => - { - try - { - await RunAsync(info).ConfigureAwait(false); - } - catch (Exception) - { - Dispose(); - } - }, - CancellationToken.None, - TaskCreationOptions.DenyChildAttach | TaskCreationOptions.LongRunning, - TaskScheduler.Default) - .Unwrap(); - } - } - - private async Task RunAsync(SubscriptionInfo info) - { - if (IsDisposed) - { - return; - } - - long oldLastProcessedCheckpoint; - - lock (syncRoot) - { - oldLastProcessedCheckpoint = lastProcessedCheckpoint; - } - - if (memoryEventSource.IsFutureCheckpoint(oldLastProcessedCheckpoint)) - { - await subscriber.NoSuchCheckpoint(info).ConfigureAwait(false); - } - -#pragma warning disable 4014 - // Run continuations asynchronously. - Task.Run(() => waitForCheckingWhetherItIsAheadCompletionSource.TrySetResult(false)); -#pragma warning restore 4014 - - int nextTransactionIndex = memoryEventSource.GetNextTransactionIndex(oldLastProcessedCheckpoint); - - while (!IsDisposed) - { - Task waitForNewTransactions = memoryEventSource.WaitForNewTransactions(); - Transaction[] transactions = memoryEventSource.GetTransactionsFromIndex(nextTransactionIndex); - - Transaction[] requestedTransactions = transactions - .Where(transaction => transaction.Checkpoint > oldLastProcessedCheckpoint) - .ToArray(); - - foreach (IList batch in requestedTransactions.InBatchesOf(batchSize)) - { - await subscriber.HandleTransactions(new ReadOnlyCollection(batch.ToList()), info) - .ConfigureAwait(false); - } - - if (requestedTransactions.Any()) - { - lock (syncRoot) - { - lastProcessedCheckpoint = requestedTransactions[requestedTransactions.Length - 1].Checkpoint; - - if (!isDisposed) - { - TaskCompletionSource oldProgressCompletionSource = progressCompletionSource; - progressCompletionSource = new TaskCompletionSource(); - -#pragma warning disable 4014 - // Run continuations asynchronously. - Task.Run(() => oldProgressCompletionSource.SetResult(lastProcessedCheckpoint)); -#pragma warning restore 4014 - } - } - } - - nextTransactionIndex += transactions.Length; - - await waitForNewTransactions - .WithWaitCancellation(cancellationTokenSource.Token) - .ConfigureAwait(false); - } - } - - public void Dispose() - { - lock (syncRoot) - { - if (!isDisposed) - { - isDisposed = true; - - progressCompletionSource.SetCanceled(); - waitForCheckingWhetherItIsAheadCompletionSource.TrySetCanceled(); - - if (cancellationTokenSource != null) - { - try - { - cancellationTokenSource.Cancel(); - } - catch (AggregateException) - { - // Ignore. - } - - if (task == null) - { - cancellationTokenSource.Dispose(); - } - else - { - // Run continuations and wait for the subscription task asynchronously. - task.ContinueWith(_ => cancellationTokenSource.Dispose()); - } - } - } - } - } - - public bool IsDisposed - { - get - { - lock (syncRoot) - { - return isDisposed; - } - } - } - - public string Id { get; } - - public async Task WaitUntilCheckpoint(long checkpoint) - { - while (true) - { - Task progressTask; - - lock (syncRoot) - { - progressTask = progressCompletionSource.Task; - - if (lastProcessedCheckpoint >= checkpoint) - { - return; - } - } - - await progressTask.ConfigureAwait(false); - } - } - - public Task WaitForCheckingWhetherItIsAhead() => waitForCheckingWhetherItIsAheadCompletionSource.Task; - } } } diff --git a/Src/LiquidProjections.Testing/MemorySubscription.cs b/Src/LiquidProjections.Testing/MemorySubscription.cs new file mode 100644 index 0000000..9248b21 --- /dev/null +++ b/Src/LiquidProjections.Testing/MemorySubscription.cs @@ -0,0 +1,237 @@ +using System; +using System.Collections.Generic; +using System.Collections.ObjectModel; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using LiquidProjections.Abstractions; + +namespace LiquidProjections.Testing +{ + public class MemorySubscription : IDisposable + { + private long lastProcessedCheckpoint; + private readonly int batchSize; + private readonly Subscriber subscriber; + private readonly MemoryEventSource memoryEventSource; + private bool isDisposed; + private CancellationTokenSource cancellationTokenSource; + private readonly object syncRoot = new object(); + private Task task; + + // Returns the last exception that caused the subscription to abort or null. + private Exception exception; + private TaskCompletionSource progressCompletionSource = new TaskCompletionSource(); + + private readonly TaskCompletionSource waitForCheckingWhetherItIsAheadCompletionSource = + new TaskCompletionSource(); + + public MemorySubscription(long lastProcessedCheckpoint, int batchSize, + Subscriber subscriber, string subscriptionId, MemoryEventSource memoryEventSource) + { + this.lastProcessedCheckpoint = lastProcessedCheckpoint; + this.batchSize = batchSize; + this.subscriber = subscriber; + Id = subscriptionId; + this.memoryEventSource = memoryEventSource; + } + + public void Start() + { + if (task != null) + { + throw new InvalidOperationException("Already started."); + } + + lock (syncRoot) + { + if (isDisposed) + { + throw new ObjectDisposedException(nameof(MemorySubscription)); + } + + cancellationTokenSource = new CancellationTokenSource(); + + SubscriptionInfo info = new SubscriptionInfo + { + Id = Id, + Subscription = this, + CancellationToken = cancellationTokenSource.Token + }; + + task = Task.Factory.StartNew( + async () => + { + try + { + await RunAsync(info).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + Dispose(); + } + catch (Exception exc) + { + exception = exc; + Dispose(); + } + }, + CancellationToken.None, + TaskCreationOptions.DenyChildAttach | TaskCreationOptions.LongRunning, + TaskScheduler.Default) + .Unwrap(); + } + } + + private async Task RunAsync(SubscriptionInfo info) + { + if (IsDisposed) + { + return; + } + + long oldLastProcessedCheckpoint; + + lock (syncRoot) + { + oldLastProcessedCheckpoint = lastProcessedCheckpoint; + } + + if (memoryEventSource.IsFutureCheckpoint(oldLastProcessedCheckpoint)) + { + await subscriber.NoSuchCheckpoint(info).ConfigureAwait(false); + } + +#pragma warning disable 4014 + // Run continuations asynchronously. + Task.Run(() => waitForCheckingWhetherItIsAheadCompletionSource.TrySetResult(false)); +#pragma warning restore 4014 + + int nextTransactionIndex = memoryEventSource.GetNextTransactionIndex(oldLastProcessedCheckpoint); + + while (!IsDisposed) + { + Task waitForNewTransactions = memoryEventSource.WaitForNewTransactions(); + Transaction[] transactions = memoryEventSource.GetTransactionsFromIndex(nextTransactionIndex); + + Transaction[] requestedTransactions = transactions + .Where(transaction => transaction.Checkpoint > oldLastProcessedCheckpoint) + .ToArray(); + + foreach (IList batch in requestedTransactions.InBatchesOf(batchSize)) + { + await subscriber.HandleTransactions(new ReadOnlyCollection(batch.ToList()), info) + .ConfigureAwait(false); + } + + if (requestedTransactions.Any()) + { + lock (syncRoot) + { + lastProcessedCheckpoint = requestedTransactions[requestedTransactions.Length - 1].Checkpoint; + + if (!isDisposed) + { + TaskCompletionSource oldProgressCompletionSource = progressCompletionSource; + progressCompletionSource = new TaskCompletionSource(); + +#pragma warning disable 4014 + // Run continuations asynchronously. + Task.Run(() => oldProgressCompletionSource.SetResult(lastProcessedCheckpoint)); +#pragma warning restore 4014 + } + } + } + + nextTransactionIndex += transactions.Length; + + await waitForNewTransactions + .WithWaitCancellation(cancellationTokenSource.Token) + .ConfigureAwait(false); + } + } + + public void Dispose() + { + lock (syncRoot) + { + if (!isDisposed) + { + isDisposed = true; + + progressCompletionSource.SetCanceled(); + waitForCheckingWhetherItIsAheadCompletionSource.TrySetCanceled(); + + if (cancellationTokenSource != null) + { + try + { + cancellationTokenSource.Cancel(); + } + catch (AggregateException) + { + // Ignore. + } + + if (task == null) + { + cancellationTokenSource.Dispose(); + } + else + { + // Run continuations and wait for the subscription task asynchronously. + task.ContinueWith(_ => cancellationTokenSource.Dispose()); + } + } + } + } + } + + public bool IsDisposed + { + get + { + lock (syncRoot) + { + return isDisposed; + } + } + } + + public string Id { get; } + + public async Task WaitUntilCheckpoint(long checkpoint) + { + while (true) + { + Task progressTask; + + lock (syncRoot) + { + progressTask = progressCompletionSource.Task; + + if (lastProcessedCheckpoint >= checkpoint) + { + return; + } + } + + try + { + await progressTask.ConfigureAwait(false); + } + catch (OperationCanceledException) + { + break; + } + } + + if (exception != null) + { + throw new AggregateException(exception); + } + } + + public Task WaitForCheckingWhetherItIsAhead() => waitForCheckingWhetherItIsAheadCompletionSource.Task; + } +} \ No newline at end of file diff --git a/Tests/LiquidProjections.Specs/DispatcherSpecs.cs b/Tests/LiquidProjections.Specs/DispatcherSpecs.cs index 12a0267..7009efd 100644 --- a/Tests/LiquidProjections.Specs/DispatcherSpecs.cs +++ b/Tests/LiquidProjections.Specs/DispatcherSpecs.cs @@ -25,11 +25,11 @@ public When_a_projector_throws_an_exception() { Given(() => { + LogProvider.SetCurrentLogProvider(UseThe(new FakeLogProvider())); + UseThe(new MemoryEventSource()); WithSubject(_ => new Dispatcher(The().Subscribe)); - LogProvider.SetCurrentLogProvider(UseThe(new FakeLogProvider())); - UseThe(new ProjectionException("Some message.")); Subject.Subscribe(null, (transaction, info) => diff --git a/Tests/LiquidProjections.Specs/LiquidProjections.Specs.csproj b/Tests/LiquidProjections.Specs/LiquidProjections.Specs.csproj index 5dbc9cb..53fe6b9 100644 --- a/Tests/LiquidProjections.Specs/LiquidProjections.Specs.csproj +++ b/Tests/LiquidProjections.Specs/LiquidProjections.Specs.csproj @@ -21,7 +21,7 @@ DEBUG;TRACE prompt 4 - 6 + 7 true @@ -31,7 +31,7 @@ TRACE prompt 4 - 6 + 7 true @@ -83,6 +83,7 @@ + diff --git a/Tests/LiquidProjections.Specs/MemoryEventSourceSpecs.cs b/Tests/LiquidProjections.Specs/MemoryEventSourceSpecs.cs new file mode 100644 index 0000000..5a80eb4 --- /dev/null +++ b/Tests/LiquidProjections.Specs/MemoryEventSourceSpecs.cs @@ -0,0 +1,47 @@ +using System; +using System.Collections.Generic; +using Chill; +using FluentAssertions; +using LiquidProjections.Testing; +using Xunit; + +namespace LiquidProjections.Specs +{ + namespace MemoryEventSourceSpecs + { + public class When_a_subscriber_throws_an_exception_that_the_dispatcher_rethrows : GivenSubject + { + public When_a_subscriber_throws_an_exception_that_the_dispatcher_rethrows() + { + Given(() => + { + UseThe(new MemoryEventSource()); + WithSubject(_ => new Dispatcher(The().Subscribe) + { + ExceptionHandler = (exception, attempts, info) => throw exception + }); + + Subject.Subscribe(null, (transaction, info) => + { + throw new ArgumentException(); + }); + }); + + WhenLater(() => + { + return The().Write(new List + { + new Transaction() + }); + }); + } + + [Fact] + public void Then_the_exception_should_bubble_up_through_the_memory_event_source() + { + WhenAction.Should().Throw(); + } + } + + } +} \ No newline at end of file