Skip to content

Commit

Permalink
feat: add upstream identifier for maxcompute
Browse files Browse the repository at this point in the history
  • Loading branch information
deryrahman committed Oct 22, 2024
1 parent 541691f commit 8dedad5
Show file tree
Hide file tree
Showing 11 changed files with 280 additions and 150 deletions.
30 changes: 19 additions & 11 deletions plugin/plugin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type EvaluatorFactory interface {

type UpstreamIdentifierFactory interface {
GetBQUpstreamIdentifier(ctx context.Context, svcAcc string, evaluators ...evaluator.Evaluator) (upstreamidentifier.UpstreamIdentifier, error)
GetMaxcomputeUpstreamIdentifier(ctx context.Context, evaluators ...evaluator.Evaluator) (upstreamidentifier.UpstreamIdentifier, error)
}

type PluginService struct {
Expand Down Expand Up @@ -108,20 +109,27 @@ func (s PluginService) IdentifyUpstreams(ctx context.Context, taskName string, c
evaluators = append(evaluators, evaluator)
}

if parserType != plugin.BQParser {
switch parserType {
case plugin.MaxcomputeParser:
upstreamIdentifier, err := s.upstreamIdentifierFactory.GetMaxcomputeUpstreamIdentifier(ctx, evaluators...)
if err != nil {
return nil, err
}
upstreamIdentifiers = append(upstreamIdentifiers, upstreamIdentifier)
case plugin.BQParser:
svcAcc, ok := compiledConfig[bqSvcAccKey]
if !ok {
return nil, fmt.Errorf("secret " + bqSvcAccKey + " required to generate upstream is not found")
}
upstreamIdentifier, err := s.upstreamIdentifierFactory.GetBQUpstreamIdentifier(ctx, svcAcc, evaluators...)
if err != nil {
return nil, err
}
upstreamIdentifiers = append(upstreamIdentifiers, upstreamIdentifier)
default:
s.l.Warn("parserType %s is not supported", parserType)
continue
}
// for now parser type is only scoped for bigquery, so that it uses bigquery as upstream identifier
svcAcc, ok := compiledConfig[bqSvcAccKey]
if !ok {
return nil, fmt.Errorf("secret " + bqSvcAccKey + " required to generate upstream is not found")
}
upstreamIdentifier, err := s.upstreamIdentifierFactory.GetBQUpstreamIdentifier(ctx, svcAcc, evaluators...)
if err != nil {
return nil, err
}
upstreamIdentifiers = append(upstreamIdentifiers, upstreamIdentifier)
}

// identify all upstream resource urns by all identifier from given asset
Expand Down
40 changes: 38 additions & 2 deletions plugin/plugin_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,13 +542,49 @@ func (_m *UpstreamIdentifierFactory) GetBQUpstreamIdentifier(ctx context.Context
return r0, r1
}

// GetMaxcomputeUpstreamIdentifier provides a mock function with given fields: ctx, evaluators
func (_m *UpstreamIdentifierFactory) GetMaxcomputeUpstreamIdentifier(ctx context.Context, evaluators ...evaluator.Evaluator) (upstreamidentifier.UpstreamIdentifier, error) {
_va := make([]interface{}, len(evaluators))
for _i := range evaluators {
_va[_i] = evaluators[_i]
}
var _ca []interface{}
_ca = append(_ca, ctx)
_ca = append(_ca, _va...)
ret := _m.Called(_ca...)

if len(ret) == 0 {
panic("no return value specified for GetMaxcomputeUpstreamIdentifier")
}

var r0 upstreamidentifier.UpstreamIdentifier
var r1 error
if rf, ok := ret.Get(0).(func(context.Context, ...evaluator.Evaluator) (upstreamidentifier.UpstreamIdentifier, error)); ok {
return rf(ctx, evaluators...)
}
if rf, ok := ret.Get(0).(func(context.Context, ...evaluator.Evaluator) upstreamidentifier.UpstreamIdentifier); ok {
r0 = rf(ctx, evaluators...)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(upstreamidentifier.UpstreamIdentifier)
}
}

if rf, ok := ret.Get(1).(func(context.Context, ...evaluator.Evaluator) error); ok {
r1 = rf(ctx, evaluators...)
} else {
r1 = ret.Error(1)
}

return r0, r1
}

// NewUpstreamIdentifierFactory creates a new instance of UpstreamIdentifierFactory. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
// The first argument is typically a *testing.T value.
func NewUpstreamIdentifierFactory(t interface {
mock.TestingT
Cleanup(func())
},
) *UpstreamIdentifierFactory {
}) *UpstreamIdentifierFactory {

Check failure on line 587 in plugin/plugin_service_test.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed with `-extra` (gofumpt)
mock := &UpstreamIdentifierFactory{}
mock.Mock.Test(t)

Expand Down
3 changes: 2 additions & 1 deletion plugin/upstream_identifier/bq_upstream_identifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/goto/optimus/core/resource"
"github.com/goto/optimus/ext/store/bigquery"
"github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/plugin/upstream_identifier/parser"
)

