Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto dependency resolution support for maxcompute #285

Merged
merged 5 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 19 additions & 11 deletions plugin/plugin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type EvaluatorFactory interface {

type UpstreamIdentifierFactory interface {
GetBQUpstreamIdentifier(ctx context.Context, svcAcc string, evaluators ...evaluator.Evaluator) (upstreamidentifier.UpstreamIdentifier, error)
GetMaxcomputeUpstreamIdentifier(ctx context.Context, evaluators ...evaluator.Evaluator) (upstreamidentifier.UpstreamIdentifier, error)
}

type PluginService struct {
Expand Down Expand Up @@ -108,20 +109,27 @@ func (s PluginService) IdentifyUpstreams(ctx context.Context, taskName string, c
evaluators = append(evaluators, evaluator)
}

if parserType != plugin.BQParser {
switch parserType {
case plugin.MaxcomputeParser:
upstreamIdentifier, err := s.upstreamIdentifierFactory.GetMaxcomputeUpstreamIdentifier(ctx, evaluators...)
if err != nil {
return nil, err
}
upstreamIdentifiers = append(upstreamIdentifiers, upstreamIdentifier)
case plugin.BQParser:
svcAcc, ok := compiledConfig[bqSvcAccKey]
if !ok {
return nil, fmt.Errorf("secret " + bqSvcAccKey + " required to generate upstream is not found")
}
upstreamIdentifier, err := s.upstreamIdentifierFactory.GetBQUpstreamIdentifier(ctx, svcAcc, evaluators...)
if err != nil {
return nil, err
}
upstreamIdentifiers = append(upstreamIdentifiers, upstreamIdentifier)
default:
s.l.Warn("parserType %s is not supported", parserType)
continue
}
// for now parser type is only scoped for bigquery, so that it uses bigquery as upstream identifier
svcAcc, ok := compiledConfig[bqSvcAccKey]
if !ok {
return nil, fmt.Errorf("secret " + bqSvcAccKey + " required to generate upstream is not found")
}
upstreamIdentifier, err := s.upstreamIdentifierFactory.GetBQUpstreamIdentifier(ctx, svcAcc, evaluators...)
if err != nil {
return nil, err
}
upstreamIdentifiers = append(upstreamIdentifiers, upstreamIdentifier)
}

// identify all upstream resource urns by all identifier from given asset
Expand Down
37 changes: 37 additions & 0 deletions plugin/plugin_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,43 @@ func (_m *UpstreamIdentifierFactory) GetBQUpstreamIdentifier(ctx context.Context
return r0, r1
}

// GetMaxcomputeUpstreamIdentifier provides a mock function with given fields: ctx, evaluators
func (_m *UpstreamIdentifierFactory) GetMaxcomputeUpstreamIdentifier(ctx context.Context, evaluators ...evaluator.Evaluator) (upstreamidentifier.UpstreamIdentifier, error) {
_va := make([]interface{}, len(evaluators))
for _i := range evaluators {
_va[_i] = evaluators[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)

if len(ret) == 0 {
panic("no return value specified for GetMaxcomputeUpstreamIdentifier")
}

var r0 upstreamidentifier.UpstreamIdentifier
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, ...evaluator.Evaluator) (upstreamidentifier.UpstreamIdentifier, error)); ok {
return rf(ctx, evaluators...)
}
if rf, ok := ret.Get(0).(func(context.Context, ...evaluator.Evaluator) upstreamidentifier.UpstreamIdentifier); ok {
r0 = rf(ctx, evaluators...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(upstreamidentifier.UpstreamIdentifier)
}
}

if rf, ok := ret.Get(1).(func(context.Context, ...evaluator.Evaluator) error); ok {
r1 = rf(ctx, evaluators...)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// NewUpstreamIdentifierFactory creates a new instance of UpstreamIdentifierFactory. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewUpstreamIdentifierFactory(t interface {
Expand Down
3 changes: 2 additions & 1 deletion plugin/upstream_identifier/bq_upstream_identifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/goto/optimus/core/resource"
"github.com/goto/optimus/ext/store/bigquery"
"github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/plugin/upstream_identifier/parser"
)

type (
Expand Down Expand Up @@ -129,7 +130,7 @@ func NewBQUpstreamIdentifier(logger log.Logger, parserFunc ParserFunc, bqExtract

return &BQUpstreamIdentifier{
logger: logger,
parserFunc: parserFunc,
parserFunc: parser.BQURNDecorator(parserFunc),
extractorFunc: bqExtractorDecorator(logger, bqExtractorFunc),
evaluatorFuncs: sanitizedEvaluatorFuncs,
}, nil
Expand Down
19 changes: 9 additions & 10 deletions plugin/upstream_identifier/bq_upstream_identifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestIdentifyResources(t *testing.T) {
defer bqExtractorFunc.AssertExpectations(t)

evaluatorFunc.On("Execute", assets).Return(assets["./query.sql"])
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"bigquery://project1:dataset1.name1"})
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"project1.dataset1.name1"})
bqExtractorFunc.On("Execute", ctx, mock.Anything).Return(nil, errors.New("some error"))

bqUpstreamIdentifier, err := upstreamidentifier.NewBQUpstreamIdentifier(logger, parserFunc.Execute, bqExtractorFunc.Execute, evaluatorFunc.Execute)
Expand All @@ -105,9 +105,8 @@ func TestIdentifyResources(t *testing.T) {
defer bqExtractorFunc.AssertExpectations(t)

evaluatorFunc.On("Execute", assets).Return(assets["./query.sql"])
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"broken://project1;dataset1.name1"})
// bq extractor should receives empty resource urn, since the urn construction is fail
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{}).Return(map[bigquery.ResourceURN]string{}, nil)
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"project1;dataset1.name1"})
// bq extractor should not be executed since the result of parser is empty

