Skip to content

Commit

Permalink
Merge pull request #109 from dennisdoomen/EnsureExceptionPropagation
Browse files Browse the repository at this point in the history
Ensures that the memory event store doesn't shallow projection exceptions
  • Loading branch information
dennisdoomen authored Feb 8, 2018
2 parents d20605b + f366df5 commit d5f8366
Show file tree
Hide file tree
Showing 6 changed files with 302 additions and 225 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -181,4 +181,5 @@ Build/**
Tools/**

# Visual Studio
.vs/
**/.vs/**
.vs/**
231 changes: 11 additions & 220 deletions Src/LiquidProjections.Testing/MemoryEventSource.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Globalization;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using LiquidProjections.Abstractions;

Expand All @@ -15,7 +13,7 @@ namespace LiquidProjections.Testing
public class MemoryEventSource
{
private readonly int batchSize;
private readonly List<Subscription> subscriptions = new List<Subscription>();
private readonly List<MemorySubscription> subscriptions = new List<MemorySubscription>();
private readonly List<Transaction> history = new List<Transaction>();
private long lastHistoryCheckpoint;
private TaskCompletionSource<bool> historyGrowthTaskCompletionSource = new TaskCompletionSource<bool>();
Expand Down Expand Up @@ -80,7 +78,7 @@ public IDisposable Subscribe(long? lastProcessedCheckpoint, Subscriber subscribe
/// </returns>
public async Task<IDisposable> SubscribeAsync(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId)
{
Subscription subscription = SubscribeWithoutWaitingInternal(lastProcessedCheckpoint, subscriber, subscriptionId);
MemorySubscription subscription = SubscribeWithoutWaitingInternal(lastProcessedCheckpoint, subscriber, subscriptionId);

try
{
Expand Down Expand Up @@ -131,9 +129,9 @@ public IDisposable SubscribeWithoutWaiting(long? lastProcessedCheckpoint, Subscr
return SubscribeWithoutWaitingInternal(lastProcessedCheckpoint, subscriber, subscriptionId);
}

private Subscription SubscribeWithoutWaitingInternal(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId)
private MemorySubscription SubscribeWithoutWaitingInternal(long? lastProcessedCheckpoint, Subscriber subscriber, string subscriptionId)
{
var subscription = new Subscription(lastProcessedCheckpoint ?? 0, batchSize, subscriber, subscriptionId, this);
var subscription = new MemorySubscription(lastProcessedCheckpoint ?? 0, batchSize, subscriber, subscriptionId, this);

lock (syncRoot)
{
Expand Down Expand Up @@ -287,7 +285,7 @@ public Transaction WriteWithHeadersWithoutWaiting(object anEvent, IDictionary<st
/// <returns>A task that completes after all the subscriptions have processed the transactions.</returns>
public async Task WaitForAllSubscriptions()
{
List<Subscription> subscriptionsAtStart;
List<MemorySubscription> subscriptionsAtStart;
long checkpointAtStart;

lock (syncRoot)
Expand All @@ -296,7 +294,7 @@ public async Task WaitForAllSubscriptions()
checkpointAtStart = lastHistoryCheckpoint;
}

foreach (Subscription subscription in subscriptionsAtStart)
foreach (MemorySubscription subscription in subscriptionsAtStart)
{
try
{
Expand All @@ -319,20 +317,20 @@ public bool HasSubscriptionForId(string subscriptionId)
{
lock (syncRoot)
{
Subscription subscription = subscriptions.SingleOrDefault(aSubscription => aSubscription.Id == subscriptionId);
MemorySubscription subscription = subscriptions.SingleOrDefault(aSubscription => aSubscription.Id == subscriptionId);
return (subscription != null) && !subscription.IsDisposed;
}
}

private bool IsFutureCheckpoint(long checkpoint)
internal bool IsFutureCheckpoint(long checkpoint)
{
lock (syncRoot)
{
return checkpoint > lastHistoryCheckpoint;
}
}

private int GetNextTransactionIndex(long checkpoint)
internal int GetNextTransactionIndex(long checkpoint)
{
lock (syncRoot)
{
Expand All @@ -352,15 +350,15 @@ private int GetNextTransactionIndex(long checkpoint)
}
}

private Task WaitForNewTransactions()
internal Task WaitForNewTransactions()
{
lock (syncRoot)
{
return historyGrowthTaskCompletionSource.Task;
}
}

private Transaction[] GetTransactionsFromIndex(int startIndex)
internal Transaction[] GetTransactionsFromIndex(int startIndex)
{
lock (syncRoot)
{
Expand All @@ -370,212 +368,5 @@ private Transaction[] GetTransactionsFromIndex(int startIndex)
return result;
}
}

private class Subscription : IDisposable
{
private long lastProcessedCheckpoint;
private readonly int batchSize;
private readonly Subscriber subscriber;
private readonly MemoryEventSource memoryEventSource;
private bool isDisposed;
private CancellationTokenSource cancellationTokenSource;
private readonly object syncRoot = new object();
private Task task;
private TaskCompletionSource<long> progressCompletionSource = new TaskCompletionSource<long>();

private readonly TaskCompletionSource<bool> waitForCheckingWhetherItIsAheadCompletionSource =
new TaskCompletionSource<bool>();

public Subscription(long lastProcessedCheckpoint, int batchSize,
Subscriber subscriber, string subscriptionId, MemoryEventSource memoryEventSource)
{
this.lastProcessedCheckpoint = lastProcessedCheckpoint;
this.batchSize = batchSize;
this.subscriber = subscriber;
Id = subscriptionId;
this.memoryEventSource = memoryEventSource;
}

public void Start()
{
if (task != null)
{
throw new InvalidOperationException("Already started.");
}

lock (syncRoot)
{
if (isDisposed)
{
throw new ObjectDisposedException(nameof(Subscription));
}

cancellationTokenSource = new CancellationTokenSource();

SubscriptionInfo info = new SubscriptionInfo
{
Id = Id,
Subscription = this,
CancellationToken = cancellationTokenSource.Token
};

task = Task.Factory.StartNew(
async () =>
{
try
{
await RunAsync(info).ConfigureAwait(false);
}
catch (Exception)
{
Dispose();
}
},
CancellationToken.None,
TaskCreationOptions.DenyChildAttach | TaskCreationOptions.LongRunning,
TaskScheduler.Default)
.Unwrap();
}
}

private async Task RunAsync(SubscriptionInfo info)
{
if (IsDisposed)
{
return;
}

long oldLastProcessedCheckpoint;

lock (syncRoot)
{
oldLastProcessedCheckpoint = lastProcessedCheckpoint;
}

if (memoryEventSource.IsFutureCheckpoint(oldLastProcessedCheckpoint))
{
await subscriber.NoSuchCheckpoint(info).ConfigureAwait(false);
}

#pragma warning disable 4014
// Run continuations asynchronously.
Task.Run(() => waitForCheckingWhetherItIsAheadCompletionSource.TrySetResult(false));
#pragma warning restore 4014

int nextTransactionIndex = memoryEventSource.GetNextTransactionIndex(oldLastProcessedCheckpoint);

while (!IsDisposed)
{
Task waitForNewTransactions = memoryEventSource.WaitForNewTransactions();
Transaction[] transactions = memoryEventSource.GetTransactionsFromIndex(nextTransactionIndex);

Transaction[] requestedTransactions = transactions
.Where(transaction => transaction.Checkpoint > oldLastProcessedCheckpoint)
.ToArray();

foreach (IList<Transaction> batch in requestedTransactions.InBatchesOf(batchSize))
{
await subscriber.HandleTransactions(new ReadOnlyCollection<Transaction>(batch.ToList()), info)
.ConfigureAwait(false);
}

if (requestedTransactions.Any())
{
lock (syncRoot)
{
lastProcessedCheckpoint = requestedTransactions[requestedTransactions.Length - 1].Checkpoint;

if (!isDisposed)
{
TaskCompletionSource<long> oldProgressCompletionSource = progressCompletionSource;
progressCompletionSource = new TaskCompletionSource<long>();

#pragma warning disable 4014
// Run continuations asynchronously.
Task.Run(() => oldProgressCompletionSource.SetResult(lastProcessedCheckpoint));
#pragma warning restore 4014
}
}
}

nextTransactionIndex += transactions.Length;

await waitForNewTransactions
.WithWaitCancellation(cancellationTokenSource.Token)
.ConfigureAwait(false);
}
}

public void Dispose()
{
lock (syncRoot)
{
if (!isDisposed)
{
isDisposed = true;

progressCompletionSource.SetCanceled();
waitForCheckingWhetherItIsAheadCompletionSource.TrySetCanceled();

if (cancellationTokenSource != null)
{
try
{
cancellationTokenSource.Cancel();
}
catch (AggregateException)
{
// Ignore.
}

if (task == null)
{
cancellationTokenSource.Dispose();
}
else
{
// Run continuations and wait for the subscription task asynchronously.
task.ContinueWith(_ => cancellationTokenSource.Dispose());
}
}
}
}
}

public bool IsDisposed
{
get
{
lock (syncRoot)
{
return isDisposed;
}
}
}

public string Id { get; }

public async Task WaitUntilCheckpoint(long checkpoint)
{
while (true)
{
Task progressTask;

lock (syncRoot)
{
progressTask = progressCompletionSource.Task;

if (lastProcessedCheckpoint >= checkpoint)
{
return;
}
}

await progressTask.ConfigureAwait(false);
}
}

public Task WaitForCheckingWhetherItIsAhead() => waitForCheckingWhetherItIsAheadCompletionSource.Task;
}
}
}
Loading

0 comments on commit d5f8366

Please sign in to comment.