Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail and delay batch items #6

Merged
merged 3 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 74 additions & 4 deletions AzureBatchQueue.Tests/BatchQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,72 @@
.BatchCompletedResult.Should().Be(BatchCompletedResult.FullyProcessed);
}

[Test]
public async Task When_batch_updates_after_all_fail()
{
using var queueTest = await Queue<string>();

var messageBatch = new[] { "orange", "banana", "apple", "pear", "strawberry" };
await queueTest.BatchQueue.Send(messageBatch);

var longVisibilityTimeout = TimeSpan.FromSeconds(10);
var firstReceive = await queueTest.BatchQueue.Receive(visibilityTimeout: longVisibilityTimeout);
firstReceive.Select(x => x.Item).Should().BeEquivalentTo(messageBatch);

foreach (var item in firstReceive)
item.Fail();

// wait for message to flush
await Task.Delay(TimeSpan.FromMilliseconds(10));

// try to complete batch item again, but catch an exception that everything was already processed
Assert.Throws<BatchCompletedException>(() => firstReceive.First().Complete())!
.BatchCompletedResult.Should().Be(BatchCompletedResult.PartialFailure);

// wait for message to be visible
await Task.Delay(TimeSpan.FromMilliseconds(50));

// message was updated with failed items
var secondReceive = await queueTest.BatchQueue.Receive(visibilityTimeout: longVisibilityTimeout);
secondReceive.Select(x => x.Item).Should().BeEquivalentTo(messageBatch);
}

[Test]
public async Task When_batch_flushes_after_all_complete_or_fail()
{
using var queueTest = await Queue<string>();

var messageBatch = new[] { "orange", "banana", "apple", "pear", "strawberry" };
await queueTest.BatchQueue.Send(messageBatch);

var longVisibilityTimeout = TimeSpan.FromSeconds(10);
var response = await queueTest.BatchQueue.Receive(visibilityTimeout: longVisibilityTimeout);
response.Select(x => x.Item).Should().BeEquivalentTo(messageBatch);

var failedItems = new[] { "orange", "apple" };
foreach (var item in response)
{
if (failedItems.Contains(item.Item))
item.Fail();
else
item.Complete();
}

// wait for message to flush
await Task.Delay(TimeSpan.FromMilliseconds(10));

// try to complete batch item again, but catch an exception that everything was already processed
Assert.Throws<BatchCompletedException>(() => response.First().Complete())!
.BatchCompletedResult.Should().Be(BatchCompletedResult.PartialFailure);

// wait for message to be visible
await Task.Delay(TimeSpan.FromMilliseconds(50));

// message was updated with failed items
var secondReceive = await queueTest.BatchQueue.Receive(visibilityTimeout: longVisibilityTimeout);
secondReceive.Select(x => x.Item).Should().BeEquivalentTo(failedItems);
}

[Test]
public async Task When_many_threads_complete_in_parallel()
{
Expand Down Expand Up @@ -119,20 +185,24 @@
var response = await queueTest.BatchQueue.Receive(visibilityTimeout: visibilityTimeout);
response.Select(x => x.Item).Should().BeEquivalentTo(messageBatch);

const string failedItem = "orange";
const string notCompletedItem = "orange";
const string failedItem = "pear";
foreach (var item in response)
{
if (item.Item == failedItem)
if (item.Item == notCompletedItem)
continue;

item.Complete();
if (item.Item == failedItem)
item.Fail();
else
item.Complete();
}

// wait for message to be quarantined
await Task.Delay(visibilityTimeout);

var responseFromQuarantine = await queueTest.BatchQueue.GetItemsFromQuarantine();
responseFromQuarantine.Should().BeEquivalentTo(failedItem);
responseFromQuarantine.Should().BeEquivalentTo(notCompletedItem, failedItem);
}

[Test]
Expand Down Expand Up @@ -229,7 +299,7 @@
await BatchQueue.Init();
}

public BatchQueue<T> BatchQueue { get; private set; }

Check warning on line 302 in AzureBatchQueue.Tests/BatchQueueTests.cs

View workflow job for this annotation

GitHub Actions / Build

Non-nullable property 'BatchQueue' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 302 in AzureBatchQueue.Tests/BatchQueueTests.cs

View workflow job for this annotation

GitHub Actions / Build

Non-nullable property 'BatchQueue' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 302 in AzureBatchQueue.Tests/BatchQueueTests.cs

View workflow job for this annotation

GitHub Actions / Build

Non-nullable property 'BatchQueue' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.

Check warning on line 302 in AzureBatchQueue.Tests/BatchQueueTests.cs

View workflow job for this annotation

GitHub Actions / Build

Non-nullable property 'BatchQueue' must contain a non-null value when exiting constructor. Consider declaring the property as nullable.
public void Dispose() => BatchQueue.ClearMessages().GetAwaiter().GetResult();
}
}
1 change: 1 addition & 0 deletions AzureBatchQueue/BatchItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ internal BatchItem(BatchItemId id, TimerBatch<T> batch, T item)
public BatchItemMetadata Metadata { get; }

public void Complete() => Batch.Complete(Id);
public void Fail() => Batch.Fail(Id);
}

