Skip to content

Commit

Permalink
Fixed deadlock issues with the non-Async methods of AsyncLock, AsyncW…
Browse files Browse the repository at this point in the history
…aitHandle, AsyncManualResetEvent, and AsyncAutoResetEvent.

Improved unit tests of threading-related types.
Upgraded xunit to v2.1.0 and disabled test parallelization.
  • Loading branch information
csdahlberg committed Oct 8, 2015
1 parent 3fa6d6c commit a2e23cf
Show file tree
Hide file tree
Showing 17 changed files with 1,936 additions and 856 deletions.
1 change: 0 additions & 1 deletion CodeTiger.Core/CodeTiger.Core.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@
<Link>CodeTigerLib.snk</Link>
</None>
<None Include="CodeTiger.Core.nuspec" />
<None Include="packages.config" />
</ItemGroup>
<ItemGroup>
<Reference Include="System" />
Expand Down
32 changes: 32 additions & 0 deletions CodeTiger.Core/Threading/AsyncAutoResetEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,38 @@ public void Reset()
Interlocked.Exchange(ref _isSignaled, 0);
}

/// <summary>
/// Gets a <see cref="TaskCompletionSource{Boolean}"/> to use for a new wait operation.
/// </summary>
/// <returns>A <see cref="TaskCompletionSource{Boolean}"/> to use for a new wait operation.</returns>
protected override TaskCompletionSource<bool> GetWaitTaskSource(
CancellationToken cancellationToken)
{
if (Interlocked.CompareExchange(ref _isSignaled, 0, 1) == 1)
{
return CompletedWaitTaskSource;
}

TaskCompletionSource<bool> waitTaskSource;

using (_pendingWaitTaskSourcesLock.Acquire(cancellationToken))
{
// If this event is already signaled, return the already-completed wait task.
if (Interlocked.CompareExchange(ref _isSignaled, 0, 1) == 1)
{
waitTaskSource = CompletedWaitTaskSource;
}
else
{
waitTaskSource = new TaskCompletionSource<bool>();
cancellationToken.Register(() => waitTaskSource.TrySetCanceled());
_pendingWaitTaskSources.Enqueue(waitTaskSource);
}
}

return waitTaskSource;
}

/// <summary>
/// Gets a <see cref="TaskCompletionSource{Boolean}"/> to use for a new wait operation.
/// </summary>
Expand Down
79 changes: 48 additions & 31 deletions CodeTiger.Core/Threading/AsyncLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public AsyncLock()
/// </returns>
public IDisposable Acquire()
{
return GetWaitTask(CancellationToken.None).ConfigureAwait(false).GetAwaiter().GetResult();
return Acquire(CancellationToken.None);
}

/// <summary>
Expand All @@ -48,7 +48,30 @@ public IDisposable Acquire()
/// </returns>
public IDisposable Acquire(CancellationToken cancellationToken)
{
return GetWaitTask(cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult();
if (Interlocked.CompareExchange(ref _acquiredCount, 1, 0) == 0)
{
return _releaser;
}

TaskCompletionSource<IDisposable> waitTaskSource;

lock (_pendingWaitTaskSources)
{
if (Interlocked.CompareExchange(ref _acquiredCount, 1, 0) == 0)
{
return _releaser;
}

waitTaskSource = new TaskCompletionSource<IDisposable>();
_pendingWaitTaskSources.Enqueue(waitTaskSource);
}

if (cancellationToken.CanBeCanceled)
{
var cancellationRegistration = cancellationToken.Register(() => waitTaskSource.TrySetCanceled());
}

return Task.Run(() => waitTaskSource.Task).Result;
}

/// <summary>
Expand All @@ -58,7 +81,7 @@ public IDisposable Acquire(CancellationToken cancellationToken)
/// </returns>
public Task<IDisposable> AcquireAsync()
{
return GetWaitTask(CancellationToken.None);
return AcquireAsync(CancellationToken.None);
}

/// <summary>
Expand All @@ -68,50 +91,44 @@ public Task<IDisposable> AcquireAsync()
/// <returns>An <see cref="IDisposable"/> object that must be disposed to release the acquired lock.
/// </returns>
public Task<IDisposable> AcquireAsync(CancellationToken cancellationToken)
{
return GetWaitTask(cancellationToken);
}

private Task<IDisposable> GetWaitTask(CancellationToken cancellationToken)
{
if (Interlocked.CompareExchange(ref _acquiredCount, 1, 0) == 0)
{
return _completedWaitTask;
}

TaskCompletionSource<IDisposable> waitTaskSource;

lock (_pendingWaitTaskSources)
{
if (Interlocked.CompareExchange(ref _acquiredCount, 1, 0) == 0)
{
return _completedWaitTask;
}

var waitTaskSource = new TaskCompletionSource<IDisposable>();
var waitTask = waitTaskSource.Task;

if (cancellationToken != CancellationToken.None)
{
var cancellationRegistration = cancellationToken
.Register(() => waitTaskSource.TrySetCanceled());

waitTask = waitTask
.ContinueWith(task =>
{
if (waitTask.IsCanceled)
{
cancellationRegistration.Dispose();
}

return task;
},
TaskContinuationOptions.ExecuteSynchronously)
.Unwrap();
}

waitTaskSource = new TaskCompletionSource<IDisposable>();
_pendingWaitTaskSources.Enqueue(waitTaskSource);
}

return waitTask;
if (!cancellationToken.CanBeCanceled)
{
return waitTaskSource.Task;
}

var cancellationRegistration = cancellationToken.Register(() => waitTaskSource.TrySetCanceled());

return waitTaskSource.Task
.ContinueWith(task =>
{
if (waitTaskSource.Task.IsCanceled)
{
cancellationRegistration.Dispose();
}

return task;
},
TaskContinuationOptions.ExecuteSynchronously)
.Unwrap();
}

private void ReleaseLock()
Expand Down
11 changes: 11 additions & 0 deletions CodeTiger.Core/Threading/AsyncManualResetEvent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,17 @@ public void Reset()
#pragma warning restore 0420
}

