Skip to content

Commit

Permalink
Refactored payload creation
Browse files Browse the repository at this point in the history
There are 2 ways to create a payload:
1. We are sending a new message, and need to create a blob if the payload size is bigger than allowed.
2. We are updating already existing message, and it can have a Blob associated with it. In case updated message contents fits into a QueueMessage - the we delete the blob, if not update the blob contents.
  • Loading branch information
yaroslav-tykhonchuk committed Dec 20, 2023
1 parent 6288a9e commit 2082e46
Showing 1 changed file with 25 additions and 12 deletions.
37 changes: 25 additions & 12 deletions AzureBatchQueue/MessageQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,36 @@ public async Task Send(T item, TimeSpan? visibilityTimeout = null, CancellationT
await queue.SendMessageAsync(new BinaryData(payload.Data), visibilityTimeout ?? TimeSpan.Zero, null, ct);
}

async Task<Payload> Payload(T item, string? blobName = null, CancellationToken ct = default)
async Task<Payload> Payload(T item, CancellationToken ct = default)
{
var payload = serializer.Serialize(item);

if (payload.Length <= MaxMessageSize)
return new Payload(payload, false);
return new Payload(payload);

var blobRef = BlobRef.Create();
await container.UploadBlobAsync(blobRef.BlobName, new BinaryData(payload), ct);
payload = JsonSerializer.SerializeToUtf8Bytes(blobRef);
return new Payload(payload, blobRef.BlobName);
}

async Task<Payload> UpdatedPayload(QueueMessage<T> queueMessage, CancellationToken ct = default)
{
var payload = serializer.Serialize(queueMessage.Item);

if (payload.Length <= MaxMessageSize)
return new Payload(payload);

// Updated payload is still not small enough for a QueueMessage, will update the blob contents
var blobName = queueMessage.MessageId.BlobName;
if (blobName == null)
{
var blobRef = BlobRef.Create();
await container.UploadBlobAsync(blobRef.BlobName, new BinaryData(payload), ct);
payload = JsonSerializer.SerializeToUtf8Bytes(blobRef);
return new Payload(payload, true);
throw new Exception("Error when serializing updated QueueMessage payload. Payload size cannot increase.");
}

await container.GetBlobClient(blobName).UploadAsync(new BinaryData(payload), true, ct);
payload = JsonSerializer.SerializeToUtf8Bytes(BlobRef.Get(blobName));
return new Payload(payload, true);
return new Payload(payload, blobName);
}

public async Task DeleteMessage(MessageId id, CancellationToken ct = default)
Expand All @@ -74,10 +86,11 @@ public async Task DeleteMessage(MessageId id, CancellationToken ct = default)

public async Task UpdateMessage(QueueMessage<T> msg, TimeSpan? visibilityTimeout = null, CancellationToken ct = default)
{
var payload = await Payload(msg.Item, msg.MessageId.BlobName, ct);
var payload = await UpdatedPayload(msg, ct);

await queue.UpdateMessageAsync(msg.MessageId.Id, msg.MessageId.PopReceipt, new BinaryData(payload.Data), visibilityTimeout ?? TimeSpan.Zero, ct);
if (msg.MessageId.BlobName != null && !payload.UsingBlob)

if (msg.MessageId.BlobName != null && payload.BlobName == null)
await container.DeleteBlobAsync(msg.MessageId.BlobName, cancellationToken: ct);
}

Expand Down Expand Up @@ -148,12 +161,12 @@ public async Task QuarantineMessage(QueueMessage<T> msg, CancellationToken ct =
{
try
{
var payload = await Payload(msg.Item, msg.MessageId.BlobName, ct);
var payload = await UpdatedPayload(msg, ct);

await quarantineQueue.SendMessageAsync(new BinaryData(payload.Data), cancellationToken: ct);
await queue.DeleteMessageAsync(msg.MessageId.Id, msg.MessageId.PopReceipt, ct);

if (msg.MessageId.BlobName != null && !payload.UsingBlob)
if (msg.MessageId.BlobName != null && payload.BlobName == null)
await container.DeleteBlobAsync(msg.MessageId.BlobName, cancellationToken: ct);
}
catch (Exception ex)
Expand Down Expand Up @@ -223,4 +236,4 @@ public MessageQueue<T> WithLogger(ILogger queueLogger)
public record QueueMessage<T>(T Item, MessageId MessageId, QueueMessageMetadata Metadata, long DequeueCount = 0);
public record QueueMessageMetadata(DateTimeOffset VisibilityTime, DateTimeOffset InsertedOn);
public record MessageId(string Id, string PopReceipt, string? BlobName);
public record Payload(byte[] Data, bool UsingBlob);
public record Payload(byte[] Data, string? BlobName = null);

0 comments on commit 2082e46

Please sign in to comment.