public record BatchItemId(string BatchId, int Idx)
Expand Down
74 changes: 58 additions & 16 deletions AzureBatchQueue/TimerBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ async Task Flush()
catch (Azure.RequestFailedException ex) when (ex.ErrorCode == "QueueNotFound")
{
logger.LogWarning(ex, "Queue {queueName} was not found when flushing {messageId} with {itemsCount} items left.",
batchQueue.Name, msg.MessageId, items.RemainingCount());
batchQueue.Name, msg.MessageId, items.NotProcessedCount());
}
catch (Azure.RequestFailedException ex) when (ex.ErrorCode == "MessageNotFound")
{
Expand All @@ -83,7 +83,7 @@ async Task DoFlush()
return;
}

completedResult = BatchCompletedResult.TriggeredByFlush;
completedResult = items.FailedCount() > 0 ? BatchCompletedResult.PartialFailure : BatchCompletedResult.TriggeredByFlush;

if (msg.Metadata.DequeueCount >= maxDequeueCount)
await Quarantine();
Expand All @@ -94,11 +94,15 @@ async Task Update()
{
var remaining = Remaining();
await batchQueue.UpdateMessage(msg.MessageId, remaining);
logger.LogWarning("Message {msgId} was not fully processed within a timeout ({FlushPeriod}) sec in queue {QueueName}. {remainingCount} items left not completed from {totalCount} total",

logger.LogWarning("Message {MsgId} was not fully processed within a timeout ({FlushPeriod}) sec in queue {QueueName}." +
" {RemainingCount} items were not completed ({NotProcessed} not processed on time and {FailedCount} failed) from {TotalCount} total",
msg.MessageId,
FlushPeriod.TotalSeconds,
batchQueue.Name,
remaining.Length,
NotProcessedCount(),
FailedCount(),
items.Items().Length);
}

Expand All @@ -114,29 +118,52 @@ async Task Quarantine()
var remaining = Remaining();
await batchQueue.QuarantineData(msg.MessageId, remaining);

logger.LogInformation("Message {msgId} was quarantined after {dequeueCount} unsuccessful attempts in queue {QueueName}." +
" With {remainingCount} unprocessed from {totalCount} total",
logger.LogInformation("Message {MsgId} was quarantined after {DequeueCount} unsuccessful attempts in queue {QueueName}." +
" {RemainingCount} items were not completed ({NotProcessed} not processed on time and {FailedCount} failed) from {TotalCount} total",
msg.MessageId,
msg.Metadata.DequeueCount,
batchQueue.Name,
remaining,
remaining.Length,
NotProcessedCount(),
FailedCount(),
items.Items().Length);
}
}
}

T[] Remaining() => items.NotEmptyItems().Select(x => x.Item).ToArray();
T[] Remaining() => items.Remaining().Select(x => x.Item).ToArray();
int NotProcessedCount() => items.NotProcessedCount();
int FailedCount() => items.FailedCount();

public void Complete(BatchItemId itemId)
{
ThrowIfCompleted(itemId);

var remaining = items.Complete(itemId);

FlushIfEmpty(remaining);
}

public void Fail(BatchItemId itemId)
{
ThrowIfCompleted(itemId);

var remaining = items.Fail(itemId);

FlushIfEmpty(remaining);
}

void ThrowIfCompleted(BatchItemId itemId)
{
if (completedResult != null)
{
throw new BatchCompletedException("Failed to complete item on an already finalized batch.",
new BatchItemMetadata(itemId.ToString(), msg.MessageId, msg.Metadata.VisibilityTime, FlushPeriod, msg.Metadata.InsertedOn), completedResult.Value);
}
}

var remaining = items.Remove(itemId);

void FlushIfEmpty(int remaining)
{
if (remaining > 0 || flushTriggered)
return;

Expand Down Expand Up @@ -181,31 +208,46 @@ TimeSpan CalculateFlushPeriod(TimeSpan visibilityTimeout)
internal class BatchItemsCollection<T>
{
readonly BatchItem<T>?[] items;
int remainingCount;
readonly List<BatchItem<T>?> failedItems;
int notProcessedCount;

public BatchItemsCollection(BatchItem<T>[] items)
{
this.items = items;
remainingCount = this.items.Length;
failedItems = new List<BatchItem<T>?>();
notProcessedCount = this.items.Length;
}

public int Complete(BatchItemId id)
{
if (items[id.Idx] == null)
throw new ItemNotFoundException(id.ToString());

items[id.Idx] = null;
return Interlocked.Decrement(ref notProcessedCount);
}

public int Remove(BatchItemId id)
public int Fail(BatchItemId id)
{
if (items[id.Idx] == null)
throw new ItemNotFoundException(id.ToString());

failedItems.Add(items[id.Idx]);
items[id.Idx] = null;
return Interlocked.Decrement(ref remainingCount);
return Interlocked.Decrement(ref notProcessedCount);
}

public int RemainingCount() => remainingCount;
public int NotProcessedCount() => notProcessedCount;
public int FailedCount() => failedItems.Count;
public int RemainingCount() => notProcessedCount + FailedCount();

public IEnumerable<BatchItem<T>> NotEmptyItems() => items.Where(x => x != null)!;
public IEnumerable<BatchItem<T>> Remaining() => failedItems.Concat(items.Where(x => x != null))!;
public BatchItem<T>?[] Items() => items;
}

public enum BatchCompletedResult
{
FullyProcessed,
TriggeredByFlush
TriggeredByFlush,
PartialFailure
}