-
Notifications
You must be signed in to change notification settings - Fork 6
/
job_instance.go
144 lines (117 loc) · 3.6 KB
/
job_instance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package asyncjob
import (
"context"
"errors"
"github.com/Azure/go-asyncjob/graph"
"github.com/Azure/go-asynctask"
"github.com/google/uuid"
)
type JobInstanceMeta interface {
GetJobInstanceId() string
GetJobDefinition() JobDefinitionMeta
GetStepInstance(stepName string) (StepInstanceMeta, bool)
Wait(context.Context) error
Visualize() (string, error)
// not exposing for now
addStepInstance(step StepInstanceMeta, precedingSteps ...StepInstanceMeta)
}
type JobExecutionOptions struct {
Id string
RunSequentially bool
}
type JobOptionPreparer func(*JobExecutionOptions) *JobExecutionOptions
func WithJobId(jobId string) JobOptionPreparer {
return func(options *JobExecutionOptions) *JobExecutionOptions {
options.Id = jobId
return options
}
}
func WithSequentialExecution() JobOptionPreparer {
return func(options *JobExecutionOptions) *JobExecutionOptions {
options.RunSequentially = true
return options
}
}
// JobInstance is the instance of a jobDefinition
type JobInstance[T any] struct {
jobOptions *JobExecutionOptions
input T
Definition *JobDefinition[T]
rootStep *StepInstance[T]
steps map[string]StepInstanceMeta
stepsDag *graph.Graph[StepInstanceMeta]
}
func newJobInstance[T any](jd *JobDefinition[T], input T, jobInstanceOptions ...JobOptionPreparer) *JobInstance[T] {
ji := &JobInstance[T]{
Definition: jd,
input: input,
steps: map[string]StepInstanceMeta{},
stepsDag: graph.NewGraph(connectStepInstance),
jobOptions: &JobExecutionOptions{},
}
for _, decorator := range jobInstanceOptions {
ji.jobOptions = decorator(ji.jobOptions)
}
if ji.jobOptions.Id == "" {
ji.jobOptions.Id = uuid.New().String()
}
return ji
}
func (ji *JobInstance[T]) start(ctx context.Context) {
// create root step instance
ji.rootStep = newStepInstance(ji.Definition.rootStep, ji)
ji.rootStep.task = asynctask.NewCompletedTask(ji.input)
ji.rootStep.state = StepStateCompleted
ji.steps[ji.rootStep.GetName()] = ji.rootStep
ji.stepsDag.AddNode(ji.rootStep)
// construct job instance graph, with TopologySort ordering
orderedSteps := ji.Definition.stepsDag.TopologicalSort()
for _, stepDef := range orderedSteps {
if stepDef.GetName() == ji.Definition.GetName() {
continue
}
ji.steps[stepDef.GetName()] = stepDef.createStepInstance(ctx, ji)
if ji.jobOptions.RunSequentially {
ji.steps[stepDef.GetName()].Waitable().Wait(ctx)
}
}
}
func (ji *JobInstance[T]) GetJobInstanceId() string {
return ji.jobOptions.Id
}
func (ji *JobInstance[T]) GetJobDefinition() JobDefinitionMeta {
return ji.Definition
}
// GetStepInstance returns the stepInstance by name
func (ji *JobInstance[T]) GetStepInstance(stepName string) (StepInstanceMeta, bool) {
stepMeta, ok := ji.steps[stepName]
return stepMeta, ok
}
func (ji *JobInstance[T]) addStepInstance(step StepInstanceMeta, precedingSteps ...StepInstanceMeta) {
ji.steps[step.GetName()] = step
ji.stepsDag.AddNode(step)
for _, precedingStep := range precedingSteps {
ji.stepsDag.Connect(precedingStep, step)
}
}
// Wait for all steps in the job to finish.
func (ji *JobInstance[T]) Wait(ctx context.Context) error {
var tasks []asynctask.Waitable
for _, step := range ji.steps {
tasks = append(tasks, step.Waitable())
}
err := asynctask.WaitAll(ctx, &asynctask.WaitAllOptions{}, tasks...)
// return rootCaused error if possible
if err != nil {
jobErr := &JobError{}
if errors.As(err, &jobErr) {
return jobErr.RootCause()
}
return err
}
return nil
}
// Visualize the job instance in graphviz dot format
func (jd *JobInstance[T]) Visualize() (string, error) {
return jd.stepsDag.ToDotGraph()
}