diff --git a/AzureBatchQueue.Tests/BatchQueueTests.cs b/AzureBatchQueue.Tests/BatchQueueTests.cs index f61c12a..489fff0 100644 --- a/AzureBatchQueue.Tests/BatchQueueTests.cs +++ b/AzureBatchQueue.Tests/BatchQueueTests.cs @@ -70,6 +70,72 @@ public async Task When_batch_flushes_after_all_complete() .BatchCompletedResult.Should().Be(BatchCompletedResult.FullyProcessed); } + [Test] + public async Task When_batch_updates_after_all_fail() + { + using var queueTest = await Queue(); + + var messageBatch = new[] { "orange", "banana", "apple", "pear", "strawberry" }; + await queueTest.BatchQueue.Send(messageBatch); + + var longVisibilityTimeout = TimeSpan.FromSeconds(10); + var firstReceive = await queueTest.BatchQueue.Receive(visibilityTimeout: longVisibilityTimeout); + firstReceive.Select(x => x.Item).Should().BeEquivalentTo(messageBatch); + + foreach (var item in firstReceive) + item.Fail(); + + // wait for message to flush + await Task.Delay(TimeSpan.FromMilliseconds(10)); + + // try to complete batch item again, but catch an exception that everything was already processed + Assert.Throws(() => firstReceive.First().Complete())! + .BatchCompletedResult.Should().Be(BatchCompletedResult.PartialFailure); + + // wait for message to be visible + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + // message was updated with failed items + var secondReceive = await queueTest.BatchQueue.Receive(visibilityTimeout: longVisibilityTimeout); + secondReceive.Select(x => x.Item).Should().BeEquivalentTo(messageBatch); + } + + [Test] + public async Task When_batch_flushes_after_all_complete_or_fail() + { + using var queueTest = await Queue(); + + var messageBatch = new[] { "orange", "banana", "apple", "pear", "strawberry" }; + await queueTest.BatchQueue.Send(messageBatch); + + var longVisibilityTimeout = TimeSpan.FromSeconds(10); + var response = await queueTest.BatchQueue.Receive(visibilityTimeout: longVisibilityTimeout); + response.Select(x => x.Item).Should().BeEquivalentTo(messageBatch); + + var failedItems = new[] { "orange", "apple" }; + foreach (var item in response) + { + if (failedItems.Contains(item.Item)) + item.Fail(); + else + item.Complete(); + } + + // wait for message to flush + await Task.Delay(TimeSpan.FromMilliseconds(10)); + + // try to complete batch item again, but catch an exception that everything was already processed + Assert.Throws(() => response.First().Complete())! + .BatchCompletedResult.Should().Be(BatchCompletedResult.PartialFailure); + + // wait for message to be visible + await Task.Delay(TimeSpan.FromMilliseconds(50)); + + // message was updated with failed items + var secondReceive = await queueTest.BatchQueue.Receive(visibilityTimeout: longVisibilityTimeout); + secondReceive.Select(x => x.Item).Should().BeEquivalentTo(failedItems); + } + [Test] public async Task When_many_threads_complete_in_parallel() { @@ -119,20 +185,24 @@ public async Task When_quarantine_only_not_completed_items_from_batch() var response = await queueTest.BatchQueue.Receive(visibilityTimeout: visibilityTimeout); response.Select(x => x.Item).Should().BeEquivalentTo(messageBatch); - const string failedItem = "orange"; + const string notCompletedItem = "orange"; + const string failedItem = "pear"; foreach (var item in response) { - if (item.Item == failedItem) + if (item.Item == notCompletedItem) continue; - item.Complete(); + if (item.Item == failedItem) + item.Fail(); + else + item.Complete(); } // wait for message to be quarantined await Task.Delay(visibilityTimeout); var responseFromQuarantine = await queueTest.BatchQueue.GetItemsFromQuarantine(); - responseFromQuarantine.Should().BeEquivalentTo(failedItem); + responseFromQuarantine.Should().BeEquivalentTo(notCompletedItem, failedItem); } [Test] diff --git a/AzureBatchQueue/BatchItem.cs b/AzureBatchQueue/BatchItem.cs index 89af140..b60dabc 100644 --- a/AzureBatchQueue/BatchItem.cs +++ b/AzureBatchQueue/BatchItem.cs @@ -17,6 +17,7 @@ internal BatchItem(BatchItemId id, TimerBatch batch, T item) public BatchItemMetadata Metadata { get; } public void Complete() => Batch.Complete(Id); + public void Fail() => Batch.Fail(Id); } public record BatchItemId(string BatchId, int Idx) diff --git a/AzureBatchQueue/TimerBatch.cs b/AzureBatchQueue/TimerBatch.cs index d00bb54..874eb90 100644 --- a/AzureBatchQueue/TimerBatch.cs +++ b/AzureBatchQueue/TimerBatch.cs @@ -59,7 +59,7 @@ async Task Flush() catch (Azure.RequestFailedException ex) when (ex.ErrorCode == "QueueNotFound") { logger.LogWarning(ex, "Queue {queueName} was not found when flushing {messageId} with {itemsCount} items left.", - batchQueue.Name, msg.MessageId, items.RemainingCount()); + batchQueue.Name, msg.MessageId, items.NotProcessedCount()); } catch (Azure.RequestFailedException ex) when (ex.ErrorCode == "MessageNotFound") { @@ -83,7 +83,7 @@ async Task DoFlush() return; } - completedResult = BatchCompletedResult.TriggeredByFlush; + completedResult = items.FailedCount() > 0 ? BatchCompletedResult.PartialFailure : BatchCompletedResult.TriggeredByFlush; if (msg.Metadata.DequeueCount >= maxDequeueCount) await Quarantine(); @@ -94,11 +94,15 @@ async Task Update() { var remaining = Remaining(); await batchQueue.UpdateMessage(msg.MessageId, remaining); - logger.LogWarning("Message {msgId} was not fully processed within a timeout ({FlushPeriod}) sec in queue {QueueName}. {remainingCount} items left not completed from {totalCount} total", + + logger.LogWarning("Message {MsgId} was not fully processed within a timeout ({FlushPeriod}) sec in queue {QueueName}." + + " {RemainingCount} items were not completed ({NotProcessed} not processed on time and {FailedCount} failed) from {TotalCount} total", msg.MessageId, FlushPeriod.TotalSeconds, batchQueue.Name, remaining.Length, + NotProcessedCount(), + FailedCount(), items.Items().Length); } @@ -114,29 +118,52 @@ async Task Quarantine() var remaining = Remaining(); await batchQueue.QuarantineData(msg.MessageId, remaining); - logger.LogInformation("Message {msgId} was quarantined after {dequeueCount} unsuccessful attempts in queue {QueueName}." + - " With {remainingCount} unprocessed from {totalCount} total", + logger.LogInformation("Message {MsgId} was quarantined after {DequeueCount} unsuccessful attempts in queue {QueueName}." + + " {RemainingCount} items were not completed ({NotProcessed} not processed on time and {FailedCount} failed) from {TotalCount} total", msg.MessageId, msg.Metadata.DequeueCount, batchQueue.Name, - remaining, + remaining.Length, + NotProcessedCount(), + FailedCount(), items.Items().Length); } } } - T[] Remaining() => items.NotEmptyItems().Select(x => x.Item).ToArray(); + T[] Remaining() => items.Remaining().Select(x => x.Item).ToArray(); + int NotProcessedCount() => items.NotProcessedCount(); + int FailedCount() => items.FailedCount(); public void Complete(BatchItemId itemId) + { + ThrowIfCompleted(itemId); + + var remaining = items.Complete(itemId); + + FlushIfEmpty(remaining); + } + + public void Fail(BatchItemId itemId) + { + ThrowIfCompleted(itemId); + + var remaining = items.Fail(itemId); + + FlushIfEmpty(remaining); + } + + void ThrowIfCompleted(BatchItemId itemId) { if (completedResult != null) { throw new BatchCompletedException("Failed to complete item on an already finalized batch.", new BatchItemMetadata(itemId.ToString(), msg.MessageId, msg.Metadata.VisibilityTime, FlushPeriod, msg.Metadata.InsertedOn), completedResult.Value); } + } - var remaining = items.Remove(itemId); - + void FlushIfEmpty(int remaining) + { if (remaining > 0 || flushTriggered) return; @@ -181,31 +208,46 @@ TimeSpan CalculateFlushPeriod(TimeSpan visibilityTimeout) internal class BatchItemsCollection { readonly BatchItem?[] items; - int remainingCount; + readonly List?> failedItems; + int notProcessedCount; public BatchItemsCollection(BatchItem[] items) { this.items = items; - remainingCount = this.items.Length; + failedItems = new List?>(); + notProcessedCount = this.items.Length; + } + + public int Complete(BatchItemId id) + { + if (items[id.Idx] == null) + throw new ItemNotFoundException(id.ToString()); + + items[id.Idx] = null; + return Interlocked.Decrement(ref notProcessedCount); } - public int Remove(BatchItemId id) + public int Fail(BatchItemId id) { if (items[id.Idx] == null) throw new ItemNotFoundException(id.ToString()); + failedItems.Add(items[id.Idx]); items[id.Idx] = null; - return Interlocked.Decrement(ref remainingCount); + return Interlocked.Decrement(ref notProcessedCount); } - public int RemainingCount() => remainingCount; + public int NotProcessedCount() => notProcessedCount; + public int FailedCount() => failedItems.Count; + public int RemainingCount() => notProcessedCount + FailedCount(); - public IEnumerable> NotEmptyItems() => items.Where(x => x != null)!; + public IEnumerable> Remaining() => failedItems.Concat(items.Where(x => x != null))!; public BatchItem?[] Items() => items; } public enum BatchCompletedResult { FullyProcessed, - TriggeredByFlush + TriggeredByFlush, + PartialFailure }