From 4fe22bc415d382638c3d153e38666bc1caffa32b Mon Sep 17 00:00:00 2001 From: yaroslavtykhonchuk Date: Thu, 28 Mar 2024 10:20:11 +0200 Subject: [PATCH 1/3] Do not wait for timer when items Fail --- AzureBatchQueue.Tests/BatchQueueTests.cs | 78 ++++++++++++++++++++++-- AzureBatchQueue/BatchItem.cs | 1 + AzureBatchQueue/TimerBatch.cs | 74 +++++++++++++++++----- 3 files changed, 133 insertions(+), 20 deletions(-) diff --git a/AzureBatchQueue.Tests/BatchQueueTests.cs b/AzureBatchQueue.Tests/BatchQueueTests.cs index f61c12a..489fff0 100644 --- a/AzureBatchQueue.Tests/BatchQueueTests.cs +++ b/AzureBatchQueue.Tests/BatchQueueTests.cs @@ -70,6 +70,72 @@ public async Task When_batch_flushes_after_all_complete() .BatchCompletedResult.Should().Be(BatchCompletedResult.FullyProcessed); } + [Test] + public async Task When_batch_updates_after_all_fail() + { + using var queueTest = await Queue(); + + var messageBatch = new[] { "orange", "banana", "apple", "pear", "strawberry" }; + await queueTest.BatchQueue.Send(messageBatch); + + var longVisibilityTimeout = TimeSpan.FromSeconds(10); + var firstReceive = await queueTest.BatchQueue.Receive(visibilityTimeout: longVisibilityTimeout); + firstReceive.Select(x => x.Item).Should().BeEquivalentTo(messageBatch); + + foreach (var item in firstReceive) + item.Fail(); + + // wait for message to flush + await Task.Delay(TimeSpan.FromMilliseconds(10)); + + // try to complete batch item again, but catch an exception that everything was already processed + Assert.Throws(() => firstReceive.First().Complete())! + .BatchCompletedResult.Should().Be(BatchCompletedResult.PartialFailure); + + // wait for message to be visible + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + // message was updated with failed items + var secondReceive = await queueTest.BatchQueue.Receive(visibilityTimeout: longVisibilityTimeout); + secondReceive.Select(x => x.Item).Should().BeEquivalentTo(messageBatch); + } + + [Test] + public async Task When_batch_flushes_after_all_complete_or_fail() + { + using var queueTest = await Queue(); + + var messageBatch = new[] { "orange", "banana", "apple", "pear", "strawberry" }; + await queueTest.BatchQueue.Send(messageBatch); + + var longVisibilityTimeout = TimeSpan.FromSeconds(10); + var response = await queueTest.BatchQueue.Receive(visibilityTimeout: longVisibilityTimeout); + response.Select(x => x.Item).Should().BeEquivalentTo(messageBatch); + + var failedItems = new[] { "orange", "apple" }; + foreach (var item in response) + { + if (failedItems.Contains(item.Item)) + item.Fail(); + else + item.Complete(); + } + + // wait for message to flush + await Task.Delay(TimeSpan.FromMilliseconds(10)); + + // try to complete batch item again, but catch an exception that everything was already processed + Assert.Throws(() => response.First().Complete())! + .BatchCompletedResult.Should().Be(BatchCompletedResult.PartialFailure); + + // wait for message to be visible + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + // message was updated with failed items + var secondReceive = await queueTest.BatchQueue.Receive(visibilityTimeout: longVisibilityTimeout); + secondReceive.Select(x => x.Item).Should().BeEquivalentTo(failedItems); + } + [Test] public async Task When_many_threads_complete_in_parallel() { @@ -119,20 +185,24 @@ public async Task When_quarantine_only_not_completed_items_from_batch() var response = await queueTest.BatchQueue.Receive(visibilityTimeout: visibilityTimeout); response.Select(x => x.Item).Should().BeEquivalentTo(messageBatch); - const string failedItem = "orange"; + const string notCompletedItem = "orange"; + const string failedItem = "pear"; foreach (var item in response) { - if (item.Item == failedItem) + if (item.Item == notCompletedItem) continue; - item.Complete(); + if (item.Item == failedItem) + item.Fail(); + else + item.Complete(); } // wait for message to be quarantined await Task.Delay(visibilityTimeout); var responseFromQuarantine = await queueTest.BatchQueue.GetItemsFromQuarantine(); - responseFromQuarantine.Should().BeEquivalentTo(failedItem); + responseFromQuarantine.Should().BeEquivalentTo(notCompletedItem, failedItem); } [Test] diff --git a/AzureBatchQueue/BatchItem.cs b/AzureBatchQueue/BatchItem.cs index 89af140..b60dabc 100644 --- a/AzureBatchQueue/BatchItem.cs +++ b/AzureBatchQueue/BatchItem.cs @@ -17,6 +17,7 @@ internal BatchItem(BatchItemId id, TimerBatch batch, T item) public BatchItemMetadata Metadata { get; } public void Complete() => Batch.Complete(Id); + public void Fail() => Batch.Fail(Id); } public record BatchItemId(string BatchId, int Idx) diff --git a/AzureBatchQueue/TimerBatch.cs b/AzureBatchQueue/TimerBatch.cs index d00bb54..874eb90 100644 --- a/AzureBatchQueue/TimerBatch.cs +++ b/AzureBatchQueue/TimerBatch.cs @@ -59,7 +59,7 @@ async Task Flush() catch (Azure.RequestFailedException ex) when (ex.ErrorCode == "QueueNotFound") { logger.LogWarning(ex, "Queue {queueName} was not found when flushing {messageId} with {itemsCount} items left.", - batchQueue.Name, msg.MessageId, items.RemainingCount()); + batchQueue.Name, msg.MessageId, items.NotProcessedCount()); } catch (Azure.RequestFailedException ex) when (ex.ErrorCode == "MessageNotFound") { @@ -83,7 +83,7 @@ async Task DoFlush() return; } - completedResult = BatchCompletedResult.TriggeredByFlush; + completedResult = items.FailedCount() > 0 ? BatchCompletedResult.PartialFailure : BatchCompletedResult.TriggeredByFlush; if (msg.Metadata.DequeueCount >= maxDequeueCount) await Quarantine(); @@ -94,11 +94,15 @@ async Task Update() { var remaining = Remaining(); await batchQueue.UpdateMessage(msg.MessageId, remaining); - logger.LogWarning("Message {msgId} was not fully processed within a timeout ({FlushPeriod}) sec in queue {QueueName}. {remainingCount} items left not completed from {totalCount} total", + + logger.LogWarning("Message {MsgId} was not fully processed within a timeout ({FlushPeriod}) sec in queue {QueueName}." + + " {RemainingCount} items were not completed ({NotProcessed} not processed on time and {FailedCount} failed) from {TotalCount} total", msg.MessageId, FlushPeriod.TotalSeconds, batchQueue.Name, remaining.Length, + NotProcessedCount(), + FailedCount(), items.Items().Length); } @@ -114,29 +118,52 @@ async Task Quarantine() var remaining = Remaining(); await batchQueue.QuarantineData(msg.MessageId, remaining); - logger.LogInformation("Message {msgId} was quarantined after {dequeueCount} unsuccessful attempts in queue {QueueName}." + - " With {remainingCount} unprocessed from {totalCount} total", + logger.LogInformation("Message {MsgId} was quarantined after {DequeueCount} unsuccessful attempts in queue {QueueName}." + + " {RemainingCount} items were not completed ({NotProcessed} not processed on time and {FailedCount} failed) from {TotalCount} total", msg.MessageId, msg.Metadata.DequeueCount, batchQueue.Name, - remaining, + remaining.Length, + NotProcessedCount(), + FailedCount(), items.Items().Length); } } } - T[] Remaining() => items.NotEmptyItems().Select(x => x.Item).ToArray(); + T[] Remaining() => items.Remaining().Select(x => x.Item).ToArray(); + int NotProcessedCount() => items.NotProcessedCount(); + int FailedCount() => items.FailedCount(); public void Complete(BatchItemId itemId) + { + ThrowIfCompleted(itemId); + + var remaining = items.Complete(itemId); + + FlushIfEmpty(remaining); + } + + public void Fail(BatchItemId itemId) + { + ThrowIfCompleted(itemId); + + var remaining = items.Fail(itemId); + + FlushIfEmpty(remaining); + } + + void ThrowIfCompleted(BatchItemId itemId) { if (completedResult != null) { throw new BatchCompletedException("Failed to complete item on an already finalized batch.", new BatchItemMetadata(itemId.ToString(), msg.MessageId, msg.Metadata.VisibilityTime, FlushPeriod, msg.Metadata.InsertedOn), completedResult.Value); } + } - var remaining = items.Remove(itemId); - + void FlushIfEmpty(int remaining) + { if (remaining > 0 || flushTriggered) return; @@ -181,31 +208,46 @@ TimeSpan CalculateFlushPeriod(TimeSpan visibilityTimeout) internal class BatchItemsCollection { readonly BatchItem?[] items; - int remainingCount; + readonly List?> failedItems; + int notProcessedCount; public BatchItemsCollection(BatchItem[] items) { this.items = items; - remainingCount = this.items.Length; + failedItems = new List?>(); + notProcessedCount = this.items.Length; + } + + public int Complete(BatchItemId id) + { + if (items[id.Idx] == null) + throw new ItemNotFoundException(id.ToString()); + + items[id.Idx] = null; + return Interlocked.Decrement(ref notProcessedCount); } - public int Remove(BatchItemId id) + public int Fail(BatchItemId id) { if (items[id.Idx] == null) throw new ItemNotFoundException(id.ToString()); + failedItems.Add(items[id.Idx]); items[id.Idx] = null; - return Interlocked.Decrement(ref remainingCount); + return Interlocked.Decrement(ref notProcessedCount); } - public int RemainingCount() => remainingCount; + public int NotProcessedCount() => notProcessedCount; + public int FailedCount() => failedItems.Count; + public int RemainingCount() => notProcessedCount + FailedCount(); - public IEnumerable> NotEmptyItems() => items.Where(x => x != null)!; + public IEnumerable> Remaining() => failedItems.Concat(items.Where(x => x != null))!; public BatchItem?[] Items() => items; } public enum BatchCompletedResult { FullyProcessed, - TriggeredByFlush + TriggeredByFlush, + PartialFailure } From 1959ab0f0aa2a5d74f575253236eefebd6f30b17 Mon Sep 17 00:00:00 2001 From: yaroslavtykhonchuk Date: Thu, 28 Mar 2024 10:37:12 +0200 Subject: [PATCH 2/3] Fail with visibilityTimeout for updated message --- AzureBatchQueue.Tests/BatchQueueTests.cs | 44 ++++++++++++++++++++++++ AzureBatchQueue/BatchItem.cs | 1 + AzureBatchQueue/BatchQueue.cs | 2 +- AzureBatchQueue/TimerBatch.cs | 21 +++++++++-- 4 files changed, 64 insertions(+), 4 deletions(-) diff --git a/AzureBatchQueue.Tests/BatchQueueTests.cs b/AzureBatchQueue.Tests/BatchQueueTests.cs index 489fff0..9474631 100644 --- a/AzureBatchQueue.Tests/BatchQueueTests.cs +++ b/AzureBatchQueue.Tests/BatchQueueTests.cs @@ -100,6 +100,50 @@ public async Task When_batch_updates_after_all_fail() secondReceive.Select(x => x.Item).Should().BeEquivalentTo(messageBatch); } + [Test] + public async Task When_batch_updates_with_smallest_visibility_timeout_if_present() + { + using var queueTest = await Queue(); + + var messageBatch = new[] { "orange", "banana", "apple", "pear", "strawberry" }; + await queueTest.BatchQueue.Send(messageBatch); + + var longVisibilityTimeout = TimeSpan.FromSeconds(10); + var firstReceive = await queueTest.BatchQueue.Receive(visibilityTimeout: longVisibilityTimeout); + firstReceive.Select(x => x.Item).Should().BeEquivalentTo(messageBatch); + + foreach (var item in firstReceive) + { + if (item.Item == "orange") + { + item.Delay(TimeSpan.FromSeconds(1)); // small delay + continue; + } + + if (item.Item == "pear") + { + item.Delay(TimeSpan.FromHours(1)); // huge delay + continue; + } + + item.Complete(); + } + + // wait for message to flush + await Task.Delay(TimeSpan.FromMilliseconds(10)); + + // try to complete batch item again, but catch an exception that everything was already processed + Assert.Throws(() => firstReceive.First().Complete())! + .BatchCompletedResult.Should().Be(BatchCompletedResult.PartialFailure); + + // wait for message to be visible after smallest delay time + await Task.Delay(TimeSpan.FromSeconds(1.1)); + + // message was updated with delayed items + var secondReceive = await queueTest.BatchQueue.Receive(visibilityTimeout: longVisibilityTimeout); + secondReceive.Select(x => x.Item).Should().BeEquivalentTo("orange", "pear"); + } + [Test] public async Task When_batch_flushes_after_all_complete_or_fail() { diff --git a/AzureBatchQueue/BatchItem.cs b/AzureBatchQueue/BatchItem.cs index b60dabc..1ea4f51 100644 --- a/AzureBatchQueue/BatchItem.cs +++ b/AzureBatchQueue/BatchItem.cs @@ -18,6 +18,7 @@ internal BatchItem(BatchItemId id, TimerBatch batch, T item) public void Complete() => Batch.Complete(Id); public void Fail() => Batch.Fail(Id); + public void Delay(TimeSpan delay) => Batch.Delay(Id, delay); } public record BatchItemId(string BatchId, int Idx) diff --git a/AzureBatchQueue/BatchQueue.cs b/AzureBatchQueue/BatchQueue.cs index 53d90d8..731ff17 100644 --- a/AzureBatchQueue/BatchQueue.cs +++ b/AzureBatchQueue/BatchQueue.cs @@ -47,7 +47,7 @@ public async Task GetItemsFromQuarantine(int maxMessages = 32, Cancellation public async Task ClearMessages() => await queue.ClearMessages(); public async Task DeleteMessage(MessageId msgId, CancellationToken ct = default) => await queue.DeleteMessage(msgId, ct); - public async Task UpdateMessage(MessageId id, T[] items, CancellationToken ct = default) => await queue.UpdateMessage(id, items, ct: ct); + public async Task UpdateMessage(MessageId id, T[] items, TimeSpan visibilityTimeout = default, CancellationToken ct = default) => await queue.UpdateMessage(id, items, visibilityTimeout, ct: ct); public async Task QuarantineData(MessageId id, T[] items) { diff --git a/AzureBatchQueue/TimerBatch.cs b/AzureBatchQueue/TimerBatch.cs index 874eb90..43b1067 100644 --- a/AzureBatchQueue/TimerBatch.cs +++ b/AzureBatchQueue/TimerBatch.cs @@ -17,6 +17,7 @@ internal class TimerBatch bool flushTriggered; readonly object locker = new(); + TimeSpan minVisibilityTimeout; public TimerBatch(BatchQueue batchQueue, QueueMessage msg, int maxDequeueCount, ILogger logger) { @@ -93,17 +94,19 @@ async Task DoFlush() async Task Update() { var remaining = Remaining(); - await batchQueue.UpdateMessage(msg.MessageId, remaining); + await batchQueue.UpdateMessage(msg.MessageId, remaining, visibilityTimeout: minVisibilityTimeout); logger.LogWarning("Message {MsgId} was not fully processed within a timeout ({FlushPeriod}) sec in queue {QueueName}." + - " {RemainingCount} items were not completed ({NotProcessed} not processed on time and {FailedCount} failed) from {TotalCount} total", + " {RemainingCount} items were not completed ({NotProcessed} not processed on time and {FailedCount} failed) from {TotalCount} total." + + " VisibilityTimeout for updated message is {VisibilityTimeout}", msg.MessageId, FlushPeriod.TotalSeconds, batchQueue.Name, remaining.Length, NotProcessedCount(), FailedCount(), - items.Items().Length); + items.Items().Length, + minVisibilityTimeout); } async Task Delete() @@ -153,6 +156,18 @@ public void Fail(BatchItemId itemId) FlushIfEmpty(remaining); } + public void Delay(BatchItemId itemId, TimeSpan delayTime) + { + ThrowIfCompleted(itemId); + + var remaining = items.Fail(itemId); + + if (minVisibilityTimeout == default || minVisibilityTimeout > delayTime) + minVisibilityTimeout = delayTime; + + FlushIfEmpty(remaining); + } + void ThrowIfCompleted(BatchItemId itemId) { if (completedResult != null) From 1447483a89ff5cd3dbf02a719e24e166a9188d7e Mon Sep 17 00:00:00 2001 From: yaroslavtykhonchuk Date: Thu, 28 Mar 2024 13:57:46 +0200 Subject: [PATCH 3/3] Revert "Fail with visibilityTimeout for updated message" This reverts commit 1959ab0f0aa2a5d74f575253236eefebd6f30b17. --- AzureBatchQueue.Tests/BatchQueueTests.cs | 44 ------------------------ AzureBatchQueue/BatchItem.cs | 1 - AzureBatchQueue/BatchQueue.cs | 2 +- AzureBatchQueue/TimerBatch.cs | 21 ++--------- 4 files changed, 4 insertions(+), 64 deletions(-) diff --git a/AzureBatchQueue.Tests/BatchQueueTests.cs b/AzureBatchQueue.Tests/BatchQueueTests.cs index 9474631..489fff0 100644 --- a/AzureBatchQueue.Tests/BatchQueueTests.cs +++ b/AzureBatchQueue.Tests/BatchQueueTests.cs @@ -100,50 +100,6 @@ public async Task When_batch_updates_after_all_fail() secondReceive.Select(x => x.Item).Should().BeEquivalentTo(messageBatch); } - [Test] - public async Task When_batch_updates_with_smallest_visibility_timeout_if_present() - { - using var queueTest = await Queue(); - - var messageBatch = new[] { "orange", "banana", "apple", "pear", "strawberry" }; - await queueTest.BatchQueue.Send(messageBatch); - - var longVisibilityTimeout = TimeSpan.FromSeconds(10); - var firstReceive = await queueTest.BatchQueue.Receive(visibilityTimeout: longVisibilityTimeout); - firstReceive.Select(x => x.Item).Should().BeEquivalentTo(messageBatch); - - foreach (var item in firstReceive) - { - if (item.Item == "orange") - { - item.Delay(TimeSpan.FromSeconds(1)); // small delay - continue; - } - - if (item.Item == "pear") - { - item.Delay(TimeSpan.FromHours(1)); // huge delay - continue; - } - - item.Complete(); - } - - // wait for message to flush - await Task.Delay(TimeSpan.FromMilliseconds(10)); - - // try to complete batch item again, but catch an exception that everything was already processed - Assert.Throws(() => firstReceive.First().Complete())! - .BatchCompletedResult.Should().Be(BatchCompletedResult.PartialFailure); - - // wait for message to be visible after smallest delay time - await Task.Delay(TimeSpan.FromSeconds(1.1)); - - // message was updated with delayed items - var secondReceive = await queueTest.BatchQueue.Receive(visibilityTimeout: longVisibilityTimeout); - secondReceive.Select(x => x.Item).Should().BeEquivalentTo("orange", "pear"); - } - [Test] public async Task When_batch_flushes_after_all_complete_or_fail() { diff --git a/AzureBatchQueue/BatchItem.cs b/AzureBatchQueue/BatchItem.cs index 1ea4f51..b60dabc 100644 --- a/AzureBatchQueue/BatchItem.cs +++ b/AzureBatchQueue/BatchItem.cs @@ -18,7 +18,6 @@ internal BatchItem(BatchItemId id, TimerBatch batch, T item) public void Complete() => Batch.Complete(Id); public void Fail() => Batch.Fail(Id); - public void Delay(TimeSpan delay) => Batch.Delay(Id, delay); } public record BatchItemId(string BatchId, int Idx) diff --git a/AzureBatchQueue/BatchQueue.cs b/AzureBatchQueue/BatchQueue.cs index 731ff17..53d90d8 100644 --- a/AzureBatchQueue/BatchQueue.cs +++ b/AzureBatchQueue/BatchQueue.cs @@ -47,7 +47,7 @@ public async Task GetItemsFromQuarantine(int maxMessages = 32, Cancellation public async Task ClearMessages() => await queue.ClearMessages(); public async Task DeleteMessage(MessageId msgId, CancellationToken ct = default) => await queue.DeleteMessage(msgId, ct); - public async Task UpdateMessage(MessageId id, T[] items, TimeSpan visibilityTimeout = default, CancellationToken ct = default) => await queue.UpdateMessage(id, items, visibilityTimeout, ct: ct); + public async Task UpdateMessage(MessageId id, T[] items, CancellationToken ct = default) => await queue.UpdateMessage(id, items, ct: ct); public async Task QuarantineData(MessageId id, T[] items) { diff --git a/AzureBatchQueue/TimerBatch.cs b/AzureBatchQueue/TimerBatch.cs index 43b1067..874eb90 100644 --- a/AzureBatchQueue/TimerBatch.cs +++ b/AzureBatchQueue/TimerBatch.cs @@ -17,7 +17,6 @@ internal class TimerBatch bool flushTriggered; readonly object locker = new(); - TimeSpan minVisibilityTimeout; public TimerBatch(BatchQueue batchQueue, QueueMessage msg, int maxDequeueCount, ILogger logger) { @@ -94,19 +93,17 @@ async Task DoFlush() async Task Update() { var remaining = Remaining(); - await batchQueue.UpdateMessage(msg.MessageId, remaining, visibilityTimeout: minVisibilityTimeout); + await batchQueue.UpdateMessage(msg.MessageId, remaining); logger.LogWarning("Message {MsgId} was not fully processed within a timeout ({FlushPeriod}) sec in queue {QueueName}." + - " {RemainingCount} items were not completed ({NotProcessed} not processed on time and {FailedCount} failed) from {TotalCount} total." + - " VisibilityTimeout for updated message is {VisibilityTimeout}", + " {RemainingCount} items were not completed ({NotProcessed} not processed on time and {FailedCount} failed) from {TotalCount} total", msg.MessageId, FlushPeriod.TotalSeconds, batchQueue.Name, remaining.Length, NotProcessedCount(), FailedCount(), - items.Items().Length, - minVisibilityTimeout); + items.Items().Length); } async Task Delete() @@ -156,18 +153,6 @@ public void Fail(BatchItemId itemId) FlushIfEmpty(remaining); } - public void Delay(BatchItemId itemId, TimeSpan delayTime) - { - ThrowIfCompleted(itemId); - - var remaining = items.Fail(itemId); - - if (minVisibilityTimeout == default || minVisibilityTimeout > delayTime) - minVisibilityTimeout = delayTime; - - FlushIfEmpty(remaining); - } - void ThrowIfCompleted(BatchItemId itemId) { if (completedResult != null)