Skip to content

Commit

Permalink
Merge pull request #15 from delegateas/tst/job-status
Browse files Browse the repository at this point in the history
Tst/job status
  • Loading branch information
pksorensen authored Oct 18, 2023
2 parents 00807ce + 1fe2110 commit 9d5218b
Show file tree
Hide file tree
Showing 16 changed files with 252 additions and 116 deletions.
4 changes: 2 additions & 2 deletions apps/WorkflowEngine.DemoApp/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IService
(executor) => executor.TriggerAsync(new TriggerContext {
Workflow = workflows.First(),
Trigger = new Trigger { ScheduledTime = DateTimeOffset.UtcNow, Key = workflows.First().Manifest.Triggers.First().Key,
Type =workflows.First().Manifest.Triggers.First().Value.Type }, RunId = Guid.NewGuid() }));
Type =workflows.First().Manifest.Triggers.First().Value.Type }, RunId = Guid.NewGuid() },null));

await c.Response.WriteAsync("Background JOb:" + a);

Expand Down Expand Up @@ -333,7 +333,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IService
Key = workflows.First(w => w.Id.ToString() == c.GetRouteValue("id") as string).Manifest.Triggers.FirstOrDefault().Key
},
Workflow = workflows.First(w=>w.Id.ToString() == c.GetRouteValue("id") as string)
}));
},null));

await c.Response.WriteAsync("Background JOb:" + a);

Expand Down
31 changes: 31 additions & 0 deletions src/WorkflowEngine.Core/ActionCompletedEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using Newtonsoft.Json;

namespace WorkflowEngine.Core
{
public class ActionCompletedEvent : Event
{
public override EventType EventType => EventType.ActionCompleted;

[JsonProperty("jobId")]
public string JobId { get; set; }
[JsonProperty("actionKey")]
public string ActionKey { get; set; }
[JsonProperty("resultPath")]
public string ResultPath { get; set; }

[JsonProperty("status")]
public string Status { get; set; }

public static ActionCompletedEvent FromAction(IActionResult result,IAction action,string jobId)
{

return new ActionCompletedEvent
{

JobId = jobId,
ActionKey = action.Key,
Status = result.Status,
};
}
}
}
78 changes: 40 additions & 38 deletions src/WorkflowEngine.Core/ActionExecutor.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using ExpressionEngine;
using ExpressionEngine;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
Expand All @@ -18,32 +18,32 @@ public class ScopeContext : IScopeContext
}
public class ActionExecutor : IActionExecutor
{
private readonly IOutputsRepository outputsRepository;
private readonly IServiceProvider serviceProvider;
private readonly ILogger logger;
private readonly IScopeContext scopeContext;
private readonly IExpressionEngine expressionEngine;
private readonly IOutputsRepository _outputsRepository;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger _logger;
private readonly IScopeContext _scopeContext;
private readonly IExpressionEngine _expressionEngine;
private Dictionary<string, IActionImplementationMetadata> _implementations;

public ActionExecutor(
IEnumerable<IActionImplementationMetadata> implementations,
IEnumerable<IActionImplementationMetadata> implementations,
IOutputsRepository outputsRepository,
IServiceProvider serviceProvider,
ILogger<ActionExecutor> logger,
IScopeContext scopeContext,
IExpressionEngine expressionEngine)
{
if(implementations.GroupBy(k=>k.Type).Any(c=>c.Count() > 1))

if (implementations.GroupBy(k => k.Type).Any(c => c.Count() > 1))
{
throw new ArgumentException("Double registration of " + String.Join(",", implementations.GroupBy(k => k.Type).Where(c => c.Count() > 1).Select(c=>c.Key)));
throw new ArgumentException("Double registration of " + String.Join(",", implementations.GroupBy(k => k.Type).Where(c => c.Count() > 1).Select(c => c.Key)));
}
_implementations = implementations?.ToDictionary(k => k.Type) ?? throw new ArgumentNullException(nameof(implementations));
this.outputsRepository=outputsRepository??throw new ArgumentNullException(nameof(outputsRepository));
this.serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
this.logger=logger??throw new ArgumentNullException(nameof(logger));
this.scopeContext=scopeContext;
this.expressionEngine=expressionEngine??throw new ArgumentNullException(nameof(expressionEngine));
_outputsRepository = outputsRepository ?? throw new ArgumentNullException(nameof(outputsRepository));
_serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider));
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
_scopeContext = scopeContext;
_expressionEngine = expressionEngine ?? throw new ArgumentNullException(nameof(expressionEngine));
}
public async ValueTask<IActionResult> ExecuteAsync(IRunContext context, IWorkflow workflow, IAction action)
{
Expand All @@ -52,13 +52,13 @@ public async ValueTask<IActionResult> ExecuteAsync(IRunContext context, IWorkflo

if (action.ScopeMoveNext)
{
await outputsRepository.EndScope(context, workflow, action);
await _outputsRepository.EndScope(context, workflow, action);
}

var actionMetadata = workflow.Manifest.Actions.FindAction(action.Key);
scopeContext.Scope=action.Key;
action.Inputs = await expressionEngine.ResolveInputs(actionMetadata,logger);
_scopeContext.Scope = action.Key;
action.Inputs = await _expressionEngine.ResolveInputs(actionMetadata, _logger);

{
//if (workflow.Manifest.Actions.FindParentAction(action.Key) is ForLoopActionMetadata parent)
//{
Expand All @@ -71,45 +71,47 @@ public async ValueTask<IActionResult> ExecuteAsync(IRunContext context, IWorkflo
//}
//else
{
await outputsRepository.AddInput(context, workflow, action);
await _outputsRepository.AddInput(context, workflow, action);
}
}

var actionImplementation = serviceProvider.GetRequiredService(_implementations[actionMetadata.Type].Implementation) as IActionImplementation;


var actionImplementation = _serviceProvider.GetRequiredService(_implementations[actionMetadata.Type].Implementation) as IActionImplementation;



var result = new ActionResult {
Key = action.Key,
Status = "Succeded",
Result = await actionImplementation.ExecuteAsync(context,workflow, action)

var result = new ActionResult
{
Key = action.Key,
Status = "Succeded",
Result = await actionImplementation.ExecuteAsync(context, workflow, action)
};

await outputsRepository.AddAsync(context, workflow, action, result);


await _outputsRepository.AddAsync(context, workflow, action, result);


return result;


}catch(Exception ex)


}
catch (Exception ex)
{
var result= new ActionResult { Key = action.Key, Status = "Failed", FailedReason=ex.ToString(), ReThrow = (ex is InvalidOperationException) };
var result = new ActionResult { Key = action.Key, Status = "Failed", FailedReason = ex.ToString(), ReThrow = (ex is InvalidOperationException) };
try
{
await outputsRepository.AddAsync(context, workflow, action, result);
await _outputsRepository.AddAsync(context, workflow, action, result);
}
catch (Exception )
catch (Exception exx)
{

}
return result;
}
}
}



}
18 changes: 10 additions & 8 deletions src/WorkflowEngine.Core/DefaultOutputsRepository.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using Newtonsoft.Json;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Concurrent;
Expand Down Expand Up @@ -64,7 +64,7 @@ private JToken GetOrCreateRun(IRunContext context)
return Runs.GetOrAdd(context.RunId, (id) => new JObject(new JProperty("actions", new JObject()), new JProperty("triggers", new JObject())));
}

