Skip to content

Commit

Permalink
Put broken messages into a separate queue
Browse files Browse the repository at this point in the history
  • Loading branch information
yaroslav-tykhonchuk committed Jan 18, 2024
1 parent f7adb37 commit fc89a5a
Showing 1 changed file with 34 additions and 8 deletions.
42 changes: 34 additions & 8 deletions AzureBatchQueue/MessageQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class MessageQueue<T>

readonly QueueClient queue;
readonly QueueClient quarantineQueue;
readonly QueueClient brokenQueue;
readonly BlobContainerClient container;

public MessageQueue(string connectionString, string queueName,
Expand All @@ -29,6 +30,7 @@ public MessageQueue(string connectionString, string queueName,
this.maxDequeueCount = maxDequeueCount;
queue = new QueueClient(connectionString, queueName, new QueueClientOptions { MessageEncoding = QueueMessageEncoding.Base64 });
quarantineQueue = new QueueClient(connectionString, $"{queueName}-quarantine", new QueueClientOptions { MessageEncoding = QueueMessageEncoding.Base64 });
brokenQueue = new QueueClient(connectionString, $"{queueName}-broken", new QueueClientOptions { MessageEncoding = QueueMessageEncoding.Base64 });
container = new BlobContainerClient(connectionString, $"overflow-{queueName}");

this.serializer = serializer ?? new JsonSerializer<T>();
Expand Down Expand Up @@ -156,7 +158,30 @@ public async Task Dequarantine(CancellationToken ct = default)

foreach (var msg in response.Value)
{
await queue.SendMessageAsync(msg.Body, TimeSpan.Zero, cancellationToken: ct);
var blobSuccess = false;

if (IsBlobRef(msg.Body, out var blobRef))
{
try
{
await container.GetBlobClient(blobRef!.BlobName).DownloadContentAsync(ct);
blobSuccess = true;
}
catch (Exception ex)
{
logger.LogError(ex, "Exception when loading blob {BlobName} for {MessageId}", blobRef!.BlobName, msg.MessageId);
}
}

if (blobSuccess)
{
await queue.SendMessageAsync(msg.Body, TimeSpan.Zero, cancellationToken: ct);
}
else
{
await brokenQueue.SendMessageAsync(msg.Body, TimeSpan.Zero, cancellationToken: ct);
}

await quarantineQueue.DeleteMessageAsync(msg.MessageId, msg.PopReceipt, ct);
}
} while (!ct.IsCancellationRequested);
Expand Down Expand Up @@ -191,14 +216,11 @@ async Task<QueueMessage<T>> ToQueueMessage(QueueMessage m, CancellationToken ct)
var blobData = await container.GetBlobClient(blobRef!.BlobName).DownloadContentAsync(ct);
payload = blobData.Value.Content.ToMemory();
}
catch (RequestFailedException ex) when (ex.Status == 404)
{
logger.LogError(ex, "Blob with name {BlobName} is not found for {MessageId}", blobRef!.BlobName, m.MessageId);
throw;
}
catch (Exception ex)
{
logger.LogError(ex, "Exception when loading blob {BlobName} for {MessageId}", blobRef!.BlobName, m.MessageId);

await brokenQueue.SendMessageAsync(m.Body, cancellationToken: ct);
throw;
}
}
Expand Down Expand Up @@ -231,11 +253,15 @@ record BlobRef([property: JsonPropertyName("__MSQ_QUEUE_BLOBNAME__")] string Blo
static string FileName() => $"{DateTime.UtcNow:yyyy-MM-dd}/{DateTime.UtcNow:s}{Guid.NewGuid():N}.json.gzip";
}

public async Task Init() => await Task.WhenAll(queue.CreateIfNotExistsAsync(), quarantineQueue.CreateIfNotExistsAsync(), container.CreateIfNotExistsAsync());
public async Task Init() => await Task.WhenAll(
queue.CreateIfNotExistsAsync(),
quarantineQueue.CreateIfNotExistsAsync(),
brokenQueue.CreateIfNotExistsAsync(),
container.CreateIfNotExistsAsync());

public async Task Delete(bool deleteBlobs = false)
{
var tasks = new List<Task> { queue.DeleteAsync(), quarantineQueue.DeleteAsync() };
var tasks = new List<Task> { queue.DeleteAsync(), quarantineQueue.DeleteAsync(), brokenQueue.DeleteAsync() };

if (deleteBlobs)
tasks.Add(container.DeleteAsync());
Expand Down

0 comments on commit fc89a5a

Please sign in to comment.