diff --git a/AzureBatchQueue/TimerBatch.cs b/AzureBatchQueue/TimerBatch.cs index 00998da..58ea54f 100644 --- a/AzureBatchQueue/TimerBatch.cs +++ b/AzureBatchQueue/TimerBatch.cs @@ -12,9 +12,12 @@ internal class TimerBatch readonly ILogger logger; readonly ConcurrentDictionary> items; - Timer? timer; + readonly Timer timer; BatchCompletedResult? completedResult; + bool flushTriggered; + readonly object locker = new(); + public TimerBatch(BatchQueue batchQueue, QueueMessage msg, int maxDequeueCount, ILogger logger) { this.batchQueue = batchQueue; @@ -34,10 +37,21 @@ public TimerBatch(BatchQueue batchQueue, QueueMessage msg, int maxDequeu async Task Flush() { - try + // check if value is already set before acquiring a lock + if (flushTriggered) + return; + + lock (locker) { - DisposeTimer(); + if (flushTriggered) + return; + flushTriggered = true; + timer.Dispose(); + } + + try + { await DoFlush(); } catch (Azure.RequestFailedException ex) when (ex.ErrorCode == "QueueNotFound") @@ -80,16 +94,6 @@ async Task DoFlush() } } - /// - /// Set timer reference to null, so that call to timer in Complete() will not throw ObjectDisposedException - /// - void DisposeTimer() - { - var timerCopy = timer; - timer = null; - timerCopy.Dispose(); - } - QueueMessage Message() { var notCompletedItems = items.Values.Select(x => x.Item).ToArray(); @@ -109,7 +113,12 @@ public BatchItemCompleteResult Complete(string itemId) if (!items.IsEmpty) return BatchItemCompleteResult.Completed; - timer?.Change(TimeSpan.Zero, Timeout.InfiniteTimeSpan); + lock (locker) + { + if (!flushTriggered) + timer.Change(TimeSpan.Zero, Timeout.InfiniteTimeSpan); + } + return BatchItemCompleteResult.BatchFullyProcessed; }