Skip to content

Commit

Permalink
Do not fail when error on deserialize
Browse files Browse the repository at this point in the history
Log and quarantine message instead
  • Loading branch information
yaroslav-tykhonchuk committed Feb 26, 2024
1 parent 62df336 commit 8c67447
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 4 deletions.
12 changes: 12 additions & 0 deletions AzureBatchQueue.Tests/MessageQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,18 @@ public async Task When_quarantine_large_message()
msgFromQuarantine.MessageId.BlobName.Should().NotBeEmpty();
}

[Test]
public async Task When_exception_on_deserialize()
{
var jsonQueue = await Queue(serializer: JsonSerializer<string>.New());
var message = new string("test");
await jsonQueue.Queue.Send(message);

using var gZipQueue = await Queue(serializer: GZipJsonSerializer<string>.New());

Assert.DoesNotThrowAsync(async () => await gZipQueue.Queue.Receive());
}

static async Task<QueueTest<T>> Queue<T>(int maxDequeueCount = 5, IMessageQueueSerializer<T>? serializer = null)
{
var queue = new QueueTest<T>(maxDequeueCount, serializer);
Expand Down
24 changes: 20 additions & 4 deletions AzureBatchQueue/MessageQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public async Task<QueueMessage<T>[]> Receive(int maxMessages = MaxMessagesReceiv
var response = Task.WhenAll(validMessages.Select(x => ToQueueMessage(x, ct)));

await Task.WhenAll(quarantine, response);
return response.Result;
return response.Result.Where(x => x != null).ToArray()!;
}

public async Task<QueueMessage<T>[]> ReceiveFromQuarantine(int maxMessages = MaxMessagesReceive, TimeSpan? visibilityTimeout = null,
Expand All @@ -106,7 +106,9 @@ public async Task<QueueMessage<T>[]> ReceiveFromQuarantine(int maxMessages = Max
if (!r.HasValue)
return Array.Empty<QueueMessage<T>>();

return await Task.WhenAll(r.Value.Select(x => ToQueueMessage(x, ct)));
var queueMessages = await Task.WhenAll(r.Value.Select(x => ToQueueMessage(x, ct, fromQuarantine: true)));

return queueMessages.Where(x => x != null).ToArray()!;
}

async Task QuarantineMessage(QueueMessage queueMessage, CancellationToken ct = default)
Expand Down Expand Up @@ -151,7 +153,7 @@ public async Task QuarantineData(T item, CancellationToken ct)
await quarantineQueue.SendMessageAsync(new BinaryData(payload.Data), cancellationToken: ct);
}

async Task<QueueMessage<T>> ToQueueMessage(QueueMessage m, CancellationToken ct)
async Task<QueueMessage<T>?> ToQueueMessage(QueueMessage m, CancellationToken ct, bool fromQuarantine = false)
{
var payload = m.Body.ToMemory();
if (IsBlobRef(m.Body, out var blobRef))
Expand All @@ -168,7 +170,21 @@ async Task<QueueMessage<T>> ToQueueMessage(QueueMessage m, CancellationToken ct)
}
}

var item = serializer.Deserialize(payload);
T? item;
try
{
item = serializer.Deserialize(payload);
}
catch (Exception ex)
{
logger.LogError(ex, "Exception when deserializing a message {MessageId}", m.MessageId);

if (!fromQuarantine)
await QuarantineMessage(m, ct);

return null;
}

var metadata = new QueueMessageMetadata(m.NextVisibleOn!.Value, m.InsertedOn!.Value, m.DequeueCount);
var messageId = new MessageId(m.MessageId, m.PopReceipt, blobRef?.BlobName);
var msg = new QueueMessage<T>(item, messageId, metadata);

Check warning on line 190 in AzureBatchQueue/MessageQueue.cs

View workflow job for this annotation

GitHub Actions / Build

Possible null reference argument for parameter 'Item' in 'QueueMessage<T>.QueueMessage(T Item, MessageId MessageId, QueueMessageMetadata Metadata)'.

Check warning on line 190 in AzureBatchQueue/MessageQueue.cs

View workflow job for this annotation

GitHub Actions / Build

Possible null reference argument for parameter 'Item' in 'QueueMessage<T>.QueueMessage(T Item, MessageId MessageId, QueueMessageMetadata Metadata)'.

Check warning on line 190 in AzureBatchQueue/MessageQueue.cs

View workflow job for this annotation

GitHub Actions / Build

Possible null reference argument for parameter 'Item' in 'QueueMessage<T>.QueueMessage(T Item, MessageId MessageId, QueueMessageMetadata Metadata)'.

Check warning on line 190 in AzureBatchQueue/MessageQueue.cs

View workflow job for this annotation

GitHub Actions / Build

Possible null reference argument for parameter 'Item' in 'QueueMessage<T>.QueueMessage(T Item, MessageId MessageId, QueueMessageMetadata Metadata)'.
Expand Down

0 comments on commit 8c67447

Please sign in to comment.