Skip to content

Commit

Permalink
Copy stream when content fits into a queue message
Browse files Browse the repository at this point in the history
To reduce places where magic could happen
  • Loading branch information
vansha committed Mar 20, 2024
1 parent 27dfed5 commit 023516c
Showing 1 changed file with 24 additions and 18 deletions.
42 changes: 24 additions & 18 deletions AzureBatchQueue/MessageQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using Azure.Storage.Queues.Models;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using JsonSerializer = System.Text.Json.JsonSerializer;

namespace AzureBatchQueue;

Expand Down Expand Up @@ -39,11 +38,11 @@ public MessageQueue(string connectionString, string queueName,
public async Task Send(T item, TimeSpan? visibilityTimeout = null, bool doubleCheckSerialization = false, CancellationToken ct = default)
{
var (data, offloaded) = await SerializeAndOffloadIfBig(item, ct: ct);
await queue.SendMessageAsync(new BinaryData(data), visibilityTimeout ?? TimeSpan.Zero, null, ct);
await queue.SendMessageAsync(data, visibilityTimeout ?? TimeSpan.Zero, null, ct);

// Don't ask. Checking our sanity
if (!offloaded && doubleCheckSerialization)
serializer.Deserialize(data.Span);
serializer.Deserialize(data);
}

public async Task DeleteMessage(MessageId id, CancellationToken ct = default)
Expand All @@ -62,7 +61,7 @@ public async Task UpdateMessage(MessageId id, T? item, TimeSpan visibilityTimeou
}

var (data, offloaded) = await SerializeAndOffloadIfBig(item, id.BlobName, ct);
await queue.UpdateMessageAsync(id.Id, id.PopReceipt, new BinaryData(data), visibilityTimeout, ct);
await queue.UpdateMessageAsync(id.Id, id.PopReceipt, data, visibilityTimeout, ct);
if (!offloaded && id.BlobName != null)
await container.DeleteBlobAsync(id.BlobName, cancellationToken: ct);
}
Expand All @@ -72,14 +71,14 @@ async Task<Payload> SerializeAndOffloadIfBig(T item, string? blobName = null, Ca
using var stream = MemoryStreamManager.RecyclableMemory.GetStream("QueueMessage");
serializer.Serialize(stream, item);

stream.Position = 0;

if (stream.Length <= MaxMessageSize)
return new Payload(stream.GetBuffer().AsMemory(0, (int)stream.Length), false);
return Payload.FitsInMessage(stream);

var blobRef = blobName != null ? BlobRef.Get(blobName) : BlobRef.Create();
stream.Position = 0;
await container.GetBlobClient(blobRef.BlobName).UploadAsync(stream, overwrite: true, ct);

return new Payload(JsonSerializer.SerializeToUtf8Bytes(blobRef), true);
return Payload.OffloadedToBlob(blobRef);
}

public async Task<QueueMessage<T>[]> Receive(int maxMessages = MaxMessagesReceive, TimeSpan? visibilityTimeout = null,
Expand Down Expand Up @@ -151,8 +150,7 @@ public async Task Dequarantine(CancellationToken ct = default)
public async Task QuarantineData(T item, CancellationToken ct)
{
var payload = await SerializeAndOffloadIfBig(item, ct: ct);

await quarantineQueue.SendMessageAsync(new BinaryData(payload.Data), cancellationToken: ct);
await quarantineQueue.SendMessageAsync(payload.Data, cancellationToken: ct);
}

async Task<QueueMessage<T>?> ToQueueMessage(QueueMessage m, CancellationToken ct, bool fromQuarantine = false)
Expand Down Expand Up @@ -207,13 +205,6 @@ static bool IsBlobRef(BinaryData data, out BlobRef? o)
}
}

record BlobRef([property: JsonPropertyName("__MSQ_QUEUE_BLOBNAME__")] string BlobName)
{
public static BlobRef Get(string blobName) => new(blobName);
public static BlobRef Create() => new(FileName());
static string FileName() => $"{DateTime.UtcNow:yyyy-MM-dd}/{DateTime.UtcNow:s}{Guid.NewGuid():N}.json.gzip";
}

public async Task Init() => await Task.WhenAll(queue.CreateIfNotExistsAsync(), quarantineQueue.CreateIfNotExistsAsync(), container.CreateIfNotExistsAsync());

public async Task Delete(bool deleteBlobs = false)
Expand All @@ -233,9 +224,24 @@ public MessageQueue<T> WithLogger(ILogger queueLogger)
logger = queueLogger;
return this;
}

record BlobRef([property: JsonPropertyName("__MSQ_QUEUE_BLOBNAME__")] string BlobName)
{
public static BlobRef Get(string blobName) => new(blobName);
public static BlobRef Create() => new(FileName());
static string FileName() => $"{DateTime.UtcNow:yyyy-MM-dd}/{DateTime.UtcNow:s}{Guid.NewGuid():N}.json.gzip";
}

record Payload(BinaryData Data, bool Offloaded)
{
public static Payload FitsInMessage(MemoryStream stream) =>
new(BinaryData.FromStream(stream), false);

public static Payload OffloadedToBlob(BlobRef blobRef) =>
new(BinaryData.FromObjectAsJson(blobRef), true);
}
}

public record QueueMessage<T>(T Item, MessageId MessageId, QueueMessageMetadata Metadata);
public record QueueMessageMetadata(DateTimeOffset VisibilityTime, DateTimeOffset InsertedOn, long DequeueCount = 0);
public record MessageId(string Id, string PopReceipt, string? BlobName);
public record Payload(ReadOnlyMemory<byte> Data, bool Offloaded);

0 comments on commit 023516c

Please sign in to comment.