-
Notifications
You must be signed in to change notification settings - Fork 6
/
job_definition.go
113 lines (91 loc) · 2.79 KB
/
job_definition.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package asyncjob
import (
"context"
"errors"
"fmt"
"github.com/Azure/go-asyncjob/graph"
)
// Interface for a job definition
type JobDefinitionMeta interface {
GetName() string
GetStep(stepName string) (StepDefinitionMeta, bool) // TODO: switch bool to error
Seal()
Sealed() bool
Visualize() (string, error)
// not exposing for now.
addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) error
getRootStep() StepDefinitionMeta
}
// JobDefinition defines a job with child steps, and step is organized in a Directed Acyclic Graph (DAG).
type JobDefinition[T any] struct {
name string
sealed bool
steps map[string]StepDefinitionMeta
stepsDag *graph.Graph[StepDefinitionMeta]
rootStep *StepDefinition[T]
}
// Create new JobDefinition
//
// it is suggest to build jobDefinition statically on process start, and reuse it for each job instance.
func NewJobDefinition[T any](name string) *JobDefinition[T] {
j := &JobDefinition[T]{
name: name,
steps: make(map[string]StepDefinitionMeta),
stepsDag: graph.NewGraph(connectStepDefinition),
}
rootStep := newStepDefinition[T](name, stepTypeRoot)
j.rootStep = rootStep
j.steps[j.rootStep.GetName()] = j.rootStep
j.stepsDag.AddNode(j.rootStep)
return j
}
// Start execution of the job definition.
//
// this will create and return new instance of the job
// caller will then be able to wait for the job instance
func (jd *JobDefinition[T]) Start(ctx context.Context, input T, jobOptions ...JobOptionPreparer) *JobInstance[T] {
if !jd.Sealed() {
jd.Seal()
}
ji := newJobInstance(jd, input, jobOptions...)
ji.start(ctx)
return ji
}
func (jd *JobDefinition[T]) getRootStep() StepDefinitionMeta {
return jd.rootStep
}
func (jd *JobDefinition[T]) GetName() string {
return jd.name
}
func (jd *JobDefinition[T]) Seal() {
if jd.sealed {
return
}
jd.sealed = true
}
func (jd *JobDefinition[T]) Sealed() bool {
return jd.sealed
}
// GetStep returns the stepDefinition by name
func (jd *JobDefinition[T]) GetStep(stepName string) (StepDefinitionMeta, bool) {
stepMeta, ok := jd.steps[stepName]
return stepMeta, ok
}
// AddStep adds a step to the job definition, with optional preceding steps
func (jd *JobDefinition[T]) addStep(step StepDefinitionMeta, precedingSteps ...StepDefinitionMeta) error {
jd.steps[step.GetName()] = step
jd.stepsDag.AddNode(step)
for _, precedingStep := range precedingSteps {
if err := jd.stepsDag.Connect(precedingStep, step); err != nil {
if errors.Is(err, graph.ErrConnectNotExistingNode) {
return ErrRefStepNotInJob.WithMessage(fmt.Sprintf("referenced step %s not found", precedingStep.GetName()))
}
return err
}
}
return nil
}
// Visualize the job definition in graphviz dot format
func (jd *JobDefinition[T]) Visualize() (string, error) {
return jd.stepsDag.ToDotGraph()
}