public ValueTask AddAsync(IRunContext context, IWorkflow workflow, ITrigger trigger)
public ValueTask AddTrigger(ITriggerContext context, IWorkflow workflow, ITrigger trigger)
{
JToken run = GetOrCreateRun(context);

Expand Down Expand Up @@ -170,7 +170,14 @@ public ValueTask EndScope(IRunContext context, IWorkflow workflow, IAction actio
return new ValueTask();

}
public ValueTask StartScope(IRunContext context, IWorkflow workflow, IAction action)

public ValueTask AddEvent(IRunContext run, IWorkflow workflow, IAction action, Event @event)
{
// Defaults to nothing to not create noise and breaks backwards compatibility
return new ValueTask();
}

public ValueTask StartScope(IRunContext context, IWorkflow workflow, IAction action)
{
JToken run = GetOrCreateRun(context);

Expand All @@ -197,10 +204,5 @@ public ValueTask AddInput(IRunContext context, IWorkflow workflow, IAction actio

return new ValueTask();
}



}


}
48 changes: 48 additions & 0 deletions src/WorkflowEngine.Core/Event.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
using Newtonsoft.Json.Linq;
using System;

namespace WorkflowEngine.Core
{
public interface IHaveFinisningStatus
{
IActionResult Result { get; }
}
[JsonConverter(typeof(BaseClassConverter))]
public abstract class Event
{
[JsonProperty("eventType")]
[Newtonsoft.Json.JsonConverter(typeof(StringEnumConverter))]
public abstract EventType EventType { get; }


}

public class BaseClassConverter : CustomCreationConverter<Event>
{
private EventType _currentObjectType;

public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
{
var jobj = JObject.ReadFrom(reader);
_currentObjectType = jobj["eventType"].ToObject<EventType>();
return base.ReadJson(jobj.CreateReader(), objectType, existingValue, serializer);
}

public override Event Create(Type objectType)
{
switch (_currentObjectType)
{
case EventType.ActionCompleted:
return new ActionCompletedEvent();
case EventType.WorkflowStarted:
return new WorkflowStarteddEvent();
case EventType.WorkflowFinished:
return new WorkflowFinishedEvent();
default:
throw new NotImplementedException();
}
}
}
}
15 changes: 15 additions & 0 deletions src/WorkflowEngine.Core/EventType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Newtonsoft.Json.Linq;
using System.Runtime.Serialization;

