diff --git a/AzureBatchQueue.Tests/MessageQueueTests.cs b/AzureBatchQueue.Tests/MessageQueueTests.cs index f1287fa..3cb3576 100644 --- a/AzureBatchQueue.Tests/MessageQueueTests.cs +++ b/AzureBatchQueue.Tests/MessageQueueTests.cs @@ -153,6 +153,18 @@ public async Task When_quarantine_large_message() msgFromQuarantine.MessageId.BlobName.Should().NotBeEmpty(); } + [Test] + public async Task When_exception_on_deserialize() + { + var jsonQueue = await Queue(serializer: JsonSerializer.New()); + var message = new string("test"); + await jsonQueue.Queue.Send(message); + + using var gZipQueue = await Queue(serializer: GZipJsonSerializer.New()); + + Assert.DoesNotThrowAsync(async () => await gZipQueue.Queue.Receive()); + } + static async Task> Queue(int maxDequeueCount = 5, IMessageQueueSerializer? serializer = null) { var queue = new QueueTest(maxDequeueCount, serializer); diff --git a/AzureBatchQueue/MessageQueue.cs b/AzureBatchQueue/MessageQueue.cs index fb00e34..48b0331 100644 --- a/AzureBatchQueue/MessageQueue.cs +++ b/AzureBatchQueue/MessageQueue.cs @@ -95,7 +95,7 @@ public async Task[]> Receive(int maxMessages = MaxMessagesReceiv var response = Task.WhenAll(validMessages.Select(x => ToQueueMessage(x, ct))); await Task.WhenAll(quarantine, response); - return response.Result; + return response.Result.Where(x => x != null).ToArray()!; } public async Task[]> ReceiveFromQuarantine(int maxMessages = MaxMessagesReceive, TimeSpan? visibilityTimeout = null, @@ -106,7 +106,9 @@ public async Task[]> ReceiveFromQuarantine(int maxMessages = Max if (!r.HasValue) return Array.Empty>(); - return await Task.WhenAll(r.Value.Select(x => ToQueueMessage(x, ct))); + var queueMessages = await Task.WhenAll(r.Value.Select(x => ToQueueMessage(x, ct, fromQuarantine: true))); + + return queueMessages.Where(x => x != null).ToArray()!; } async Task QuarantineMessage(QueueMessage queueMessage, CancellationToken ct = default) @@ -151,7 +153,7 @@ public async Task QuarantineData(T item, CancellationToken ct) await quarantineQueue.SendMessageAsync(new BinaryData(payload.Data), cancellationToken: ct); } - async Task> ToQueueMessage(QueueMessage m, CancellationToken ct) + async Task?> ToQueueMessage(QueueMessage m, CancellationToken ct, bool fromQuarantine = false) { var payload = m.Body.ToMemory(); if (IsBlobRef(m.Body, out var blobRef)) @@ -168,7 +170,21 @@ async Task> ToQueueMessage(QueueMessage m, CancellationToken ct) } } - var item = serializer.Deserialize(payload); + T? item; + try + { + item = serializer.Deserialize(payload); + } + catch (Exception ex) + { + logger.LogError(ex, "Exception when deserializing a message {MessageId}", m.MessageId); + + if (!fromQuarantine) + await QuarantineMessage(m, ct); + + return null; + } + var metadata = new QueueMessageMetadata(m.NextVisibleOn!.Value, m.InsertedOn!.Value, m.DequeueCount); var messageId = new MessageId(m.MessageId, m.PopReceipt, blobRef?.BlobName); var msg = new QueueMessage(item, messageId, metadata);