type (
Expand Down Expand Up @@ -129,7 +130,7 @@ func NewBQUpstreamIdentifier(logger log.Logger, parserFunc ParserFunc, bqExtract

return &BQUpstreamIdentifier{
logger: logger,
parserFunc: parserFunc,
parserFunc: parser.BQURNDecorator(parserFunc),
extractorFunc: bqExtractorDecorator(logger, bqExtractorFunc),
evaluatorFuncs: sanitizedEvaluatorFuncs,
}, nil
Expand Down
19 changes: 9 additions & 10 deletions plugin/upstream_identifier/bq_upstream_identifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func TestIdentifyResources(t *testing.T) {
defer bqExtractorFunc.AssertExpectations(t)

evaluatorFunc.On("Execute", assets).Return(assets["./query.sql"])
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"bigquery://project1:dataset1.name1"})
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"project1.dataset1.name1"})
bqExtractorFunc.On("Execute", ctx, mock.Anything).Return(nil, errors.New("some error"))

bqUpstreamIdentifier, err := upstreamidentifier.NewBQUpstreamIdentifier(logger, parserFunc.Execute, bqExtractorFunc.Execute, evaluatorFunc.Execute)
Expand All @@ -105,9 +105,8 @@ func TestIdentifyResources(t *testing.T) {
defer bqExtractorFunc.AssertExpectations(t)

evaluatorFunc.On("Execute", assets).Return(assets["./query.sql"])
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"broken://project1;dataset1.name1"})
// bq extractor should receives empty resource urn, since the urn construction is fail
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{}).Return(map[bigquery.ResourceURN]string{}, nil)
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"project1;dataset1.name1"})
// bq extractor should not be executed since the result of parser is empty

bqUpstreamIdentifier, err := upstreamidentifier.NewBQUpstreamIdentifier(logger, parserFunc.Execute, bqExtractorFunc.Execute, evaluatorFunc.Execute)
assert.NoError(t, err)
Expand Down Expand Up @@ -135,13 +134,13 @@ func TestIdentifyResources(t *testing.T) {
sqlView2 := "select 1 from `project1.dataset1.name1` join `project1.dataset1.name3` on true"

evaluatorFunc.On("Execute", assets).Return(assets["./query.sql"])
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"bigquery://project1:dataset1.name1"})
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"project1.dataset1.name1"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN1}).Return(map[bigquery.ResourceURN]string{resourceURN1: sqlView1}, nil)

parserFunc.On("Execute", sqlView1).Return([]string{"bigquery://project1:dataset1.name2"})
parserFunc.On("Execute", sqlView1).Return([]string{"project1.dataset1.name2"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN2}).Return(map[bigquery.ResourceURN]string{resourceURN2: sqlView2}, nil)

parserFunc.On("Execute", sqlView2).Return([]string{"bigquery://project1:dataset1.name1", "bigquery://project1:dataset1.name3"})
parserFunc.On("Execute", sqlView2).Return([]string{"project1.dataset1.name1", "project1.dataset1.name3"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN1, resourceURN3}).Return(map[bigquery.ResourceURN]string{resourceURN1: sqlView1, resourceURN3: ""}, nil)

parserFunc.On("Execute", "").Return([]string{})
Expand Down Expand Up @@ -172,13 +171,13 @@ func TestIdentifyResources(t *testing.T) {
sqlView2 := "select 1 from `project1.dataset1.name3`"

evaluatorFunc.On("Execute", assets).Return(assets["./query.sql"])
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"bigquery://project1:dataset1.name1"})
parserFunc.On("Execute", assets["./query.sql"]).Return([]string{"project1.dataset1.name1"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN1}).Return(map[bigquery.ResourceURN]string{resourceURN1: sqlView1}, nil)