namespace WorkflowEngine.Core
{
public enum EventType
{
[EnumMember(Value = "workflow_started")]
WorkflowStarted = 0,
[EnumMember(Value = "workflow_finished")]
WorkflowFinished = 1,
[EnumMember(Value = "action_completed")]
ActionCompleted = 2
}
}
32 changes: 21 additions & 11 deletions src/WorkflowEngine.Core/ExpressionEngineExtensions.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
using ExpressionEngine;
using ExpressionEngine;
using ExpressionEngine.Functions.Base;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.Linq;
Expand All @@ -24,30 +25,39 @@ public static IServiceCollection AddFunctions(this IServiceCollection services)

return services;
}
public static async ValueTask<IDictionary<string,object>> ResolveInputs(this IExpressionEngine engine, ActionMetadata actionMetadata, ILogger logger)
public static async ValueTask<IDictionary<string, object>> ResolveInputs(this IExpressionEngine engine, IDictionary<string, object> inputs, ILogger logger)
{

var resolvedInputs = new Dictionary<string, object>();

foreach (var input in actionMetadata.Inputs)
foreach (var input in inputs)
{
if (input.Value is string str && str.Contains("@"))
{
resolvedInputs[input.Key] = await engine.ParseToValueContainer(str);
resolvedInputs[input.Key] = await engine.ParseToValueContainer(input.Value.ToString());
}
else
{
resolvedInputs[input.Key] = input.Value;
}
//else
//{
if (input.Value is IDictionary<string, object> obj)
{
resolvedInputs[input.Key] = await engine.ResolveInputs(obj, logger);
}
else
{
resolvedInputs[input.Key] = input.Value;
}

// logger.LogWarning("{Key}: {Type}", input, inputs[input].GetType());
//}
// inputs[input] = inputs[input];
}

}

return resolvedInputs;

}
public static ValueTask<IDictionary<string,object>> ResolveInputs(this IExpressionEngine engine, ActionMetadata actionMetadata, ILogger logger)
{
return engine.ResolveInputs(actionMetadata.Inputs, logger);

}
}
}
9 changes: 4 additions & 5 deletions src/WorkflowEngine.Core/IOutputsRepository.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
using System;
using System;
using System.Threading.Tasks;

namespace WorkflowEngine.Core
{
public interface IOutputsRepository
{
ValueTask AddAsync(IRunContext context, IWorkflow workflow, IAction action, IActionResult result);
ValueTask AddAsync(IRunContext context, IWorkflow workflow, ITrigger trigger);
ValueTask AddTrigger(ITriggerContext context, IWorkflow workflow, ITrigger trigger);
ValueTask<object> GetTriggerData(Guid id);
ValueTask AddInput(IRunContext context, IWorkflow workflow, IAction action);
ValueTask<object> GetOutputData(Guid id, string v);
ValueTask AddArrayItemAsync(IRunContext run, IWorkflow workflow, string key, IActionResult result);
ValueTask AddArrayInput(IRunContext context, IWorkflow workflow, IAction action);
ValueTask StartScope(IRunContext context, IWorkflow workflow, IAction action);
// ValueTask StartScope(IRunContext context, IWorkflow workflow, IAction action);
ValueTask AddScopeItem(IRunContext context, IWorkflow workflow, IAction action, IActionResult result);
ValueTask EndScope(IRunContext run, IWorkflow workflow, IAction action);
ValueTask AddEvent(IRunContext run, IWorkflow workflow, IAction action, Event @event);
}


}
3 changes: 2 additions & 1 deletion src/WorkflowEngine.Core/ITriggerContext.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;

namespace WorkflowEngine.Core
{
Expand All @@ -7,6 +7,7 @@ public interface ITriggerContext:IRunContext
IWorkflow Workflow { get; }
ITrigger Trigger { get; set; }

string JobId { get; set; }

}

Expand Down
Loading

0 comments on commit 9d5218b

Please sign in to comment.