Skip to content

Commit

Permalink
Fix wrong exceptions during a clean disconnect.
Browse files Browse the repository at this point in the history
  • Loading branch information
chkr1011 committed Jun 6, 2018
1 parent c8bf360 commit ae72bc3
Showing 1 changed file with 44 additions and 22 deletions.
66 changes: 44 additions & 22 deletions Frameworks/MQTTnet.NetStandard/Client/MqttClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class MqttClient : IMqttClient
private Task _packetReceiverTask;
private Task _keepAliveMessageSenderTask;
private IMqttChannelAdapter _adapter;
private bool _cleanDisconnectInitiated;

public MqttClient(IMqttClientAdapterFactory channelFactory, IMqttNetLogger logger)
{
Expand Down Expand Up @@ -59,7 +60,7 @@ public async Task<MqttClientConnectResult> ConnectAsync(IMqttClientOptions optio

_adapter = _adapterFactory.CreateClientAdapter(options, _logger);

_logger.Verbose("Trying to connect with server.");
_logger.Verbose($"Trying to connect with server ({_options.ChannelOptions}).");
await _adapter.ConnectAsync(_options.CommunicationTimeout, _cancellationTokenSource.Token).ConfigureAwait(false);
_logger.Verbose("Connection with server established.");

Expand Down Expand Up @@ -94,6 +95,8 @@ public async Task DisconnectAsync()
{
try
{
_cleanDisconnectInitiated = true;

if (IsConnected && !_cancellationTokenSource.IsCancellationRequested)
{
await SendAsync(new MqttDisconnectPacket(), _cancellationTokenSource.Token).ConfigureAwait(false);
Expand Down Expand Up @@ -235,24 +238,7 @@ private void ThrowIfConnected(string message)

private async Task DisconnectInternalAsync(Task sender, Exception exception)
{
await _disconnectLock.WaitAsync().ConfigureAwait(false);
try
{
if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested)
{
return;
}

_cancellationTokenSource.Cancel(false);
}
catch (Exception adapterException)
{
_logger.Warning(adapterException, "Error while disconnecting from adapter.");
}
finally
{
_disconnectLock.Release();
}
await InitiateDisconnectAsync().ConfigureAwait(false);

var clientWasConnected = IsConnected;
IsConnected = false;
Expand All @@ -279,12 +265,35 @@ private async Task DisconnectInternalAsync(Task sender, Exception exception)
_adapter = null;
_cancellationTokenSource?.Dispose();
_cancellationTokenSource = null;
_cleanDisconnectInitiated = false;

_logger.Info("Disconnected.");
Disconnected?.Invoke(this, new MqttClientDisconnectedEventArgs(clientWasConnected, exception));
}
}

private async Task InitiateDisconnectAsync()
{
await _disconnectLock.WaitAsync().ConfigureAwait(false);
try
{
if (_cancellationTokenSource == null || _cancellationTokenSource.IsCancellationRequested)
{
return;
}

_cancellationTokenSource.Cancel(false);
}
catch (Exception adapterException)
{
_logger.Warning(adapterException, "Error while initiating disconnect.");
}
finally
{
_disconnectLock.Release();
}
}

private Task SendAsync(MqttBasePacket packet, CancellationToken cancellationToken)
{
return SendAsync(new[] { packet }, cancellationToken);
Expand Down Expand Up @@ -361,6 +370,11 @@ private async Task SendKeepAliveMessagesAsync(CancellationToken cancellationToke
}
catch (Exception exception)
{
if (_cleanDisconnectInitiated)
{
return;
}

if (exception is OperationCanceledException)
{
}
Expand Down Expand Up @@ -390,11 +404,19 @@ private async Task ReceivePacketsAsync(CancellationToken cancellationToken)
while (!cancellationToken.IsCancellationRequested)
{
var packet = await _adapter.ReceivePacketAsync(TimeSpan.Zero, cancellationToken).ConfigureAwait(false);
await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false);
if (packet != null)
{
await ProcessReceivedPacketAsync(packet, cancellationToken).ConfigureAwait(false);
}
}
}
catch (Exception exception)
{
if (_cleanDisconnectInitiated)
{
return;
}

if (exception is OperationCanceledException)
{
}
Expand All @@ -407,8 +429,8 @@ private async Task ReceivePacketsAsync(CancellationToken cancellationToken)
_logger.Error(exception, "Unhandled exception while receiving packets.");
}

await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false);
_packetDispatcher.Dispatch(exception);
await DisconnectInternalAsync(_packetReceiverTask, exception).ConfigureAwait(false);
}
finally
{
Expand Down Expand Up @@ -458,7 +480,7 @@ private Task ProcessReceivedPublishPacketAsync(MqttPublishPacket publishPacket,

if (publishPacket.QualityOfServiceLevel == MqttQualityOfServiceLevel.ExactlyOnce)
{
// QoS 2 is implement as method "B" [4.3.3 QoS 2: Exactly once delivery]
// QoS 2 is implement as method "B" (4.3.3 QoS 2: Exactly once delivery)
FireApplicationMessageReceivedEvent(publishPacket);
return SendAsync(new MqttPubRecPacket { PacketIdentifier = publishPacket.PacketIdentifier }, cancellationToken);
}
Expand Down

0 comments on commit ae72bc3

Please sign in to comment.