diff --git a/plugin/plugin_service.go b/plugin/plugin_service.go index ae5a782a5c..79c1083568 100644 --- a/plugin/plugin_service.go +++ b/plugin/plugin_service.go @@ -35,6 +35,7 @@ type EvaluatorFactory interface { type UpstreamIdentifierFactory interface { GetBQUpstreamIdentifier(ctx context.Context, svcAcc string, evaluators ...evaluator.Evaluator) (upstreamidentifier.UpstreamIdentifier, error) + GetMaxcomputeUpstreamIdentifier(ctx context.Context, evaluators ...evaluator.Evaluator) (upstreamidentifier.UpstreamIdentifier, error) } type PluginService struct { @@ -108,20 +109,27 @@ func (s PluginService) IdentifyUpstreams(ctx context.Context, taskName string, c evaluators = append(evaluators, evaluator) } - if parserType != plugin.BQParser { + switch parserType { + case plugin.MaxcomputeParser: + upstreamIdentifier, err := s.upstreamIdentifierFactory.GetMaxcomputeUpstreamIdentifier(ctx, evaluators...) + if err != nil { + return nil, err + } + upstreamIdentifiers = append(upstreamIdentifiers, upstreamIdentifier) + case plugin.BQParser: + svcAcc, ok := compiledConfig[bqSvcAccKey] + if !ok { + return nil, fmt.Errorf("secret " + bqSvcAccKey + " required to generate upstream is not found") + } + upstreamIdentifier, err := s.upstreamIdentifierFactory.GetBQUpstreamIdentifier(ctx, svcAcc, evaluators...) + if err != nil { + return nil, err + } + upstreamIdentifiers = append(upstreamIdentifiers, upstreamIdentifier) + default: s.l.Warn("parserType %s is not supported", parserType) continue } - // for now parser type is only scoped for bigquery, so that it uses bigquery as upstream identifier - svcAcc, ok := compiledConfig[bqSvcAccKey] - if !ok { - return nil, fmt.Errorf("secret " + bqSvcAccKey + " required to generate upstream is not found") - } - upstreamIdentifier, err := s.upstreamIdentifierFactory.GetBQUpstreamIdentifier(ctx, svcAcc, evaluators...) - if err != nil { - return nil, err - } - upstreamIdentifiers = append(upstreamIdentifiers, upstreamIdentifier) } // identify all upstream resource urns by all identifier from given asset diff --git a/plugin/plugin_service_test.go b/plugin/plugin_service_test.go index c91edd117e..1ed3771759 100644 --- a/plugin/plugin_service_test.go +++ b/plugin/plugin_service_test.go @@ -542,13 +542,49 @@ func (_m *UpstreamIdentifierFactory) GetBQUpstreamIdentifier(ctx context.Context return r0, r1 } +// GetMaxcomputeUpstreamIdentifier provides a mock function with given fields: ctx, evaluators +func (_m *UpstreamIdentifierFactory) GetMaxcomputeUpstreamIdentifier(ctx context.Context, evaluators ...evaluator.Evaluator) (upstreamidentifier.UpstreamIdentifier, error) { + _va := make([]interface{}, len(evaluators)) + for _i := range evaluators { + _va[_i] = evaluators[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for GetMaxcomputeUpstreamIdentifier") + } + + var r0 upstreamidentifier.UpstreamIdentifier + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, ...evaluator.Evaluator) (upstreamidentifier.UpstreamIdentifier, error)); ok { + return rf(ctx, evaluators...) + } + if rf, ok := ret.Get(0).(func(context.Context, ...evaluator.Evaluator) upstreamidentifier.UpstreamIdentifier); ok { + r0 = rf(ctx, evaluators...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(upstreamidentifier.UpstreamIdentifier) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, ...evaluator.Evaluator) error); ok { + r1 = rf(ctx, evaluators...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // NewUpstreamIdentifierFactory creates a new instance of UpstreamIdentifierFactory. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewUpstreamIdentifierFactory(t interface { mock.TestingT Cleanup(func()) -}, -) *UpstreamIdentifierFactory { +}) *UpstreamIdentifierFactory { mock := &UpstreamIdentifierFactory{} mock.Mock.Test(t) diff --git a/plugin/upstream_identifier/bq_upstream_identifier.go b/plugin/upstream_identifier/bq_upstream_identifier.go index 8f1a307137..5690ce77d9 100644 --- a/plugin/upstream_identifier/bq_upstream_identifier.go +++ b/plugin/upstream_identifier/bq_upstream_identifier.go @@ -9,6 +9,7 @@ import ( "github.com/goto/optimus/core/resource" "github.com/goto/optimus/ext/store/bigquery" "github.com/goto/optimus/internal/errors" + "github.com/goto/optimus/plugin/upstream_identifier/parser" ) type ( @@ -129,7 +130,7 @@ func NewBQUpstreamIdentifier(logger log.Logger, parserFunc ParserFunc, bqExtract return &BQUpstreamIdentifier{ logger: logger, - parserFunc: parserFunc, + parserFunc: parser.BQURNDecorator(parserFunc), extractorFunc: bqExtractorDecorator(logger, bqExtractorFunc), evaluatorFuncs: sanitizedEvaluatorFuncs, }, nil diff --git a/plugin/upstream_identifier/bq_upstream_identifier_test.go b/plugin/upstream_identifier/bq_upstream_identifier_test.go index cd577a6ba1..650cf069a4 100644 --- a/plugin/upstream_identifier/bq_upstream_identifier_test.go +++ b/plugin/upstream_identifier/bq_upstream_identifier_test.go @@ -85,7 +85,7 @@ func TestIdentifyResources(t *testing.T) { defer bqExtractorFunc.AssertExpectations(t) evaluatorFunc.On("Execute", assets).Return(assets["./query.sql"]) - parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"bigquery://project1:dataset1.name1"}) + parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"project1.dataset1.name1"}) bqExtractorFunc.On("Execute", ctx, mock.Anything).Return(nil, errors.New("some error")) bqUpstreamIdentifier, err := upstreamidentifier.NewBQUpstreamIdentifier(logger, parserFunc.Execute, bqExtractorFunc.Execute, evaluatorFunc.Execute) @@ -105,9 +105,8 @@ func TestIdentifyResources(t *testing.T) { defer bqExtractorFunc.AssertExpectations(t) evaluatorFunc.On("Execute", assets).Return(assets["./query.sql"]) - parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"broken://project1;dataset1.name1"}) - // bq extractor should receives empty resource urn, since the urn construction is fail - bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{}).Return(map[bigquery.ResourceURN]string{}, nil) + parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"project1;dataset1.name1"}) + // bq extractor should not be executed since the result of parser is empty bqUpstreamIdentifier, err := upstreamidentifier.NewBQUpstreamIdentifier(logger, parserFunc.Execute, bqExtractorFunc.Execute, evaluatorFunc.Execute) assert.NoError(t, err) @@ -135,13 +134,13 @@ func TestIdentifyResources(t *testing.T) { sqlView2 := "select 1 from `project1.dataset1.name1` join `project1.dataset1.name3` on true" evaluatorFunc.On("Execute", assets).Return(assets["./query.sql"]) - parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"bigquery://project1:dataset1.name1"}) + parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"project1.dataset1.name1"}) bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN1}).Return(map[bigquery.ResourceURN]string{resourceURN1: sqlView1}, nil) - parserFunc.On("Execute", sqlView1).Return([]string{"bigquery://project1:dataset1.name2"}) + parserFunc.On("Execute", sqlView1).Return([]string{"project1.dataset1.name2"}) bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN2}).Return(map[bigquery.ResourceURN]string{resourceURN2: sqlView2}, nil) - parserFunc.On("Execute", sqlView2).Return([]string{"bigquery://project1:dataset1.name1", "bigquery://project1:dataset1.name3"}) + parserFunc.On("Execute", sqlView2).Return([]string{"project1.dataset1.name1", "project1.dataset1.name3"}) bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN1, resourceURN3}).Return(map[bigquery.ResourceURN]string{resourceURN1: sqlView1, resourceURN3: ""}, nil) parserFunc.On("Execute", "").Return([]string{}) @@ -172,13 +171,13 @@ func TestIdentifyResources(t *testing.T) { sqlView2 := "select 1 from `project1.dataset1.name3`" evaluatorFunc.On("Execute", assets).Return(assets["./query.sql"]) - parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"bigquery://project1:dataset1.name1"}) + parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"project1.dataset1.name1"}) bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN1}).Return(map[bigquery.ResourceURN]string{resourceURN1: sqlView1}, nil) - parserFunc.On("Execute", sqlView1).Return([]string{"bigquery://project1:dataset1.name2", "bigquery://project1:dataset1.name3"}) + parserFunc.On("Execute", sqlView1).Return([]string{"project1.dataset1.name2", "project1.dataset1.name3"}) bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN2, resourceURN3}).Return(map[bigquery.ResourceURN]string{resourceURN2: sqlView2, resourceURN3: ""}, nil) - parserFunc.On("Execute", sqlView2).Return([]string{"bigquery://project1:dataset1.name3"}) + parserFunc.On("Execute", sqlView2).Return([]string{"project1.dataset1.name3"}) bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN3}).Return(map[bigquery.ResourceURN]string{resourceURN3: ""}, nil) parserFunc.On("Execute", "").Return([]string{}) diff --git a/plugin/upstream_identifier/maxcompute_upstream_identifier.go b/plugin/upstream_identifier/maxcompute_upstream_identifier.go new file mode 100644 index 0000000000..bfbb2f4cb9 --- /dev/null +++ b/plugin/upstream_identifier/maxcompute_upstream_identifier.go @@ -0,0 +1,48 @@ +package upstreamidentifier + +import ( + "context" + + "github.com/goto/optimus/core/resource" + "github.com/goto/optimus/plugin/upstream_identifier/parser" + "github.com/goto/salt/log" +) + +type MaxcomputeUpstreamIdentifier struct { + logger log.Logger + parserFunc ParserFunc + evaluatorFuncs []EvalAssetFunc +} + +func NewMaxcomputeUpstreamIdentifier(logger log.Logger, parserFunc ParserFunc, evaluatorFuncs ...EvalAssetFunc) (*MaxcomputeUpstreamIdentifier, error) { + return &MaxcomputeUpstreamIdentifier{ + logger: logger, + parserFunc: parser.MaxcomputeURNDecorator(parserFunc), + evaluatorFuncs: evaluatorFuncs, + }, nil +} + +func (g MaxcomputeUpstreamIdentifier) IdentifyResources(ctx context.Context, assets map[string]string) ([]resource.URN, error) { + resourceURNs := []resource.URN{} + + // generate resource urn with upstream from each evaluator + for _, evaluatorFunc := range g.evaluatorFuncs { + query := evaluatorFunc(assets) + if query == "" { + continue + } + resources := g.identifyResources(query) + resourceURNs = append(resourceURNs, resources...) + } + return resourceURNs, nil +} + +func (g MaxcomputeUpstreamIdentifier) identifyResources(query string) []resource.URN { + resources := g.parserFunc(query) + resourceURNs := make([]resource.URN, len(resources)) + for _, r := range resources { + resourceURN, _ := resource.NewURN("maxcompute", r) // TODO: use dedicated function new resource from string + resourceURNs = append(resourceURNs, resourceURN) + } + return resourceURNs +} diff --git a/plugin/upstream_identifier/maxcompute_upstream_identifier_test.go b/plugin/upstream_identifier/maxcompute_upstream_identifier_test.go new file mode 100644 index 0000000000..52a7ba24e2 --- /dev/null +++ b/plugin/upstream_identifier/maxcompute_upstream_identifier_test.go @@ -0,0 +1,3 @@ +package upstreamidentifier_test + +// TODO: Implement test diff --git a/plugin/upstream_identifier/parser/bq_parser.go b/plugin/upstream_identifier/parser/query_parser.go similarity index 57% rename from plugin/upstream_identifier/parser/bq_parser.go rename to plugin/upstream_identifier/parser/query_parser.go index 8fd776926c..956252f812 100644 --- a/plugin/upstream_identifier/parser/bq_parser.go +++ b/plugin/upstream_identifier/parser/query_parser.go @@ -3,31 +3,29 @@ package parser import ( "regexp" "strings" - - "github.com/goto/optimus/ext/store/bigquery" ) var ( topLevelUpstreamsPattern = regexp.MustCompile( - "(?i)(?:FROM)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-\\*?]+)`?" + //nolint:gocritic + "(?i)(?:FROM)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+\\.[\\w-]+\\.[\\w-\\*?]+)`?" + //nolint:gocritic "|" + - "(?i)(?:JOIN)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" + + "(?i)(?:JOIN)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+\\.[\\w-]+\\.[\\w-]+)`?" + "|" + - "(?i)(?:WITH)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?\\s+(?:AS)" + + "(?i)(?:WITH)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+\\.[\\w-]+\\.[\\w-]+)`?\\s+(?:AS)" + "|" + // ref: https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement - "(?i)(?:MERGE)\\s*(?:INTO)?\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" + // to ignore + "(?i)(?:MERGE)\\s*(?:INTO)?\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+\\.[\\w-]+\\.[\\w-]+)`?" + // to ignore "|" + // ref: https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#insert_statement - "(?i)(?:INSERT)\\s*(?:INTO)?\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" + // to ignore + "(?i)(?:INSERT)\\s*(?:INTO)?\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+\\.[\\w-]+\\.[\\w-]+)`?" + // to ignore "|" + // ref: https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#delete_statement - "(?i)(?:DELETE)\\s*(?:FROM)?\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" + // to ignore + "(?i)(?:DELETE)\\s*(?:FROM)?\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+\\.[\\w-]+\\.[\\w-]+)`?" + // to ignore "|" + // ref: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language - "(?i)(?:CREATE)\\s*(?:OR\\s+REPLACE)?\\s*(?:VIEW|(?:TEMP\\s+)?TABLE)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" + // to ignore + "(?i)(?:CREATE)\\s*(?:OR\\s+REPLACE)?\\s*(?:VIEW|(?:TEMP\\s+)?TABLE)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+\\.[\\w-]+\\.[\\w-]+)`?" + // to ignore "|" + - "(?i)(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`\\s*(?:AS)?") + "(?i)(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+\\.[\\w-]+\\.[\\w-]+)`?\\s*(?:AS)?") singleLineCommentsPattern = regexp.MustCompile(`(--.*)`) multiLineCommentsPattern = regexp.MustCompile(`(((/\*)+?[\w\W]*?(\*/)+))`) @@ -37,41 +35,33 @@ var ( func ParseTopLevelUpstreamsFromQuery(query string) []string { cleanedQuery := cleanQueryFromComment(query) - resourcesFound := make(map[bigquery.ResourceURN]bool) - pseudoResources := make(map[bigquery.ResourceURN]bool) + tableFound := map[string]bool{} + pseudoTable := map[string]bool{} matches := topLevelUpstreamsPattern.FindAllStringSubmatch(cleanedQuery, -1) for _, match := range matches { - var projectIdx, datasetIdx, nameIdx, ignoreUpstreamIdx int + var tableIdx, ignoreUpstreamIdx int tokens := strings.Fields(match[0]) clause := strings.ToLower(tokens[0]) switch clause { case "from": - ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 1, 2, 3, 4 + ignoreUpstreamIdx, tableIdx = 1, 2 case "join": - ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 5, 6, 7, 8 + ignoreUpstreamIdx, tableIdx = 3, 4 case "with": - ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 9, 10, 11, 12 + ignoreUpstreamIdx, tableIdx = 5, 6 case "merge": - ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 13, 14, 15, 16 + ignoreUpstreamIdx, tableIdx = 7, 8 case "insert": - ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 17, 18, 19, 20 + ignoreUpstreamIdx, tableIdx = 9, 10 case "delete": - ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 21, 22, 23, 24 + ignoreUpstreamIdx, tableIdx = 11, 12 case "create": - ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 25, 26, 27, 28 + ignoreUpstreamIdx, tableIdx = 13, 14 default: - ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 29, 30, 31, 32 - } - - project := match[projectIdx] - dataset := match[datasetIdx] - name := match[nameIdx] - - if project == "" || dataset == "" || name == "" { - continue + ignoreUpstreamIdx, tableIdx = 15, 16 } if strings.TrimSpace(match[ignoreUpstreamIdx]) == "@ignoreupstream" { @@ -82,22 +72,21 @@ func ParseTopLevelUpstreamsFromQuery(query string) []string { continue } - resourceURN, _ := bigquery.NewResourceURN(project, dataset, name) - + tableName := match[tableIdx] if clause == "with" { - pseudoResources[resourceURN] = true + pseudoTable[tableName] = true } else { - resourcesFound[resourceURN] = true + tableFound[tableName] = true } } output := []string{} - for resourceURN := range resourcesFound { - if pseudoResources[resourceURN] { + for table := range tableFound { + if pseudoTable[table] { continue } - output = append(output, resourceURN.URN()) + output = append(output, table) } return output diff --git a/plugin/upstream_identifier/parser/bq_parser_test.go b/plugin/upstream_identifier/parser/query_parser_test.go similarity index 55% rename from plugin/upstream_identifier/parser/bq_parser_test.go rename to plugin/upstream_identifier/parser/query_parser_test.go index 0d0b42689b..69ad1546ab 100644 --- a/plugin/upstream_identifier/parser/bq_parser_test.go +++ b/plugin/upstream_identifier/parser/query_parser_test.go @@ -1,148 +1,148 @@ package parser_test import ( + "fmt" "testing" "github.com/stretchr/testify/assert" - "github.com/goto/optimus/ext/store/bigquery" "github.com/goto/optimus/plugin/upstream_identifier/parser" ) func TestParseTopLevelUpstreamsFromQuery(t *testing.T) { t.Run("parse test", func(t *testing.T) { testCases := []struct { - Name string - InputQuery string - ExpectedResourceURNs []string + Name string + InputQuery string + ExpectedTables []string }{ { - Name: "empty query", - InputQuery: "", - ExpectedResourceURNs: []string{}, + Name: "empty query", + InputQuery: "", + ExpectedTables: []string{}, }, { Name: "simple query", InputQuery: "select * from data-engineering.testing.table1", - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "table1"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "table1"), }, }, { Name: "simple query with hyphenated table name", InputQuery: "select * from data-engineering.testing.table_name-1", - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "table_name-1"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "table_name-1"), }, }, { Name: "simple query with quotes", InputQuery: "select * from `data-engineering.testing.table1`", - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "table1"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "table1"), }, }, { - Name: "simple query without project name", - InputQuery: "select * from testing.table1", - ExpectedResourceURNs: []string{}, + Name: "simple query without project name", + InputQuery: "select * from testing.table1", + ExpectedTables: []string{}, }, { Name: "simple query with simple join", InputQuery: "select * from data-engineering.testing.table1 join data-engineering.testing.table2 on some_field", - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "table1"), - newBQResourceURN("data-engineering", "testing", "table2"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "table1"), + newTable("data-engineering", "testing", "table2"), }, }, { Name: "simple query with outer join", InputQuery: "select * from data-engineering.testing.table1 outer join data-engineering.testing.table2 on some_field", - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "table1"), - newBQResourceURN("data-engineering", "testing", "table2"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "table1"), + newTable("data-engineering", "testing", "table2"), }, }, { Name: "subquery", InputQuery: "select * from (select order_id from data-engineering.testing.orders)", - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "orders"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "orders"), }, }, { Name: "`with` clause + simple query", InputQuery: "with `information.foo.bar` as (select * from `data-engineering.testing.data`) select * from `information.foo.bar`", - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "data"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "data"), }, }, { Name: "`with` clause with missing project name", InputQuery: "with `foo.bar` as (select * from `data-engineering.testing.data`) select * from `foo.bar`", - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "data"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "data"), }, }, { Name: "project name with dashes", InputQuery: "select * from `foo-bar.baz.data`", - ExpectedResourceURNs: []string{ - newBQResourceURN("foo-bar", "baz", "data"), + ExpectedTables: []string{ + newTable("foo-bar", "baz", "data"), }, }, { Name: "dataset and project name with dashes", InputQuery: "select * from `foo-bar.bar-baz.data", - ExpectedResourceURNs: []string{ - newBQResourceURN("foo-bar", "bar-baz", "data"), + ExpectedTables: []string{ + newTable("foo-bar", "bar-baz", "data"), }, }, { Name: "`with` clause + join", InputQuery: "with dedup_source as (select * from `project.fire.fly`) select * from dedup_source join `project.maximum.overdrive` on dedup_source.left = `project.maximum.overdrive`.right", - ExpectedResourceURNs: []string{ - newBQResourceURN("project", "fire", "fly"), - newBQResourceURN("project", "maximum", "overdrive"), + ExpectedTables: []string{ + newTable("project", "fire", "fly"), + newTable("project", "maximum", "overdrive"), }, }, { Name: "double `with` + pseudoreference", InputQuery: "with s1 as (select * from internal.pseudo.ref), with internal.pseudo.ref as (select * from `project.another.name`) select * from s1", - ExpectedResourceURNs: []string{ - newBQResourceURN("project", "another", "name"), + ExpectedTables: []string{ + newTable("project", "another", "name"), }, }, { - Name: "simple query that ignores from upstream", - InputQuery: "select * from /* @ignoreupstream */ data-engineering.testing.table1", - ExpectedResourceURNs: []string{}, + Name: "simple query that ignores from upstream", + InputQuery: "select * from /* @ignoreupstream */ data-engineering.testing.table1", + ExpectedTables: []string{}, }, { - Name: "simple query that ignores from upstream with quotes", - InputQuery: "select * from /* @ignoreupstream */ `data-engineering.testing.table1`", - ExpectedResourceURNs: []string{}, + Name: "simple query that ignores from upstream with quotes", + InputQuery: "select * from /* @ignoreupstream */ `data-engineering.testing.table1`", + ExpectedTables: []string{}, }, { Name: "simple query with simple join that ignores from upstream", InputQuery: "select * from /* @ignoreupstream */ data-engineering.testing.table1 join data-engineering.testing.table2 on some_field", - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "table2"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "table2"), }, }, { Name: "simple query with simple join that has comments but does not ignores upstream", InputQuery: "select * from /* */ data-engineering.testing.table1 join data-engineering.testing.table2 on some_field", - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "table1"), - newBQResourceURN("data-engineering", "testing", "table2"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "table1"), + newTable("data-engineering", "testing", "table2"), }, }, { Name: "simple query with simple join that ignores upstream of join", InputQuery: "select * from data-engineering.testing.table1 join /* @ignoreupstream */ data-engineering.testing.table2 on some_field", - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "table1"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "table1"), }, }, { @@ -153,8 +153,8 @@ func TestParseTopLevelUpstreamsFromQuery(t *testing.T) { ) SELECT id FROM /* @ignoreupstream */ my_temp_table `, - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "an_upstream_table"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "an_upstream_table"), }, }, { @@ -165,12 +165,12 @@ func TestParseTopLevelUpstreamsFromQuery(t *testing.T) { ) SELECT id FROM my_temp_table `, - ExpectedResourceURNs: []string{}, + ExpectedTables: []string{}, }, { - Name: "simple query with simple join that ignores upstream of join", - InputQuery: "WITH my_temp_table AS ( SELECT id, name FROM /* @ignoreupstream */ data-engineering.testing.an_upstream_table ) SELECT id FROM /* @ignoreupstream */ my_temp_table", - ExpectedResourceURNs: []string{}, + Name: "simple query with simple join that ignores upstream of join", + InputQuery: "WITH my_temp_table AS ( SELECT id, name FROM /* @ignoreupstream */ data-engineering.testing.an_upstream_table ) SELECT id FROM /* @ignoreupstream */ my_temp_table", + ExpectedTables: []string{}, }, { Name: "simple query with another query inside comment", @@ -178,8 +178,8 @@ func TestParseTopLevelUpstreamsFromQuery(t *testing.T) { select * from data-engineering.testing.tableABC -- select * from data-engineering.testing.table1 join data-engineering.testing.table2 on some_field `, - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "tableABC"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "tableABC"), }, }, { @@ -189,8 +189,8 @@ func TestParseTopLevelUpstreamsFromQuery(t *testing.T) { /* select * from data-engineering.testing.table1 join data-engineering.testing.table2 on some_field */ join /* @ignoreupstream */ data-engineering.testing.table2 on some_field `, - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "tableABC"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "tableABC"), }, }, { @@ -201,8 +201,8 @@ func TestParseTopLevelUpstreamsFromQuery(t *testing.T) { from data-engineering.testing.tableDEF, `, - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "tableDEF"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "tableDEF"), }, }, { @@ -216,9 +216,9 @@ func TestParseTopLevelUpstreamsFromQuery(t *testing.T) { ` + "`data-engineering.testing.tableDEF`," + ` as backup_table, /* @ignoreupstream */ data-engineering.testing.tableGHI as ignored_table, `, - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "tableABC"), - newBQResourceURN("data-engineering", "testing", "tableDEF"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "tableABC"), + newTable("data-engineering", "testing", "tableDEF"), }, }, { @@ -235,9 +235,9 @@ func TestParseTopLevelUpstreamsFromQuery(t *testing.T) { from /*@ignoreupstream*/ data-engineering.testing.tableC* `, - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "tableA*"), - newBQResourceURN("data-engineering", "testing", "tableB*"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "tableA*"), + newTable("data-engineering", "testing", "tableB*"), }, }, { @@ -254,8 +254,8 @@ func TestParseTopLevelUpstreamsFromQuery(t *testing.T) { from data-engineering.testing.table_a join /* @ignoreupstream */ data-engineering.testing.table_d `, - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "table_a"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "table_a"), }, }, { @@ -276,42 +276,41 @@ func TestParseTopLevelUpstreamsFromQuery(t *testing.T) { join /* @ignoreupstream */ data-engineering.testing.table_e `, - ExpectedResourceURNs: []string{ - newBQResourceURN("data-engineering", "testing", "table_a"), - newBQResourceURN("data-engineering", "testing", "table_d"), + ExpectedTables: []string{ + newTable("data-engineering", "testing", "table_a"), + newTable("data-engineering", "testing", "table_d"), }, }, { - Name: "ignore merge into query", - InputQuery: "merge into `data-engineering.testing.table_a` as target", - ExpectedResourceURNs: []string{}, + Name: "ignore merge into query", + InputQuery: "merge into `data-engineering.testing.table_a` as target", + ExpectedTables: []string{}, }, { - Name: "ignore insert into query", - InputQuery: "insert into `data-engineering.testing.table_a`(id,name)", - ExpectedResourceURNs: []string{}, + Name: "ignore insert into query", + InputQuery: "insert into `data-engineering.testing.table_a`(id,name)", + ExpectedTables: []string{}, }, { - Name: "ignore delete + insert query", - InputQuery: "delete from `data-engineering.testing.table_b`; create or replace table `data-engineering.testing.table_b`", - ExpectedResourceURNs: []string{}, + Name: "ignore delete + insert query", + InputQuery: "delete from `data-engineering.testing.table_b`; create or replace table `data-engineering.testing.table_b`", + ExpectedTables: []string{}, }, { - Name: "ignore create or replace query", - InputQuery: "create or replace table `data-engineering.testing.table_b`", - ExpectedResourceURNs: []string{}, + Name: "ignore create or replace query", + InputQuery: "create or replace table `data-engineering.testing.table_b`", + ExpectedTables: []string{}, }, } for _, test := range testCases { t.Run(test.Name, func(t *testing.T) { actualResourceURNs := parser.ParseTopLevelUpstreamsFromQuery(test.InputQuery) - assert.ElementsMatch(t, test.ExpectedResourceURNs, actualResourceURNs) + assert.ElementsMatch(t, test.ExpectedTables, actualResourceURNs) }) } }) } -func newBQResourceURN(project, dataset, name string) string { - resourceURN, _ := bigquery.NewResourceURN(project, dataset, name) - return resourceURN.URN() +func newTable(project, dataset, name string) string { + return fmt.Sprintf("%s.%s.%s", project, dataset, name) } diff --git a/plugin/upstream_identifier/parser/urn_decorator.go b/plugin/upstream_identifier/parser/urn_decorator.go new file mode 100644 index 0000000000..3f125c471a --- /dev/null +++ b/plugin/upstream_identifier/parser/urn_decorator.go @@ -0,0 +1,38 @@ +package parser + +import ( + "fmt" + "strings" +) + +func BQURNDecorator(f func(string) []string) func(string) []string { + return func(rawResource string) []string { + resourceURNs := []string{} + tables := f(rawResource) + for _, table := range tables { + tableSplitted := strings.Split(table, ".") + if len(tableSplitted) != 3 { + continue + } + resourceURN := fmt.Sprintf("bigquery://%s:%s.%s", tableSplitted[0], tableSplitted[1], tableSplitted[2]) + resourceURNs = append(resourceURNs, resourceURN) + } + return resourceURNs + } +} + +func MaxcomputeURNDecorator(f func(string) []string) func(string) []string { + return func(rawResource string) []string { + resourceURNs := []string{} + tables := f(rawResource) + for _, table := range tables { + tableSplitted := strings.Split(table, ".") + if len(tableSplitted) != 3 { + continue + } + resourceURN := fmt.Sprintf("maxcompute://%s.%s.%s", tableSplitted[0], tableSplitted[1], tableSplitted[2]) + resourceURNs = append(resourceURNs, resourceURN) + } + return resourceURNs + } +} diff --git a/plugin/upstream_identifier/upstream_identifier.go b/plugin/upstream_identifier/upstream_identifier.go index 54cbbc3684..1d9ac5b9f0 100644 --- a/plugin/upstream_identifier/upstream_identifier.go +++ b/plugin/upstream_identifier/upstream_identifier.go @@ -45,6 +45,14 @@ func (u *UpstreamIdentifierFactory) GetBQUpstreamIdentifier(ctx context.Context, return NewBQUpstreamIdentifier(u.l, parser.ParseTopLevelUpstreamsFromQuery, e.Extract, evaluatorFuncs...) } +func (u *UpstreamIdentifierFactory) GetMaxcomputeUpstreamIdentifier(ctx context.Context, evaluators ...evaluator.Evaluator) (UpstreamIdentifier, error) { + evaluatorFuncs := make([]EvalAssetFunc, len(evaluators)) + for i, evaluator := range evaluators { + evaluatorFuncs[i] = evaluator.Evaluate + } + return NewMaxcomputeUpstreamIdentifier(u.l, parser.ParseTopLevelUpstreamsFromQuery, evaluatorFuncs...) +} + func NewUpstreamIdentifierFactory(logger log.Logger) (*UpstreamIdentifierFactory, error) { if logger == nil { return nil, fmt.Errorf("logger is nil") diff --git a/sdk/plugin/plugin.go b/sdk/plugin/plugin.go index 5c80a2cf47..9ff49413c2 100644 --- a/sdk/plugin/plugin.go +++ b/sdk/plugin/plugin.go @@ -46,7 +46,8 @@ type ( ) const ( - BQParser ParserType = "bq" + BQParser ParserType = "bq" + MaxcomputeParser ParserType = "maxcompute" ) type Evaluator struct {