Skip to content

Commit

Permalink
fix: added support for not retrying invalidexceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
pksorensen committed Sep 11, 2022
1 parent e2eaf1d commit d5f9d39
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 25 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@
.vs
bin
obj
/src/WorkflowEngine.Core/test1.txt
/src/WorkflowEngine.Hangfire/test1.txt
28 changes: 13 additions & 15 deletions src/WorkflowEngine.Core/ActionExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,28 +86,26 @@ public async ValueTask<IActionResult> ExecuteAsync(IRunContext context, IWorkflo
Result = await actionImplementation.ExecuteAsync(context,workflow, action)
};

{
//if (workflow.Manifest.Actions.FindParentAction(action.Key) is ForLoopActionMetadata parent)
//{
// await outputsRepository.AddArrayItemAsync(context, workflow, action.Key, result);
//}
//else
//if (actionMetadata is ForLoopActionMetadata)
//{
// await outputsRepository.AddScopeItem(context, workflow, action, result);
//}
//else
{


await outputsRepository.AddAsync(context, workflow, action, result);
}
}


return result;


}catch(Exception ex)
{
return new ActionResult { Key = action.Key, Status = "Failed", FailedReason=ex.ToString() };
var result= new ActionResult { Key = action.Key, Status = "Failed", FailedReason=ex.ToString() };
try
{
await outputsRepository.AddAsync(context, workflow, action, result);
}
catch (Exception )
{

}
return result;
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowEngine.Core/DefaultOutputsRepository.cs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public ValueTask EndScope(IRunContext context, IWorkflow workflow, IAction actio
body.Add(actions);


action.Index= body.Count;
action.Index= body.Count; //TODO - test if action.index is set and we could basically count it ++ instead, because then we can move it out and control it better for parallel

return new ValueTask();

Expand Down
1 change: 1 addition & 0 deletions src/WorkflowEngine.Core/WorkflowActions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ namespace WorkflowEngine.Core
public class ForLoopActionMetadata : ActionMetadata, IScopedActionMetadata
{
public object ForEach { get; set; }
public int ConcurrentCount { get; set; } = 1;

public WorkflowActions Actions { get; set; } = new WorkflowActions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public static IServiceCollection AddWorkflowEngine<TOutputsRepository>(this ISer
services.AddHostedService<WorkflowStarterBackgroundJob>();

services.AddTransient<IWorkflowAccessor, DefaultWorkflowAccessor>();
services.AddTransient<IHangfireActionExecutorResultInspector, DefaultHangfireActionExecutorResultInspector>();
return services;
}

Expand Down
28 changes: 25 additions & 3 deletions src/WorkflowEngine.Hangfire/ForloopAction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,40 @@ public async ValueTask<object> ExecuteAsync(IRunContext context, IWorkflow workf

if (items.Type() == ExpressionEngine.ValueType.Array)
{
var aa = items.GetValue<List<ValueContainer>>();
if (action.Index < aa.Count)
var itemsToRunover = items.GetValue<List<ValueContainer>>();

if (loop.ConcurrentCount > 1)
{
var pageSize = itemsToRunover.Count / loop.ConcurrentCount;

foreach (var child in Enumerable.Range(0, loop.ConcurrentCount))
{
var nextactionmetadata = loop.Actions.SingleOrDefault(c => c.Value.RunAfter?.Count == 0);
if (nextactionmetadata.Equals(default(KeyValuePair<string, ActionMetadata>)))
throw new Exception("No action with no runafter could be found");

var nextaction = context.CopyTo(new Action { Type = nextactionmetadata.Value.Type, Key = $"{action.Key}.{nextactionmetadata.Key}", ScheduledTime = DateTimeOffset.UtcNow, Index = action.Index });

var a = backgroundJobClient.ContinueJobWith<IHangfireActionExecutor>(arrayContext.JobId,
(executor) => executor.ExecuteAsync(context, workflow, nextaction, null));

return new { item = itemsToRunover[action.Index] };
}
}
else if (action.Index < itemsToRunover.Count)
{
// var nextAction = new Action { Type = action.Type, Key=action.Key, ScheduledTime = DateTimeOffset.UtcNow, RunId = context.RunId, Index = action.Index+1 };

var nextactionmetadata = loop.Actions.SingleOrDefault(c => c.Value.RunAfter?.Count == 0);
if (nextactionmetadata.Equals( default(KeyValuePair<string, ActionMetadata>)))
throw new Exception("No action with no runafter could be found");

var nextaction = context.CopyTo( new Action { Type = nextactionmetadata.Value.Type, Key= $"{action.Key}.{nextactionmetadata.Key}", ScheduledTime = DateTimeOffset.UtcNow, Index=action.Index });

var a = backgroundJobClient.ContinueJobWith<IHangfireActionExecutor>(arrayContext.JobId,
(executor) => executor.ExecuteAsync(context, workflow, nextaction,null));

return new { item = aa[action.Index] };
return new { item = itemsToRunover[action.Index] };
}

}
Expand Down
38 changes: 33 additions & 5 deletions src/WorkflowEngine.Hangfire/HangfireWorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,32 @@ public static string TriggerAsync<TTriggerContext>(this IBackgroundJobClient bac

}
}
public interface IHangfireActionExecutorResultInspector
{
Task InspectAsync(IRunContext run, IWorkflow workflow, IActionResult result, IAction next);
}
public class DefaultHangfireActionExecutorResultInspector : IHangfireActionExecutorResultInspector
{
public Task InspectAsync(IRunContext run, IWorkflow workflow, IActionResult result, IAction next)
{
return Task.CompletedTask;
}
}
public class HangfireWorkflowExecutor : IHangfireWorkflowExecutor, IHangfireActionExecutor
{
private readonly IWorkflowAccessor workflowAccessor;
private readonly IHangfireActionExecutorResultInspector hangfireActionExecutorResultHandler;
private readonly IBackgroundJobClient backgroundJobClient;
private readonly IRunContextAccessor runContextAccessor;
private readonly IWorkflowExecutor executor;
private readonly IActionExecutor actionExecutor;
private readonly IOutputsRepository outputRepository;
private readonly IArrayContext arrayContext;

public HangfireWorkflowExecutor(IWorkflowAccessor workflowAccessor, IBackgroundJobClient backgroundJobClient, IArrayContext arrayContext, IRunContextAccessor runContextAccessor, IWorkflowExecutor executor, IActionExecutor actionExecutor, IOutputsRepository actionResultRepository)
public HangfireWorkflowExecutor(IWorkflowAccessor workflowAccessor, IHangfireActionExecutorResultInspector hangfireActionExecutorResultHandler, IBackgroundJobClient backgroundJobClient, IArrayContext arrayContext, IRunContextAccessor runContextAccessor, IWorkflowExecutor executor, IActionExecutor actionExecutor, IOutputsRepository actionResultRepository)
{
this.workflowAccessor = workflowAccessor ?? throw new ArgumentNullException(nameof(workflowAccessor));
this.hangfireActionExecutorResultHandler = hangfireActionExecutorResultHandler ?? throw new ArgumentNullException(nameof(hangfireActionExecutorResultHandler));
this.backgroundJobClient=backgroundJobClient??throw new ArgumentNullException(nameof(backgroundJobClient));
this.arrayContext=arrayContext??throw new ArgumentNullException(nameof(arrayContext));
this.runContextAccessor=runContextAccessor;
Expand Down Expand Up @@ -84,27 +97,42 @@ public async ValueTask<object> ExecuteAsync(IRunContext run, IWorkflow workflow,

var result = await actionExecutor.ExecuteAsync(run, workflow, action);





if (result != null)
{

var next = await executor.GetNextAction(run, workflow, result);


await hangfireActionExecutorResultHandler.InspectAsync(run, workflow, result, next);

if (next != null)
{
var a = backgroundJobClient.Enqueue<IHangfireActionExecutor>(
(executor) => executor.ExecuteAsync(run, workflow, next,null));
}else if(workflow.Manifest.Actions.FindParentAction(action.Key) is ForLoopActionMetadata scope)
(executor) => executor.ExecuteAsync(run, workflow, next, null));
}
else if (workflow.Manifest.Actions.FindParentAction(action.Key) is ForLoopActionMetadata scope)
{

var scopeaction= run.CopyTo( new Action {ScopeMoveNext=true, Type = scope.Type, Key=action.Key.Substring(0, action.Key.LastIndexOf('.')), ScheduledTime=DateTimeOffset.UtcNow });
var scopeaction = run.CopyTo(new Action { ScopeMoveNext = true, Type = scope.Type, Key = action.Key.Substring(0, action.Key.LastIndexOf('.')), ScheduledTime = DateTimeOffset.UtcNow });


var a = backgroundJobClient.Enqueue<IHangfireActionExecutor>(
(executor) => executor.ExecuteAsync(run, workflow, scopeaction, null));

//await outputRepository.EndScope(run, workflow, action);
} else if (result.Status == "Failed")
{
context.SetJobParameter("RetryCount", 999);
throw new InvalidOperationException("Action failed") { Data = { ["ActionResult"] = result } };
}





}

return result;
Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowEngine.Hangfire/WorkflowEngine.Hangfire.csproj
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netcoreapp3.1;net5.0;net6.0</TargetFrameworks>
<TargetFrameworks>netcoreapp3.1;net5.0</TargetFrameworks>

<PackageId>Delegate.WorkflowEngine.Hangfire</PackageId>
<Authors>Delegate A/S</Authors>
Expand Down

0 comments on commit d5f9d39

Please sign in to comment.