Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Non partitioned replace #27

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion task/bq2bq/executor/bumblebee/transformation.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def transform(self):
bq_destination_table = self.bigquery_service.get_table(self.task_config.destination_table)
if bq_destination_table.time_partitioning is None:
task_queries = self.sql_query.split(OPTIMUS_QUERY_BREAK_MARKER)
transformation = TableTransformation(self.bigquery_service,
transformation = NonPartitionedTableTransformation(self.bigquery_service,
self.task_config,
task_queries[0],
self.dstart,
Expand Down Expand Up @@ -199,6 +199,42 @@ def execute(self):
logger.info("finished")



class NonPartitionedTableTransformation:
"""
Query transformation effects whole non partitioned table
"""

def __init__(self, bigquery_service: BigqueryService,
task_config: TaskConfig,
task_query: str,
dstart: datetime,
dend: datetime,
dry_run: bool,
execution_time: datetime):
self.bigquery_service = bigquery_service
self.task_config = task_config
self.task_query = task_query
self.dry_run = dry_run
self.window = CustomWindow(dstart, dend)
self.execution_time = execution_time

def transform(self):
loader = TableLoader(self.bigquery_service, self.task_config.destination_table, LoadMethod.APPEND,
self.task_config.allow_field_addition)
logger.info("create transformation for table")

task = NonPartitionTransformation(self.task_config,
loader,
self.task_query,
self.window,
self.dry_run,
self.execution_time)

self.bigquery_service.execute_query("truncate table `gtfndata-integration.playground.replace_table_target`;")
task.execute()


class TableTransformation:
"""
Query transformation effects whole non partitioned table
Expand Down Expand Up @@ -270,6 +306,38 @@ def transform(self):
task.execute()


class NonPartitionTransformation:
def __init__(self,
task_config: TaskConfig,
loader: BaseLoader,
query: str,
window: Window,
dry_run: bool,
execution_time: datetime):
self.dry_run = dry_run
self.loader = loader

destination_parameter = DestinationParameter(task_config.destination_table)
window_parameter = WindowParameter(window)
execution_parameter = ExecutionParameter(execution_time)

self.query = Query(query).apply_parameter(window_parameter).apply_parameter(
execution_parameter).apply_parameter(destination_parameter)

def execute(self):
logger.info("start transformation job")
self.query.print_with_logger(logger)

result = None
if not self.dry_run:
result = self.loader.load(self.query)

logger.info(result)
logger.info("finished")

async def async_execute(self):
self.execute()

class PartitionTransformation:
def __init__(self,
task_config: TaskConfig,
Expand Down
5 changes: 3 additions & 2 deletions task/bq2bq/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"cloud.google.com/go/bigquery"
"github.com/googleapis/google-cloud-go-testing/bigquery/bqiface"
"github.com/hashicorp/go-hclog"
"golang.org/x/oauth2/google"
"google.golang.org/api/drive/v2"
"google.golang.org/api/option"
Expand Down Expand Up @@ -35,6 +36,6 @@ func (fac *DefaultBQClientFactory) New(ctx context.Context, svcAccount string) (
type DefaultUpstreamExtractorFactory struct {
}

func (d *DefaultUpstreamExtractorFactory) New(client bqiface.Client) (UpstreamExtractor, error) {
return upstream.NewExtractor(client)
func (d *DefaultUpstreamExtractorFactory) New(client bqiface.Client, logger hclog.Logger) (UpstreamExtractor, error) {
return upstream.NewExtractor(client, logger)
}
14 changes: 6 additions & 8 deletions task/bq2bq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ type ClientFactory interface {
}

type UpstreamExtractor interface {
ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []upstream.Resource) ([]*upstream.Upstream, error)
ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []upstream.Resource) ([]upstream.Resource, error)
}

type ExtractorFactory interface {
New(client bqiface.Client) (UpstreamExtractor, error)
New(client bqiface.Client, logger hclog.Logger) (UpstreamExtractor, error)
}

type BQ2BQ struct {
Expand Down Expand Up @@ -224,10 +224,7 @@ func (b *BQ2BQ) GenerateDependencies(ctx context.Context, request plugin.Generat
return response, fmt.Errorf("error extracting upstreams: %w", err)
}

flattenedUpstreams := upstream.FlattenUpstreams(upstreams)
uniqueUpstreams := upstream.UniqueFilterResources(flattenedUpstreams)

formattedUpstreams := b.formatUpstreams(uniqueUpstreams, func(r upstream.Resource) string {
formattedUpstreams := b.formatUpstreams(upstreams, func(r upstream.Resource) string {
name := fmt.Sprintf("%s:%s.%s", r.Project, r.Dataset, r.Name)
return fmt.Sprintf(plugin.DestinationURNFormat, selfTable.Type, name)
})
Expand All @@ -237,7 +234,7 @@ func (b *BQ2BQ) GenerateDependencies(ctx context.Context, request plugin.Generat
return response, nil
}

func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string, resourcesToIgnore []upstream.Resource) ([]*upstream.Upstream, error) {
func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string, resourcesToIgnore []upstream.Resource) ([]upstream.Resource, error) {
spanCtx, span := StartChildSpan(ctx, "extractUpstreams")
defer span.End()

Expand All @@ -247,7 +244,7 @@ func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string
return nil, fmt.Errorf("error creating bigquery client: %w", err)
}

extractor, err := b.ExtractorFac.New(client)
extractor, err := b.ExtractorFac.New(client, b.logger)
if err != nil {
return nil, fmt.Errorf("error initializing upstream extractor: %w", err)
}
Expand All @@ -263,6 +260,7 @@ func (b *BQ2BQ) extractUpstreams(ctx context.Context, query, svcAccSecret string
}

b.logger.Error("error extracting upstreams", err)
return nil, err
}

return upstreams, nil
Expand Down
97 changes: 39 additions & 58 deletions task/bq2bq/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/googleapis/google-cloud-go-testing/bigquery/bqiface"
"github.com/goto/optimus/sdk/plugin"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

Expand All @@ -33,8 +34,8 @@ type extractorFactoryMock struct {
mock.Mock
}

func (e *extractorFactoryMock) New(client bqiface.Client) (UpstreamExtractor, error) {
args := e.Called(client)
func (e *extractorFactoryMock) New(client bqiface.Client, logger hclog.Logger) (UpstreamExtractor, error) {
args := e.Called(client, logger)

r1, ok := args.Get(0).(UpstreamExtractor)
if !ok {
Expand All @@ -48,10 +49,10 @@ type extractorMock struct {
mock.Mock
}

func (e *extractorMock) ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []upstream.Resource) ([]*upstream.Upstream, error) {
func (e *extractorMock) ExtractUpstreams(ctx context.Context, query string, resourcesToIgnore []upstream.Resource) ([]upstream.Resource, error) {
args := e.Called(ctx, query, resourcesToIgnore)

r1, ok := args.Get(0).([]*upstream.Upstream)
r1, ok := args.Get(0).([]upstream.Resource)
if !ok {
return nil, args.Error(1)
}
Expand All @@ -61,6 +62,7 @@ func (e *extractorMock) ExtractUpstreams(ctx context.Context, query string, reso

func TestBQ2BQ(t *testing.T) {
ctx := context.Background()
logger := hclog.NewNullLogger()

t.Run("GetName", func(t *testing.T) {
t.Run("should return name bq2bq", func(t *testing.T) {
Expand Down Expand Up @@ -263,22 +265,21 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`

extractor := new(extractorMock)
extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}).
Return([]*upstream.Upstream{
Return([]upstream.Resource{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
}, nil)

extractorFac := new(extractorFactoryMock)
extractorFac.On("New", client).Return(extractor, nil)
extractorFac.On("New", client, logger).Return(extractor, nil)

b := &BQ2BQ{
ClientFac: bqClientFac,
ExtractorFac: extractorFac,
logger: logger,
}
got, err := b.GenerateDependencies(ctx, data)
if err != nil {
Expand Down Expand Up @@ -335,45 +336,26 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`

extractor := new(extractorMock)
extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}).
Return([]*upstream.Upstream{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
Upstreams: []*upstream.Upstream{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table2",
},
},
},
},
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table2",
},
},
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
Return([]upstream.Resource{
{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
{
Project: "proj",
Dataset: "dataset",
Name: "table2",
},
}, nil)

extractorFac := new(extractorFactoryMock)
extractorFac.On("New", client).Return(extractor, nil)
extractorFac.On("New", client, logger).Return(extractor, nil)

b := &BQ2BQ{
ClientFac: bqClientFac,
ExtractorFac: extractorFac,
logger: logger,
}
got, err := b.GenerateDependencies(ctx, data)
if err != nil {
Expand Down Expand Up @@ -430,14 +412,15 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`

extractor := new(extractorMock)
extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}).
Return([]*upstream.Upstream{}, nil)
Return([]upstream.Resource{}, nil)

extractorFac := new(extractorFactoryMock)
extractorFac.On("New", client).Return(extractor, nil)
extractorFac.On("New", client, logger).Return(extractor, nil)

b := &BQ2BQ{
ClientFac: bqClientFac,
ExtractorFac: extractorFac,
logger: logger,
}
got, err := b.GenerateDependencies(ctx, data)
if err != nil {
Expand Down Expand Up @@ -494,22 +477,21 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`

extractor := new(extractorMock)
extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}).
Return([]*upstream.Upstream{
Return([]upstream.Resource{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
}, nil)

extractorFac := new(extractorFactoryMock)
extractorFac.On("New", client).Return(extractor, nil)
extractorFac.On("New", client, logger).Return(extractor, nil)

b := &BQ2BQ{
ClientFac: bqClientFac,
ExtractorFac: extractorFac,
logger: logger,
}
got, err := b.GenerateDependencies(ctx, data)
if err != nil {
Expand Down Expand Up @@ -566,22 +548,21 @@ Select * from table where ts > "2021-01-16T00:00:00Z"`

extractor := new(extractorMock)
extractor.On("ExtractUpstreams", mock.Anything, query, []upstream.Resource{destination}).
Return([]*upstream.Upstream{
Return([]upstream.Resource{
{
Resource: upstream.Resource{
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
Project: "proj",
Dataset: "dataset",
Name: "table1",
},
}, nil)

extractorFac := new(extractorFactoryMock)
extractorFac.On("New", client).Return(extractor, nil)
extractorFac.On("New", client, logger).Return(extractor, nil)

b := &BQ2BQ{
ClientFac: bqClientFac,
ExtractorFac: extractorFac,
logger: logger,
}
got, err := b.GenerateDependencies(ctx, data)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions task/bq2bq/optimus-plugin-bq2bq.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ name: bq2bq
description: BigQuery to BigQuery transformation task
plugintype: task
pluginversion: {{.version}}
asset_parsers:
bq:
- filepath: "./query.sql"
destination_urn_template: "bigquery://<PROJECT>:<DATASET>.<TABLE>"
image: docker.io/gotocompany/optimus-task-bq2bq-executor:{{.version}}
entrypoint:
script: "python3 /opt/bumblebee/main.py"
Expand Down
Loading