Skip to content

Commit

Permalink
Merge pull request #8 from Azure/haitao/fix_err_case
Browse files Browse the repository at this point in the history
Haitao/fix_err_case
  • Loading branch information
haitch authored Nov 12, 2022
2 parents b1b1221 + 9b1bede commit 73c27d5
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 59 deletions.
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,11 @@ module github.com/Azure/go-asyncjob
go 1.18

require github.com/Azure/go-asynctask v1.3.0

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/stretchr/testify v1.8.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
github.com/Azure/go-asynctask v1.3.0 h1:QBx9mGbGi4Urz4YeZ3o1c7cLGL4iUch+mGgNGupTLMI=
github.com/Azure/go-asynctask v1.3.0/go.mod h1:S1Ee5SVnt6ZUJ84brodPiHvoNfN2wgDyVO7UYTI5WeM=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
17 changes: 12 additions & 5 deletions graph/error.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@
package graph

type GraphErrorCode string
type GraphCodeError string

const (
ErrDuplicateNode GraphErrorCode = "node with same key already exists in this graph"
ErrConnectNotExistingNode GraphErrorCode = "node to connect does not exist in this graph"
ErrDuplicateNode GraphCodeError = "node with same key already exists in this graph"
ErrConnectNotExistingNode GraphCodeError = "node to connect does not exist in this graph"
)

