Skip to content

Commit

Permalink
fix: empty resource (#23)
Browse files Browse the repository at this point in the history
* refactor: update naming within upstream for more readability

* fix: remove upstream implementation to resource only

* refactor: extract out group upstream extraction

* test: add sorting actual results to ensure consistency

* fix: ignore error caused by access denied or user permission issue

* refactor: remove unnecessary implementation in test

* refactor: move out ignorable error handling to extractor

* fix: update to add logger to extractor
  • Loading branch information
irainia authored Oct 30, 2023
1 parent 76ba68d commit fe52f52
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 289 deletions.
5 changes: 3 additions & 2 deletions task/bq2bq/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"cloud.google.com/go/bigquery"
"github.com/googleapis/google-cloud-go-testing/bigquery/bqiface"
"github.com/hashicorp/go-hclog"
"golang.org/x/oauth2/google"
"google.golang.org/api/drive/v2"
"google.golang.org/api/option"
Expand Down Expand Up @@ -35,6 +36,6 @@ func (fac *DefaultBQClientFactory) New(ctx context.Context, svcAccount string) (
type DefaultUpstreamExtractorFactory struct {
}

func (d *DefaultUpstreamExtractorFactory) New(client bqiface.Client) (UpstreamExtractor, error) {
return upstream.NewExtractor(client)
func (d *DefaultUpstreamExtractorFactory) New(client bqiface.Client, logger hclog.Logger) (UpstreamExtractor, error) {
return upstream.NewExtractor(client, logger)
}
14 changes: 6 additions & 8 deletions task/bq2bq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ type ClientFactory interface {
}

type UpstreamExtractor interface {
ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []upstream.Resource) ([]*upstream.Upstream, error)
ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []upstream.Resource) ([]upstream.Resource, error)
}

type ExtractorFactory interface {
New(client bqiface.Client) (UpstreamExtractor, error)
New(client bqiface.Client, logger hclog.Logger) (UpstreamExtractor, error)
}

type BQ2BQ struct {
Expand Down Expand Up @@ -224,10 +224,7 @@ func (b *BQ2BQ) GenerateDependencies(ctx context.Context, request plugin.Generat
return response, fmt.Errorf("error extracting upstreams: %w", err)
}

flattenedUpstreams := upstream.FlattenUpstreams(upstreams)
uniqueUpstreams := upstream.UniqueFilterResources(flattenedUpstreams)

formattedUpstreams := b.formatUpstreams(uniqueUpstreams, func(r upstream.Resource) string {
formattedUpstreams := b.formatUpstreams(upstreams, func(r upstream.Resource) string {
name := fmt.Sprintf("%s:%s.%s", r.Project, r.Dataset, r.Name)
return fmt.Sprintf(plugin.DestinationURNFormat, selfTable.Type, name)
})
Expand All @@ -237,7 +234,7 @@ func (b *BQ2BQ) GenerateDependencies(ctx context.Context, request plugin.Generat
return response, nil
}

func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string, resourcesToIgnore []upstream.Resource) ([]*upstream.Upstream, error) {
func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string, resourcesToIgnore []upstream.Resource) ([]upstream.Resource, error) {
spanCtx, span := StartChildSpan(ctx, "extractUpstreams")
defer span.End()

Expand All @@ -247,7 +244,7 @@ func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string
return nil, fmt.Errorf("error creating bigquery client: %w", err)
}

extractor, err := b.ExtractorFac.New(client)
extractor, err := b.ExtractorFac.New(client, b.logger)
if err != nil {
return nil, fmt.Errorf("error initializing upstream extractor: %w", err)
}
Expand All @@ -263,6 +260,7 @@ func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string
}

b.logger.Error("error extracting upstreams", err)
return nil, err
}

return upstreams, nil
Expand Down
97 changes: 39 additions & 58 deletions task/bq2bq/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/googleapis/google-cloud-go-testing/bigquery/bqiface"
"github.com/goto/optimus/sdk/plugin"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

Expand All @@ -33,8 +34,8 @@ type extractorFactoryMock struct {
mock.Mock
}

func (e *extractorFactoryMock) New(client bqiface.Client) (UpstreamExtractor, error) {
args := e.Called(client)
func (e *extractorFactoryMock) New(client bqiface.Client, logger hclog.Logger) (UpstreamExtractor, error) {
args := e.Called(client, logger)

r1, ok := args.Get(0).(UpstreamExtractor)
if !ok {
Expand All @@ -48,10 +49,10 @@ type extractorMock struct {
mock.Mock
}

func (e *extractorMock) ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []upstream.Resource) ([]*upstream.Upstream, error) {
func (e *extractorMock) ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []upstream.Resource) ([]upstream.Resource, error) {
args := e.Called(ctx, query, resourcesToIgnore)

r1, ok := args.Get(0).([]*upstream.Upstream)
r1, ok := args.Get(0).([]upstream.Resource)
if !ok {
return nil, args.Error(1)
}
Expand All @@ -61,6 +62,7 @@ func (e *extractorMock) ExtractUpstreams(ctx context.Context, query string, reso

func TestBQ2BQ(t *testing.T) {
ctx := context.Background()
logger := hclog.NewNullLogger()

t.Run("GetName", func(t *testing.T) {
t.Run("should return name bq2bq", func(t *testing.T) {
Expand Down Expand Up @@ -263,22 +265,21 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`

extractor := new(extractorMock)
extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}).
Return([]*upstream.Upstream{
Return([]upstream.Resource{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
}, nil)

extractorFac := new(extractorFactoryMock)
extractorFac.On("New", client).Return(extractor, nil)
extractorFac.On("New", client, logger).Return(extractor, nil)

b := &BQ2BQ{
ClientFac: bqClientFac,
ExtractorFac: extractorFac,
logger: logger,
}
got, err := b.GenerateDependencies(ctx, data)
if err != nil {
Expand Down Expand Up @@ -335,45 +336,26 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`

extractor := new(extractorMock)
extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}).
Return([]*upstream.Upstream{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
Upstreams: []*upstream.Upstream{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table2",
},
},
},
},
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table2",
},
},
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
Return([]upstream.Resource{
{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
{
Project: "proj",
Dataset: "dataset",
Name: "table2",
},
}, nil)

extractorFac := new(extractorFactoryMock)
extractorFac.On("New", client).Return(extractor, nil)
extractorFac.On("New", client, logger).Return(extractor, nil)

b := &BQ2BQ{
ClientFac: bqClientFac,
ExtractorFac: extractorFac,
logger: logger,
}
got, err := b.GenerateDependencies(ctx, data)
if err != nil {
Expand Down Expand Up @@ -430,14 +412,15 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`

extractor := new(extractorMock)
extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}).
Return([]*upstream.Upstream{}, nil)
Return([]upstream.Resource{}, nil)

extractorFac := new(extractorFactoryMock)
extractorFac.On("New", client).Return(extractor, nil)
extractorFac.On("New", client, logger).Return(extractor, nil)

b := &BQ2BQ{
ClientFac: bqClientFac,
ExtractorFac: extractorFac,
logger: logger,
}
got, err := b.GenerateDependencies(ctx, data)
if err != nil {
Expand Down Expand Up @@ -494,22 +477,21 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`

extractor := new(extractorMock)
extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}).
Return([]*upstream.Upstream{
Return([]upstream.Resource{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
}, nil)

extractorFac := new(extractorFactoryMock)
extractorFac.On("New", client).Return(extractor, nil)
extractorFac.On("New", client, logger).Return(extractor, nil)

b := &BQ2BQ{
ClientFac: bqClientFac,
ExtractorFac: extractorFac,
logger: logger,
}
got, err := b.GenerateDependencies(ctx, data)
if err != nil {
Expand Down Expand Up @@ -566,22 +548,21 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`

extractor := new(extractorMock)
extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}).
Return([]*upstream.Upstream{
Return([]upstream.Resource{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
}, nil)

extractorFac := new(extractorFactoryMock)
extractorFac.On("New", client).Return(extractor, nil)
extractorFac.On("New", client, logger).Return(extractor, nil)

b := &BQ2BQ{
ClientFac: bqClientFac,
ExtractorFac: extractorFac,
logger: logger,
}
got, err := b.GenerateDependencies(ctx, data)
if err != nil {
Expand Down
Loading

0 comments on commit fe52f52

Please sign in to comment.