Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: empty resource #23

Merged
merged 8 commits into from
Oct 30, 2023
5 changes: 3 additions & 2 deletions task/bq2bq/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"cloud.google.com/go/bigquery"
"github.com/googleapis/google-cloud-go-testing/bigquery/bqiface"
"github.com/hashicorp/go-hclog"
"golang.org/x/oauth2/google"
"google.golang.org/api/drive/v2"
"google.golang.org/api/option"
Expand Down Expand Up @@ -35,6 +36,6 @@ func (fac *DefaultBQClientFactory) New(ctx context.Context, svcAccount string) (
type DefaultUpstreamExtractorFactory struct {
}

func (d *DefaultUpstreamExtractorFactory) New(client bqiface.Client) (UpstreamExtractor, error) {
return upstream.NewExtractor(client)
func (d *DefaultUpstreamExtractorFactory) New(client bqiface.Client, logger hclog.Logger) (UpstreamExtractor, error) {
return upstream.NewExtractor(client, logger)
}
14 changes: 6 additions & 8 deletions task/bq2bq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ type ClientFactory interface {
}

type UpstreamExtractor interface {
ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []upstream.Resource) ([]*upstream.Upstream, error)
ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []upstream.Resource) ([]upstream.Resource, error)
}

type ExtractorFactory interface {
New(client bqiface.Client) (UpstreamExtractor, error)
New(client bqiface.Client, logger hclog.Logger) (UpstreamExtractor, error)
}

type BQ2BQ struct {
Expand Down Expand Up @@ -224,10 +224,7 @@ func (b *BQ2BQ) GenerateDependencies(ctx context.Context, request plugin.Generat
return response, fmt.Errorf("error extracting upstreams: %w", err)
}

flattenedUpstreams := upstream.FlattenUpstreams(upstreams)
uniqueUpstreams := upstream.UniqueFilterResources(flattenedUpstreams)

formattedUpstreams := b.formatUpstreams(uniqueUpstreams, func(r upstream.Resource) string {
formattedUpstreams := b.formatUpstreams(upstreams, func(r upstream.Resource) string {
name := fmt.Sprintf("%s:%s.%s", r.Project, r.Dataset, r.Name)
return fmt.Sprintf(plugin.DestinationURNFormat, selfTable.Type, name)
})
Expand All @@ -237,7 +234,7 @@ func (b *BQ2BQ) GenerateDependencies(ctx context.Context, request plugin.Generat
return response, nil
}

func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string, resourcesToIgnore []upstream.Resource) ([]*upstream.Upstream, error) {
func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string, resourcesToIgnore []upstream.Resource) ([]upstream.Resource, error) {
spanCtx, span := StartChildSpan(ctx, "extractUpstreams")
defer span.End()

Expand All @@ -247,7 +244,7 @@ func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string
return nil, fmt.Errorf("error creating bigquery client: %w", err)
}

extractor, err := b.ExtractorFac.New(client)
extractor, err := b.ExtractorFac.New(client, b.logger)
if err != nil {
return nil, fmt.Errorf("error initializing upstream extractor: %w", err)
}
Expand All @@ -263,6 +260,7 @@ func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string
}

b.logger.Error("error extracting upstreams", err)
return nil, err
}

return upstreams, nil
Expand Down
97 changes: 39 additions & 58 deletions task/bq2bq/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/googleapis/google-cloud-go-testing/bigquery/bqiface"
"github.com/goto/optimus/sdk/plugin"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

Expand All @@ -33,8 +34,8 @@ type extractorFactoryMock struct {
mock.Mock
}

