Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT: Parallelize the creation and addition of builders to the Pipeline. #160

Draft
wants to merge 5 commits into
base: version/4.5
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 84 additions & 55 deletions FiftyOne.Pipeline.Core/FlowElements/PipelineBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace FiftyOne.Pipeline.Core.FlowElements
{
Expand Down Expand Up @@ -61,7 +64,7 @@ public class PipelineBuilder : PipelineBuilderBase<PipelineBuilder>,
/// The <see cref="ILoggerFactory"/> to use when creating logger
/// instances.
/// </param>
public PipelineBuilder(ILoggerFactory loggerFactory)
public PipelineBuilder(ILoggerFactory loggerFactory)
: base(loggerFactory)
{
GetAvailableElementBuilders();
Expand All @@ -78,7 +81,7 @@ public PipelineBuilder(ILoggerFactory loggerFactory)
/// Collection of services which contain builder instances for the
/// required elements.
/// </param>
public PipelineBuilder(ILoggerFactory loggerFactory,
public PipelineBuilder(ILoggerFactory loggerFactory,
IServiceProvider services)
: this(loggerFactory)
{
Expand Down Expand Up @@ -114,31 +117,53 @@ public virtual IPipeline BuildFromConfiguration(PipelineOptions options)
// Clear the list of flow elements ready to be populated
// from the configuration options.
FlowElements.Clear();
int counter = 0;

try
{
foreach (var elementOptions in options.Elements)
{
var tempElementDict =
new ConcurrentDictionary<int, IFlowElement>();
// Create elements in parallel. The index is declared in
// the foreach so that the order of elements is preserved.
// the index is passed down to the point of inserting the
// the element into the dictionary.
Parallel.ForEach(options.Elements,
new ParallelOptions()
{
MaxDegreeOfParallelism = Environment.ProcessorCount / 2
},
(elementOptions, state, index) =>
{
if (elementOptions.SubElements != null &&
elementOptions.SubElements.Count > 0)
{
// The configuration has sub elements so create
// a ParallelElements instance.
AddParallelElementsToList(FlowElements, elementOptions, counter);
ParallelEnqueueElement(
tempElementDict,
elementOptions,
(int)index);
}
else
{
// The configuration has no sub elements so create
// a flow element.
AddElementToList(FlowElements, elementOptions,
$"element {counter}");
EnqueueElement(
tempElementDict,
elementOptions,
$"element {index}",
(int)index);
}
counter++;
}
});

// order the dictionary so the elements are in the correct
// order and then add the values to FlowElements.
FlowElements
.AddRange(tempElementDict
.OrderBy(kvp => kvp.Key)
.Select(kvp => kvp.Value));

// Process any additional parameters for the pipeline
// builder itself.
// Process any additional parameters for the pipeline
// builder itself.
ProcessBuildParameters(
options.BuildParameters,
GetType(),
Expand Down Expand Up @@ -172,39 +197,39 @@ private void GetAvailableElementBuilders()
// Get all loaded types where there is at least one..
foreach (var assembly in AppDomain.CurrentDomain.GetAssemblies()
#if DEBUG
// Exclude VisualStudio assemblies
.Where(a => !a.FullName.StartsWith("Microsoft.VisualStudio",
// Exclude VisualStudio assemblies
.Where(a => !a.FullName.StartsWith("Microsoft.VisualStudio",
StringComparison.OrdinalIgnoreCase))
#endif
)
{
// Exclude dynamic assemblies
if (assembly.IsDynamic == false)
{
try
{
{
// Exclude dynamic assemblies
if (assembly.IsDynamic == false)
{
try
{
_elementBuilders.AddRange(assembly.GetTypes()
.Where(t => t.GetMethods()
// ..method called 'Build'..
.Any(m => m.Name == "Build" &&
// ..where the return type is or implements IFlowElement
(m.ReturnType == typeof(IFlowElement) ||
.Where(t => t.GetMethods()
// ..method called 'Build'..
.Any(m => m.Name == "Build" &&
// ..where the return type is or implements IFlowElement
(m.ReturnType == typeof(IFlowElement) ||
m.ReturnType.GetInterfaces().Contains(typeof(IFlowElement)))))
.ToList());
}
// Catch type load exceptions when assembly can't be loaded
// and log a warning.
catch (ReflectionTypeLoadException ex)
{
if (Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug(ex, $"Failed to get Types for {assembly.FullName}", null);
}
}
}
}
// Catch type load exceptions when assembly can't be loaded
// and log a warning.
catch (ReflectionTypeLoadException ex)
{
if (Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug(ex, $"Failed to get Types for {assembly.FullName}", null);
}
}
}
}
}

}
/// <summary>
/// Create a new <see cref="IFlowElement"/> using the specified
/// <see cref="ElementOptions"/> and add it to the supplied list
Expand All @@ -220,11 +245,13 @@ private void GetAvailableElementBuilders()
/// <param name="elementLocation">
/// The string description of the element's location within the
/// <see cref="PipelineOptions"/> instance.
/// </param>
private void AddElementToList(
List<IFlowElement> elements,
/// </param>
/// <param name="elementIndex"></param>
private void EnqueueElement(
ConcurrentDictionary<int, IFlowElement> elements,
ElementOptions elementOptions,
string elementLocation)
string elementLocation,
int elementIndex)
{
// Check that a builder name is set
if (string.IsNullOrEmpty(elementOptions.BuilderName))
Expand Down Expand Up @@ -384,9 +411,9 @@ private void AddElementToList(
}

// Add the element to the list.
elements.Add(element);
}

elements.TryAdd(elementIndex, element);
}
/// <summary>
/// Create a <see cref="ParallelElements"/> from the specified
/// configuration and add it to the _flowElements list.
Expand All @@ -398,11 +425,9 @@ private void AddElementToList(
/// The <see cref="ElementOptions"/> instance to use when creating
/// the <see cref="ParallelElements"/>.
/// </param>
/// <param name="elementIndex">
/// The index of the element within the <see cref="PipelineOptions"/>.
/// </param>
private void AddParallelElementsToList(
List<IFlowElement> elements,
/// <param name="elementIndex"></param>
private void ParallelEnqueueElement(
ConcurrentDictionary<int, IFlowElement> elements,
ElementOptions elementOptions,
int elementIndex)
{
Expand All @@ -416,7 +441,8 @@ private void AddParallelElementsToList(
$"SubElements and other settings values. " +
$"This is invalid");
}
List<IFlowElement> parallelElements = new List<IFlowElement>();

var parallelElements = new ConcurrentDictionary<int, IFlowElement>();

// Iterate through the sub elements, creating them and
// adding them to the list.
Expand All @@ -432,8 +458,11 @@ private void AddParallelElementsToList(
}
else
{
AddElementToList(parallelElements, subElement,
$"element {subCounter} in element {elementIndex}");
EnqueueElement(
parallelElements,
subElement,
$"element {subCounter} in element {elementIndex}",
elementIndex);
}
subCounter++;
}
Expand All @@ -442,8 +471,8 @@ private void AddParallelElementsToList(
// elements.
var parallelInstance = new ParallelElements(
LoggerFactory.CreateLogger<ParallelElements>(),
parallelElements.ToArray());
elements.Add(parallelInstance);
parallelElements.Values.ToArray());
elements.TryAdd(elementIndex, parallelInstance);
}

/// <summary>
Expand Down