Skip to content

Commit

Permalink
Add max pending messages options for ManagedClient.
Browse files Browse the repository at this point in the history
  • Loading branch information
chkr1011 committed Oct 29, 2018
1 parent ca7952a commit 28efbcd
Show file tree
Hide file tree
Showing 14 changed files with 308 additions and 14 deletions.
2 changes: 2 additions & 0 deletions Build/MQTTnet.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes>* [Server] Added new method which exposes all retained messages.
* [Server] Removed (wrong) setter from the server options interface.
* [ManagedClient] Added max pending messages count option.
* [ManagedClient] Add pending messages overflow strategy option.
</releaseNotes>
<copyright>Copyright Christian Kratky 2016-2018</copyright>
<tags>MQTT Message Queue Telemetry Transport MQTTClient MQTTServer Server MQTTBroker Broker NETStandard IoT InternetOfThings Messaging Hardware Arduino Sensor Actuator M2M ESP Smart Home Cities Automation Xamarin</tags>
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ MQTTnet is a high performance .NET library for MQTT based communication. It prov
* Performance optimized (processing ~70.000 messages / second)*
* Interfaces included for mocking and testing
* Access to internal trace messages
* Unit tested (~90 tests)
* Unit tested (~100 tests)

\* Tested on local machine (Intel i7 8700K) with MQTTnet client and server running in the same process using the TCP channel. The app for verification is part of this repository and stored in _/Tests/MQTTnet.TestApp.NetCore_.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;

namespace MQTTnet.Extensions.ManagedClient
{
public class ApplicationMessageSkippedEventArgs : EventArgs
{
public ApplicationMessageSkippedEventArgs(ManagedMqttApplicationMessage applicationMessage)
{
ApplicationMessage = applicationMessage ?? throw new ArgumentNullException(nameof(applicationMessage));
}

public ManagedMqttApplicationMessage ApplicationMessage { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public interface IManagedMqttClient : IApplicationMessageReceiver, IApplicationM
event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;

event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed;
event EventHandler<ApplicationMessageSkippedEventArgs> ApplicationMessageSkipped;

event EventHandler<MqttManagedProcessFailedEventArgs> ConnectingFailed;
event EventHandler<MqttManagedProcessFailedEventArgs> SynchronizingSubscriptionsFailed;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using MQTTnet.Client;
using MQTTnet.Server;

namespace MQTTnet.Extensions.ManagedClient
{
Expand All @@ -12,5 +13,9 @@ public interface IManagedMqttClientOptions
TimeSpan ConnectionCheckInterval { get; }

IManagedMqttClientStorage Storage { get; }

int MaxPendingMessages { get; }

MqttPendingMessagesOverflowStrategy PendingMessagesOverflowStrategy { get; }
}
}
50 changes: 38 additions & 12 deletions Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using MQTTnet.Client;
using MQTTnet.Diagnostics;
using MQTTnet.Exceptions;
using MQTTnet.Internal;
using MQTTnet.Protocol;
using MQTTnet.Server;
using MqttClientConnectedEventArgs = MQTTnet.Client.MqttClientConnectedEventArgs;
using MqttClientDisconnectedEventArgs = MQTTnet.Client.MqttClientDisconnectedEventArgs;

namespace MQTTnet.Extensions.ManagedClient
{
public class ManagedMqttClient : IManagedMqttClient
{
private readonly BlockingCollection<ManagedMqttApplicationMessage> _messageQueue = new BlockingCollection<ManagedMqttApplicationMessage>();
private readonly BlockingQueue<ManagedMqttApplicationMessage> _messageQueue = new BlockingQueue<ManagedMqttApplicationMessage>();
private readonly Dictionary<string, MqttQualityOfServiceLevel> _subscriptions = new Dictionary<string, MqttQualityOfServiceLevel>();
private readonly HashSet<string> _unsubscriptions = new HashSet<string>();

Expand Down Expand Up @@ -50,6 +53,7 @@ public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)

public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;
public event EventHandler<ApplicationMessageProcessedEventArgs> ApplicationMessageProcessed;
public event EventHandler<ApplicationMessageSkippedEventArgs> ApplicationMessageSkipped;

public event EventHandler<MqttManagedProcessFailedEventArgs> ConnectingFailed;
public event EventHandler<MqttManagedProcessFailedEventArgs> SynchronizingSubscriptionsFailed;
Expand All @@ -75,7 +79,7 @@ public async Task StartAsync(IManagedMqttClientOptions options)

foreach (var message in messages)
{
_messageQueue.Add(message);
_messageQueue.Enqueue(message);
}
}

Expand All @@ -93,10 +97,7 @@ public Task StopAsync()
StopPublishing();
StopMaintainingConnection();

while (_messageQueue.Any())
{
_messageQueue.Take();
}
_messageQueue.Clear();

return Task.FromResult(0);
}
Expand All @@ -112,12 +113,38 @@ public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)
{
if (applicationMessage == null) throw new ArgumentNullException(nameof(applicationMessage));

ManagedMqttApplicationMessage skippedMessage = null;
lock (_messageQueue)
{
if (_messageQueue.Count > _options.MaxPendingMessages)
{
if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
{
_logger.Verbose("Skipping publish of new application message because internal queue is full.");
ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(applicationMessage));
return;
}

if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
{
skippedMessage = _messageQueue.RemoveFirst();
_logger.Verbose("Removed oldest application message from internal queue because it is full.");
ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(skippedMessage));
}
}

