Skip to content

Commit

Permalink
fix: ignore destination (#176)
Browse files Browse the repository at this point in the history
  • Loading branch information
deryrahman committed Nov 3, 2023
1 parent e1e57da commit 1c0335e
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 1 deletion.
16 changes: 15 additions & 1 deletion plugin/plugin_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,21 @@ func (s PluginService) IdentifyUpstreams(ctx context.Context, taskName string, c
resourceURNs = append(resourceURNs, currentResourceURNs...)
}

return resourceURNs, me.ToErr()
// ignore destination urns
destinationURN, err := s.ConstructDestinationURN(ctx, taskName, compiledConfig)
if err != nil {
return nil, err
}
filteredResourceURNs := []string{}
for _, resourceURN := range resourceURNs {
if resourceURN == destinationURN {
s.l.Warn("ignore destination resource %s", resourceURN)
continue
}
filteredResourceURNs = append(filteredResourceURNs, resourceURN)
}

return filteredResourceURNs, me.ToErr()
}

func (s PluginService) ConstructDestinationURN(_ context.Context, taskName string, compiledConfig map[string]string) (string, error) {
Expand Down
35 changes: 35 additions & 0 deletions plugin/plugin_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,41 @@ func TestIdentifyUpstreams(t *testing.T) {
assert.NotEmpty(t, resourceURNs)
assert.Len(t, resourceURNs, 1)
})
t.Run("should generate clean dependencies without destination in it", func(t *testing.T) {
pluginGetter := new(PluginGetter)
defer pluginGetter.AssertExpectations(t)
upstreamIdentifierFactory := new(UpstreamIdentifierFactory)
defer upstreamIdentifierFactory.AssertExpectations(t)
evaluatorFactory := new(EvaluatorFactory)
defer evaluatorFactory.AssertExpectations(t)
evaluator := new(Evaluator)
defer evaluator.AssertExpectations(t)
upstreamIdentifier := new(UpstreamIdentifier)
defer upstreamIdentifier.AssertExpectations(t)

pluginYamlTestWithDestinationTemplate, err := yaml.NewPluginSpec("./yaml/tests/sample_plugin_with_parser_and_destination_template.yaml")
assert.NoError(t, err)
pluginTestWithDestinationTemplate := &p.Plugin{
YamlMod: pluginYamlTestWithDestinationTemplate,
}
pluginGetter.On("GetByName", mock.Anything).Return(pluginTestWithDestinationTemplate, nil)
evaluatorFactory.On("GetFileEvaluator", mock.Anything).Return(evaluator, nil)
upstreamIdentifierFactory.On("GetBQUpstreamIdentifier", ctx, mock.Anything, evaluator).Return(upstreamIdentifier, nil)
upstreamIdentifier.On("IdentifyResources", ctx, assets).Return([]string{"bigquery://proj:datas:tabl", "bigquery://projectA:datasetB.tableC"}, nil)
pluginService, err := plugin.NewPluginService(logger, pluginGetter, upstreamIdentifierFactory, evaluatorFactory)
assert.NoError(t, err)
assert.NotNil(t, pluginService)

configTask := map[string]string{}
configTask["BQ_SERVICE_ACCOUNT"] = "service_account_value"
configTask["PROJECT"] = "projectA"
configTask["DATASET"] = "datasetB"
configTask["TABLE"] = "tableC"
resourceURNs, err := pluginService.IdentifyUpstreams(ctx, taskName, configTask, assets)
assert.NoError(t, err)
assert.NotEmpty(t, resourceURNs)
assert.Len(t, resourceURNs, 1)
})
}

func TestConstructDestinationURN(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: bq2bqtest
description: Testing
plugintype: task
pluginversion: latest
asset_parsers:
bq:
- filepath: "./query.sql"
image: docker.io/goto/optimus-task-bq2bq-executor:latest
destination_urn_template: "bigquery://<PROJECT>:<DATASET>.<TABLE>"
entrypoint:
shell: "/bin/bash"
script: |-
sleep 100
sleep 150
questions:
- name: PROJECT
prompt: Project ID
regexp: ^[a-zA-Z0-9_\-]+$
minlength: 3

defaultconfig:
- name: TEST
value: "{{.test}}"

defaultassets:
- name: query.sql
value: Select * from "project.dataset.table";

0 comments on commit 1c0335e

Please sign in to comment.