Skip to content

Commit

Permalink
Refactor session and connection handling in server. Fix QoS level 2 i…
Browse files Browse the repository at this point in the history
…ssues.
  • Loading branch information
chkr1011 committed Jan 27, 2019
1 parent ebf0e91 commit 6a2bded
Show file tree
Hide file tree
Showing 72 changed files with 2,913 additions and 2,455 deletions.
3 changes: 3 additions & 0 deletions Build/MQTTnet.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
<requireLicenseAcceptance>false</requireLicenseAcceptance>
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes> * [Core] Added support for MQTTv5 packages.
* [Core] Performance improvements (removed several exceptions).
* [Client] Added new MQTTv5 features to options builder.
* [Client] Added uniform API across all supported MQTT versions (BREAKING CHANGE!)
* [Client] The client will now avoid sending an ACK if an exception has been thrown in message handler (thanks to @ramonsmits).
* [Client] Fixed issues in QoS 2 handling which leads to message loss.
* [Server] Added support for MQTTv5 clients. The server will still return _success_ for all cases at the moment even if more granular codes are available.
* [Server] Fixed issues in QoS 2 handling which leads to message loss.

* [Note] Due to MQTTv5 a lot of new classes were introduced. This required adding new namespaces as well. Most classes are backward compatible but new namespaces must be added.
</releaseNotes>
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ MQTTnet is a high performance .NET library for MQTT based communication. It prov
* Uniform API across all supported versions of the MQTT protocol
* Interfaces included for mocking and testing
* Access to internal trace messages
* Unit tested (~120 tests)
* Unit tested (~130 tests)

\* Tested on local machine (Intel i7 8700K) with MQTTnet client and server running in the same process using the TCP channel. The app for verification is part of this repository and stored in _/Tests/MQTTnet.TestApp.NetCore_.

Expand Down
2 changes: 1 addition & 1 deletion Source/MQTTnet.AspnetCore/MqttConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, Cancellat
return null;
}

public async Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken)
{
var buffer = PacketFormatterAdapter.Encode(packet).AsMemory();
var output = Connection.Transport.Output;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public Task StopAsync()
return Task.FromResult(0);
}

public async Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage)
public async Task<MqttClientPublishResult> PublishAsync(MqttApplicationMessage applicationMessage, CancellationToken cancellationToken)
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client.Publishing;
using MQTTnet.Protocol;

namespace MQTTnet.Extensions.ManagedClient
Expand Down Expand Up @@ -35,5 +38,89 @@ public static Task UnsubscribeAsync(this IManagedMqttClient managedClient, param

return managedClient.UnsubscribeAsync(topicFilters);
}

public static async Task PublishAsync(this IManagedMqttClient client, IEnumerable<MqttApplicationMessage> applicationMessages)
{
if (client == null) throw new ArgumentNullException(nameof(client));
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));

foreach (var applicationMessage in applicationMessages)
{
await client.PublishAsync(applicationMessage).ConfigureAwait(false);
}
}

public static Task<MqttClientPublishResult> PublishAsync(this IManagedMqttClient client, MqttApplicationMessage applicationMessage)
{
if (client == null) throw new ArgumentNullException(nameof(client));
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

return client.PublishAsync(applicationMessage, CancellationToken.None);
}

public static async Task PublishAsync(this IManagedMqttClient client, params MqttApplicationMessage[] applicationMessages)
{
if (client == null) throw new ArgumentNullException(nameof(client));
if (applicationMessages == null) throw new ArgumentNullException(nameof(applicationMessages));

foreach (var applicationMessage in applicationMessages)
{
await client.PublishAsync(applicationMessage, CancellationToken.None).ConfigureAwait(false);
}
}

public static Task<MqttClientPublishResult> PublishAsync(this IManagedMqttClient client, string topic)
{
if (client == null) throw new ArgumentNullException(nameof(client));
if (topic == null) throw new ArgumentNullException(nameof(topic));

return client.PublishAsync(builder => builder
.WithTopic(topic));
}

