Skip to content

Commit

Permalink
Refactor subscribe and unsubscribe to support MQTTv5.
Browse files Browse the repository at this point in the history
  • Loading branch information
chkr1011 committed Jan 21, 2019
1 parent 0d6e244 commit ebf0e91
Show file tree
Hide file tree
Showing 39 changed files with 1,184 additions and 477 deletions.
1 change: 1 addition & 0 deletions Build/MQTTnet.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
<releaseNotes> * [Core] Added support for MQTTv5 packages.
* [Client] Added new MQTTv5 features to options builder.
* [Client] Added uniform API across all supported MQTT versions (BREAKING CHANGE!)
* [Client] The client will now avoid sending an ACK if an exception has been thrown in message handler (thanks to @ramonsmits).
* [Server] Added support for MQTTv5 clients. The server will still return _success_ for all cases at the moment even if more granular codes are available.

* [Note] Due to MQTTv5 a lot of new classes were introduced. This required adding new namespaces as well. Most classes are backward compatible but new namespaces must be added.
Expand Down
38 changes: 19 additions & 19 deletions MQTTnet.noUWP.sln
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.27004.2010
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Core.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Core.Tests.csproj", "{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{9248C2E1-B9D6-40BF-81EC-86004D7765B4}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Source", "Source", "{32A630A7-2598-41D7-B625-204CD906F5FB}"
Expand Down Expand Up @@ -45,6 +43,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Extensions.ManagedC
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.AspNetCore.Tests", "Tests\MQTTnet.AspNetCore.Tests\MQTTnet.AspNetCore.Tests.csproj", "{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MQTTnet.Tests", "Tests\MQTTnet.Core.Tests\MQTTnet.Tests.csproj", "{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -57,22 +57,6 @@ Global
Release|x86 = Release|x86
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.ActiveCfg = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|ARM.Build.0 = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.ActiveCfg = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x64.Build.0 = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.ActiveCfg = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Debug|x86.Build.0 = Debug|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|Any CPU.Build.0 = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.ActiveCfg = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|ARM.Build.0 = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.ActiveCfg = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x64.Build.0 = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.ActiveCfg = Release|Any CPU
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC}.Release|x86.Build.0 = Release|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3587E506-55A2-4EB3-99C7-DC01E42D25D2}.Debug|ARM.ActiveCfg = Debug|Any CPU
Expand Down Expand Up @@ -201,12 +185,27 @@ Global
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|x64.Build.0 = Release|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|x86.ActiveCfg = Release|Any CPU
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E}.Release|x86.Build.0 = Release|Any CPU
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}.Debug|ARM.ActiveCfg = Debug|Any CPU
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}.Debug|ARM.Build.0 = Debug|Any CPU
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}.Debug|x64.ActiveCfg = Debug|Any CPU
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}.Debug|x64.Build.0 = Debug|Any CPU
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}.Debug|x86.ActiveCfg = Debug|Any CPU
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}.Debug|x86.Build.0 = Debug|Any CPU
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}.Release|Any CPU.Build.0 = Release|Any CPU
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}.Release|ARM.ActiveCfg = Release|Any CPU
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}.Release|ARM.Build.0 = Release|Any CPU
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}.Release|x64.ActiveCfg = Release|Any CPU
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}.Release|x64.Build.0 = Release|Any CPU
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}.Release|x86.ActiveCfg = Release|Any CPU
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{A7FF0C91-25DE-4BA6-B39E-F54E8DADF1CC} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{3587E506-55A2-4EB3-99C7-DC01E42D25D2} = {32A630A7-2598-41D7-B625-204CD906F5FB}
{3D283AAD-AAA8-4339-8394-52F80B6304DB} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{C6FF8AEA-0855-41EC-A1F3-AC262225BAB9} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
Expand All @@ -215,6 +214,7 @@ Global
{998D04DD-7CB0-45F5-A393-E2495C16399E} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{C400533A-8EBA-4F0B-BF4D-295C3708604B} = {12816BCC-AF9E-44A9-9AE5-C246AF2A0587}
{61B62223-F5D0-48E4-BBD6-2CBA9353CB5E} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
{9C7106CA-96B8-4ABE-B3B4-9357AB8ACB41} = {9248C2E1-B9D6-40BF-81EC-86004D7765B4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {07536672-5CBC-4BE3-ACE0-708A431A7894}
Expand Down
11 changes: 9 additions & 2 deletions Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using MQTTnet.Client;
using MQTTnet.Client.Publishing;
using MQTTnet.Client.Receiving;
using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Internal;
Expand Down Expand Up @@ -52,6 +53,12 @@ public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
public event EventHandler<MqttClientConnectedEventArgs> Connected;
public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;

public IMqttApplicationMessageHandler ReceivedApplicationMessageHandler
{
get => _mqttClient.ReceivedApplicationMessageHandler;
set => _mqttClient.ReceivedApplicationMessageHandler = value;
}

public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
public event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed;
public event EventHandler<ApplicationMessageSkippedEventArgs> ApplicationMessageSkipped;
Expand Down Expand Up @@ -372,12 +379,12 @@ private async Task SynchronizeSubscriptionsAsync()
{
if (unsubscriptions.Any())
{
await _mqttClient.UnsubscribeAsync(unsubscriptions).ConfigureAwait(false);
await _mqttClient.UnsubscribeAsync(unsubscriptions.ToArray()).ConfigureAwait(false);
}

if (subscriptions.Any())
{
await _mqttClient.SubscribeAsync(subscriptions).ConfigureAwait(false);
await _mqttClient.SubscribeAsync(subscriptions.ToArray()).ConfigureAwait(false);
}
}
catch (Exception exception)
Expand Down
19 changes: 16 additions & 3 deletions Source/MQTTnet/Adapter/MqttChannelAdapter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,18 @@ public async Task<MqttBasePacket> ReceivePacketAsync(TimeSpan timeout, Cancellat

private async Task<ReceivedMqttPacket> ReceiveAsync(CancellationToken cancellationToken)
{
var fixedHeader = await _packetReader.ReadFixedHeaderAsync(_fixedHeaderBuffer, cancellationToken).ConfigureAwait(false);
var readFixedHeaderResult = await _packetReader.ReadFixedHeaderAsync(_fixedHeaderBuffer, cancellationToken).ConfigureAwait(false);

try
{
if (readFixedHeaderResult.ConnectionClosed)
{
return null;
}

ReadingPacketStarted?.Invoke(this, EventArgs.Empty);

var fixedHeader = readFixedHeaderResult.FixedHeader;
if (fixedHeader.RemainingLength == 0)
{
return new ReceivedMqttPacket(fixedHeader.Flags, null, 2);
Expand All @@ -205,9 +211,16 @@ private async Task<ReceivedMqttPacket> ReceiveAsync(CancellationToken cancellati
var readBytes = _channel.ReadAsync(body, bodyOffset, chunkSize, cancellationToken).ConfigureAwait(false).GetAwaiter().GetResult();
#endif

cancellationToken.ThrowIfCancellationRequested();
ExceptionHelper.ThrowIfGracefulSocketClose(readBytes);
if (cancellationToken.IsCancellationRequested)
{
return null;
}

if (readBytes == 0)
{
return null;
}

bodyOffset += readBytes;
} while (bodyOffset < body.Length);

Expand Down
5 changes: 2 additions & 3 deletions Source/MQTTnet/Client/IMqttClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
Expand All @@ -20,7 +19,7 @@ public interface IMqttClient : IApplicationMessageReceiver, IApplicationMessageP
Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions options);
Task DisconnectAsync(MqttClientDisconnectOptions options);

Task<MqttClientSubscribeResult> SubscribeAsync(IEnumerable<TopicFilter> topicFilters);
Task<MqttClientUnsubscribeResult> UnsubscribeAsync(IEnumerable<string> topics);
Task<MqttClientSubscribeResult> SubscribeAsync(MqttClientSubscribeOptions options);
Task<MqttClientUnsubscribeResult> UnsubscribeAsync(MqttClientUnsubscribeOptions options);
}
}
Loading

0 comments on commit ebf0e91

Please sign in to comment.