Skip to content

Commit

Permalink
Merge pull request #1 from BPJDeLozier/master
Browse files Browse the repository at this point in the history
QueueCountAsync
  • Loading branch information
juanpgarces authored Nov 13, 2023
2 parents 5f67c02 + dcf1972 commit 3b9f4b8
Showing 1 changed file with 23 additions and 8 deletions.
31 changes: 23 additions & 8 deletions src/UPS/FuncMultipleManager.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -23,7 +24,7 @@ public static class FuncMultipleManager
private static int maxThreads = 5;

// Regular Operations
private static long currentCount = 0;
private static long currentThreadCount = 0;

// Error Handling
private static Func<Exception, Task> funcExceptionLogger = null;
Expand Down Expand Up @@ -66,12 +67,26 @@ public static async Task<Guid> EnqueueAsync(Func<Task<object>> func, Func<Task<b
return referencedTask.guid;
}

public static async Task SetErrorLoggingFunction(Func<Exception, Task> myFuncExceptionLogger)
public static Task SetErrorLoggingFunction(Func<Exception, Task> myFuncExceptionLogger)
{
if (myFuncExceptionLogger != null)
{
funcExceptionLogger = myFuncExceptionLogger;
}

return Task.CompletedTask;
}

public static Task<int> QueueCountAsync(string queueName = "default")
{
int? result = null;

if (concurrentQueues.TryGetValue(queueName, out var queue))
{
result = queue.Count;
}

return Task.FromResult(result ?? default);
}

//public static async Task<ReferencedResult> GetResultIfExistsAsync(Guid guid)
Expand All @@ -86,7 +101,7 @@ public static async Task SetErrorLoggingFunction(Func<Exception, Task> myFuncExc
// return failedFunctions.Find(rR => rR.guid == guid);
//}

private static async void checkQueue(object state)
private static async void CheckQueue(object state)
{
var queueName = (string)state;

Expand All @@ -106,7 +121,7 @@ private static void InitializeQueue(string queueName, int period)
}
if (!concurrentTimers.ContainsKey(queueName))
{
concurrentTimers.TryAdd(queueName, new Timer(checkQueue, queueName, period, period));
concurrentTimers.TryAdd(queueName, new Timer(CheckQueue, queueName, period, period));
}
}

Expand All @@ -116,13 +131,13 @@ private static async Task StartProcessing(string queueName)

if (queue.Count > 0)
{
if (Interlocked.CompareExchange(ref currentCount, 0, 0) < maxThreads)
if (Interlocked.CompareExchange(ref currentThreadCount, 0, 0) < maxThreads)
{
await Task.Factory.StartNew(async () =>
{
try
{
Interlocked.Increment(ref currentCount);
Interlocked.Increment(ref currentThreadCount);
while (!queue.IsEmpty)
{
try
Expand All @@ -136,7 +151,7 @@ await Task.Factory.StartNew(async () =>
if (queue.TryDequeue(out ReferencedFunc<object> dequeuedReferencedTask))
await ExecuteAsync(dequeuedReferencedTask);
}
else if(await (referencedTask.checkpoint?.Invoke()) == true)
else if (await (referencedTask.checkpoint?.Invoke()) == true)
{
if (queue.TryDequeue(out ReferencedFunc<object> dequeuedReferencedTask))
await ExecuteAsync(dequeuedReferencedTask);
Expand All @@ -163,7 +178,7 @@ await Task.Factory.StartNew(async () =>
}
finally
{
Interlocked.Decrement(ref currentCount);
Interlocked.Decrement(ref currentThreadCount);
}
}, TaskCreationOptions.LongRunning);
}
Expand Down

0 comments on commit 3b9f4b8

Please sign in to comment.