func (e *extractorFactoryMock) New(client bqiface.Client) (UpstreamExtractor, error) {
args := e.Called(client)
func (e *extractorFactoryMock) New(client bqiface.Client, logger hclog.Logger) (UpstreamExtractor, error) {
args := e.Called(client, logger)

r1, ok := args.Get(0).(UpstreamExtractor)
if !ok {
Expand All @@ -48,10 +49,10 @@ type extractorMock struct {
mock.Mock
}

func (e *extractorMock) ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []upstream.Resource) ([]*upstream.Upstream, error) {
func (e *extractorMock) ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []upstream.Resource) ([]upstream.Resource, error) {
args := e.Called(ctx, query, resourcesToIgnore)

r1, ok := args.Get(0).([]*upstream.Upstream)
r1, ok := args.Get(0).([]upstream.Resource)
if !ok {
return nil, args.Error(1)
}
Expand All @@ -61,6 +62,7 @@ func (e *extractorMock) ExtractUpstreams(ctx context.Context, query string, reso

func TestBQ2BQ(t *testing.T) {
ctx := context.Background()
logger := hclog.NewNullLogger()

t.Run("GetName", func(t *testing.T) {
t.Run("should return name bq2bq", func(t *testing.T) {
Expand Down Expand Up @@ -263,22 +265,21 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`

extractor := new(extractorMock)
extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}).
Return([]*upstream.Upstream{
Return([]upstream.Resource{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
}, nil)

extractorFac := new(extractorFactoryMock)
extractorFac.On("New", client).Return(extractor, nil)
extractorFac.On("New", client, logger).Return(extractor, nil)

b := &BQ2BQ{
ClientFac: bqClientFac,
ExtractorFac: extractorFac,
logger: logger,
}
got, err := b.GenerateDependencies(ctx, data)
if err != nil {
Expand Down Expand Up @@ -335,45 +336,26 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`

extractor := new(extractorMock)
extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}).
Return([]*upstream.Upstream{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
Upstreams: []*upstream.Upstream{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table2",
},
},
},
},
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table2",
},
},
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
Return([]upstream.Resource{
{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
{
Project: "proj",
Dataset: "dataset",
Name: "table2",
},
}, nil)

extractorFac := new(extractorFactoryMock)
extractorFac.On("New", client).Return(extractor, nil)
extractorFac.On("New", client, logger).Return(extractor, nil)

b := &BQ2BQ{
ClientFac: bqClientFac,
ExtractorFac: extractorFac,
logger: logger,
}
got, err := b.GenerateDependencies(ctx, data)
if err != nil {
Expand Down Expand Up @@ -430,14 +412,15 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`

extractor := new(extractorMock)
extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}).
Return([]*upstream.Upstream{}, nil)
Return([]upstream.Resource{}, nil)

extractorFac := new(extractorFactoryMock)
extractorFac.On("New", client).Return(extractor, nil)
extractorFac.On("New", client, logger).Return(extractor, nil)

b := &BQ2BQ{
ClientFac: bqClientFac,
ExtractorFac: extractorFac,
logger: logger,
}
got, err := b.GenerateDependencies(ctx, data)
if err != nil {
Expand Down Expand Up @@ -494,22 +477,21 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`

extractor := new(extractorMock)
extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}).
Return([]*upstream.Upstream{
Return([]upstream.Resource{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
}, nil)

extractorFac := new(extractorFactoryMock)
extractorFac.On("New", client).Return(extractor, nil)
extractorFac.On("New", client, logger).Return(extractor, nil)

b := &BQ2BQ{
ClientFac: bqClientFac,
ExtractorFac: extractorFac,
logger: logger,
}
got, err := b.GenerateDependencies(ctx, data)
if err != nil {
Expand Down Expand Up @@ -566,22 +548,21 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`

extractor := new(extractorMock)
extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}).
Return([]*upstream.Upstream{
Return([]upstream.Resource{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
}, nil)

extractorFac := new(extractorFactoryMock)
extractorFac.On("New", client).Return(extractor, nil)
extractorFac.On("New", client, logger).Return(extractor, nil)

b := &BQ2BQ{
ClientFac: bqClientFac,
ExtractorFac: extractorFac,
logger: logger,
}
got, err := b.GenerateDependencies(ctx, data)
if err != nil {
Expand Down
Loading
Loading