_messageQueue.Enqueue(applicationMessage);
}

if (_storageManager != null)
{
if (skippedMessage != null)
{
await _storageManager.RemoveAsync(skippedMessage).ConfigureAwait(false);
}

await _storageManager.AddAsync(applicationMessage).ConfigureAwait(false);
}

_messageQueue.Add(applicationMessage);
}

public Task SubscribeAsync(IEnumerable<TopicFilter> topicFilters)
Expand Down Expand Up @@ -157,7 +184,6 @@ public Task UnsubscribeAsync(IEnumerable<string> topics)

public void Dispose()
{
_messageQueue?.Dispose();
_connectionCancellationToken?.Dispose();
_publishingCancellationToken?.Dispose();
}
Expand Down Expand Up @@ -228,7 +254,7 @@ private void PublishQueuedMessages(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested && _mqttClient.IsConnected)
{
var message = _messageQueue.Take(cancellationToken);
var message = _messageQueue.Dequeue();
if (message == null)
{
continue;
Expand Down Expand Up @@ -268,7 +294,7 @@ private void TryPublishQueuedMessage(ManagedMqttApplicationMessage message)

if (message.ApplicationMessage.QualityOfServiceLevel > MqttQualityOfServiceLevel.AtMostOnce)
{
_messageQueue.Add(message);
_messageQueue.Enqueue(message);
}
}
catch (Exception exception)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using MQTTnet.Client;
using MQTTnet.Server;

namespace MQTTnet.Extensions.ManagedClient
{
Expand All @@ -12,5 +13,9 @@ public class ManagedMqttClientOptions : IManagedMqttClientOptions
public TimeSpan ConnectionCheckInterval { get; set; } = TimeSpan.FromSeconds(1);

public IManagedMqttClientStorage Storage { get; set; }

public int MaxPendingMessages { get; set; } = int.MaxValue;

public MqttPendingMessagesOverflowStrategy PendingMessagesOverflowStrategy { get; set; } = MqttPendingMessagesOverflowStrategy.DropNewMessage;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using MQTTnet.Client;
using MQTTnet.Server;

namespace MQTTnet.Extensions.ManagedClient
{
Expand All @@ -8,6 +9,18 @@ public class ManagedMqttClientOptionsBuilder
private readonly ManagedMqttClientOptions _options = new ManagedMqttClientOptions();
private MqttClientOptionsBuilder _clientOptionsBuilder;

public ManagedMqttClientOptionsBuilder WithMaxPendingMessages(int value)
{
_options.MaxPendingMessages = value;
return this;
}

public ManagedMqttClientOptionsBuilder WithPendingMessagesOverflowStrategy(MqttPendingMessagesOverflowStrategy value)
{
_options.PendingMessagesOverflowStrategy = value;
return this;
}

public ManagedMqttClientOptionsBuilder WithAutoReconnectDelay(TimeSpan value)
{
_options.AutoReconnectDelay = value;
Expand Down
77 changes: 77 additions & 0 deletions Source/MQTTnet/Internal/BlockingQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
using System;
using System.Collections.Generic;
using System.Threading;

namespace MQTTnet.Internal
{
public class BlockingQueue<TItem>
{
private readonly object _syncRoot = new object();
private readonly LinkedList<TItem> _items = new LinkedList<TItem>();
private readonly ManualResetEvent _gate = new ManualResetEvent(false);

public int Count
{
get
{
lock (_syncRoot)
{
return _items.Count;
}
}
}

public void Enqueue(TItem item)
{
if (item == null) throw new ArgumentNullException(nameof(item));

lock (_syncRoot)
{
_items.AddLast(item);
_gate.Set();
}
}

public TItem Dequeue()
{
while (true)
{
lock (_syncRoot)
{
if (_items.Count > 0)
{
var item = _items.First.Value;
_items.RemoveFirst();

return item;
}

if (_items.Count == 0)
{
_gate.Reset();
}
}

_gate.WaitOne();
}
}

public TItem RemoveFirst()
{
lock (_syncRoot)
{
var item = _items.First;
_items.RemoveFirst();
return item.Value;
}
}

public void Clear()
{
lock (_syncRoot)
{
_items.Clear();
}
}
}
}
Loading

0 comments on commit 28efbcd

Please sign in to comment.