forked from raystack/optimus
-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: auto dependency resolution support for maxcompute (#285)
* feat: add upstream identifier for maxcompute * fix: linter * test: add test case for maxcompute identifier * fix: linter * feat: support tick quote in one part of table
- Loading branch information
1 parent
5a0455d
commit d943a10
Showing
11 changed files
with
385 additions
and
148 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
74 changes: 74 additions & 0 deletions
74
plugin/upstream_identifier/maxcompute_upstream_identifier.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
package upstreamidentifier | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/goto/salt/log" | ||
|
||
"github.com/goto/optimus/core/resource" | ||
"github.com/goto/optimus/internal/errors" | ||
"github.com/goto/optimus/plugin/upstream_identifier/parser" | ||
) | ||
|
||
type MaxcomputeUpstreamIdentifier struct { | ||
logger log.Logger | ||
parserFunc ParserFunc | ||
evaluatorFuncs []EvalAssetFunc | ||
} | ||
|
||
func NewMaxcomputeUpstreamIdentifier(logger log.Logger, parserFunc ParserFunc, evaluatorFuncs ...EvalAssetFunc) (*MaxcomputeUpstreamIdentifier, error) { | ||
me := errors.NewMultiError("create maxcompute upstream generator errors") | ||
if logger == nil { | ||
me.Append(fmt.Errorf("logger is nil")) | ||
} | ||
if parserFunc == nil { | ||
me.Append(fmt.Errorf("parserFunc is nil")) | ||
} | ||
sanitizedEvaluatorFuncs := []EvalAssetFunc{} | ||
for _, evaluatorFunc := range evaluatorFuncs { | ||
if evaluatorFunc != nil { | ||
sanitizedEvaluatorFuncs = append(sanitizedEvaluatorFuncs, evaluatorFunc) | ||
} | ||
} | ||
if len(sanitizedEvaluatorFuncs) == 0 { | ||
me.Append(fmt.Errorf("non-nil evaluatorFuncs is needed")) | ||
} | ||
if me.ToErr() != nil { | ||
return nil, me.ToErr() | ||
} | ||
return &MaxcomputeUpstreamIdentifier{ | ||
logger: logger, | ||
parserFunc: parser.MaxcomputeURNDecorator(parserFunc), | ||
evaluatorFuncs: evaluatorFuncs, | ||
}, nil | ||
} | ||
|
||
func (g MaxcomputeUpstreamIdentifier) IdentifyResources(_ context.Context, assets map[string]string) ([]resource.URN, error) { | ||
resourceURNs := []resource.URN{} | ||
|
||
// generate resource urn with upstream from each evaluator | ||
for _, evaluatorFunc := range g.evaluatorFuncs { | ||
query := evaluatorFunc(assets) | ||
if query == "" { | ||
continue | ||
} | ||
resources := g.identifyResources(query) | ||
resourceURNs = append(resourceURNs, resources...) | ||
} | ||
return resourceURNs, nil | ||
} | ||
|
||
func (g MaxcomputeUpstreamIdentifier) identifyResources(query string) []resource.URN { | ||
resources := g.parserFunc(query) | ||
resourceURNs := make([]resource.URN, len(resources)) | ||
for i, r := range resources { | ||
resourceURN, err := resource.ParseURN(r) | ||
if err != nil { | ||
g.logger.Error("error when parsing resource urn %s", r) | ||
continue | ||
} | ||
resourceURNs[i] = resourceURN | ||
} | ||
return resourceURNs | ||
} |
62 changes: 62 additions & 0 deletions
62
plugin/upstream_identifier/maxcompute_upstream_identifier_test.go
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package upstreamidentifier_test | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
"github.com/goto/salt/log" | ||
"github.com/stretchr/testify/assert" | ||
|
||
upstreamidentifier "github.com/goto/optimus/plugin/upstream_identifier" | ||
) | ||
|
||
func TestNewMaxcomputeUpstreamIdentifier(t *testing.T) { | ||
logger := log.NewNoop() | ||
parserFunc := func(string) []string { return nil } | ||
evaluatorFunc := func(map[string]string) string { return "" } | ||
t.Run("return error when logger is nil", func(t *testing.T) { | ||
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(nil, parserFunc, evaluatorFunc) | ||
assert.Error(t, err) | ||
assert.Nil(t, upstreamIdentifier) | ||
}) | ||
t.Run("return error when parserFunc is nil", func(t *testing.T) { | ||
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(logger, nil, evaluatorFunc) | ||
assert.Error(t, err) | ||
assert.Nil(t, upstreamIdentifier) | ||
}) | ||
t.Run("return error when no evaluators", func(t *testing.T) { | ||
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(logger, parserFunc) | ||
assert.Error(t, err) | ||
assert.Nil(t, upstreamIdentifier) | ||
}) | ||
t.Run("return error when evaluatorFuncs is nil", func(t *testing.T) { | ||
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(logger, parserFunc, nil) | ||
assert.Error(t, err) | ||
assert.Nil(t, upstreamIdentifier) | ||
}) | ||
t.Run("return success", func(t *testing.T) { | ||
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(logger, parserFunc, evaluatorFunc) | ||
assert.NoError(t, err) | ||
assert.NotNil(t, upstreamIdentifier) | ||
}) | ||
} | ||
|
||
func TestMaxcomputeUpstreamIdentifier_IdentifyResources(t *testing.T) { | ||
ctx := context.Background() | ||
logger := log.NewNoop() | ||
assets := map[string]string{ | ||
"./query.sql": "select 1 from project1.schema1.name1", | ||
} | ||
// TODO: adding failure test cases | ||
t.Run("return success", func(t *testing.T) { | ||
parserFunc := func(string) []string { return []string{"project1.schema1.name1"} } | ||
evaluatorFunc := func(map[string]string) string { return "./query.sql" } | ||
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(logger, parserFunc, evaluatorFunc) | ||
assert.NoError(t, err) | ||
assert.NotNil(t, upstreamIdentifier) | ||
resourceURNs, err := upstreamIdentifier.IdentifyResources(ctx, assets) | ||
assert.NoError(t, err) | ||
assert.Len(t, resourceURNs, 1) | ||
assert.Equal(t, "maxcompute://project1.schema1.name1", resourceURNs[0].String()) | ||
}) | ||
} |
Oops, something went wrong.