diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 292ad9c6468..2b497057ada 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -122,37 +122,37 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error { return capabilityRegistrationErr } -func (e *Engine) initializeCapability(ctx context.Context, s *step) error { +func (e *Engine) initializeCapability(ctx context.Context, step *step) error { // If the capability already exists, that means we've already registered it - if s.capability != nil { + if step.capability != nil { return nil } - cp, err := e.registry.Get(ctx, s.ID) + cp, err := e.registry.Get(ctx, step.ID) if err != nil { - return fmt.Errorf("failed to get capability with ref %s: %s", s.ID, err) + return fmt.Errorf("failed to get capability with ref %s: %s", step.ID, err) } // We configure actions, consensus and targets here, and // they all satisfy the `CallbackCapability` interface cc, ok := cp.(capabilities.CallbackCapability) if !ok { - return fmt.Errorf("could not coerce capability %s to CallbackCapability", s.ID) + return fmt.Errorf("could not coerce capability %s to CallbackCapability", step.ID) } - if s.config == nil { - configMap, newMapErr := values.NewMap(s.Config) + if step.config == nil { + configMap, newMapErr := values.NewMap(step.Config) if newMapErr != nil { return fmt.Errorf("failed to convert config to values.Map: %s", newMapErr) } - s.config = configMap + step.config = configMap } registrationRequest := capabilities.RegisterToWorkflowRequest{ Metadata: capabilities.RegistrationMetadata{ WorkflowID: e.workflow.id, }, - Config: s.config, + Config: step.config, } err = cc.RegisterToWorkflow(ctx, registrationRequest) @@ -160,7 +160,7 @@ func (e *Engine) initializeCapability(ctx context.Context, s *step) error { return fmt.Errorf("failed to register to workflow (%+v): %w", registrationRequest, err) } - s.capability = cc + step.capability = cc return nil } @@ -260,8 +260,8 @@ func (e *Engine) resumeInProgressExecutions(ctx context.Context) error { // and `scheduledExecution` for targets. If we don't have the necessary // config to initialize a scheduledExecution for a target, we'll fallback to // using `immediateExecution`. -func (e *Engine) initializeExecutionStrategy(step *step) error { - if step.executionStrategy != nil { +func (e *Engine) initializeExecutionStrategy(s *step) error { + if s.executionStrategy != nil { return nil } @@ -272,16 +272,16 @@ func (e *Engine) initializeExecutionStrategy(step *step) error { } ie := immediateExecution{} - if step.CapabilityType != capabilities.CapabilityTypeTarget { - e.logger.Debugf("initializing step %+v with immediate execution strategy: not a target", step) - step.executionStrategy = ie + if s.CapabilityType != capabilities.CapabilityTypeTarget { + e.logger.Debugf("initializing step %+v with immediate execution strategy: not a target", s) + s.executionStrategy = ie return nil } dinfo := e.donInfo if dinfo.DON == nil { e.logger.Debugf("initializing target step with immediate execution strategy: donInfo %+v", e.donInfo) - step.executionStrategy = ie + s.executionStrategy = ie return nil } @@ -294,17 +294,17 @@ func (e *Engine) initializeExecutionStrategy(step *step) error { } if position == nil { - e.logger.Debugf("initializing step %+v with immediate execution strategy: position not found in donInfo %+v", step, e.donInfo) - step.executionStrategy = ie + e.logger.Debugf("initializing step %+v with immediate execution strategy: position not found in donInfo %+v", s, e.donInfo) + s.executionStrategy = ie return nil } - step.executionStrategy = scheduledExecution{ + s.executionStrategy = scheduledExecution{ DON: e.donInfo.DON, Position: *position, PeerID: e.donInfo.PeerID(), } - e.logger.Debugf("initializing step %+v with scheduled execution strategy", step) + e.logger.Debugf("initializing step %+v with scheduled execution strategy", s) return nil } diff --git a/core/services/workflows/models.go b/core/services/workflows/models.go index 8dce11cabe5..dadadc8ba0e 100644 --- a/core/services/workflows/models.go +++ b/core/services/workflows/models.go @@ -16,10 +16,10 @@ type stepRequest struct { state store.WorkflowExecution } -// stepDefinition is the parsed representation of a step in a workflow. +// StepDefinition is the parsed representation of a step in a workflow. // // Within the workflow spec, they are called "Capability Properties". -type stepDefinition struct { +type StepDefinition struct { ID string `json:"id" jsonschema:"required"` Ref string `json:"ref,omitempty" jsonschema:"pattern=^[a-z0-9_]+$"` Inputs map[string]any `json:"inputs,omitempty"` @@ -28,16 +28,16 @@ type stepDefinition struct { CapabilityType capabilities.CapabilityType `json:"-"` } -// workflowSpec is the parsed representation of a workflow. -type workflowSpec struct { - Triggers []stepDefinition `json:"triggers" jsonschema:"required"` - Actions []stepDefinition `json:"actions,omitempty"` - Consensus []stepDefinition `json:"consensus" jsonschema:"required"` - Targets []stepDefinition `json:"targets" jsonschema:"required"` +// WorkflowSpec is the parsed representation of a workflow. +type WorkflowSpec struct { + Triggers []StepDefinition `json:"triggers" jsonschema:"required"` + Actions []StepDefinition `json:"actions,omitempty"` + Consensus []StepDefinition `json:"consensus" jsonschema:"required"` + Targets []StepDefinition `json:"targets" jsonschema:"required"` } -func (w *workflowSpec) steps() []stepDefinition { - s := []stepDefinition{} +func (w *WorkflowSpec) Steps() []StepDefinition { + s := []StepDefinition{} s = append(s, w.Actions...) s = append(s, w.Consensus...) s = append(s, w.Targets...) @@ -55,7 +55,7 @@ type workflow struct { triggers []*triggerCapability - spec *workflowSpec + spec *WorkflowSpec } func (w *workflow) walkDo(start string, do func(s *step) error) error { @@ -106,17 +106,39 @@ func (w *workflow) dependents(start string) ([]*step, error) { return steps, nil } -// step wraps a stepDefinition with additional context for dependencies and execution +// step wraps a Vertex with additional context for execution that is mutated by the engine type step struct { - stepDefinition - dependencies []string + Vertex capability capabilities.CallbackCapability config *values.Map executionStrategy executionStrategy } +type Vertex struct { + StepDefinition + dependencies []string +} + +// DependencyGraph is an intermediate representation of a workflow wherein all the graph +// vertices are represented and validated. It is a static representation of the workflow dependencies. +type DependencyGraph struct { + ID string + graph.Graph[string, *Vertex] + + Triggers []*StepDefinition + + Spec *WorkflowSpec +} + +// VID is an identifier for a Vertex that can be used to uniquely identify it in a graph. +// it represents the notion `hash` in the graph package AddVertex method. +// we refrain from naming it `hash` to avoid confusion with the hash function. +func (v *Vertex) VID() string { + return v.Ref +} + type triggerCapability struct { - stepDefinition + StepDefinition trigger capabilities.TriggerCapability config *values.Map } @@ -126,6 +148,14 @@ const ( ) func Parse(yamlWorkflow string) (*workflow, error) { + wf2, err := ParseDepedencyGraph(yamlWorkflow) + if err != nil { + return nil, err + } + return createWorkflow(wf2) +} + +func ParseDepedencyGraph(yamlWorkflow string) (*DependencyGraph, error) { spec, err := ParseWorkflowSpecYaml(yamlWorkflow) if err != nil { return nil, err @@ -138,23 +168,23 @@ func Parse(yamlWorkflow string) (*workflow, error) { // Note: all triggers are represented by a single step called // `trigger`. This is because for workflows with multiple triggers // only one trigger will have started the workflow. - stepHash := func(s *step) string { - return s.Ref + stepHash := func(s *Vertex) string { + return s.VID() } g := graph.New( stepHash, graph.PreventCycles(), graph.Directed(), ) - err = g.AddVertex(&step{ - stepDefinition: stepDefinition{Ref: keywordTrigger}, + err = g.AddVertex(&Vertex{ + StepDefinition: StepDefinition{Ref: keywordTrigger}, }) if err != nil { return nil, err } // Next, let's populate the other entries in the graph. - for _, s := range spec.steps() { + for _, s := range spec.Steps() { // TODO: The workflow format spec doesn't always require a `Ref` // to be provided (triggers and targets don't have a `Ref` for example). // To handle this, we default the `Ref` to the type, but ideally we @@ -163,7 +193,7 @@ func Parse(yamlWorkflow string) (*workflow, error) { s.Ref = s.ID } - innerErr := g.AddVertex(&step{stepDefinition: s}) + innerErr := g.AddVertex(&Vertex{StepDefinition: s}) if innerErr != nil { return nil, fmt.Errorf("cannot add vertex %s: %w", s.Ref, innerErr) } @@ -200,16 +230,72 @@ func Parse(yamlWorkflow string) (*workflow, error) { } } - triggerSteps := []*triggerCapability{} + triggerSteps := []*StepDefinition{} for _, t := range spec.Triggers { - triggerSteps = append(triggerSteps, &triggerCapability{ - stepDefinition: t, - }) + tt := t + triggerSteps = append(triggerSteps, &tt) } - wf := &workflow{ - spec: &spec, + wf := &DependencyGraph{ + Spec: &spec, Graph: g, - triggers: triggerSteps, + Triggers: triggerSteps, } return wf, err } + +// createWorkflow converts a StaticWorkflow to an executable workflow +// by adding metadata to the vertices that is owned by the workflow runtime. +func createWorkflow(wf2 *DependencyGraph) (*workflow, error) { + out := &workflow{ + id: wf2.ID, + triggers: []*triggerCapability{}, + spec: wf2.Spec, + } + + for _, t := range wf2.Triggers { + out.triggers = append(out.triggers, &triggerCapability{ + StepDefinition: *t, + }) + } + + stepHash := func(s *step) string { + // must use the same hash function as the DependencyGraph. + // this ensures that the intermediate representation (DependencyGraph) and the workflow + // representation label vertices with the same identifier, which in turn allows us to + // to copy the edges from the intermediate representation to the executable representation. + return s.Vertex.VID() + } + g := graph.New( + stepHash, + graph.PreventCycles(), + graph.Directed(), + ) + adjMap, err := wf2.Graph.AdjacencyMap() + if err != nil { + return nil, fmt.Errorf("failed to convert intermediate representation to adjacency map: %w", err) + } + + // copy the all the vertices from the intermediate graph to the executable workflow graph + for vertexRef := range adjMap { + v, innerErr := wf2.Graph.Vertex(vertexRef) + if innerErr != nil { + return nil, fmt.Errorf("failed to retrieve vertex for %s: %w", vertexRef, innerErr) + } + innerErr = g.AddVertex(&step{Vertex: *v}) + if innerErr != nil { + return nil, fmt.Errorf("failed to add vertex to executable workflow %s: %w", vertexRef, innerErr) + } + } + // now we can add all the edges. this works because we are using vertex hash function is the same in both graphs. + // see comment on `stepHash` function. + for vertexRef, edgeRefs := range adjMap { + for edgeRef := range edgeRefs { + innerErr := g.AddEdge(vertexRef, edgeRef) + if innerErr != nil { + return nil, fmt.Errorf("failed to add edge from '%s' to '%s': %w", vertexRef, edgeRef, innerErr) + } + } + } + out.Graph = g + return out, nil +} diff --git a/core/services/workflows/models_yaml.go b/core/services/workflows/models_yaml.go index 74ed8ee466d..90d3f109c06 100644 --- a/core/services/workflows/models_yaml.go +++ b/core/services/workflows/models_yaml.go @@ -20,7 +20,7 @@ func GenerateJsonSchema() ([]byte, error) { return json.MarshalIndent(schema, "", " ") } -func ParseWorkflowSpecYaml(data string) (workflowSpec, error) { +func ParseWorkflowSpecYaml(data string) (WorkflowSpec, error) { w := workflowSpecYaml{} err := yaml.Unmarshal([]byte(data), &w) @@ -46,36 +46,36 @@ type workflowSpecYaml struct { // // We support multiple ways of defining a workflow spec yaml, // but internally we want to work with a single representation. -func (w workflowSpecYaml) toWorkflowSpec() workflowSpec { - triggers := make([]stepDefinition, 0, len(w.Triggers)) +func (w workflowSpecYaml) toWorkflowSpec() WorkflowSpec { + triggers := make([]StepDefinition, 0, len(w.Triggers)) for _, t := range w.Triggers { sd := t.toStepDefinition() sd.CapabilityType = capabilities.CapabilityTypeTrigger triggers = append(triggers, sd) } - actions := make([]stepDefinition, 0, len(w.Actions)) + actions := make([]StepDefinition, 0, len(w.Actions)) for _, a := range w.Actions { sd := a.toStepDefinition() sd.CapabilityType = capabilities.CapabilityTypeAction actions = append(actions, sd) } - consensus := make([]stepDefinition, 0, len(w.Consensus)) + consensus := make([]StepDefinition, 0, len(w.Consensus)) for _, c := range w.Consensus { sd := c.toStepDefinition() sd.CapabilityType = capabilities.CapabilityTypeConsensus consensus = append(consensus, sd) } - targets := make([]stepDefinition, 0, len(w.Targets)) + targets := make([]StepDefinition, 0, len(w.Targets)) for _, t := range w.Targets { sd := t.toStepDefinition() sd.CapabilityType = capabilities.CapabilityTypeTarget targets = append(targets, sd) } - return workflowSpec{ + return WorkflowSpec{ Triggers: triggers, Actions: actions, Consensus: consensus, @@ -247,8 +247,8 @@ type stepDefinitionYaml struct { // toStepDefinition converts a stepDefinitionYaml to a stepDefinition. // // `stepDefinition` is the converged representation of a step in a workflow. -func (s stepDefinitionYaml) toStepDefinition() stepDefinition { - return stepDefinition{ +func (s stepDefinitionYaml) toStepDefinition() StepDefinition { + return StepDefinition{ Ref: s.Ref, ID: s.ID.String(), Inputs: s.Inputs,