bqUpstreamIdentifier, err := upstreamidentifier.NewBQUpstreamIdentifier(logger, parserFunc.Execute, bqExtractorFunc.Execute, evaluatorFunc.Execute)
assert.NoError(t, err)
Expand Down Expand Up @@ -135,13 +134,13 @@ func TestIdentifyResources(t *testing.T) {
sqlView2 := "select 1 from `project1.dataset1.name1` join `project1.dataset1.name3` on true"

evaluatorFunc.On("Execute", assets).Return(assets["./query.sql"])
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"bigquery://project1:dataset1.name1"})
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"project1.dataset1.name1"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN1}).Return(map[bigquery.ResourceURN]string{resourceURN1: sqlView1}, nil)

parserFunc.On("Execute", sqlView1).Return([]string{"bigquery://project1:dataset1.name2"})
parserFunc.On("Execute", sqlView1).Return([]string{"project1.dataset1.name2"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN2}).Return(map[bigquery.ResourceURN]string{resourceURN2: sqlView2}, nil)

parserFunc.On("Execute", sqlView2).Return([]string{"bigquery://project1:dataset1.name1", "bigquery://project1:dataset1.name3"})
parserFunc.On("Execute", sqlView2).Return([]string{"project1.dataset1.name1", "project1.dataset1.name3"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN1, resourceURN3}).Return(map[bigquery.ResourceURN]string{resourceURN1: sqlView1, resourceURN3: ""}, nil)

parserFunc.On("Execute", "").Return([]string{})
Expand Down Expand Up @@ -172,13 +171,13 @@ func TestIdentifyResources(t *testing.T) {
sqlView2 := "select 1 from `project1.dataset1.name3`"

evaluatorFunc.On("Execute", assets).Return(assets["./query.sql"])
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"bigquery://project1:dataset1.name1"})
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"project1.dataset1.name1"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN1}).Return(map[bigquery.ResourceURN]string{resourceURN1: sqlView1}, nil)

parserFunc.On("Execute", sqlView1).Return([]string{"bigquery://project1:dataset1.name2", "bigquery://project1:dataset1.name3"})
parserFunc.On("Execute", sqlView1).Return([]string{"project1.dataset1.name2", "project1.dataset1.name3"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN2, resourceURN3}).Return(map[bigquery.ResourceURN]string{resourceURN2: sqlView2, resourceURN3: ""}, nil)

parserFunc.On("Execute", sqlView2).Return([]string{"bigquery://project1:dataset1.name3"})
parserFunc.On("Execute", sqlView2).Return([]string{"project1.dataset1.name3"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN3}).Return(map[bigquery.ResourceURN]string{resourceURN3: ""}, nil)

parserFunc.On("Execute", "").Return([]string{})
Expand Down
74 changes: 74 additions & 0 deletions plugin/upstream_identifier/maxcompute_upstream_identifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package upstreamidentifier

import (
"context"
"fmt"

"github.com/goto/salt/log"

"github.com/goto/optimus/core/resource"
"github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/plugin/upstream_identifier/parser"
)

type MaxcomputeUpstreamIdentifier struct {
logger log.Logger
parserFunc ParserFunc
evaluatorFuncs []EvalAssetFunc
}

