From a997fb4628c87b18a0e25723d54b756131227c43 Mon Sep 17 00:00:00 2001 From: James Rogers Date: Mon, 16 Dec 2024 10:25:56 +0000 Subject: [PATCH 1/5] FEAT: Parallelize the creation and addition of builders to the Pipeline. --- .../FlowElements/PipelineBuilder.cs | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs b/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs index fcd0430c..dd0b2d68 100644 --- a/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs +++ b/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs @@ -27,11 +27,14 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.Runtime.CompilerServices; using System.Text; +using System.Threading; +using System.Threading.Tasks; namespace FiftyOne.Pipeline.Core.FlowElements { @@ -118,25 +121,30 @@ public virtual IPipeline BuildFromConfiguration(PipelineOptions options) try { - foreach (var elementOptions in options.Elements) - { + // Create element builders in parallel + var flowElementQueue = new ConcurrentQueue(); + Parallel.ForEach(options.Elements, (elementOptions => + { if (elementOptions.SubElements != null && elementOptions.SubElements.Count > 0) { // The configuration has sub elements so create // a ParallelElements instance. - AddParallelElementsToList(FlowElements, elementOptions, counter); + AddParallelElementsToList(flowElementQueue, elementOptions, counter); } else { // The configuration has no sub elements so create // a flow element. - AddElementToList(FlowElements, elementOptions, + AddElementToList(flowElementQueue, elementOptions, $"element {counter}"); } - counter++; - } + Interlocked.Increment(ref counter); + })); + // Add created builders to flow elements + FlowElements.AddRange(flowElementQueue); + // Process any additional parameters for the pipeline // builder itself. ProcessBuildParameters( @@ -222,7 +230,7 @@ private void GetAvailableElementBuilders() /// instance. /// private void AddElementToList( - List elements, + ConcurrentQueue elements, ElementOptions elementOptions, string elementLocation) { @@ -384,7 +392,7 @@ private void AddElementToList( } // Add the element to the list. - elements.Add(element); + elements.Enqueue(element); } /// @@ -402,7 +410,7 @@ private void AddElementToList( /// The index of the element within the . /// private void AddParallelElementsToList( - List elements, + ConcurrentQueue elements, ElementOptions elementOptions, int elementIndex) { @@ -416,7 +424,7 @@ private void AddParallelElementsToList( $"SubElements and other settings values. " + $"This is invalid"); } - List parallelElements = new List(); + var parallelElements = new ConcurrentQueue(); // Iterate through the sub elements, creating them and // adding them to the list. @@ -443,7 +451,7 @@ private void AddParallelElementsToList( var parallelInstance = new ParallelElements( LoggerFactory.CreateLogger(), parallelElements.ToArray()); - elements.Add(parallelInstance); + elements.Enqueue(parallelInstance); } /// From 30eea1880028d29a261bd92ff77c6cd84eeb21e5 Mon Sep 17 00:00:00 2001 From: James Rogers Date: Mon, 16 Dec 2024 14:30:48 +0000 Subject: [PATCH 2/5] FEAT: Rename methods to express new approach. Add parallel options to limit the processor count so that it doesnt use up all resources on start up. --- .../FlowElements/PipelineBuilder.cs | 79 ++++++++++--------- 1 file changed, 42 insertions(+), 37 deletions(-) diff --git a/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs b/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs index dd0b2d68..284da31b 100644 --- a/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs +++ b/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs @@ -64,7 +64,7 @@ public class PipelineBuilder : PipelineBuilderBase, /// The to use when creating logger /// instances. /// - public PipelineBuilder(ILoggerFactory loggerFactory) + public PipelineBuilder(ILoggerFactory loggerFactory) : base(loggerFactory) { GetAvailableElementBuilders(); @@ -81,7 +81,7 @@ public PipelineBuilder(ILoggerFactory loggerFactory) /// Collection of services which contain builder instances for the /// required elements. /// - public PipelineBuilder(ILoggerFactory loggerFactory, + public PipelineBuilder(ILoggerFactory loggerFactory, IServiceProvider services) : this(loggerFactory) { @@ -122,31 +122,36 @@ public virtual IPipeline BuildFromConfiguration(PipelineOptions options) try { // Create element builders in parallel - var flowElementQueue = new ConcurrentQueue(); - Parallel.ForEach(options.Elements, (elementOptions => + var flowElementQueue = new ConcurrentQueue(); + Parallel.ForEach(options.Elements, + new ParallelOptions() + { + MaxDegreeOfParallelism = Environment.ProcessorCount / 2 + }, + (elementOptions => { if (elementOptions.SubElements != null && elementOptions.SubElements.Count > 0) { // The configuration has sub elements so create // a ParallelElements instance. - AddParallelElementsToList(flowElementQueue, elementOptions, counter); + ParallelEnqueueElement(flowElementQueue, elementOptions, counter); } else { // The configuration has no sub elements so create // a flow element. - AddElementToList(flowElementQueue, elementOptions, + EnqueueElement(flowElementQueue, elementOptions, $"element {counter}"); } Interlocked.Increment(ref counter); })); // Add created builders to flow elements - FlowElements.AddRange(flowElementQueue); - - // Process any additional parameters for the pipeline - // builder itself. + FlowElements.AddRange(flowElementQueue); + + // Process any additional parameters for the pipeline + // builder itself. ProcessBuildParameters( options.BuildParameters, GetType(), @@ -180,36 +185,36 @@ private void GetAvailableElementBuilders() // Get all loaded types where there is at least one.. foreach (var assembly in AppDomain.CurrentDomain.GetAssemblies() #if DEBUG - // Exclude VisualStudio assemblies - .Where(a => !a.FullName.StartsWith("Microsoft.VisualStudio", + // Exclude VisualStudio assemblies + .Where(a => !a.FullName.StartsWith("Microsoft.VisualStudio", StringComparison.OrdinalIgnoreCase)) #endif ) - { - // Exclude dynamic assemblies - if (assembly.IsDynamic == false) - { - try - { + { + // Exclude dynamic assemblies + if (assembly.IsDynamic == false) + { + try + { _elementBuilders.AddRange(assembly.GetTypes() - .Where(t => t.GetMethods() - // ..method called 'Build'.. - .Any(m => m.Name == "Build" && - // ..where the return type is or implements IFlowElement - (m.ReturnType == typeof(IFlowElement) || + .Where(t => t.GetMethods() + // ..method called 'Build'.. + .Any(m => m.Name == "Build" && + // ..where the return type is or implements IFlowElement + (m.ReturnType == typeof(IFlowElement) || m.ReturnType.GetInterfaces().Contains(typeof(IFlowElement))))) .ToList()); - } - // Catch type load exceptions when assembly can't be loaded - // and log a warning. - catch (ReflectionTypeLoadException ex) - { - if (Logger.IsEnabled(LogLevel.Debug)) - { - Logger.LogDebug(ex, $"Failed to get Types for {assembly.FullName}", null); - } - } - } + } + // Catch type load exceptions when assembly can't be loaded + // and log a warning. + catch (ReflectionTypeLoadException ex) + { + if (Logger.IsEnabled(LogLevel.Debug)) + { + Logger.LogDebug(ex, $"Failed to get Types for {assembly.FullName}", null); + } + } + } } } @@ -229,7 +234,7 @@ private void GetAvailableElementBuilders() /// The string description of the element's location within the /// instance. /// - private void AddElementToList( + private void EnqueueElement( ConcurrentQueue elements, ElementOptions elementOptions, string elementLocation) @@ -409,7 +414,7 @@ private void AddElementToList( /// /// The index of the element within the . /// - private void AddParallelElementsToList( + private void ParallelEnqueueElement( ConcurrentQueue elements, ElementOptions elementOptions, int elementIndex) @@ -440,7 +445,7 @@ private void AddParallelElementsToList( } else { - AddElementToList(parallelElements, subElement, + EnqueueElement(parallelElements, subElement, $"element {subCounter} in element {elementIndex}"); } subCounter++; From 5d808e968be492628d5b4ecae51169d2677b0d8d Mon Sep 17 00:00:00 2001 From: James Rogers Date: Mon, 16 Dec 2024 15:39:19 +0000 Subject: [PATCH 3/5] FEAT: Set a fixed size for the FlowElements list, add an index for each parallel interation and insert each element at the respective index. This preserves the element order that is declared in the configuration. --- .../FlowElements/PipelineBuilder.cs | 71 +++++++++++-------- 1 file changed, 43 insertions(+), 28 deletions(-) diff --git a/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs b/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs index 284da31b..6d527179 100644 --- a/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs +++ b/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs @@ -117,39 +117,46 @@ public virtual IPipeline BuildFromConfiguration(PipelineOptions options) // Clear the list of flow elements ready to be populated // from the configuration options. FlowElements.Clear(); - int counter = 0; - + // set the size of the list to be the max amount of elements + // possible + FlowElements.Capacity = options.Elements.Count; try { - // Create element builders in parallel - var flowElementQueue = new ConcurrentQueue(); + // Create elements in parallel. The index is declared in + // the foreach so that the order of elements is preserved. + // the index is passed down to the point of inserting the + // the element into the list and used as the index for + // insertion. Parallel.ForEach(options.Elements, new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount / 2 }, - (elementOptions => + (elementOptions, state, index) => { if (elementOptions.SubElements != null && elementOptions.SubElements.Count > 0) { // The configuration has sub elements so create // a ParallelElements instance. - ParallelEnqueueElement(flowElementQueue, elementOptions, counter); + ParallelEnqueueElement( + FlowElements, + elementOptions, + $"element {index}", + (int)index); } else { // The configuration has no sub elements so create // a flow element. - EnqueueElement(flowElementQueue, elementOptions, - $"element {counter}"); + EnqueueElement( + FlowElements, + elementOptions, + $"element {index}", + (int)index); } - Interlocked.Increment(ref counter); - })); + }); - // Add created builders to flow elements - FlowElements.AddRange(flowElementQueue); - // Process any additional parameters for the pipeline // builder itself. ProcessBuildParameters( @@ -216,8 +223,8 @@ private void GetAvailableElementBuilders() } } } - } - + } + /// /// Create a new using the specified /// and add it to the supplied list @@ -233,11 +240,13 @@ private void GetAvailableElementBuilders() /// /// The string description of the element's location within the /// instance. - /// + /// + /// private void EnqueueElement( - ConcurrentQueue elements, + List elements, ElementOptions elementOptions, - string elementLocation) + string elementLocation, + int elementIndex) { // Check that a builder name is set if (string.IsNullOrEmpty(elementOptions.BuilderName)) @@ -397,9 +406,9 @@ private void EnqueueElement( } // Add the element to the list. - elements.Enqueue(element); - } - + FlowElements.Add(element); + } + /// /// Create a from the specified /// configuration and add it to the _flowElements list. @@ -411,12 +420,14 @@ private void EnqueueElement( /// The instance to use when creating /// the . /// - /// + /// /// The index of the element within the . - /// + /// + /// private void ParallelEnqueueElement( - ConcurrentQueue elements, + List elements, ElementOptions elementOptions, + string elementLocation, int elementIndex) { // Element contains further sub elements, this is not allowed. @@ -429,7 +440,8 @@ private void ParallelEnqueueElement( $"SubElements and other settings values. " + $"This is invalid"); } - var parallelElements = new ConcurrentQueue(); + + var parallelElements = new List(); // Iterate through the sub elements, creating them and // adding them to the list. @@ -445,8 +457,11 @@ private void ParallelEnqueueElement( } else { - EnqueueElement(parallelElements, subElement, - $"element {subCounter} in element {elementIndex}"); + EnqueueElement( + parallelElements, + subElement, + $"element {subCounter} in element {elementIndex}", + elementIndex); } subCounter++; } @@ -456,7 +471,7 @@ private void ParallelEnqueueElement( var parallelInstance = new ParallelElements( LoggerFactory.CreateLogger(), parallelElements.ToArray()); - elements.Enqueue(parallelInstance); + elements.Insert(elementIndex, parallelInstance); } /// From 6ccaa1ee4bd733fc4a6a4a13db466e57d36bc9b2 Mon Sep 17 00:00:00 2001 From: James Rogers Date: Mon, 16 Dec 2024 16:04:08 +0000 Subject: [PATCH 4/5] CLEANUP: Removed unused param --- FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs b/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs index 6d527179..fe2ef3a0 100644 --- a/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs +++ b/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs @@ -142,7 +142,6 @@ public virtual IPipeline BuildFromConfiguration(PipelineOptions options) ParallelEnqueueElement( FlowElements, elementOptions, - $"element {index}", (int)index); } else @@ -420,14 +419,10 @@ private void EnqueueElement( /// The instance to use when creating /// the . /// - /// - /// The index of the element within the . - /// /// private void ParallelEnqueueElement( List elements, ElementOptions elementOptions, - string elementLocation, int elementIndex) { // Element contains further sub elements, this is not allowed. From 2de0249c9a1bcdc4dd76e75ee0794d800f2980ee Mon Sep 17 00:00:00 2001 From: James Rogers Date: Mon, 16 Dec 2024 17:17:56 +0000 Subject: [PATCH 5/5] BUG: use concurrent dictionary to preserve the order instead of List. List runs into Out of Index exceptions because of how lists are created. --- .../FlowElements/PipelineBuilder.cs | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs b/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs index fe2ef3a0..5d5af285 100644 --- a/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs +++ b/FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs @@ -117,16 +117,15 @@ public virtual IPipeline BuildFromConfiguration(PipelineOptions options) // Clear the list of flow elements ready to be populated // from the configuration options. FlowElements.Clear(); - // set the size of the list to be the max amount of elements - // possible - FlowElements.Capacity = options.Elements.Count; + try { + var tempElementDict = + new ConcurrentDictionary(); // Create elements in parallel. The index is declared in // the foreach so that the order of elements is preserved. // the index is passed down to the point of inserting the - // the element into the list and used as the index for - // insertion. + // the element into the dictionary. Parallel.ForEach(options.Elements, new ParallelOptions() { @@ -140,7 +139,7 @@ public virtual IPipeline BuildFromConfiguration(PipelineOptions options) // The configuration has sub elements so create // a ParallelElements instance. ParallelEnqueueElement( - FlowElements, + tempElementDict, elementOptions, (int)index); } @@ -149,13 +148,20 @@ public virtual IPipeline BuildFromConfiguration(PipelineOptions options) // The configuration has no sub elements so create // a flow element. EnqueueElement( - FlowElements, + tempElementDict, elementOptions, $"element {index}", (int)index); } }); + // order the dictionary so the elements are in the correct + // order and then add the values to FlowElements. + FlowElements + .AddRange(tempElementDict + .OrderBy(kvp => kvp.Key) + .Select(kvp => kvp.Value)); + // Process any additional parameters for the pipeline // builder itself. ProcessBuildParameters( @@ -242,7 +248,7 @@ private void GetAvailableElementBuilders() /// /// private void EnqueueElement( - List elements, + ConcurrentDictionary elements, ElementOptions elementOptions, string elementLocation, int elementIndex) @@ -405,7 +411,7 @@ private void EnqueueElement( } // Add the element to the list. - FlowElements.Add(element); + elements.TryAdd(elementIndex, element); } /// @@ -421,7 +427,7 @@ private void EnqueueElement( /// /// private void ParallelEnqueueElement( - List elements, + ConcurrentDictionary elements, ElementOptions elementOptions, int elementIndex) { @@ -436,7 +442,7 @@ private void ParallelEnqueueElement( $"This is invalid"); } - var parallelElements = new List(); + var parallelElements = new ConcurrentDictionary(); // Iterate through the sub elements, creating them and // adding them to the list. @@ -465,8 +471,8 @@ private void ParallelEnqueueElement( // elements. var parallelInstance = new ParallelElements( LoggerFactory.CreateLogger(), - parallelElements.ToArray()); - elements.Insert(elementIndex, parallelInstance); + parallelElements.Values.ToArray()); + elements.TryAdd(elementIndex, parallelInstance); } ///