From 9f8b5251798dd313625d4b96c6ded20c75357e5a Mon Sep 17 00:00:00 2001 From: Saverio Cisternino Date: Wed, 13 Dec 2017 17:57:08 +0100 Subject: [PATCH 1/5] Safe ClearInvocationCallbacks --- src/v2.x/HubConnection.cs | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/src/v2.x/HubConnection.cs b/src/v2.x/HubConnection.cs index aea54cb..6048432 100644 --- a/src/v2.x/HubConnection.cs +++ b/src/v2.x/HubConnection.cs @@ -207,18 +207,26 @@ private static string GetUrl(string url, bool useDefaultUrl) private void ClearInvocationCallbacks(string error) { - var result = new HubResult(); - result.Error = error; - - lock (_callbacks) - { - foreach (var callback in _callbacks.Values) - { - callback(result); - } - - _callbacks.Clear(); - } - } + Action[] callbacks; + + lock (_callbacks) + { + callbacks = _callbacks.Values.ToArray(); + _callbacks.Clear(); + } + + foreach (var callback in callbacks) + { + // Create a new HubResult each time as it's mutable and we don't want callbacks + // changing it during their parallel invocation + Task.Factory.StartNew(() => callback(new HubResult { Error = error })) + .Catch(); + } + } + + public void Force() + { + ClearInvocationCallbacks("Force"); + } } } From 441db5d1cc7c72cbf89f6ae818c49124d2409611 Mon Sep 17 00:00:00 2001 From: Saverio Cisternino Date: Wed, 13 Dec 2017 18:02:40 +0100 Subject: [PATCH 2/5] Fix Escape query string --- src/v2.x/Infrastructure/TransportAbortHandler.cs | 2 +- src/v2.x/Transports/HttpBasedTransport.cs | 2 +- src/v2.x/Transports/TransportHelper.cs | 4 ++-- src/v2.x/Transports/WebSocketTransport.cs | 9 +++++---- 4 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/v2.x/Infrastructure/TransportAbortHandler.cs b/src/v2.x/Infrastructure/TransportAbortHandler.cs index 17d2b9f..c9aaf72 100644 --- a/src/v2.x/Infrastructure/TransportAbortHandler.cs +++ b/src/v2.x/Infrastructure/TransportAbortHandler.cs @@ -64,7 +64,7 @@ public void Abort(IConnection connection, TimeSpan timeout, string connectionDat string url = connection.Url + "abort" + String.Format(CultureInfo.InvariantCulture, _abortQueryString, _transportName, - connectionData, + Uri.EscapeDataString(connectionData), Uri.EscapeDataString(connection.ConnectionToken), null); diff --git a/src/v2.x/Transports/HttpBasedTransport.cs b/src/v2.x/Transports/HttpBasedTransport.cs index 07e9e87..bdbca38 100644 --- a/src/v2.x/Transports/HttpBasedTransport.cs +++ b/src/v2.x/Transports/HttpBasedTransport.cs @@ -91,7 +91,7 @@ public Task Send(IConnection connection, string data, string connectionData) url += String.Format(CultureInfo.InvariantCulture, _sendQueryString, _transport, - connectionData, + Uri.EscapeDataString(connectionData), Uri.EscapeDataString(connection.ConnectionToken), customQueryString); diff --git a/src/v2.x/Transports/TransportHelper.cs b/src/v2.x/Transports/TransportHelper.cs index 4a41156..9211fca 100644 --- a/src/v2.x/Transports/TransportHelper.cs +++ b/src/v2.x/Transports/TransportHelper.cs @@ -42,7 +42,7 @@ public static Task GetNegotiationResponse(this IHttpClient if (!String.IsNullOrEmpty(connectionData)) { - negotiateUrl += "&connectionData=" + connectionData; + negotiateUrl += "&connectionData=" + Uri.EscapeDataString(connectionData); } httpClient.Initialize(connection); @@ -85,7 +85,7 @@ public static string GetReceiveQueryString(IConnection connection, string connec if (connectionData != null) { - qsBuilder.Append("&connectionData=" + connectionData); + qsBuilder.Append("&connectionData=" + Uri.EscapeDataString(connectionData)); } qsBuilder.Append("&clientProtocol=" + connection.Protocol); diff --git a/src/v2.x/Transports/WebSocketTransport.cs b/src/v2.x/Transports/WebSocketTransport.cs index 5998a9f..d2daa4f 100644 --- a/src/v2.x/Transports/WebSocketTransport.cs +++ b/src/v2.x/Transports/WebSocketTransport.cs @@ -97,11 +97,12 @@ public virtual Task PerformConnect() { private async Task PerformConnect(bool reconnecting) { var url = _connectionInfo.Connection.Url + (reconnecting ? "reconnect" : "connect"); - url += TransportHelper.GetReceiveQueryString(_connectionInfo.Connection, _connectionInfo.Data, "webSockets"); - var builder = new UriBuilder(url); + var qs = TransportHelper.GetReceiveQueryString(_connectionInfo.Connection, _connectionInfo.Data, "webSockets"); + qs += "&tid=" + new Random((int)DateTime.Now.Ticks).Next(); + var builder = new UriBuilder(url); builder.Scheme = builder.Scheme == "https" ? "wss" : "ws"; - _connectionInfo.Connection.Trace(TraceLevels.Events, "WS Connecting to: {0}", builder.Uri); + _connectionInfo.Connection.Trace(TraceLevels.Events, "WS Connecting to: {0}", builder.Uri+qs); // TODO: Revisit thread safety of this assignment _webSocketTokenSource = new CancellationTokenSource(); @@ -109,7 +110,7 @@ private async Task PerformConnect(bool reconnecting) { CancellationTokenSource linkedCts = CancellationTokenSource.CreateLinkedTokenSource(_webSocketTokenSource.Token, _disconnectToken); CancellationToken token = linkedCts.Token; - _webSocket = new ClientWebSocket(builder.Uri.ToString(), token, null); + _webSocket = new ClientWebSocket(builder.Uri.ToString()+qs, token, null); _webSocket.Log = new Logger(LogLevel.Trace, null, ((data, s) => _connectionInfo.Connection.Trace(TraceLevels.All, "WebSocket - {0}", data.Message))); _webSocket.OnOpen += (sender, args) => _connectionInfo.Connection.ChangeState(_connectionInfo.Connection.State, ConnectionState.Connected); From fbb9ef5c67e01ff666aa2c911d902b2d6933311f Mon Sep 17 00:00:00 2001 From: Saverio Cisternino Date: Wed, 13 Dec 2017 18:03:29 +0100 Subject: [PATCH 3/5] Fix Null Reference Exception --- src/v2.x/Transports/WebSockets/WebSocketMessageReader.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/v2.x/Transports/WebSockets/WebSocketMessageReader.cs b/src/v2.x/Transports/WebSockets/WebSocketMessageReader.cs index cfd0021..4df131c 100644 --- a/src/v2.x/Transports/WebSockets/WebSocketMessageReader.cs +++ b/src/v2.x/Transports/WebSockets/WebSocketMessageReader.cs @@ -41,7 +41,7 @@ await TaskEx.Run(() => { public static async Task ReadMessageAsync(WebSocket webSocket, int bufferSize, int? maxMessageSize, CancellationToken disconnectToken) { WebSocketReceiveResult receiveResult = await WSRecieveAsync(webSocket, disconnectToken).ConfigureAwait(false); - if(receiveResult.MessageType == WebSocketMessageType.Close) + if(receiveResult == null || receiveResult.MessageType == WebSocketMessageType.Close) return WebSocketMessage.CloseMessage; return new WebSocketMessage(receiveResult.Message, receiveResult.MessageType); } From 453b752a4ba9b6b88b1a732cee4134ad1e6b9e13 Mon Sep 17 00:00:00 2001 From: Saverio Cisternino Date: Wed, 13 Dec 2017 18:05:06 +0100 Subject: [PATCH 4/5] Fix ServerSentEventTrasport Connection State --- src/v2.x/Transports/ServerSentEventsTransport.cs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/v2.x/Transports/ServerSentEventsTransport.cs b/src/v2.x/Transports/ServerSentEventsTransport.cs index e5ff6de..4a45839 100644 --- a/src/v2.x/Transports/ServerSentEventsTransport.cs +++ b/src/v2.x/Transports/ServerSentEventsTransport.cs @@ -168,6 +168,10 @@ private void OpenConnection(IConnection connection, // Raise the reconnect event if the connection comes back up connection.OnReconnected(); } + else + { + connection.ChangeState(ConnectionState.Disconnected, ConnectionState.Connected); + } }; eventSource.Message = sseEvent => From fe603c242cf077681cdcb577944afb76df96ba41 Mon Sep 17 00:00:00 2001 From: Saverio Cisternino Date: Wed, 13 Dec 2017 18:06:37 +0100 Subject: [PATCH 5/5] Fix WebSocketTransport locking after send --- src/v2.x/Hubs/HubRegistrationData.cs | 3 +- src/v2.x/Nivot.SignalR.Client.Net35.csproj | 3 + src/v2.x/Transports/WebSocketTransport.cs | 2 +- .../WebSockets/DefaultWebSocketHandler.cs | 8 +- src/v2.x/WebSockets/IWebSocket.cs | 3 +- src/v2.x/WebSockets/WebSocketHandler.cs | 604 ++++++++++-------- src/v2.x/packages.config | 1 + 7 files changed, 357 insertions(+), 267 deletions(-) diff --git a/src/v2.x/Hubs/HubRegistrationData.cs b/src/v2.x/Hubs/HubRegistrationData.cs index 36fc153..29c7e89 100644 --- a/src/v2.x/Hubs/HubRegistrationData.cs +++ b/src/v2.x/Hubs/HubRegistrationData.cs @@ -4,6 +4,7 @@ namespace Microsoft.AspNet.SignalR.Client.Hubs { public class HubRegistrationData { - public string Name { get; set; } + [Newtonsoft.Json.JsonProperty(PropertyName = "name")] + public string Name { get; set; } } } diff --git a/src/v2.x/Nivot.SignalR.Client.Net35.csproj b/src/v2.x/Nivot.SignalR.Client.Net35.csproj index 4818776..975ab78 100644 --- a/src/v2.x/Nivot.SignalR.Client.Net35.csproj +++ b/src/v2.x/Nivot.SignalR.Client.Net35.csproj @@ -42,6 +42,9 @@ ..\packages\Newtonsoft.Json.5.0.8\lib\net35\Newtonsoft.Json.dll + + ..\packages\TunnelVisionLabs.Threading.2.0.0\lib\net35-client\Rackspace.Threading.dll + diff --git a/src/v2.x/Transports/WebSocketTransport.cs b/src/v2.x/Transports/WebSocketTransport.cs index d2daa4f..ab44064 100644 --- a/src/v2.x/Transports/WebSocketTransport.cs +++ b/src/v2.x/Transports/WebSocketTransport.cs @@ -140,7 +140,7 @@ public Task Send(IConnection connection, string data, string connectionData) { return TaskAsyncHelper.FromError(ex); } - return SendAsync(data); + return SendAsync(connection,data); } public override void OnMessage(string message) { diff --git a/src/v2.x/WebSockets/DefaultWebSocketHandler.cs b/src/v2.x/WebSockets/DefaultWebSocketHandler.cs index d1e3c09..ad4e63d 100644 --- a/src/v2.x/WebSockets/DefaultWebSocketHandler.cs +++ b/src/v2.x/WebSockets/DefaultWebSocketHandler.cs @@ -52,16 +52,16 @@ Action IWebSocket.OnError { set; } - Task IWebSocket.Send(string value) { - return Send(value); + Task IWebSocket.Send(IConnection connection, string value) { + return Send(connection, value); } - public override Task Send(string message) { + public override Task Send(IConnection connection ,string message) { if (_closed) { return TaskAsyncHelper.Empty; } - return base.Send(message); + return base.Send(connection,message); } public override Task CloseAsync() { diff --git a/src/v2.x/WebSockets/IWebSocket.cs b/src/v2.x/WebSockets/IWebSocket.cs index 6ab75dc..f225dd1 100644 --- a/src/v2.x/WebSockets/IWebSocket.cs +++ b/src/v2.x/WebSockets/IWebSocket.cs @@ -23,9 +23,10 @@ internal interface IWebSocket /// /// Sends data over the websocket. /// + /// /// The value to send. /// A that represents the send is complete. - Task Send(string value); + Task Send(IConnection connection, string value); /// /// Sends a chunk of data over the websocket ("endOfMessage" flag set to false.) diff --git a/src/v2.x/WebSockets/WebSocketHandler.cs b/src/v2.x/WebSockets/WebSocketHandler.cs index 8ca8c24..1ad993b 100644 --- a/src/v2.x/WebSockets/WebSocketHandler.cs +++ b/src/v2.x/WebSockets/WebSocketHandler.cs @@ -7,269 +7,353 @@ using Microsoft.AspNet.SignalR.Client.Infrastructure; using Microsoft.AspNet.SignalR.Client.Transports.WebSockets; using WebSocketSharp; +using Rackspace.Threading; namespace Microsoft.AspNet.SignalR.Client.WebSockets { - public class WebSocketHandler - { - // Wait 250 ms before giving up on a Close - private static readonly TimeSpan _closeTimeout = TimeSpan.FromMilliseconds(250); - - // 4KB default fragment size (we expect most messages to be very short) - private const int _receiveLoopBufferSize = 4 * 1024; - private readonly int? _maxIncomingMessageSize; - - // Queue for sending messages - private readonly TaskQueue _sendQueue = new TaskQueue(); - - public WebSocketHandler(int? maxIncomingMessageSize) { - _maxIncomingMessageSize = maxIncomingMessageSize; - } - - public virtual void OnOpen() { } - - public virtual void OnMessage(string message) { throw new NotImplementedException(); } - - public virtual void OnMessage(byte[] message) { throw new NotImplementedException(); } - - public virtual void OnError() { } - - public virtual void OnClose() { } - - // Sends a text message to the client - public virtual Task Send(string message) { - if (message == null) { - throw new ArgumentNullException("message"); - } - - return SendAsync(message); - } - - internal Task SendAsync(string message) { - return _sendQueue.Enqueue(async state => { - bool? completed = null; - var cts = new CancellationTokenSource(); - try { - var context = (string)state; - await TaskEx.Run(() => WebSocket.SendAsync(context, (x) => { - completed = x; - cts.Cancel(); - })).ConfigureAwait(false); - - await TaskEx.Delay(1000 * 60, cts.Token); - if (completed == false) - throw new Exception(context); - } catch (Exception ex) { - // Swallow exceptions on send - Trace.TraceError("Error while sending: " + ex); - } - }, message); - } - - [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1026:DefaultParametersShouldNotBeUsed"), Obsolete("Currently Not Supported", true)] - public virtual Task SendAsync(ArraySegment message, WebSocketMessageType messageType, bool endOfMessage = true) { - throw new NotSupportedException(); - //if (WebSocket.ReadyState != WebSocketState.Open) { - // return TaskAsyncHelper.Empty; - //} - - //var sendContext = new SendContext(this, message, messageType, endOfMessage); - - //return _sendQueue.Enqueue(async state => { - // var context = (SendContext)state; - - // if (context.Handler.WebSocket.ReadyState != WebSocketState.Open) { - // return; - // } - - // try { - // await context.Handler.WebSocket.SendAsync(context.Message, context.MessageType, context.EndOfMessage, CancellationToken.None); - // } catch (Exception ex) { - // // Swallow exceptions on send - // Trace.TraceError("Error while sending: " + ex); - // } - //}, - //sendContext); - } - - public virtual Task CloseAsync() { - if (IsClosedOrClosedSent(WebSocket)) { - return TaskAsyncHelper.Empty; - } - - var closeContext = new CloseContext(this); - - return _sendQueue.Enqueue(async state => { - var context = (CloseContext)state; - - if (IsClosedOrClosedSent(context.Handler.WebSocket)) { - return; - } - - try { - WebSocket.CloseAsync(CloseStatusCode.Normal, ""); - - await TaskEx.Run(() => { - while (WebSocket.ReadyState != WebSocketState.Closed) { } - }, CancellationToken.None); - } catch (Exception ex) { - // Swallow exceptions on close - Trace.TraceError("Error while closing the websocket: " + ex); - } - }, - closeContext); - } - - public int? MaxIncomingMessageSize { - get { - return _maxIncomingMessageSize; - } - } - - internal WebSocket WebSocket { get; set; } - - public Exception Error { get; set; } - - public Task ProcessWebSocketRequestAsync(WebSocket webSocket, CancellationToken disconnectToken) { - if (webSocket == null) { - throw new ArgumentNullException("webSocket"); - } - - var receiveContext = new ReceiveContext(webSocket, disconnectToken, MaxIncomingMessageSize, _receiveLoopBufferSize); - - return ProcessWebSocketRequestAsync(webSocket, disconnectToken, state => { - var context = (ReceiveContext)state; - - return WebSocketMessageReader.ReadMessageAsync(context.WebSocket, context.BufferSize, context.MaxIncomingMessageSize, context.DisconnectToken); - }, - receiveContext); - } - - internal async Task ProcessWebSocketRequestAsync(WebSocket webSocket, CancellationToken disconnectToken, Func> messageRetriever, object state) { - bool closedReceived = false; - - try { - // first, set primitives and initialize the object - WebSocket = webSocket; - OnOpen(); - - // dispatch incoming messages - while (!disconnectToken.IsCancellationRequested && !closedReceived) { - WebSocketMessage incomingMessage = await messageRetriever(state); - switch (incomingMessage.MessageType) { - case WebSocketMessageType.Binary: - OnMessage((byte[])incomingMessage.Data); - break; - - case WebSocketMessageType.Text: - OnMessage((string)incomingMessage.Data); - break; - - default: - closedReceived = true; - - // If we received an incoming CLOSE message, we'll queue a CLOSE frame to be sent. - // We'll give the queued frame some amount of time to go out on the wire, and if a - // timeout occurs we'll give up and abort the connection. - await TaskEx.WhenAny(CloseAsync(), TaskEx.Delay(_closeTimeout)); - break; - } - } - - } catch (OperationCanceledException ex) { - // ex.CancellationToken never has the token that was actually cancelled - if (!disconnectToken.IsCancellationRequested) { - Error = ex; - OnError(); - } - } catch (ObjectDisposedException) { - // If the websocket was disposed while we were reading then noop - } catch (Exception ex) { - if (IsFatalException(ex)) { - Error = ex; - OnError(); - } - } - - try { - if (WebSocket.ReadyState == WebSocketState.Closed/* || - WebSocket.ReadyState == WebSocketState.Aborted*/) { - // No-op if the socket is already closed or aborted - } else { - // Close the socket - WebSocket.CloseAsync(CloseStatusCode.Normal, ""); - - await TaskEx.Run(() => { - while (WebSocket.ReadyState != WebSocketState.Closed) {} - }, CancellationToken.None); - } - } finally { - OnClose(); - } - } - - // returns true if this is a fatal exception (e.g. OnError should be called) - private static bool IsFatalException(Exception ex) { - // If this exception is due to the underlying TCP connection going away, treat as a normal close - // rather than a fatal exception. - COMException ce = ex as COMException; - if (ce != null) { - switch ((uint)ce.ErrorCode) { - // These are the three error codes we've seen in testing which can be caused by the TCP connection going away unexpectedly. - case 0x800703e3: - case 0x800704cd: - case 0x80070026: - return false; - } - } - - // unknown exception; treat as fatal - return true; - } - - private static bool IsClosedOrClosedSent(WebSocket webSocket) { - return webSocket.ReadyState == WebSocketState.Closed/* || + public class WebSocketHandler + { + // Wait 250 ms before giving up on a Close + private static readonly TimeSpan _closeTimeout = TimeSpan.FromMilliseconds(250); + + // 4KB default fragment size (we expect most messages to be very short) + private const int _receiveLoopBufferSize = 4 * 1024; + private readonly int? _maxIncomingMessageSize; + + // Queue for sending messages + private readonly TaskQueue _sendQueue = new TaskQueue(); + + public WebSocketHandler(int? maxIncomingMessageSize) + { + _maxIncomingMessageSize = maxIncomingMessageSize; + } + + public virtual void OnOpen() { } + + public virtual void OnMessage(string message) { throw new NotImplementedException(); } + + public virtual void OnMessage(byte[] message) { throw new NotImplementedException(); } + + public virtual void OnError() { } + + public virtual void OnClose() { } + + // Sends a text message to the client + public virtual Task Send(IConnection connection, string message) + { + if (message == null) + { + throw new ArgumentNullException("message"); + } + + return SendAsync(connection, message); + } + + internal Task SendAsync(IConnection connection, string message) + { + var buffer = Encoding.UTF8.GetBytes(message); + + return _sendQueue.Enqueue(state => + { + //bool? completed = null; + //var cts = new CancellationTokenSource(); + //try { + // var context = (string)state; + // await TaskEx.Run(() => WebSocket.SendAsync(context, (x) => { + // completed = x; + // cts.Cancel(); + // })).ConfigureAwait(false); + + // await TaskEx.Delay(1000 * 60, cts.Token); + // if (completed == false) + // throw new Exception(context); + //} catch (Exception ex) { + // // Swallow exceptions on send + // Trace.TraceError("Error while sending: " + ex); + //} + var contex = state as SendContext; + var tcs = new TaskCompletionSource(); + EventHandler eventHandler = null; + var cts = new CancellationTokenSource(); + var websocket = contex.Handler.WebSocket; + cts.CancelAfter(TimeSpan.FromSeconds(60)); + eventHandler = (s, arg) => + { + + connection.Trace(TraceLevels.Messages, "OnMessage({0})", arg.Data); + Newtonsoft.Json.Linq.JToken token = null; + try + { + token = Newtonsoft.Json.Linq.JObject.Parse(arg.Data); + + } + catch (Exception ex2) + { + connection.Trace(TraceLevels.Events, "Error ({0})", ex2.Message); + } + if (token != null && token["I"] != null) + { + websocket.OnMessage -= eventHandler; + connection.OnReceived(token); + tcs.SetResult(arg); + } + }; + websocket.OnMessage += eventHandler; + + TaskEx.Run(() => websocket.SendAsync(message, completed => + { + if (completed == false) + tcs.SetCanceled(); + })); + return tcs.Task; + + }, new SendContext(this, buffer, WebSocketMessageType.Text, true)); + } + + [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1026:DefaultParametersShouldNotBeUsed"), Obsolete("Currently Not Supported", true)] + public virtual Task SendAsync(ArraySegment message, WebSocketMessageType messageType, bool endOfMessage = true) + { + throw new NotSupportedException(); + //if (WebSocket.ReadyState != WebSocketState.Open) { + // return TaskAsyncHelper.Empty; + //} + + //var sendContext = new SendContext(this, message, messageType, endOfMessage); + + //return _sendQueue.Enqueue(async state => { + // var context = (SendContext)state; + + // if (context.Handler.WebSocket.ReadyState != WebSocketState.Open) { + // return; + // } + + // try { + // await context.Handler.WebSocket.SendAsync(context.Message, context.MessageType, context.EndOfMessage, CancellationToken.None); + // } catch (Exception ex) { + // // Swallow exceptions on send + // Trace.TraceError("Error while sending: " + ex); + // } + //}, + //sendContext); + } + + public virtual Task CloseAsync() + { + if (IsClosedOrClosedSent(WebSocket)) + { + return TaskAsyncHelper.Empty; + } + + var closeContext = new CloseContext(this); + + return _sendQueue.Enqueue(async state => + { + var context = (CloseContext)state; + + if (IsClosedOrClosedSent(context.Handler.WebSocket)) + { + return; + } + + try + { + WebSocket.CloseAsync(CloseStatusCode.Normal, ""); + + await TaskEx.Run(() => + { + while (WebSocket.ReadyState != WebSocketState.Closed) { } + }, CancellationToken.None); + } + catch (Exception ex) + { + // Swallow exceptions on close + Trace.TraceError("Error while closing the websocket: " + ex); + } + }, + closeContext); + } + + public int? MaxIncomingMessageSize + { + get + { + return _maxIncomingMessageSize; + } + } + + internal WebSocket WebSocket { get; set; } + + public Exception Error { get; set; } + + public Task ProcessWebSocketRequestAsync(WebSocket webSocket, CancellationToken disconnectToken) + { + if (webSocket == null) + { + throw new ArgumentNullException("webSocket"); + } + + var receiveContext = new ReceiveContext(webSocket, disconnectToken, MaxIncomingMessageSize, _receiveLoopBufferSize); + + return ProcessWebSocketRequestAsync(webSocket, disconnectToken, state => + { + var context = (ReceiveContext)state; + + return WebSocketMessageReader.ReadMessageAsync(context.WebSocket, context.BufferSize, context.MaxIncomingMessageSize, context.DisconnectToken); + }, + receiveContext); + } + + internal async Task ProcessWebSocketRequestAsync(WebSocket webSocket, CancellationToken disconnectToken, Func> messageRetriever, object state) + { + bool closedReceived = false; + + try + { + // first, set primitives and initialize the object + WebSocket = webSocket; + OnOpen(); + + // dispatch incoming messages + while (!disconnectToken.IsCancellationRequested && !closedReceived) + { + WebSocketMessage incomingMessage = await messageRetriever(state); + switch (incomingMessage.MessageType) + { + case WebSocketMessageType.Binary: + OnMessage((byte[])incomingMessage.Data); + break; + + case WebSocketMessageType.Text: + OnMessage((string)incomingMessage.Data); + break; + + default: + closedReceived = true; + + // If we received an incoming CLOSE message, we'll queue a CLOSE frame to be sent. + // We'll give the queued frame some amount of time to go out on the wire, and if a + // timeout occurs we'll give up and abort the connection. + await TaskEx.WhenAny(CloseAsync(), TaskEx.Delay(_closeTimeout)); + break; + } + } + + } + catch (OperationCanceledException ex) + { + // ex.CancellationToken never has the token that was actually cancelled + if (!disconnectToken.IsCancellationRequested) + { + Error = ex; + OnError(); + } + } + catch (ObjectDisposedException) + { + // If the websocket was disposed while we were reading then noop + } + catch (Exception ex) + { + if (IsFatalException(ex)) + { + Error = ex; + OnError(); + } + } + + try + { + if (WebSocket.ReadyState == WebSocketState.Closed/* || + WebSocket.ReadyState == WebSocketState.Aborted*/) + { + // No-op if the socket is already closed or aborted + } + else + { + // Close the socket + WebSocket.CloseAsync(CloseStatusCode.Normal, ""); + + await TaskEx.Run(() => + { + while (WebSocket.ReadyState != WebSocketState.Closed) { } + }, CancellationToken.None); + } + } + finally + { + OnClose(); + } + } + + // returns true if this is a fatal exception (e.g. OnError should be called) + private static bool IsFatalException(Exception ex) + { + // If this exception is due to the underlying TCP connection going away, treat as a normal close + // rather than a fatal exception. + COMException ce = ex as COMException; + if (ce != null) + { + switch ((uint)ce.ErrorCode) + { + // These are the three error codes we've seen in testing which can be caused by the TCP connection going away unexpectedly. + case 0x800703e3: + case 0x800704cd: + case 0x80070026: + return false; + } + } + + // unknown exception; treat as fatal + return true; + } + + private static bool IsClosedOrClosedSent(WebSocket webSocket) + { + return webSocket.ReadyState == WebSocketState.Closed/* || webSocket.State == WebSocketState.CloseSent || webSocket.State == WebSocketState.Aborted*/; - } - - private class CloseContext - { - public WebSocketHandler Handler; - - public CloseContext(WebSocketHandler webSocketHandler) { - Handler = webSocketHandler; - } - } - - private class SendContext - { - public WebSocketHandler Handler; - public ArraySegment Message; - public WebSocketMessageType MessageType; - public bool EndOfMessage; - - public SendContext(WebSocketHandler webSocketHandler, ArraySegment message, WebSocketMessageType messageType, bool endOfMessage) { - Handler = webSocketHandler; - Message = message; - MessageType = messageType; - EndOfMessage = endOfMessage; - } - } - - private class ReceiveContext - { - public WebSocket WebSocket; - public CancellationToken DisconnectToken; - public int? MaxIncomingMessageSize; - public int BufferSize; - - public ReceiveContext(WebSocket webSocket, CancellationToken disconnectToken, int? maxIncomingMessageSize, int bufferSize) { - WebSocket = webSocket; - DisconnectToken = disconnectToken; - MaxIncomingMessageSize = maxIncomingMessageSize; - BufferSize = bufferSize; - } - } - } + } + + private class CloseContext + { + public WebSocketHandler Handler; + + public CloseContext(WebSocketHandler webSocketHandler) + { + Handler = webSocketHandler; + } + } + + private class SendContext + { + public WebSocketHandler Handler; + public byte[] Message; + public WebSocketMessageType MessageType; + public bool EndOfMessage; + + public SendContext(WebSocketHandler webSocketHandler, byte[] message, WebSocketMessageType messageType, bool endOfMessage) + { + Handler = webSocketHandler; + Message = message; + MessageType = messageType; + EndOfMessage = endOfMessage; + } + } + + private class ReceiveContext + { + public WebSocket WebSocket; + public CancellationToken DisconnectToken; + public int? MaxIncomingMessageSize; + public int BufferSize; + + public ReceiveContext(WebSocket webSocket, CancellationToken disconnectToken, int? maxIncomingMessageSize, int bufferSize) + { + WebSocket = webSocket; + DisconnectToken = disconnectToken; + MaxIncomingMessageSize = maxIncomingMessageSize; + BufferSize = bufferSize; + } + } + } } \ No newline at end of file diff --git a/src/v2.x/packages.config b/src/v2.x/packages.config index ab16e55..b0c9884 100644 --- a/src/v2.x/packages.config +++ b/src/v2.x/packages.config @@ -3,5 +3,6 @@ + \ No newline at end of file