Skip to content

Commit

Permalink
fix: #12
Browse files Browse the repository at this point in the history
  • Loading branch information
pksorensen committed Sep 11, 2022
1 parent d5f9d39 commit d8cdf79
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 36 deletions.
2 changes: 1 addition & 1 deletion src/WorkflowEngine.Core/ActionExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public async ValueTask<IActionResult> ExecuteAsync(IRunContext context, IWorkflo

}catch(Exception ex)
{
var result= new ActionResult { Key = action.Key, Status = "Failed", FailedReason=ex.ToString() };
var result= new ActionResult { Key = action.Key, Status = "Failed", FailedReason=ex.ToString(), ReThrow = (ex is InvalidOperationException) };
try
{
await outputsRepository.AddAsync(context, workflow, action, result);
Expand Down
1 change: 1 addition & 0 deletions src/WorkflowEngine.Core/ActionResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class ActionResult : IActionResult
public string Status { get; set; }
public object Result { get; set; }
public string FailedReason { get; set; }
public bool ReThrow { get; set; }
}


Expand Down
1 change: 1 addition & 0 deletions src/WorkflowEngine.Core/IActionResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public interface IActionResult
string Status { get; }
object Result { get; }
string FailedReason { get; }
public bool ReThrow { get; }
}


Expand Down
84 changes: 49 additions & 35 deletions src/WorkflowEngine.Hangfire/HangfireWorkflowExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public DefaultWorkflowAccessor(IEnumerable<IWorkflow> workflows)
}
public ValueTask<WorkflowManifest> GetWorkflowManifestAsync(IWorkflow workflow)
{
return new ValueTask<WorkflowManifest>(workflows.FirstOrDefault(x=>x.Id == workflow.Id && workflow.Version == x.Version).Manifest);
return new ValueTask<WorkflowManifest>(workflows.FirstOrDefault(x => x.Id == workflow.Id && workflow.Version == x.Version).Manifest);
}
}

Expand All @@ -44,7 +44,7 @@ public static string TriggerAsync<TTriggerContext>(this IBackgroundJobClient bac
}
}
public interface IHangfireActionExecutorResultInspector
{
{
Task InspectAsync(IRunContext run, IWorkflow workflow, IActionResult result, IAction next);
}
public class DefaultHangfireActionExecutorResultInspector : IHangfireActionExecutorResultInspector
Expand All @@ -69,12 +69,12 @@ public HangfireWorkflowExecutor(IWorkflowAccessor workflowAccessor, IHangfireAct
{
this.workflowAccessor = workflowAccessor ?? throw new ArgumentNullException(nameof(workflowAccessor));
this.hangfireActionExecutorResultHandler = hangfireActionExecutorResultHandler ?? throw new ArgumentNullException(nameof(hangfireActionExecutorResultHandler));
this.backgroundJobClient=backgroundJobClient??throw new ArgumentNullException(nameof(backgroundJobClient));
this.arrayContext=arrayContext??throw new ArgumentNullException(nameof(arrayContext));
this.runContextAccessor=runContextAccessor;
this.backgroundJobClient = backgroundJobClient ?? throw new ArgumentNullException(nameof(backgroundJobClient));
this.arrayContext = arrayContext ?? throw new ArgumentNullException(nameof(arrayContext));
this.runContextAccessor = runContextAccessor;
this.executor = executor ?? throw new ArgumentNullException(nameof(executor));
this.actionExecutor = actionExecutor ?? throw new ArgumentNullException(nameof(actionExecutor));
this.outputRepository=actionResultRepository;
this.outputRepository = actionResultRepository;
}

/// <summary>
Expand All @@ -91,51 +91,65 @@ public async ValueTask<object> ExecuteAsync(IRunContext run, IWorkflow workflow,


runContextAccessor.RunContext = run;
arrayContext.JobId=context.BackgroundJob.Id;
arrayContext.JobId = context.BackgroundJob.Id;



var result = await actionExecutor.ExecuteAsync(run, workflow, action);

try
{

var result = await actionExecutor.ExecuteAsync(run, workflow, action);


if (result != null)
{

var next = await executor.GetNextAction(run, workflow, result);


await hangfireActionExecutorResultHandler.InspectAsync(run, workflow, result, next);

if (next != null)
{
var a = backgroundJobClient.Enqueue<IHangfireActionExecutor>(
(executor) => executor.ExecuteAsync(run, workflow, next, null));
}
else if (workflow.Manifest.Actions.FindParentAction(action.Key) is ForLoopActionMetadata scope)
if (result != null)
{

var scopeaction = run.CopyTo(new Action { ScopeMoveNext = true, Type = scope.Type, Key = action.Key.Substring(0, action.Key.LastIndexOf('.')), ScheduledTime = DateTimeOffset.UtcNow });
var next = await executor.GetNextAction(run, workflow, result);


var a = backgroundJobClient.Enqueue<IHangfireActionExecutor>(
(executor) => executor.ExecuteAsync(run, workflow, scopeaction, null));
await hangfireActionExecutorResultHandler.InspectAsync(run, workflow, result, next);

//await outputRepository.EndScope(run, workflow, action);
} else if (result.Status == "Failed")
{
context.SetJobParameter("RetryCount", 999);
throw new InvalidOperationException("Action failed") { Data = { ["ActionResult"] = result } };
}
if (next != null)
{
var a = backgroundJobClient.Enqueue<IHangfireActionExecutor>(
(executor) => executor.ExecuteAsync(run, workflow, next, null));
}
else if (workflow.Manifest.Actions.FindParentAction(action.Key) is ForLoopActionMetadata scope)
{

var scopeaction = run.CopyTo(new Action { ScopeMoveNext = true, Type = scope.Type, Key = action.Key.Substring(0, action.Key.LastIndexOf('.')), ScheduledTime = DateTimeOffset.UtcNow });


var a = backgroundJobClient.Enqueue<IHangfireActionExecutor>(
(executor) => executor.ExecuteAsync(run, workflow, scopeaction, null));

//await outputRepository.EndScope(run, workflow, action);
}
else if (result.Status == "Failed" && result.ReThrow)
{

throw new InvalidOperationException("Action failed: " + result.FailedReason) { Data = { ["ActionResult"] = result } };
}






}


return result;
}
catch (InvalidOperationException ex)
{
context.SetJobParameter("RetryCount", 999);
throw;
}


return result;
}
/// <summary>
/// Runs on the background process in hangfire
Expand All @@ -147,19 +161,19 @@ public async ValueTask<object> TriggerAsync(ITriggerContext context)
//TODO - avoid sending all workflow over hangfire,
context.Workflow.Manifest ??= await workflowAccessor.GetWorkflowManifestAsync(context.Workflow);

context.RunId = context.RunId == Guid.Empty? Guid.NewGuid() : context.RunId;
context.RunId = context.RunId == Guid.Empty ? Guid.NewGuid() : context.RunId;

runContextAccessor.RunContext = context;
var action = await executor.Trigger(context);

if (action != null)
{
//TODO - avoid sending all workflow over hangfire, so we should wipe the workflow.manifest before scheduling and restore it after.
context.Workflow.Manifest = null;


var a = backgroundJobClient.Enqueue<IHangfireActionExecutor>(
(executor) => executor.ExecuteAsync(context, context.Workflow, action,null));
(executor) => executor.ExecuteAsync(context, context.Workflow, action, null));
}
return action;
}
Expand Down

0 comments on commit d8cdf79

Please sign in to comment.