From 023516c98fe314d994e90cdfe57b000c5867ae64 Mon Sep 17 00:00:00 2001 From: Ivan Korneliuk Date: Wed, 20 Mar 2024 17:44:49 +0200 Subject: [PATCH] Copy stream when content fits into a queue message To reduce places where magic could happen --- AzureBatchQueue/MessageQueue.cs | 42 +++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/AzureBatchQueue/MessageQueue.cs b/AzureBatchQueue/MessageQueue.cs index 9c47b31..d0879a8 100644 --- a/AzureBatchQueue/MessageQueue.cs +++ b/AzureBatchQueue/MessageQueue.cs @@ -4,7 +4,6 @@ using Azure.Storage.Queues.Models; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; -using JsonSerializer = System.Text.Json.JsonSerializer; namespace AzureBatchQueue; @@ -39,11 +38,11 @@ public MessageQueue(string connectionString, string queueName, public async Task Send(T item, TimeSpan? visibilityTimeout = null, bool doubleCheckSerialization = false, CancellationToken ct = default) { var (data, offloaded) = await SerializeAndOffloadIfBig(item, ct: ct); - await queue.SendMessageAsync(new BinaryData(data), visibilityTimeout ?? TimeSpan.Zero, null, ct); + await queue.SendMessageAsync(data, visibilityTimeout ?? TimeSpan.Zero, null, ct); // Don't ask. Checking our sanity if (!offloaded && doubleCheckSerialization) - serializer.Deserialize(data.Span); + serializer.Deserialize(data); } public async Task DeleteMessage(MessageId id, CancellationToken ct = default) @@ -62,7 +61,7 @@ public async Task UpdateMessage(MessageId id, T? item, TimeSpan visibilityTimeou } var (data, offloaded) = await SerializeAndOffloadIfBig(item, id.BlobName, ct); - await queue.UpdateMessageAsync(id.Id, id.PopReceipt, new BinaryData(data), visibilityTimeout, ct); + await queue.UpdateMessageAsync(id.Id, id.PopReceipt, data, visibilityTimeout, ct); if (!offloaded && id.BlobName != null) await container.DeleteBlobAsync(id.BlobName, cancellationToken: ct); } @@ -72,14 +71,14 @@ async Task SerializeAndOffloadIfBig(T item, string? blobName = null, Ca using var stream = MemoryStreamManager.RecyclableMemory.GetStream("QueueMessage"); serializer.Serialize(stream, item); + stream.Position = 0; + if (stream.Length <= MaxMessageSize) - return new Payload(stream.GetBuffer().AsMemory(0, (int)stream.Length), false); + return Payload.FitsInMessage(stream); var blobRef = blobName != null ? BlobRef.Get(blobName) : BlobRef.Create(); - stream.Position = 0; await container.GetBlobClient(blobRef.BlobName).UploadAsync(stream, overwrite: true, ct); - - return new Payload(JsonSerializer.SerializeToUtf8Bytes(blobRef), true); + return Payload.OffloadedToBlob(blobRef); } public async Task[]> Receive(int maxMessages = MaxMessagesReceive, TimeSpan? visibilityTimeout = null, @@ -151,8 +150,7 @@ public async Task Dequarantine(CancellationToken ct = default) public async Task QuarantineData(T item, CancellationToken ct) { var payload = await SerializeAndOffloadIfBig(item, ct: ct); - - await quarantineQueue.SendMessageAsync(new BinaryData(payload.Data), cancellationToken: ct); + await quarantineQueue.SendMessageAsync(payload.Data, cancellationToken: ct); } async Task?> ToQueueMessage(QueueMessage m, CancellationToken ct, bool fromQuarantine = false) @@ -207,13 +205,6 @@ static bool IsBlobRef(BinaryData data, out BlobRef? o) } } - record BlobRef([property: JsonPropertyName("__MSQ_QUEUE_BLOBNAME__")] string BlobName) - { - public static BlobRef Get(string blobName) => new(blobName); - public static BlobRef Create() => new(FileName()); - static string FileName() => $"{DateTime.UtcNow:yyyy-MM-dd}/{DateTime.UtcNow:s}{Guid.NewGuid():N}.json.gzip"; - } - public async Task Init() => await Task.WhenAll(queue.CreateIfNotExistsAsync(), quarantineQueue.CreateIfNotExistsAsync(), container.CreateIfNotExistsAsync()); public async Task Delete(bool deleteBlobs = false) @@ -233,9 +224,24 @@ public MessageQueue WithLogger(ILogger queueLogger) logger = queueLogger; return this; } + + record BlobRef([property: JsonPropertyName("__MSQ_QUEUE_BLOBNAME__")] string BlobName) + { + public static BlobRef Get(string blobName) => new(blobName); + public static BlobRef Create() => new(FileName()); + static string FileName() => $"{DateTime.UtcNow:yyyy-MM-dd}/{DateTime.UtcNow:s}{Guid.NewGuid():N}.json.gzip"; + } + + record Payload(BinaryData Data, bool Offloaded) + { + public static Payload FitsInMessage(MemoryStream stream) => + new(BinaryData.FromStream(stream), false); + + public static Payload OffloadedToBlob(BlobRef blobRef) => + new(BinaryData.FromObjectAsJson(blobRef), true); + } } public record QueueMessage(T Item, MessageId MessageId, QueueMessageMetadata Metadata); public record QueueMessageMetadata(DateTimeOffset VisibilityTime, DateTimeOffset InsertedOn, long DequeueCount = 0); public record MessageId(string Id, string PopReceipt, string? BlobName); -public record Payload(ReadOnlyMemory Data, bool Offloaded);