Skip to content

Commit

Permalink
Add readonly property for client options.
Browse files Browse the repository at this point in the history
  • Loading branch information
chkr1011 committed Dec 3, 2018
1 parent 86c348d commit 619eaf2
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 27 deletions.
2 changes: 2 additions & 0 deletions Build/MQTTnet.nuspec
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
<description>MQTTnet is a high performance .NET library for MQTT based communication. It provides a MQTT client and a MQTT server (broker).</description>
<releaseNotes> * [Core] Updated nuget packages due to security issues.
* [Client] Fixed wrong behavior of publish method when client is disconnecting (thanks to @PaulFake).
* [Client] Added readonly property for accessing options.
* [ManagedClient] Added max pending messages count option.
* [ManagedClient] Add pending messages overflow strategy option.
* [ManagedClient] Fixed an issue which deletes the wrong message from the internal queue (thanks to @PaulFake).
* [ManagedClient] Added readonly property for accessing options.
* [Server] Added new method which exposes all retained messages.
* [Server] Removed (wrong) setter from the server options interface.
* [Server] Fixed cpu spike in case a client disconnects (issue 421).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ public interface IManagedMqttClient : IApplicationMessageReceiver, IApplicationM
bool IsStarted { get; }
bool IsConnected { get; }
int PendingApplicationMessagesCount { get; }
IManagedMqttClientOptions Options { get; }

event EventHandler<MqttClientConnectedEventArgs> Connected;
event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
Expand Down
22 changes: 11 additions & 11 deletions Source/MQTTnet.Extensions.ManagedClient/ManagedMqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ public class ManagedMqttClient : IManagedMqttClient
private CancellationTokenSource _publishingCancellationToken;

private ManagedMqttClientStorageManager _storageManager;
private IManagedMqttClientOptions _options;


private bool _subscriptionsNotPushed;

public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
Expand All @@ -47,6 +46,7 @@ public ManagedMqttClient(IMqttClient mqttClient, IMqttNetChildLogger logger)
public bool IsConnected => _mqttClient.IsConnected;
public bool IsStarted => _connectionCancellationToken != null;
public int PendingApplicationMessagesCount => _messageQueue.Count;
public IManagedMqttClientOptions Options { get; private set; }

public event EventHandler<MqttClientConnectedEventArgs> Connected;
public event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
Expand All @@ -70,11 +70,11 @@ public async Task StartAsync(IManagedMqttClientOptions options)

if (_connectionCancellationToken != null) throw new InvalidOperationException("The managed client is already started.");

_options = options;
Options = options;

