diff --git a/AzureBatchQueue/MessageQueue.cs b/AzureBatchQueue/MessageQueue.cs index 4b67f39..a8a1e79 100644 --- a/AzureBatchQueue/MessageQueue.cs +++ b/AzureBatchQueue/MessageQueue.cs @@ -19,6 +19,7 @@ public class MessageQueue readonly QueueClient queue; readonly QueueClient quarantineQueue; + readonly QueueClient brokenQueue; readonly BlobContainerClient container; public MessageQueue(string connectionString, string queueName, @@ -29,6 +30,7 @@ public MessageQueue(string connectionString, string queueName, this.maxDequeueCount = maxDequeueCount; queue = new QueueClient(connectionString, queueName, new QueueClientOptions { MessageEncoding = QueueMessageEncoding.Base64 }); quarantineQueue = new QueueClient(connectionString, $"{queueName}-quarantine", new QueueClientOptions { MessageEncoding = QueueMessageEncoding.Base64 }); + brokenQueue = new QueueClient(connectionString, $"{queueName}-broken", new QueueClientOptions { MessageEncoding = QueueMessageEncoding.Base64 }); container = new BlobContainerClient(connectionString, $"overflow-{queueName}"); this.serializer = serializer ?? new JsonSerializer(); @@ -156,7 +158,30 @@ public async Task Dequarantine(CancellationToken ct = default) foreach (var msg in response.Value) { - await queue.SendMessageAsync(msg.Body, TimeSpan.Zero, cancellationToken: ct); + var blobSuccess = false; + + if (IsBlobRef(msg.Body, out var blobRef)) + { + try + { + await container.GetBlobClient(blobRef!.BlobName).DownloadContentAsync(ct); + blobSuccess = true; + } + catch (Exception ex) + { + logger.LogError(ex, "Exception when loading blob {BlobName} for {MessageId}", blobRef!.BlobName, msg.MessageId); + } + } + + if (blobSuccess) + { + await queue.SendMessageAsync(msg.Body, TimeSpan.Zero, cancellationToken: ct); + } + else + { + await brokenQueue.SendMessageAsync(msg.Body, TimeSpan.Zero, cancellationToken: ct); + } + await quarantineQueue.DeleteMessageAsync(msg.MessageId, msg.PopReceipt, ct); } } while (!ct.IsCancellationRequested); @@ -191,14 +216,11 @@ async Task> ToQueueMessage(QueueMessage m, CancellationToken ct) var blobData = await container.GetBlobClient(blobRef!.BlobName).DownloadContentAsync(ct); payload = blobData.Value.Content.ToMemory(); } - catch (RequestFailedException ex) when (ex.Status == 404) - { - logger.LogError(ex, "Blob with name {BlobName} is not found for {MessageId}", blobRef!.BlobName, m.MessageId); - throw; - } catch (Exception ex) { logger.LogError(ex, "Exception when loading blob {BlobName} for {MessageId}", blobRef!.BlobName, m.MessageId); + + await brokenQueue.SendMessageAsync(m.Body, cancellationToken: ct); throw; } } @@ -231,11 +253,15 @@ record BlobRef([property: JsonPropertyName("__MSQ_QUEUE_BLOBNAME__")] string Blo static string FileName() => $"{DateTime.UtcNow:yyyy-MM-dd}/{DateTime.UtcNow:s}{Guid.NewGuid():N}.json.gzip"; } - public async Task Init() => await Task.WhenAll(queue.CreateIfNotExistsAsync(), quarantineQueue.CreateIfNotExistsAsync(), container.CreateIfNotExistsAsync()); + public async Task Init() => await Task.WhenAll( + queue.CreateIfNotExistsAsync(), + quarantineQueue.CreateIfNotExistsAsync(), + brokenQueue.CreateIfNotExistsAsync(), + container.CreateIfNotExistsAsync()); public async Task Delete(bool deleteBlobs = false) { - var tasks = new List { queue.DeleteAsync(), quarantineQueue.DeleteAsync() }; + var tasks = new List { queue.DeleteAsync(), quarantineQueue.DeleteAsync(), brokenQueue.DeleteAsync() }; if (deleteBlobs) tasks.Add(container.DeleteAsync());