func (ge GraphErrorCode) Error() string {
func (ge GraphCodeError) Error() string {
return string(ge)
}

type GraphError struct {
Code GraphErrorCode
Code GraphCodeError
Message string
}

func NewGraphError(code GraphCodeError, message string) *GraphError {
return &GraphError{
Code: code,
Message: message,
}
}

func (ge *GraphError) Error() string {
return ge.Code.Error() + ": " + ge.Message
}
Expand Down
18 changes: 10 additions & 8 deletions graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package graph

import (
"bytes"
"fmt"
)

// NodeConstrain is a constraint for a node in a graph
Expand Down Expand Up @@ -56,25 +57,26 @@ func NewGraph[NT NodeConstrain](edgeSpecFunc EdgeSpecFunc[NT]) *Graph[NT] {
func (g *Graph[NT]) AddNode(n NT) error {
nodeKey := n.DotSpec().ID
if _, ok := g.nodes[nodeKey]; ok {
return ErrDuplicateNode
return NewGraphError(ErrDuplicateNode, fmt.Sprintf("node with key %s already exists in this graph", nodeKey))
}
g.nodes[nodeKey] = n

return nil
}

func (g *Graph[NT]) Connect(from, to string) error {
var nodeFrom, nodeTo NT
func (g *Graph[NT]) Connect(from, to NT) error {
fromNodeKey := from.DotSpec().ID
toNodeKey := to.DotSpec().ID
var ok bool
if nodeFrom, ok = g.nodes[from]; !ok {
return ErrConnectNotExistingNode
if from, ok = g.nodes[fromNodeKey]; !ok {
return NewGraphError(ErrConnectNotExistingNode, fmt.Sprintf("cannot connect node %s, it's not added in this graph yet", fromNodeKey))
}

if nodeTo, ok = g.nodes[to]; !ok {
return ErrConnectNotExistingNode
if to, ok = g.nodes[toNodeKey]; !ok {
return NewGraphError(ErrConnectNotExistingNode, fmt.Sprintf("cannot connect node %s, it's not added in this graph yet", toNodeKey))
}

g.nodeEdges[from] = append(g.nodeEdges[from], &Edge[NT]{From: nodeFrom, To: nodeTo})
g.nodeEdges[fromNodeKey] = append(g.nodeEdges[fromNodeKey], &Edge[NT]{From: from, To: to})
return nil
}

Expand Down
29 changes: 20 additions & 9 deletions graph/graph_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package graph_test

import (
"errors"
"fmt"
"testing"

"github.com/Azure/go-asyncjob/graph"
"github.com/stretchr/testify/assert"
)

func TestSimpleJob(t *testing.T) {
g := graph.NewGraph[*testNode](edgeSpecFromConnection)
func TestSimpleGraph(t *testing.T) {
g := graph.NewGraph(edgeSpecFromConnection)
root := &testNode{Name: "root"}
g.AddNode(root)
calc1 := &testNode{Name: "calc1"}
Expand All @@ -18,16 +20,25 @@ func TestSimpleJob(t *testing.T) {
summary := &testNode{Name: "summary"}
g.AddNode(summary)

g.Connect(root.DotSpec().ID, calc1.DotSpec().ID)
g.Connect(root.DotSpec().ID, calc2.DotSpec().ID)
g.Connect(calc1.DotSpec().ID, summary.DotSpec().ID)
g.Connect(calc2.DotSpec().ID, summary.DotSpec().ID)
g.Connect(root, calc1)
g.Connect(root, calc2)
g.Connect(calc1, summary)
g.Connect(calc2, summary)

graph, err := g.ToDotGraph()
graphStr, err := g.ToDotGraph()
if err != nil {
t.Fatal(err)
assert.NoError(t, err)
}
fmt.Println(graph)
t.Log(graphStr)

err = g.AddNode(calc1)
assert.Error(t, err)
assert.True(t, errors.Is(err, graph.ErrDuplicateNode))

calc3 := &testNode{Name: "calc3"}
err = g.Connect(root, calc3)
assert.Error(t, err)
assert.True(t, errors.Is(err, graph.ErrConnectNotExistingNode))
}

type testNode struct {
Expand Down
2 changes: 1 addition & 1 deletion job.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (j *Job) AddStep(step StepMeta, precedingSteps ...StepMeta) {
stepNode := newStepNode(step)
j.stepsDag.AddNode(stepNode)
for _, precedingStep := range precedingSteps {
j.stepsDag.Connect(precedingStep.getID(), step.getID())
j.stepsDag.Connect(newStepNode(precedingStep), stepNode)
}
}

Expand Down
30 changes: 21 additions & 9 deletions job_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package asyncjob
package asyncjob_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/Azure/go-asyncjob"
"github.com/stretchr/testify/assert"
)

func TestSimpleJob(t *testing.T) {
Expand All @@ -14,15 +17,18 @@ func TestSimpleJob(t *testing.T) {
Query1: "query1",
Table2: "table2",
Query2: "query2",
RetryPolicies: map[string]RetryPolicy{},
RetryPolicies: map[string]asyncjob.RetryPolicy{},
}
jb := sb.BuildJob(context.Background())

jb.Start(context.Background())
jb.Wait(context.Background())
jobErr := jb.Wait(context.Background())
if jobErr != nil {
assert.NoError(t, jobErr)
}

dotGraph, err := jb.Visualize()
if err != nil {
dotGraph, vizErr := jb.Visualize()
if vizErr != nil {
t.FailNow()
}
fmt.Println(dotGraph)
Expand All @@ -36,12 +42,16 @@ func TestSimpleJobError(t *testing.T) {
Table2: "table2",
Query2: "query2",
ErrorInjection: map[string]func() error{"ExecuteQuery.query2": getErrorFunc(fmt.Errorf("table2 schema error"), 1)},
RetryPolicies: map[string]RetryPolicy{},
RetryPolicies: map[string]asyncjob.RetryPolicy{},
}
jb := sb.BuildJob(context.Background())

jb.Start(context.Background())
jb.Wait(context.Background())
jobErr := jb.Wait(context.Background())
if jobErr != nil {
assert.Error(t, jobErr)
}

dotGraph, err := jb.Visualize()
if err != nil {
Expand All @@ -63,7 +73,7 @@ func TestSimpleJobPanic(t *testing.T) {
"GetConnection": getErrorFunc(fmt.Errorf("InternalServerError"), 1),
"ExecuteQuery.panicQuery1": getPanicFunc(4),
},
RetryPolicies: map[string]RetryPolicy{
RetryPolicies: map[string]asyncjob.RetryPolicy{
"CheckAuth": linearRetry, // coverage for AddStep
"GetConnection": linearRetry, // coverage for StepAfter
"QueryTable1": linearRetry, // coverage for StepAfterBoth
Expand All @@ -72,8 +82,10 @@ func TestSimpleJobPanic(t *testing.T) {
jb := sb.BuildJob(context.Background())

jb.Start(context.Background())
err := jb.Wait(context.Background())
fmt.Print(err)
jobErr := jb.Wait(context.Background())
if jobErr != nil {
assert.Error(t, jobErr)
}

dotGraph, err := jb.Visualize()
if err != nil {
Expand Down
21 changes: 12 additions & 9 deletions job_builder.go → step_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,15 @@ func AddStep[T any](bCtx context.Context, j JobInterface, stepName string, stepF
result, err = stepFunc(j.RuntimeContext())
}

step.executionData.Duration = time.Since(step.executionData.StartTime)

if err != nil {
step.state = StepStateFailed
return nil, newStepError(stepName, err)
} else {
step.state = StepStateCompleted
return result, nil
}

step.executionData.Duration = time.Since(step.executionData.StartTime)
return result, newStepError(stepName, err)
}

step.task = asynctask.Start(bCtx, instrumentedFunc)
Expand Down Expand Up @@ -136,14 +137,15 @@ func StepAfter[T, S any](bCtx context.Context, j JobInterface, stepName string,
result, err = stepFunc(j.RuntimeContext(), t)
}

step.executionData.Duration = time.Since(step.executionData.StartTime)

if err != nil {
step.state = StepStateFailed
return nil, newStepError(stepName, err)
} else {
step.state = StepStateCompleted
return result, nil
}

step.executionData.Duration = time.Since(step.executionData.StartTime)
return result, newStepError(stepName, err)
}

step.task = asynctask.ContinueWith(bCtx, parentStep.task, instrumentedFunc)
Expand Down Expand Up @@ -203,14 +205,15 @@ func StepAfterBoth[T, S, R any](bCtx context.Context, j JobInterface, stepName s
result, err = stepFunc(j.RuntimeContext(), t, s)
}

step.executionData.Duration = time.Since(step.executionData.StartTime)

if err != nil {
step.state = StepStateFailed
return nil, newStepError(stepName, err)
} else {
step.state = StepStateCompleted
return result, nil
}

step.executionData.Duration = time.Since(step.executionData.StartTime)
return result, newStepError(stepName, err)
}

step.task = asynctask.AfterBoth(bCtx, parentStepT.task, parentStepS.task, instrumentedFunc)
Expand Down
37 changes: 19 additions & 18 deletions util_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package asyncjob
package asyncjob_test

import (
"context"
"fmt"
"time"

"github.com/Azure/go-asyncjob"
"github.com/Azure/go-asynctask"
)

Expand All @@ -15,7 +16,7 @@ type SqlSummaryJobLib struct {
Table2 string
Query2 string
ErrorInjection map[string]func() error
RetryPolicies map[string]RetryPolicy
RetryPolicies map[string]asyncjob.RetryPolicy
}

type SqlConnection struct {
Expand Down Expand Up @@ -102,26 +103,26 @@ func (sql *SqlSummaryJobLib) EmailNotification(ctx context.Context) error {
return nil
}

func (sql *SqlSummaryJobLib) BuildJob(bCtx context.Context) *Job {
job := NewJob("sqlSummaryJob")
func (sql *SqlSummaryJobLib) BuildJob(bCtx context.Context) *asyncjob.Job {
job := asyncjob.NewJob("sqlSummaryJob")

serverNameParamTask := InputParam(bCtx, job, "serverName", &sql.ServerName)
connTsk, _ := StepAfter(bCtx, job, "GetConnection", serverNameParamTask, sql.GetConnection, WithRetry(sql.RetryPolicies["GetConnection"]))
serverNameParamTask := asyncjob.InputParam(bCtx, job, "serverName", &sql.ServerName)
connTsk, _ := asyncjob.StepAfter(bCtx, job, "GetConnection", serverNameParamTask, sql.GetConnection, asyncjob.WithRetry(sql.RetryPolicies["GetConnection"]))

checkAuthTask, _ := AddStep(bCtx, job, "CheckAuth", asynctask.ActionToFunc(sql.CheckAuth), WithRetry(sql.RetryPolicies["CheckAuth"]))
checkAuthTask, _ := asyncjob.AddStep(bCtx, job, "CheckAuth", asynctask.ActionToFunc(sql.CheckAuth), asyncjob.WithRetry(sql.RetryPolicies["CheckAuth"]))

table1ParamTsk := InputParam(bCtx, job, "table1", &sql.Table1)
table1ClientTsk, _ := StepAfterBoth(bCtx, job, "getTableClient1", connTsk, table1ParamTsk, sql.GetTableClient)
query1ParamTsk := InputParam(bCtx, job, "query1", &sql.Query1)
qery1ResultTsk, _ := StepAfterBoth(bCtx, job, "QueryTable1", table1ClientTsk, query1ParamTsk, sql.ExecuteQuery, WithRetry(sql.RetryPolicies["QueryTable1"]), ExecuteAfter(checkAuthTask))
table1ParamTsk := asyncjob.InputParam(bCtx, job, "table1", &sql.Table1)
table1ClientTsk, _ := asyncjob.StepAfterBoth(bCtx, job, "getTableClient1", connTsk, table1ParamTsk, sql.GetTableClient)
query1ParamTsk := asyncjob.InputParam(bCtx, job, "query1", &sql.Query1)
qery1ResultTsk, _ := asyncjob.StepAfterBoth(bCtx, job, "QueryTable1", table1ClientTsk, query1ParamTsk, sql.ExecuteQuery, asyncjob.WithRetry(sql.RetryPolicies["QueryTable1"]), asyncjob.ExecuteAfter(checkAuthTask))

table2ParamTsk := InputParam(bCtx, job, "table2", &sql.Table2)
table2ClientTsk, _ := StepAfterBoth(bCtx, job, "getTableClient2", connTsk, table2ParamTsk, sql.GetTableClient)
query2ParamTsk := InputParam(bCtx, job, "query2", &sql.Query2)
qery2ResultTsk, _ := StepAfterBoth(bCtx, job, "QueryTable2", table2ClientTsk, query2ParamTsk, sql.ExecuteQuery, WithRetry(sql.RetryPolicies["QueryTable2"]), ExecuteAfter(checkAuthTask))
table2ParamTsk := asyncjob.InputParam(bCtx, job, "table2", &sql.Table2)
table2ClientTsk, _ := asyncjob.StepAfterBoth(bCtx, job, "getTableClient2", connTsk, table2ParamTsk, sql.GetTableClient)
query2ParamTsk := asyncjob.InputParam(bCtx, job, "query2", &sql.Query2)
qery2ResultTsk, _ := asyncjob.StepAfterBoth(bCtx, job, "QueryTable2", table2ClientTsk, query2ParamTsk, sql.ExecuteQuery, asyncjob.WithRetry(sql.RetryPolicies["QueryTable2"]), asyncjob.ExecuteAfter(checkAuthTask))

summaryTsk, _ := StepAfterBoth(bCtx, job, "summarize", qery1ResultTsk, qery2ResultTsk, sql.SummarizeQueryResult)
AddStep(bCtx, job, "emailNotification", asynctask.ActionToFunc(sql.EmailNotification), ExecuteAfter(summaryTsk))
summaryTsk, _ := asyncjob.StepAfterBoth(bCtx, job, "summarize", qery1ResultTsk, qery2ResultTsk, sql.SummarizeQueryResult)
asyncjob.AddStep(bCtx, job, "emailNotification", asynctask.ActionToFunc(sql.EmailNotification), asyncjob.ExecuteAfter(summaryTsk))
return job
}

Expand All @@ -131,7 +132,7 @@ type linearRetryPolicy struct {
tried int
}

func newLinearRetryPolicy(sleepInterval time.Duration, maxRetryCount int) RetryPolicy {
func newLinearRetryPolicy(sleepInterval time.Duration, maxRetryCount int) asyncjob.RetryPolicy {
return &linearRetryPolicy{
sleepInterval: sleepInterval,
maxRetryCount: maxRetryCount,
Expand Down

0 comments on commit 73c27d5

Please sign in to comment.