if (_options.Storage != null)
if (Options.Storage != null)
{
_storageManager = new ManagedMqttClientStorageManager(_options.Storage);
_storageManager = new ManagedMqttClientStorageManager(Options.Storage);
var messages = await _storageManager.LoadQueuedMessagesAsync().ConfigureAwait(false);

foreach (var message in messages)
Expand Down Expand Up @@ -116,16 +116,16 @@ public async Task PublishAsync(ManagedMqttApplicationMessage applicationMessage)
ManagedMqttApplicationMessage removedMessage = null;
lock (_messageQueue)
{
if (_messageQueue.Count >= _options.MaxPendingMessages)
if (_messageQueue.Count >= Options.MaxPendingMessages)
{
if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropNewMessage)
{
_logger.Verbose("Skipping publish of new application message because internal queue is full.");
ApplicationMessageSkipped?.Invoke(this, new ApplicationMessageSkippedEventArgs(applicationMessage));
return;
}

if (_options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
if (Options.PendingMessagesOverflowStrategy == MqttPendingMessagesOverflowStrategy.DropOldestQueuedMessage)
{
removedMessage = _messageQueue.RemoveFirst();
_logger.Verbose("Removed oldest application message from internal queue because it is full.");
Expand Down Expand Up @@ -219,7 +219,7 @@ private async Task TryMaintainConnectionAsync(CancellationToken cancellationToke
if (connectionState == ReconnectionResult.NotConnected)
{
StopPublishing();
await Task.Delay(_options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
await Task.Delay(Options.AutoReconnectDelay, cancellationToken).ConfigureAwait(false);
return;
}

Expand All @@ -232,7 +232,7 @@ private async Task TryMaintainConnectionAsync(CancellationToken cancellationToke

if (connectionState == ReconnectionResult.StillConnected)
{
await Task.Delay(_options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
await Task.Delay(Options.ConnectionCheckInterval, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
Expand Down Expand Up @@ -388,7 +388,7 @@ private async Task<ReconnectionResult> ReconnectIfRequiredAsync()

try
{
await _mqttClient.ConnectAsync(_options.ClientOptions).ConfigureAwait(false);
await _mqttClient.ConnectAsync(Options.ClientOptions).ConfigureAwait(false);
return ReconnectionResult.Reconnected;
}
catch (Exception exception)
Expand Down
1 change: 1 addition & 0 deletions Source/MQTTnet/Client/IMqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ namespace MQTTnet.Client
public interface IMqttClient : IApplicationMessageReceiver, IApplicationMessagePublisher, IDisposable
{
bool IsConnected { get; }
IMqttClientOptions Options { get; }

event EventHandler<MqttClientConnectedEventArgs> Connected;
event EventHandler<MqttClientDisconnectedEventArgs> Disconnected;
Expand Down
33 changes: 17 additions & 16 deletions Source/MQTTnet/Client/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public class MqttClient : IMqttClient
private readonly IMqttClientAdapterFactory _adapterFactory;
private readonly IMqttNetChildLogger _logger;

private IMqttClientOptions _options;
private CancellationTokenSource _cancellationTokenSource;
internal Task _packetReceiverTask;
internal Task _keepAliveMessageSenderTask;
Expand All @@ -44,6 +43,7 @@ public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logge
public event EventHandler<MqttApplicationMessageReceivedEventArgs> ApplicationMessageReceived;

public bool IsConnected { get; private set; }
public IMqttClientOptions Options { get; private set; }

public async Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions options)
{
Expand All @@ -54,16 +54,17 @@ public async Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions optio

try
{
_options = options;
Options = options;

_packetIdentifierProvider.Reset();
_packetDispatcher.Reset();

_cancellationTokenSource = new CancellationTokenSource();
_disconnectGate = 0;
_adapter = _adapterFactory.CreateClientAdapter(options, _logger);

_logger.Verbose($"Trying to connect with server ({_options.ChannelOptions}).");
await _adapter.ConnectAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false);
_logger.Verbose($"Trying to connect with server ({Options.ChannelOptions}).");
await _adapter.ConnectAsync(Options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false);
_logger.Verbose("Connection with server established.");

StartReceivingPackets(_cancellationTokenSource.Token);
Expand All @@ -73,7 +74,7 @@ public async Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions optio

_sendTracker.Restart();

if (_options.KeepAlivePeriod != TimeSpan.Zero)
if (Options.KeepAlivePeriod != TimeSpan.Zero)
{
StartSendingKeepAliveMessages(_cancellationTokenSource.Token);
}
Expand Down Expand Up @@ -197,11 +198,11 @@ private async Task<MqttConnAckPacket> AuthenticateAsync(MqttApplicationMessage w
{
var connectPacket = new MqttConnectPacket
{
ClientId = _options.ClientId,
Username = _options.Credentials?.Username,
Password = _options.Credentials?.Password,
CleanSession = _options.CleanSession,
KeepAlivePeriod = (ushort)_options.KeepAlivePeriod.TotalSeconds,
ClientId = Options.ClientId,
Username = Options.Credentials?.Username,
Password = Options.Credentials?.Password,
CleanSession = Options.CleanSession,
KeepAlivePeriod = (ushort)Options.KeepAlivePeriod.TotalSeconds,
WillMessage = willApplicationMessage
};

Expand Down Expand Up @@ -236,7 +237,7 @@ private async Task DisconnectInternalAsync(Task sender, Exception exception)
{
if (_adapter != null)
{
await _adapter.DisconnectAsync(_options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
await _adapter.DisconnectAsync(Options.CommunicationTimeout, CancellationToken.None).ConfigureAwait(false);
}

await WaitForTaskAsync(_packetReceiverTask, sender).ConfigureAwait(false);
Expand Down Expand Up @@ -303,7 +304,7 @@ private async Task<TResponsePacket> SendAndReceiveAsync<TResponsePacket>(MqttBas
try
{
await _adapter.SendPacketAsync(requestPacket, cancellationToken).ConfigureAwait(false);
var respone = await Internal.TaskExtensions.TimeoutAfterAsync(ct => packetAwaiter.Task, _options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);
var respone = await Internal.TaskExtensions.TimeoutAfterAsync(ct => packetAwaiter.Task, Options.CommunicationTimeout, cancellationToken).ConfigureAwait(false);

return (TResponsePacket)respone;
}
Expand All @@ -326,10 +327,10 @@ private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToke

while (!cancellationToken.IsCancellationRequested)
{
var keepAliveSendInterval = TimeSpan.FromSeconds(_options.KeepAlivePeriod.TotalSeconds * 0.75);
if (_options.KeepAliveSendInterval.HasValue)
var keepAliveSendInterval = TimeSpan.FromSeconds(Options.KeepAlivePeriod.TotalSeconds * 0.75);
if (Options.KeepAliveSendInterval.HasValue)
{
keepAliveSendInterval = _options.KeepAliveSendInterval.Value;
keepAliveSendInterval = Options.KeepAliveSendInterval.Value;
}

var waitTime = keepAliveSendInterval - _sendTracker.Elapsed;
Expand Down Expand Up @@ -517,7 +518,7 @@ private void FireApplicationMessageReceivedEvent(MqttPublishPacket publishPacket
try
{
var applicationMessage = publishPacket.ToApplicationMessage();
ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(_options.ClientId, applicationMessage));
ApplicationMessageReceived?.Invoke(this, new MqttApplicationMessageReceivedEventArgs(Options.ClientId, applicationMessage));
}
catch (Exception exception)
{
Expand Down

0 comments on commit 619eaf2

Please sign in to comment.