Skip to content

Commit

Permalink
Squash commit
Browse files Browse the repository at this point in the history
  • Loading branch information
David García Vives committed Mar 3, 2021
1 parent 0d13c41 commit f0336bf
Show file tree
Hide file tree
Showing 12 changed files with 1,367 additions and 286 deletions.
1 change: 1 addition & 0 deletions src/Docker.DotNet/Docker.DotNet.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
<ItemGroup>
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="System.Buffers" Version="4.5.1" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
</ItemGroup>
</Project>
20 changes: 10 additions & 10 deletions src/Docker.DotNet/DockerClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections.Generic;
using System.IO;
using System.IO.Pipes;
Expand Down Expand Up @@ -346,22 +346,22 @@ private async Task<HttpResponseMessage> PrivateMakeRequestAsync(
IRequestContent data,
CancellationToken cancellationToken)
{
// If there is a timeout, we turn it into a cancellation token. At the same time, we need to link to the caller's
// cancellation token. To avoid leaking objects, we must then also dispose of the CancellationTokenSource. To keep
// code flow simple, we treat it as re-entering the same method with a different CancellationToken and no timeout.
var request = PrepareRequest(method, path, queryString, headers, data);

if (timeout != s_InfiniteTimeout)
{
using (var timeoutTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
{
timeoutTokenSource.CancelAfter(timeout);

// We must await here because we need to dispose of the CTS only after the work has been completed.
return await PrivateMakeRequestAsync(s_InfiniteTimeout, completionOption, method, path, queryString, headers, data, timeoutTokenSource.Token).ConfigureAwait(false);
return await _client.SendAsync(request, completionOption, timeoutTokenSource.Token).ConfigureAwait(false);
}
}

var request = PrepareRequest(method, path, queryString, headers, data);
return await _client.SendAsync(request, completionOption, cancellationToken).ConfigureAwait(false);
var tcs = new TaskCompletionSource<HttpResponseMessage>();
using (cancellationToken.Register(() => tcs.SetCanceled()))
{
return await await Task.WhenAny(tcs.Task, _client.SendAsync(request, completionOption, cancellationToken)).ConfigureAwait(false);
}
}

private async Task HandleIfErrorResponseAsync(HttpStatusCode statusCode, HttpResponseMessage response, IEnumerable<ApiResponseErrorHandlingDelegate> handlers)
Expand Down Expand Up @@ -452,4 +452,4 @@ public void Dispose()
}

internal delegate void ApiResponseErrorHandlingDelegate(HttpStatusCode statusCode, string responseBody);
}
}
152 changes: 55 additions & 97 deletions src/Docker.DotNet/Endpoints/StreamUtil.cs
Original file line number Diff line number Diff line change
@@ -1,97 +1,55 @@
using System;
using System.IO;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;

namespace Docker.DotNet.Models
{
internal static class StreamUtil
{
private static Newtonsoft.Json.JsonSerializer _serializer = new Newtonsoft.Json.JsonSerializer();

internal static async Task MonitorStreamAsync(Task<Stream> streamTask, DockerClient client, CancellationToken cancel, IProgress<string> progress)
{
using (var stream = await streamTask)
{
// ReadLineAsync must be cancelled by closing the whole stream.
using (cancel.Register(() => stream.Dispose()))
{
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
{
string line;
while ((line = await reader.ReadLineAsync()) != null)
{
progress.Report(line);
}
}
}
}
}

internal static async Task MonitorStreamForMessagesAsync<T>(Task<Stream> streamTask, DockerClient client, CancellationToken cancel, IProgress<T> progress)
{
using (var stream = await streamTask)
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
using (var jsonReader = new JsonTextReader(reader) { SupportMultipleContent = true })
{
while (await jsonReader.ReadAsync().WithCancellation(cancel))
{
var ev = _serializer.Deserialize<T>(jsonReader);
progress?.Report(ev);
}
}
}

internal static async Task MonitorResponseForMessagesAsync<T>(Task<HttpResponseMessage> responseTask, DockerClient client, CancellationToken cancel, IProgress<T> progress)
{
using (var response = await responseTask)
{
await client.HandleIfErrorResponseAsync(response.StatusCode, response);

using (var stream = await response.Content.ReadAsStreamAsync())
{
// ReadLineAsync must be cancelled by closing the whole stream.
using (cancel.Register(() => stream.Dispose()))
{
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
{
string line;
try
{
while ((line = await reader.ReadLineAsync()) != null)
{
var prog = client.JsonSerializer.DeserializeObject<T>(line);
if (prog == null) continue;

progress.Report(prog);
}
}
catch (ObjectDisposedException)
{
// The subsequent call to reader.ReadLineAsync() after cancellation
// will fail because we disposed the stream. Just ignore here.
}
}
}
}
}
}

private static async Task<T> WithCancellation<T>(this Task<T> task, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<bool>();
using (cancellationToken.Register(s => ((TaskCompletionSource<bool>)s).TrySetResult(true), tcs))
{
if (task != await Task.WhenAny(task, tcs.Task))
{
throw new OperationCanceledException(cancellationToken);
}
}

return await task;
}
}
}
using System;
using System.Diagnostics;
using System.IO;
using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;