/// <summary>
/// Gets a <see cref="TaskCompletionSource{Boolean}"/> to use for a new wait operation.
/// </summary>
/// <param name="cancellationToken">A cancellation token to observe.</param>
/// <returns>A <see cref="TaskCompletionSource{Boolean}"/> to use for a new wait operation.</returns>
protected override TaskCompletionSource<bool> GetWaitTaskSource(
CancellationToken cancellationToken)
{
return _waitTaskSource;
}

/// <summary>
/// Gets a <see cref="TaskCompletionSource{Boolean}"/> to use for a new wait operation.
/// </summary>
Expand Down
79 changes: 31 additions & 48 deletions CodeTiger.Core/Threading/AsyncWaitHandle.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Diagnostics.CodeAnalysis;
using System.Threading;
using System.Threading.Tasks;
using CodeTiger.Threading.Tasks;

namespace CodeTiger.Threading
{
Expand Down Expand Up @@ -89,45 +90,24 @@ public bool WaitOne(int timeoutMilliseconds, CancellationToken cancellationToken
/// </returns>
public bool WaitOne(TimeSpan timeout, CancellationToken cancellationToken)
{
var waitTaskSource = GetWaitTaskSourceAsync(cancellationToken).GetAwaiter().GetResult();
var waitTaskSource = GetWaitTaskSource(cancellationToken);

if (cancellationToken != CancellationToken.None)
if (cancellationToken.CanBeCanceled)
{
// Have the cancellation token attempt to cancel the wait task.
cancellationToken.Register(() => waitTaskSource.TrySetCanceled());
}

if (timeout != Timeout.InfiniteTimeSpan)
{
using (var compositeCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken))
{
var timeoutTask = Task.Delay(timeout, compositeCancellationSource.Token);
var completedTask = Task.WhenAny(waitTaskSource.Task, timeoutTask).GetAwaiter().GetResult();

if (completedTask == timeoutTask)
{
waitTaskSource.TrySetResult(false);
}
else
{
// If the timeout task has not yet completed, use the composite cancellation token to
// cancel it so it will not continue running in the background.
compositeCancellationSource.Cancel();
}
}
}

return waitTaskSource.Task.GetAwaiter().GetResult();
return Task.Run(() => waitTaskSource.Task.Wait(timeout, cancellationToken)).Result;
}

/// <summary>
/// Asynchronously waits until this event is signaled.
/// </summary>
/// <returns>A <see cref="Task"/> that will complete when the event is signaled.</returns>
public async Task WaitOneAsync()
public Task WaitOneAsync()
{
await WaitOneAsync(Timeout.InfiniteTimeSpan, CancellationToken.None).ConfigureAwait(false);
return WaitOneAsync(Timeout.InfiniteTimeSpan, CancellationToken.None);
}

/// <summary>
Expand All @@ -137,10 +117,9 @@ public async Task WaitOneAsync()
/// <see cref="Timeout.Infinite"/> (negative 1) to wait indefinitely.</param>
/// <returns>A <see cref="Task{Boolean}"/> that will complete with a result of <c>true</c> if this wait
/// handle was signaled before the timeout elapsed; otherwise, with a result of <c>false</c>.</returns>
public async Task<bool> WaitOneAsync(int timeoutMilliseconds)
public Task<bool> WaitOneAsync(int timeoutMilliseconds)
{
return await WaitOneAsync(TimeSpan.FromMilliseconds(timeoutMilliseconds), CancellationToken.None)
.ConfigureAwait(false);
return WaitOneAsync(TimeSpan.FromMilliseconds(timeoutMilliseconds), CancellationToken.None);
}

/// <summary>
Expand All @@ -150,9 +129,9 @@ public async Task<bool> WaitOneAsync(int timeoutMilliseconds)
/// milliseconds) to wait indefinitely.</param>
/// <returns>A <see cref="Task{Boolean}"/> that will complete with a result of <c>true</c> if this wait
/// handle was signaled before the timeout elapsed; otherwise, with a result of <c>false</c>.</returns>
public async Task<bool> WaitOneAsync(TimeSpan timeout)
public Task<bool> WaitOneAsync(TimeSpan timeout)
{
return await WaitOneAsync(timeout, CancellationToken.None).ConfigureAwait(false);
return WaitOneAsync(timeout, CancellationToken.None);
}

/// <summary>
Expand All @@ -161,9 +140,9 @@ public async Task<bool> WaitOneAsync(TimeSpan timeout)
/// <param name="cancellationToken">A cancellation token to observe.</param>
/// <returns>A <see cref="Task{Boolean}"/> that will complete with a result of <c>true</c> if this wait
/// handle was signaled before the timeout elapsed; otherwise, with a result of <c>false</c>.</returns>
public async Task<bool> WaitOneAsync(CancellationToken cancellationToken)
public Task<bool> WaitOneAsync(CancellationToken cancellationToken)
{
return await WaitOneAsync(Timeout.InfiniteTimeSpan, cancellationToken).ConfigureAwait(false);
return WaitOneAsync(Timeout.InfiniteTimeSpan, cancellationToken);
}

/// <summary>
Expand All @@ -175,10 +154,9 @@ public async Task<bool> WaitOneAsync(CancellationToken cancellationToken)
/// <param name="cancellationToken">A cancellation token to observe.</param>
/// <returns>A <see cref="Task{Boolean}"/> that will complete with a result of <c>true</c> if this wait
/// handle was signaled before the timeout elapsed; otherwise, with a result of <c>false</c>.</returns>
public async Task<bool> WaitOneAsync(int timeoutMilliseconds, CancellationToken cancellationToken)
public Task<bool> WaitOneAsync(int timeoutMilliseconds, CancellationToken cancellationToken)
{
return await WaitOneAsync(TimeSpan.FromMilliseconds(timeoutMilliseconds), cancellationToken)
.ConfigureAwait(false);
return WaitOneAsync(TimeSpan.FromMilliseconds(timeoutMilliseconds), cancellationToken);
}

/// <summary>
Expand All @@ -194,26 +172,22 @@ public async Task<bool> WaitOneAsync(TimeSpan timeout, CancellationToken cancell
{
var waitTaskSource = await GetWaitTaskSourceAsync(cancellationToken).ConfigureAwait(false);

if (cancellationToken != CancellationToken.None)
if (cancellationToken.CanBeCanceled)
{
// Have the cancellation token attempt to cancel the wait task.
cancellationToken.Register(() => waitTaskSource.TrySetCanceled());
}

Task completedTask;
Task timeoutTask = null;
if (timeout == Timeout.InfiniteTimeSpan)
{
return await waitTaskSource.Task.ConfigureAwait(false);
}

using (var compositeCancellationSource = CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken))
{
if (timeout != Timeout.InfiniteTimeSpan)
{
timeoutTask = Task.Delay(timeout, compositeCancellationSource.Token);
completedTask = await Task.WhenAny(waitTaskSource.Task, timeoutTask).ConfigureAwait(false);
}
else
{
completedTask = waitTaskSource.Task;
}
var timeoutTask = Task.Delay(timeout, compositeCancellationSource.Token);
var completedTask = await Task.WhenAny(waitTaskSource.Task, timeoutTask).ConfigureAwait(false);

if (completedTask == timeoutTask)
{
Expand All @@ -230,6 +204,15 @@ public async Task<bool> WaitOneAsync(TimeSpan timeout, CancellationToken cancell
return await waitTaskSource.Task.ConfigureAwait(false);
}

/// <summary>
/// Gets a <see cref="TaskCompletionSource{Boolean}"/> object to use for a new wait operation.
/// </summary>
/// <param name="cancellationToken">A cancellation token to observe.</param>
/// <returns>A <see cref="TaskCompletionSource{Boolean}"/> object to use for a new wait operation.
/// </returns>
protected abstract TaskCompletionSource<bool> GetWaitTaskSource(
CancellationToken cancellationToken);

/// <summary>
/// Gets a <see cref="TaskCompletionSource{Boolean}"/> object to use for a new wait operation.
/// </summary>
Expand Down
Loading

0 comments on commit a2e23cf

Please sign in to comment.