func NewMaxcomputeUpstreamIdentifier(logger log.Logger, parserFunc ParserFunc, evaluatorFuncs ...EvalAssetFunc) (*MaxcomputeUpstreamIdentifier, error) {
me := errors.NewMultiError("create maxcompute upstream generator errors")
if logger == nil {
me.Append(fmt.Errorf("logger is nil"))
}
if parserFunc == nil {
me.Append(fmt.Errorf("parserFunc is nil"))
}
sanitizedEvaluatorFuncs := []EvalAssetFunc{}
for _, evaluatorFunc := range evaluatorFuncs {
if evaluatorFunc != nil {
sanitizedEvaluatorFuncs = append(sanitizedEvaluatorFuncs, evaluatorFunc)
}
}
if len(sanitizedEvaluatorFuncs) == 0 {
me.Append(fmt.Errorf("non-nil evaluatorFuncs is needed"))
}
if me.ToErr() != nil {
return nil, me.ToErr()
}
return &MaxcomputeUpstreamIdentifier{
logger: logger,
parserFunc: parser.MaxcomputeURNDecorator(parserFunc),
evaluatorFuncs: evaluatorFuncs,
}, nil
}

func (g MaxcomputeUpstreamIdentifier) IdentifyResources(_ context.Context, assets map[string]string) ([]resource.URN, error) {
resourceURNs := []resource.URN{}

// generate resource urn with upstream from each evaluator
for _, evaluatorFunc := range g.evaluatorFuncs {
query := evaluatorFunc(assets)
if query == "" {
continue
}
resources := g.identifyResources(query)
resourceURNs = append(resourceURNs, resources...)
}
return resourceURNs, nil
}

func (g MaxcomputeUpstreamIdentifier) identifyResources(query string) []resource.URN {
resources := g.parserFunc(query)
resourceURNs := make([]resource.URN, len(resources))
for i, r := range resources {
resourceURN, err := resource.ParseURN(r)
if err != nil {
g.logger.Error("error when parsing resource urn %s", r)
continue
}
resourceURNs[i] = resourceURN
}
return resourceURNs
}
62 changes: 62 additions & 0 deletions plugin/upstream_identifier/maxcompute_upstream_identifier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package upstreamidentifier_test

import (
"context"
"testing"

"github.com/goto/salt/log"
"github.com/stretchr/testify/assert"

upstreamidentifier "github.com/goto/optimus/plugin/upstream_identifier"
)

func TestNewMaxcomputeUpstreamIdentifier(t *testing.T) {
logger := log.NewNoop()
parserFunc := func(string) []string { return nil }
evaluatorFunc := func(map[string]string) string { return "" }
t.Run("return error when logger is nil", func(t *testing.T) {
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(nil, parserFunc, evaluatorFunc)
assert.Error(t, err)
assert.Nil(t, upstreamIdentifier)
})
t.Run("return error when parserFunc is nil", func(t *testing.T) {
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(logger, nil, evaluatorFunc)
assert.Error(t, err)
assert.Nil(t, upstreamIdentifier)
})
t.Run("return error when no evaluators", func(t *testing.T) {
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(logger, parserFunc)
assert.Error(t, err)
assert.Nil(t, upstreamIdentifier)
})
t.Run("return error when evaluatorFuncs is nil", func(t *testing.T) {
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(logger, parserFunc, nil)
assert.Error(t, err)
assert.Nil(t, upstreamIdentifier)
})
t.Run("return success", func(t *testing.T) {
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(logger, parserFunc, evaluatorFunc)
assert.NoError(t, err)
assert.NotNil(t, upstreamIdentifier)
})
}

func TestMaxcomputeUpstreamIdentifier_IdentifyResources(t *testing.T) {
ctx := context.Background()
logger := log.NewNoop()
assets := map[string]string{
"./query.sql": "select 1 from project1.schema1.name1",
}
// TODO: adding failure test cases
t.Run("return success", func(t *testing.T) {
parserFunc := func(string) []string { return []string{"project1.schema1.name1"} }
evaluatorFunc := func(map[string]string) string { return "./query.sql" }
upstreamIdentifier, err := upstreamidentifier.NewMaxcomputeUpstreamIdentifier(logger, parserFunc, evaluatorFunc)
assert.NoError(t, err)
assert.NotNil(t, upstreamIdentifier)
resourceURNs, err := upstreamIdentifier.IdentifyResources(ctx, assets)
assert.NoError(t, err)
assert.Len(t, resourceURNs, 1)
assert.Equal(t, "maxcompute://project1.schema1.name1", resourceURNs[0].String())
})
}
Loading
Loading