public static Task<MqttClientPublishResult> PublishAsync(this IManagedMqttClient client, string topic, string payload)
{
if (client == null) throw new ArgumentNullException(nameof(client));
if (topic == null) throw new ArgumentNullException(nameof(topic));

return client.PublishAsync(builder => builder
.WithTopic(topic)
.WithPayload(payload));
}

public static Task<MqttClientPublishResult> PublishAsync(this IManagedMqttClient client, string topic, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel)
{
if (client == null) throw new ArgumentNullException(nameof(client));
if (topic == null) throw new ArgumentNullException(nameof(topic));

return client.PublishAsync(builder => builder
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qualityOfServiceLevel));
}

public static Task<MqttClientPublishResult> PublishAsync(this IManagedMqttClient client, string topic, string payload, MqttQualityOfServiceLevel qualityOfServiceLevel, bool retain)
{
if (client == null) throw new ArgumentNullException(nameof(client));
if (topic == null) throw new ArgumentNullException(nameof(topic));

return client.PublishAsync(builder => builder
.WithTopic(topic)
.WithPayload(payload)
.WithQualityOfServiceLevel(qualityOfServiceLevel)
.WithRetainFlag(retain));
}

public static Task<MqttClientPublishResult> PublishAsync(this IManagedMqttClient client, Func<MqttApplicationMessageBuilder, MqttApplicationMessageBuilder> builder, CancellationToken cancellationToken)
{
var message = builder(new MqttApplicationMessageBuilder()).Build();
return client.PublishAsync(message, cancellationToken);
}

