diff --git a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs index eeb3d333c..d93a356d5 100644 --- a/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs +++ b/Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs @@ -28,6 +28,7 @@ public class MqttClient : IMqttClient private Task _packetReceiverTask; private Task _keepAliveMessageSenderTask; private IMqttChannelAdapter _adapter; + private bool _cleanDisconnectInitiated; public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger) { @@ -59,7 +60,7 @@ public async Task ConnectAsync(IMqttClientOptions optio _adapter = _adapterFactory.CreateClientAdapter(options, _logger); - _logger.Verbose("Trying to connect with server."); + _logger.Verbose($"Trying to connect with server ({_options.ChannelOptions})."); await _adapter.ConnectAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false); _logger.Verbose("Connection with server established."); @@ -94,6 +95,8 @@ public async Task DisconnectAsync() { try { + _cleanDisconnectInitiated = true; + if (IsConnected && !_cancellationTokenSource.IsCancellationRequested) { await SendAsync(new MqttDisconnectPacket(), _cancellationTokenSource.Token).ConfigureAwait(false); @@ -235,24 +238,7 @@ private void ThrowIfConnected(string message) private async Task DisconnectInternalAsync(Task sender, Exception exception) { - await _disconnectLock.WaitAsync().ConfigureAwait(false); - try - { - if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested) - { - return; - } - - _cancellationTokenSource.Cancel(false); - } - catch (Exception adapterException) - { - _logger.Warning(adapterException, "Error while disconnecting from adapter."); - } - finally - { - _disconnectLock.Release(); - } + await InitiateDisconnectAsync().ConfigureAwait(false); var clientWasConnected = IsConnected; IsConnected = false; @@ -279,12 +265,35 @@ private async Task DisconnectInternalAsync(Task sender, Exception exception) _adapter = null; _cancellationTokenSource?.Dispose(); _cancellationTokenSource = null; + _cleanDisconnectInitiated = false; _logger.Info("Disconnected."); Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception)); } } + private async Task InitiateDisconnectAsync() + { + await _disconnectLock.WaitAsync().ConfigureAwait(false); + try + { + if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested) + { + return; + } + + _cancellationTokenSource.Cancel(false); + } + catch (Exception adapterException) + { + _logger.Warning(adapterException, "Error while initiating disconnect."); + } + finally + { + _disconnectLock.Release(); + } + } + private Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken) { return SendAsync(new[] { packet }, cancellationToken); @@ -361,6 +370,11 @@ private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToke } catch (Exception exception) { + if (_cleanDisconnectInitiated) + { + return; + } + if (exception is OperationCanceledException) { } @@ -390,11 +404,19 @@ private async Task ReceivePacketsAsync(CancellationToken cancellationToken) while (!cancellationToken.IsCancellationRequested) { var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false); - await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false); + if (packet != null) + { + await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false); + } } } catch (Exception exception) { + if (_cleanDisconnectInitiated) + { + return; + } + if (exception is OperationCanceledException) { } @@ -407,8 +429,8 @@ private async Task ReceivePacketsAsync(CancellationToken cancellationToken) _logger.Error(exception, "Unhandled exception while receiving packets."); } - await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false); _packetDispatcher.Dispatch(exception); + await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false); } finally { @@ -458,7 +480,7 @@ private Task ProcessReceivedPublishPacketAsync(MqttPublishPacket publishPacket, if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce) { - // QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery] + // QoS 2 is implement as method "B" (4.3.3 QoS 2: Exactly once delivery) FireApplicationMessageReceivedEvent(publishPacket); return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, cancellationToken); }