Skip to content

Commit

Permalink
Improve connection disposal.
Browse files Browse the repository at this point in the history
  • Loading branch information
chkr1011 committed Apr 22, 2019
1 parent 5c903e6 commit 6399172
Show file tree
Hide file tree
Showing 20 changed files with 179 additions and 165 deletions.
2 changes: 1 addition & 1 deletion Source/MQTTnet.AspnetCore/ApplicationBuilderExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public static IApplicationBuilder UseMqttEndpoint(this IApplicationBuilder app,
}

var adapter = app.ApplicationServices.GetRequiredService<MqttWebSocketServerAdapter>();
using (var webSocket = await context.WebSockets.AcceptWebSocketAsync(subProtocol))
using (var webSocket = await context.WebSockets.AcceptWebSocketAsync(subProtocol).ConfigureAwait(false))
{
await adapter.RunWebSocketConnectionAsync(webSocket, context);
}
Expand Down
14 changes: 7 additions & 7 deletions Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ public MqttConnectionContext(MqttPacketFormatterAdapter packetFormatterAdapter,
_input = Connection.Transport.Input;
_output = Connection.Transport.Output;
}



_reader = new SpanBasedMqttPacketBodyReader();
}

private PipeReader _input;
private PipeWriter _output;
private readonly SpanBasedMqttPacketBodyReader _reader;

public string Endpoint
public string Endpoint
{
get {
get
{
var connection = Http?.HttpContext?.Connection;
if (connection == null)
{
Expand All @@ -53,12 +53,12 @@ public string Endpoint
public ConnectionContext Connection { get; }
public MqttPacketFormatterAdapter PacketFormatterAdapter { get; }

public long BytesSent { get; set; }
public long BytesSent { get; set; }
public long BytesReceived { get; set; }

public Action ReadingPacketStartedCallback { get; set; }
public Action ReadingPacketCompletedCallback { get; set; }

private readonly SemaphoreSlim _writerSemaphore = new SemaphoreSlim(1, 1);

public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationToken)
Expand Down Expand Up @@ -145,7 +145,7 @@ public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, Cancellat
public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken)
{
var formatter = PacketFormatterAdapter;


await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
Expand Down
11 changes: 6 additions & 5 deletions Source/MQTTnet.AspnetCore/MqttConnectionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace MQTTnet.AspNetCore
{
public class MqttConnectionHandler : ConnectionHandler, IMqttServerAdapter
{
public Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }
public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }

public override async Task OnConnectedAsync(ConnectionContext connection)
{
Expand All @@ -25,10 +25,11 @@ public override async Task OnConnectedAsync(ConnectionContext connection)
var formatter = new MqttPacketFormatterAdapter(writer);
using (var adapter = new MqttConnectionContext(formatter, connection))
{
var args = new MqttServerAdapterClientAcceptedEventArgs(adapter);
ClientAcceptedHandler?.Invoke(args);

await args.SessionTask.ConfigureAwait(false);
var clientHandler = ClientHandler;
if (clientHandler != null)
{
await clientHandler(adapter).ConfigureAwait(false);
}
}
}

Expand Down
21 changes: 10 additions & 11 deletions Source/MQTTnet.AspnetCore/MqttWebSocketServerAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public MqttWebSocketServerAdapter(IMqttNetChildLogger logger)
_logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter));
}

public Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }
public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }

public Task StartAsync(IMqttServerOptions options)
{
Expand All @@ -43,17 +43,16 @@ public async Task RunWebSocketConnectionAsync(WebSocket webSocket, HttpContext h
var isSecureConnection = clientCertificate != null;
clientCertificate?.Dispose();

var writer = new SpanBasedMqttPacketWriter();
var formatter = new MqttPacketFormatterAdapter(writer);
var channel = new MqttWebSocketChannel(webSocket, endpoint, isSecureConnection);
var channelAdapter = new MqttChannelAdapter(channel, formatter, _logger.CreateChildLogger(nameof(MqttWebSocketServerAdapter)));

var eventArgs = new MqttServerAdapterClientAcceptedEventArgs(channelAdapter);
ClientAcceptedHandler?.Invoke(eventArgs);

if (eventArgs.SessionTask != null)
var clientHandler = ClientHandler;
if (clientHandler != null)
{
await eventArgs.SessionTask.ConfigureAwait(false);
var writer = new SpanBasedMqttPacketWriter();
var formatter = new MqttPacketFormatterAdapter(writer);
var channel = new MqttWebSocketChannel(webSocket, endpoint, isSecureConnection);
using (var channelAdapter = new MqttChannelAdapter(channel, formatter, _logger.CreateChildLogger(nameof(MqttWebSocketServerAdapter))))
{
await clientHandler(channelAdapter).ConfigureAwait(false);
}
}
}

Expand Down
51 changes: 51 additions & 0 deletions Source/MQTTnet.AspnetCore/TopicFilterBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using MQTTnet.Exceptions;
using MQTTnet.Protocol;

namespace MQTTnet
{
public class TopicFilterBuilder
{
private MqttQualityOfServiceLevel _qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce;
private string _topic;

public TopicFilterBuilder WithTopic(string topic)
{
_topic = topic;
return this;
}

public TopicFilterBuilder WithQualityOfServiceLevel(MqttQualityOfServiceLevel qualityOfServiceLevel)
{
_qualityOfServiceLevel = qualityOfServiceLevel;
return this;
}

public TopicFilterBuilder WithAtLeastOnceQoS()
{
_qualityOfServiceLevel = MqttQualityOfServiceLevel.AtLeastOnce;
return this;
}

public TopicFilterBuilder WithAtMostOnceQoS()
{
_qualityOfServiceLevel = MqttQualityOfServiceLevel.AtMostOnce;
return this;
}

public TopicFilterBuilder WithExactlyOnceQoS()
{
_qualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce;
return this;
}

public TopicFilter Build()
{
if (string.IsNullOrEmpty(_topic))
{
throw new MqttProtocolViolationException("Topic is not set.");
}

return new TopicFilter { Topic = _topic, QualityOfServiceLevel = _qualityOfServiceLevel };
}
}
}
3 changes: 3 additions & 0 deletions Source/MQTTnet.Server/Configuration/SettingsModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public class SettingsModel
/// </summary>
public RetainedApplicationMessagesModel RetainedApplicationMessages { get; set; } = new RetainedApplicationMessagesModel();

/// <summary>
/// Enables or disables the MQTTnet internal logging.
/// </summary>
public bool EnableDebugLogging { get; set; } = false;
}
}
20 changes: 8 additions & 12 deletions Source/MQTTnet.Server/Configuration/TcpEndpointModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ public bool TryReadIPv4(out IPAddress address)
{
if (IPv4 == "*")
{
address = IPAddress.Parse("0.0.0.0");
address = IPAddress.Any;
return true;
}

if (IPv4 == "localhost")
{
address = IPAddress.Parse("127.0.0.1");
address = IPAddress.Loopback;
return true;
}

Expand All @@ -81,10 +81,8 @@ public bool TryReadIPv4(out IPAddress address)
address = ip;
return true;
}
else
{
throw new System.Exception($"Could not parse IPv4 address: {IPv4}");
}

throw new System.Exception($"Could not parse IPv4 address: {IPv4}");
}

/// <summary>
Expand All @@ -95,13 +93,13 @@ public bool TryReadIPv6(out IPAddress address)
{
if (IPv6 == "*")
{
address = IPAddress.Parse("::");
address = IPAddress.IPv6Any;
return true;
}

if (IPv6 == "localhost")
{
address = IPAddress.Parse("::1");
address = IPAddress.IPv6Loopback;
return true;
}

Expand All @@ -116,10 +114,8 @@ public bool TryReadIPv6(out IPAddress address)
address = ip;
return true;
}
else
{
throw new System.Exception($"Could not parse IPv6 address: {IPv6}");
}