public static Task<MqttClientPublishResult> PublishAsync(this IManagedMqttClient client, Func<MqttApplicationMessageBuilder, MqttApplicationMessageBuilder> builder)
{
var message = builder(new MqttApplicationMessageBuilder()).Build();
return client.PublishAsync(message, CancellationToken.None);
}
}
}
2 changes: 1 addition & 1 deletion Source/MQTTnet/Adapter/IMqttChannelAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface IMqttChannelAdapter : IDisposable

Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellationToken);

Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken);
Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken);

Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, CancellationToken cancellationToken);
}
Expand Down
48 changes: 32 additions & 16 deletions Source/MQTTnet/Adapter/MqttChannelAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,14 @@ public async Task ConnectAsync(TimeSpan timeout, CancellationToken cancellationT

try
{
_logger.Verbose("Connecting [Timeout={0}]", timeout);

await Internal.TaskExtensions
.TimeoutAfterAsync(ct => _channel.ConnectAsync(ct), timeout, cancellationToken)
.ConfigureAwait(false);
if (timeout == TimeSpan.Zero)
{
await _channel.ConnectAsync(cancellationToken).ConfigureAwait(false);
}
else
{
await MqttTaskTimeout.WaitAsync(t => _channel.ConnectAsync(t), timeout, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception exception)
{
Expand All @@ -76,11 +79,15 @@ public async Task DisconnectAsync(TimeSpan timeout, CancellationToken cancellati

try
{
_logger.Verbose("Disconnecting [Timeout={0}]", timeout);

await Internal.TaskExtensions
.TimeoutAfterAsync(ct => _channel.DisconnectAsync(), timeout, cancellationToken)
.ConfigureAwait(false);
if (timeout == TimeSpan.Zero)
{
await _channel.DisconnectAsync(cancellationToken).ConfigureAwait(false);
}
else
{
await MqttTaskTimeout.WaitAsync(
t => _channel.DisconnectAsync(t), timeout, cancellationToken).ConfigureAwait(false);
}
}
catch (Exception exception)
{
Expand All @@ -93,13 +100,23 @@ await Internal.TaskExtensions
}
}

public async Task SendPacketAsync(MqttBasePacket packet, CancellationToken cancellationToken)
public async Task SendPacketAsync(MqttBasePacket packet, TimeSpan timeout, CancellationToken cancellationToken)
{
await _writerSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
var packetData = PacketFormatterAdapter.Encode(packet);
await _channel.WriteAsync(packetData.Array, packetData.Offset, packetData.Count, cancellationToken).ConfigureAwait(false);

if (timeout == TimeSpan.Zero)
{
await _channel.WriteAsync(packetData.Array, packetData.Offset, packetData.Count, cancellationToken).ConfigureAwait(false);
}
else
{
await MqttTaskTimeout.WaitAsync(
t => _channel.WriteAsync(packetData.Array, packetData.Offset, packetData.Count, t), timeout, cancellationToken).ConfigureAwait(false);
}

PacketFormatterAdapter.FreeBuffer();

_logger.Verbose("TX ({0} bytes) >>> {1}", packetData.Count, packet);
Expand All @@ -126,14 +143,13 @@ public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, Cancellat
try
{
ReceivedMqttPacket receivedMqttPacket;

if (timeout > TimeSpan.Zero)
if (timeout == TimeSpan.Zero)
{
receivedMqttPacket = await Internal.TaskExtensions.TimeoutAfterAsync(ReceiveAsync, timeout, cancellationToken).ConfigureAwait(false);
receivedMqttPacket = await ReceiveAsync(cancellationToken).ConfigureAwait(false);
}
else
{
receivedMqttPacket = await ReceiveAsync(cancellationToken).ConfigureAwait(false);
receivedMqttPacket = await MqttTaskTimeout.WaitAsync(ReceiveAsync, timeout, cancellationToken).ConfigureAwait(false);
}

if (receivedMqttPacket == null || cancellationToken.IsCancellationRequested)
Expand Down
81 changes: 0 additions & 81 deletions Source/MQTTnet/ApplicationMessagePublisherExtensions.cs

This file was deleted.

2 changes: 1 addition & 1 deletion Source/MQTTnet/Channel/IMqttChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public interface IMqttChannel : IDisposable
string Endpoint { get; }

Task ConnectAsync(CancellationToken cancellationToken);
Task DisconnectAsync();
Task DisconnectAsync(CancellationToken cancellationToken);

Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken);
Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
namespace MQTTnet.Client.Connecting
{
public class MqttClientConnectResult
public class MqttClientAuthenticateResult
{
public bool IsSessionPresent { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ namespace MQTTnet.Client.Connecting
{
public class MqttClientConnectedEventArgs : EventArgs
{
public MqttClientConnectedEventArgs(MqttClientConnectResult connectResult)
public MqttClientConnectedEventArgs(MqttClientAuthenticateResult authenticateResult)
{
ConnectResult = connectResult ?? throw new ArgumentNullException(nameof(connectResult));
AuthenticateResult = authenticateResult ?? throw new ArgumentNullException(nameof(authenticateResult));
}

public MqttClientConnectResult ConnectResult { get; }
public MqttClientAuthenticateResult AuthenticateResult { get; }
}
}
9 changes: 5 additions & 4 deletions Source/MQTTnet/Client/IMqttClient.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
Expand All @@ -16,10 +17,10 @@ public interface IMqttClient : IApplicationMessageReceiver, IApplicationMessageP
event EventHandler<MqttClientConnectedEventArgs> Connected;
event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;

Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions options);
Task DisconnectAsync(MqttClientDisconnectOptions options);
Task<MqttClientAuthenticateResult> ConnectAsync(IMqttClientOptions options, CancellationToken cancellationToken);
Task DisconnectAsync(MqttClientDisconnectOptions options, CancellationToken cancellationToken);

Task<MqttClientSubscribeResult> SubscribeAsync(MqttClientSubscribeOptions options);
Task<MqttClientUnsubscribeResult> UnsubscribeAsync(MqttClientUnsubscribeOptions options);
Task<MqttClientSubscribeResult> SubscribeAsync(MqttClientSubscribeOptions options, CancellationToken cancellationToken);
Task<MqttClientUnsubscribeResult> UnsubscribeAsync(MqttClientUnsubscribeOptions options, CancellationToken cancellationToken);
}
}
Loading

0 comments on commit 6a2bded

Please sign in to comment.