parserFunc.On("Execute", sqlView1).Return([]string{"bigquery://project1:dataset1.name2", "bigquery://project1:dataset1.name3"})
parserFunc.On("Execute", sqlView1).Return([]string{"project1.dataset1.name2", "project1.dataset1.name3"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN2, resourceURN3}).Return(map[bigquery.ResourceURN]string{resourceURN2: sqlView2, resourceURN3: ""}, nil)

parserFunc.On("Execute", sqlView2).Return([]string{"bigquery://project1:dataset1.name3"})
parserFunc.On("Execute", sqlView2).Return([]string{"project1.dataset1.name3"})
bqExtractorFunc.On("Execute", ctx, []bigquery.ResourceURN{resourceURN3}).Return(map[bigquery.ResourceURN]string{resourceURN3: ""}, nil)

parserFunc.On("Execute", "").Return([]string{})
Expand Down
48 changes: 48 additions & 0 deletions plugin/upstream_identifier/maxcompute_upstream_identifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package upstreamidentifier

import (
"context"

Check failure on line 5 in plugin/upstream_identifier/maxcompute_upstream_identifier.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/goto/optimus) (gci)
"github.com/goto/optimus/core/resource"
"github.com/goto/optimus/plugin/upstream_identifier/parser"
"github.com/goto/salt/log"

Check failure on line 8 in plugin/upstream_identifier/maxcompute_upstream_identifier.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gci`-ed with --skip-generated -s standard -s default -s prefix(github.com/goto/optimus) (gci)
)

type MaxcomputeUpstreamIdentifier struct {
logger log.Logger
parserFunc ParserFunc
evaluatorFuncs []EvalAssetFunc
}

func NewMaxcomputeUpstreamIdentifier(logger log.Logger, parserFunc ParserFunc, evaluatorFuncs ...EvalAssetFunc) (*MaxcomputeUpstreamIdentifier, error) {

Check failure on line 17 in plugin/upstream_identifier/maxcompute_upstream_identifier.go

View workflow job for this annotation

GitHub Actions / lint

NewMaxcomputeUpstreamIdentifier - result 1 (error) is always nil (unparam)
return &MaxcomputeUpstreamIdentifier{
logger: logger,
parserFunc: parser.MaxcomputeURNDecorator(parserFunc),
evaluatorFuncs: evaluatorFuncs,
}, nil
}

func (g MaxcomputeUpstreamIdentifier) IdentifyResources(ctx context.Context, assets map[string]string) ([]resource.URN, error) {

Check failure on line 25 in plugin/upstream_identifier/maxcompute_upstream_identifier.go

View workflow job for this annotation

GitHub Actions / lint

unused-parameter: parameter 'ctx' seems to be unused, consider removing or renaming it as _ (revive)
resourceURNs := []resource.URN{}

// generate resource urn with upstream from each evaluator
for _, evaluatorFunc := range g.evaluatorFuncs {
query := evaluatorFunc(assets)
if query == "" {
continue
}
resources := g.identifyResources(query)
resourceURNs = append(resourceURNs, resources...)
}
return resourceURNs, nil
}

func (g MaxcomputeUpstreamIdentifier) identifyResources(query string) []resource.URN {
resources := g.parserFunc(query)
resourceURNs := make([]resource.URN, len(resources))
for _, r := range resources {
resourceURN, _ := resource.NewURN("maxcompute", r) // TODO: use dedicated function new resource from string
resourceURNs = append(resourceURNs, resourceURN)

Check failure on line 45 in plugin/upstream_identifier/maxcompute_upstream_identifier.go

View workflow job for this annotation

GitHub Actions / lint

append to slice `resourceURNs` with non-zero initialized length (makezero)
}
return resourceURNs
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package upstreamidentifier_test

// TODO: Implement test
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,29 @@ package parser
import (
"regexp"
"strings"

"github.com/goto/optimus/ext/store/bigquery"
)

var (
topLevelUpstreamsPattern = regexp.MustCompile(
"(?i)(?:FROM)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-\\*?]+)`?" + //nolint:gocritic
"(?i)(?:FROM)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+\\.[\\w-]+\\.[\\w-\\*?]+)`?" + //nolint:gocritic
"|" +
"(?i)(?:JOIN)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" +
"(?i)(?:JOIN)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+\\.[\\w-]+\\.[\\w-]+)`?" +
"|" +
"(?i)(?:WITH)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?\\s+(?:AS)" +
"(?i)(?:WITH)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+\\.[\\w-]+\\.[\\w-]+)`?\\s+(?:AS)" +
"|" +
// ref: https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#merge_statement
"(?i)(?:MERGE)\\s*(?:INTO)?\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" + // to ignore
"(?i)(?:MERGE)\\s*(?:INTO)?\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+\\.[\\w-]+\\.[\\w-]+)`?" + // to ignore
"|" +
// ref: https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#insert_statement
"(?i)(?:INSERT)\\s*(?:INTO)?\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" + // to ignore
"(?i)(?:INSERT)\\s*(?:INTO)?\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+\\.[\\w-]+\\.[\\w-]+)`?" + // to ignore
"|" +
// ref: https://cloud.google.com/bigquery/docs/reference/standard-sql/dml-syntax#delete_statement
"(?i)(?:DELETE)\\s*(?:FROM)?\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" + // to ignore
"(?i)(?:DELETE)\\s*(?:FROM)?\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+\\.[\\w-]+\\.[\\w-]+)`?" + // to ignore
"|" +
// ref: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language
"(?i)(?:CREATE)\\s*(?:OR\\s+REPLACE)?\\s*(?:VIEW|(?:TEMP\\s+)?TABLE)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" + // to ignore
"(?i)(?:CREATE)\\s*(?:OR\\s+REPLACE)?\\s*(?:VIEW|(?:TEMP\\s+)?TABLE)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+\\.[\\w-]+\\.[\\w-]+)`?" + // to ignore
"|" +
"(?i)(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`\\s*(?:AS)?")
"(?i)(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+\\.[\\w-]+\\.[\\w-]+)`?\\s*(?:AS)?")

