diff --git a/src/UPS/FuncMultipleManager.cs b/src/UPS/FuncMultipleManager.cs index 3a51e9f..297ed92 100644 --- a/src/UPS/FuncMultipleManager.cs +++ b/src/UPS/FuncMultipleManager.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -23,7 +24,7 @@ public static class FuncMultipleManager private static int maxThreads = 5; // Regular Operations - private static long currentCount = 0; + private static long currentThreadCount = 0; // Error Handling private static Func funcExceptionLogger = null; @@ -66,12 +67,26 @@ public static async Task EnqueueAsync(Func> func, Func myFuncExceptionLogger) + public static Task SetErrorLoggingFunction(Func myFuncExceptionLogger) { if (myFuncExceptionLogger != null) { funcExceptionLogger = myFuncExceptionLogger; } + + return Task.CompletedTask; + } + + public static Task QueueCountAsync(string queueName = "default") + { + int? result = null; + + if (concurrentQueues.TryGetValue(queueName, out var queue)) + { + result = queue.Count; + } + + return Task.FromResult(result ?? default); } //public static async Task GetResultIfExistsAsync(Guid guid) @@ -86,7 +101,7 @@ public static async Task SetErrorLoggingFunction(Func myFuncExc // return failedFunctions.Find(rR => rR.guid == guid); //} - private static async void checkQueue(object state) + private static async void CheckQueue(object state) { var queueName = (string)state; @@ -106,7 +121,7 @@ private static void InitializeQueue(string queueName, int period) } if (!concurrentTimers.ContainsKey(queueName)) { - concurrentTimers.TryAdd(queueName, new Timer(checkQueue, queueName, period, period)); + concurrentTimers.TryAdd(queueName, new Timer(CheckQueue, queueName, period, period)); } } @@ -116,13 +131,13 @@ private static async Task StartProcessing(string queueName) if (queue.Count > 0) { - if (Interlocked.CompareExchange(ref currentCount, 0, 0) < maxThreads) + if (Interlocked.CompareExchange(ref currentThreadCount, 0, 0) < maxThreads) { await Task.Factory.StartNew(async () => { try { - Interlocked.Increment(ref currentCount); + Interlocked.Increment(ref currentThreadCount); while (!queue.IsEmpty) { try @@ -136,7 +151,7 @@ await Task.Factory.StartNew(async () => if (queue.TryDequeue(out ReferencedFunc dequeuedReferencedTask)) await ExecuteAsync(dequeuedReferencedTask); } - else if(await (referencedTask.checkpoint?.Invoke()) == true) + else if (await (referencedTask.checkpoint?.Invoke()) == true) { if (queue.TryDequeue(out ReferencedFunc dequeuedReferencedTask)) await ExecuteAsync(dequeuedReferencedTask); @@ -163,7 +178,7 @@ await Task.Factory.StartNew(async () => } finally { - Interlocked.Decrement(ref currentCount); + Interlocked.Decrement(ref currentThreadCount); } }, TaskCreationOptions.LongRunning); }