namespace Docker.DotNet.Models
{
internal static class StreamUtil
{
internal static async Task MonitorStreamAsync(Task<Stream> streamTask, DockerClient client, CancellationToken cancellationToken, IProgress<string> progress)
{
var tcs = new TaskCompletionSource<string>();

using (var stream = await streamTask)
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
using (cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)))
{
string line;
while ((line = await await Task.WhenAny(reader.ReadLineAsync(), tcs.Task)) != null)
{
progress.Report(line);
}
}
}

internal static async Task MonitorStreamForMessagesAsync<T>(Task<Stream> streamTask, DockerClient client, CancellationToken cancellationToken, IProgress<T> progress)
{
var tcs = new TaskCompletionSource<bool>();

using (var stream = await streamTask)
using (var reader = new StreamReader(stream, new UTF8Encoding(false)))
using (var jsonReader = new JsonTextReader(reader) { SupportMultipleContent = true })
using (cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)))
{
while (await await Task.WhenAny(jsonReader.ReadAsync(cancellationToken), tcs.Task))
{
var ev = await client.JsonSerializer.Deserialize<T>(jsonReader, cancellationToken);
progress.Report(ev);
}
}
}

internal static async Task MonitorResponseForMessagesAsync<T>(Task<HttpResponseMessage> responseTask, DockerClient client, CancellationToken cancel, IProgress<T> progress)
{
using (var response = await responseTask)
{
await MonitorStreamForMessagesAsync<T>(response.Content.ReadAsStreamAsync(), client, cancel, progress);
}
}
}
}
25 changes: 23 additions & 2 deletions src/Docker.DotNet/JsonSerializer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using Newtonsoft.Json;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;

namespace Docker.DotNet
Expand All @@ -8,6 +10,8 @@ namespace Docker.DotNet
/// </summary>
internal class JsonSerializer
{
private readonly Newtonsoft.Json.JsonSerializer _serializer;

private readonly JsonSerializerSettings _settings = new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore,
Expand All @@ -24,6 +28,23 @@ internal class JsonSerializer

public JsonSerializer()
{
_serializer = Newtonsoft.Json.JsonSerializer.CreateDefault(this._settings);
}

public Task<T> Deserialize<T>(JsonReader jsonReader, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource<T>();
using (cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)))
{
Task.Factory.StartNew(
() => tcs.TrySetResult(_serializer.Deserialize<T>(jsonReader)),
cancellationToken,
TaskCreationOptions.LongRunning,
TaskScheduler.Default
);

return tcs.Task;
}
}

public T DeserializeObject<T>(string json)
Expand All @@ -36,4 +57,4 @@ public string SerializeObject<T>(T value)
return JsonConvert.SerializeObject(value, this._settings);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public override int WriteTimeout

public override int Read(byte[] buffer, int offset, int count)
{
return ReadAsync(buffer, offset, count, CancellationToken.None).Result;
return ReadAsync(buffer, offset, count, CancellationToken.None).GetAwaiter().GetResult();
}

public async override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
Expand Down
Loading

0 comments on commit f0336bf

Please sign in to comment.