throw new System.Exception($"Could not parse IPv6 address: {IPv6}");
}
}
}
1 change: 1 addition & 0 deletions Source/MQTTnet.Server/Mqtt/MqttServerService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ private IMqttServerOptions CreateMqttServerOptions()
if (_settings.TcpEndPoint.Enabled)
{
options.WithDefaultEndpoint();

if (_settings.TcpEndPoint.TryReadIPv4(out var address4))
{
options.WithDefaultEndpointBoundIPAddress(address4);
Expand Down
10 changes: 5 additions & 5 deletions Source/MQTTnet.Server/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
*/
"TcpEndPoint": {
"Enabled": true,
"IPv4": "localhost",
"IPv6": "localhost",
"IPv4": "*",
"IPv6": "*",
"Port": 1883
},
"EncryptedTcpEndPoint": {
"Enabled": false,
"IPv4": "localhost",
"IPv6": "localhost",
"IPv4": "*",
"IPv6": "*",
"Port": 8883,
"CertificatePath": "/absolute/path/to/pfx"
},
Expand All @@ -45,7 +45,7 @@
"Filename": "RetainedApplicationMessages.json",
"WriteInterval": 10 // In seconds.
},
"EnableDebugLogging": false
"EnableDebugLogging": true
},
"Logging": {
"LogLevel": {
Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet/Adapter/IMqttServerAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace MQTTnet.Adapter
{
public interface IMqttServerAdapter : IDisposable
{
Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }
Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }

Task StartAsync(IMqttServerOptions options);
Task StopAsync();
Expand Down
17 changes: 0 additions & 17 deletions Source/MQTTnet/Adapter/MqttServerAdapterClientAcceptedEventArgs.cs

This file was deleted.

43 changes: 32 additions & 11 deletions Source/MQTTnet/Implementations/MqttTcpServerAdapter.Uwp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public MqttTcpServerAdapter(IMqttNetChildLogger logger)
_logger = logger.CreateChildLogger(nameof(MqttTcpServerAdapter));
}

public Action<MqttServerAdapterClientAcceptedEventArgs> ClientAcceptedHandler { get; set; }
public Func<IMqttChannelAdapter, Task> ClientHandler { get; set; }

public async Task StartAsync(IMqttServerOptions options)
{
Expand All @@ -39,7 +39,7 @@ public async Task StartAsync(IMqttServerOptions options)
_listener.Control.NoDelay = options.DefaultEndpointOptions.NoDelay;
_listener.Control.KeepAlive = true;
_listener.Control.QualityOfService = SocketQualityOfService.LowLatency;
_listener.ConnectionReceived += AcceptDefaultEndpointConnectionsAsync;
_listener.ConnectionReceived += OnConnectionReceivedAsync;

await _listener.BindServiceNameAsync(options.DefaultEndpointOptions.Port.ToString(), SocketProtectionLevel.PlainSocket);
}
Expand All @@ -54,30 +54,51 @@ public Task StopAsync()
{
if (_listener != null)
{
_listener.ConnectionReceived -= AcceptDefaultEndpointConnectionsAsync;
_listener.ConnectionReceived -= OnConnectionReceivedAsync;
}

_listener?.Dispose();
_listener = null;

return Task.FromResult(0);
}

public void Dispose()
{
StopAsync().GetAwaiter().GetResult();
_listener?.Dispose();
_listener = null;
}

private void AcceptDefaultEndpointConnectionsAsync(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args)
private async void OnConnectionReceivedAsync(StreamSocketListener sender, StreamSocketListenerConnectionReceivedEventArgs args)
{
try
{
var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(args.Socket, _options), new MqttPacketFormatterAdapter(), _logger);
ClientAcceptedHandler?.Invoke(new MqttServerAdapterClientAcceptedEventArgs(clientAdapter));
var clientHandler = ClientHandler;
if (clientHandler != null)
{
using (var clientAdapter = new MqttChannelAdapter(new MqttTcpChannel(args.Socket, _options), new MqttPacketFormatterAdapter(), _logger))
{
await clientHandler(clientAdapter).ConfigureAwait(false);
}
}
}
catch (Exception exception)
{
_logger.Error(exception, "Error while accepting connection at default endpoint.");
if (exception is ObjectDisposedException)
{
// It can happen that the listener socket is accessed after the cancellation token is already set and the listener socket is disposed.
return;
}

_logger.Error(exception, "Error while handling client connection.");
}
finally
{
try
{
args.Socket.Dispose();
}
catch (Exception exception)
{
_logger.Error(exception, "Error while cleaning up client connection");
}
}
}
}
Expand Down
Loading

0 comments on commit 6399172

Please sign in to comment.