singleLineCommentsPattern = regexp.MustCompile(`(--.*)`)
multiLineCommentsPattern = regexp.MustCompile(`(((/\*)+?[\w\W]*?(\*/)+))`)
Expand All @@ -37,41 +35,33 @@ var (
func ParseTopLevelUpstreamsFromQuery(query string) []string {
cleanedQuery := cleanQueryFromComment(query)

resourcesFound := make(map[bigquery.ResourceURN]bool)
pseudoResources := make(map[bigquery.ResourceURN]bool)
tableFound := map[string]bool{}
pseudoTable := map[string]bool{}

matches := topLevelUpstreamsPattern.FindAllStringSubmatch(cleanedQuery, -1)

for _, match := range matches {
var projectIdx, datasetIdx, nameIdx, ignoreUpstreamIdx int
var tableIdx, ignoreUpstreamIdx int
tokens := strings.Fields(match[0])
clause := strings.ToLower(tokens[0])

switch clause {
case "from":
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 1, 2, 3, 4
ignoreUpstreamIdx, tableIdx = 1, 2
case "join":
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 5, 6, 7, 8
ignoreUpstreamIdx, tableIdx = 3, 4
case "with":
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 9, 10, 11, 12
ignoreUpstreamIdx, tableIdx = 5, 6
case "merge":
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 13, 14, 15, 16
ignoreUpstreamIdx, tableIdx = 7, 8
case "insert":
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 17, 18, 19, 20
ignoreUpstreamIdx, tableIdx = 9, 10
case "delete":
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 21, 22, 23, 24
ignoreUpstreamIdx, tableIdx = 11, 12
case "create":
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 25, 26, 27, 28
ignoreUpstreamIdx, tableIdx = 13, 14
default:
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 29, 30, 31, 32
}

project := match[projectIdx]
dataset := match[datasetIdx]
name := match[nameIdx]

if project == "" || dataset == "" || name == "" {
continue
ignoreUpstreamIdx, tableIdx = 15, 16
}

if strings.TrimSpace(match[ignoreUpstreamIdx]) == "@ignoreupstream" {
Expand All @@ -82,22 +72,21 @@ func ParseTopLevelUpstreamsFromQuery(query string) []string {
continue
}

resourceURN, _ := bigquery.NewResourceURN(project, dataset, name)

tableName := match[tableIdx]
if clause == "with" {
pseudoResources[resourceURN] = true
pseudoTable[tableName] = true
} else {
resourcesFound[resourceURN] = true
tableFound[tableName] = true
}
}

output := []string{}

for resourceURN := range resourcesFound {
if pseudoResources[resourceURN] {
for table := range tableFound {
if pseudoTable[table] {
continue
}
output = append(output, resourceURN.URN())
output = append(output, table)
}

return output
Expand Down
Loading

0 comments on commit 8dedad5

Please sign in to comment.