diff --git a/core/services/job/job_orm_test.go b/core/services/job/job_orm_test.go index 9db99fcd48d..fd54a39d431 100644 --- a/core/services/job/job_orm_test.go +++ b/core/services/job/job_orm_test.go @@ -45,6 +45,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/relay" "github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" "github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs" "github.com/smartcontractkit/chainlink/v2/core/utils/testutils/heavyweight" ) @@ -1873,6 +1874,7 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) { c.ID = s.ID c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow99", addr1) // insert with mismatched name c.SpecType = job.YamlSpec + c.SecretsID = s.SecretsID return mustInsertWFJob(t, o, &c) }, }, @@ -1892,6 +1894,7 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) { var c job.WorkflowSpec c.ID = s.ID c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow03", addr2) // insert with mismatched owner + c.SecretsID = s.SecretsID return mustInsertWFJob(t, o, &c) }, }, @@ -1899,22 +1902,32 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) { }, } - for _, tt := range tests { + for i, tt := range tests { t.Run(tt.name, func(t *testing.T) { + ctx := testutils.Context(t) ks := cltest.NewKeyStore(t, tt.fields.ds) + + secretsORM := syncer.NewWorkflowRegistryDS(tt.fields.ds, logger.TestLogger(t)) + + sid, err := secretsORM.Create(ctx, "some-url.com", fmt.Sprintf("some-hash-%d", i), "some-contentz") + require.NoError(t, err) + tt.args.spec.SecretsID = sql.NullInt64{Int64: sid, Valid: true} + pipelineORM := pipeline.NewORM(tt.fields.ds, logger.TestLogger(t), configtest.NewTestGeneralConfig(t).JobPipeline().MaxSuccessfulRuns()) bridgesORM := bridges.NewORM(tt.fields.ds) o := NewTestORM(t, tt.fields.ds, pipelineORM, bridgesORM, ks) + var wantJobID int32 if tt.args.before != nil { wantJobID = tt.args.before(t, o, tt.args.spec) } - ctx := testutils.Context(t) + gotJ, err := o.FindJobIDByWorkflow(ctx, *tt.args.spec) if (err != nil) != tt.wantErr { t.Errorf("orm.FindJobByWorkflow() error = %v, wantErr %v", err, tt.wantErr) return } + if err == nil { assert.Equal(t, wantJobID, gotJ, "mismatch job id") } @@ -1936,25 +1949,36 @@ func Test_ORM_FindJobByWorkflow_Multiple(t *testing.T) { bridges.NewORM(db), cltest.NewKeyStore(t, db)) ctx := testutils.Context(t) + secretsORM := syncer.NewWorkflowRegistryDS(db, logger.TestLogger(t)) + + var sids []int64 + for i := 0; i < 3; i++ { + sid, err := secretsORM.Create(ctx, "some-url.com", fmt.Sprintf("some-hash-%d", i), "some-contentz") + require.NoError(t, err) + sids = append(sids, sid) + } wfYaml1 := pkgworkflows.WFYamlSpec(t, "workflow00", addr1) s1 := job.WorkflowSpec{ - Workflow: wfYaml1, - SpecType: job.YamlSpec, + Workflow: wfYaml1, + SpecType: job.YamlSpec, + SecretsID: sql.NullInt64{Int64: sids[0], Valid: true}, } wantJobID1 := mustInsertWFJob(t, o, &s1) wfYaml2 := pkgworkflows.WFYamlSpec(t, "workflow01", addr1) s2 := job.WorkflowSpec{ - Workflow: wfYaml2, - SpecType: job.YamlSpec, + Workflow: wfYaml2, + SpecType: job.YamlSpec, + SecretsID: sql.NullInt64{Int64: sids[1], Valid: true}, } wantJobID2 := mustInsertWFJob(t, o, &s2) wfYaml3 := pkgworkflows.WFYamlSpec(t, "workflow00", addr2) s3 := job.WorkflowSpec{ - Workflow: wfYaml3, - SpecType: job.YamlSpec, + Workflow: wfYaml3, + SpecType: job.YamlSpec, + SecretsID: sql.NullInt64{Int64: sids[2], Valid: true}, } wantJobID3 := mustInsertWFJob(t, o, &s3) @@ -1992,7 +2016,7 @@ func mustInsertWFJob(t *testing.T, orm job.ORM, s *job.WorkflowSpec) int32 { } err = orm.CreateJob(ctx, &j) - require.NoError(t, err, "failed to insert job with wf spec %v %s", s, s.Workflow) + require.NoError(t, err, "failed to insert job with wf spec %+v %s", s, err) return j.ID } diff --git a/core/services/job/models.go b/core/services/job/models.go index 84ff2f5d7f1..423a297c8da 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -2,6 +2,7 @@ package job import ( "context" + "database/sql" "database/sql/driver" "encoding/json" "fmt" @@ -879,7 +880,7 @@ type WorkflowSpec struct { WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow. BinaryURL string `db:"binary_url"` ConfigURL string `db:"config_url"` - SecretsID string `db:"secrets_id"` + SecretsID sql.NullInt64 `db:"secrets_id"` CreatedAt time.Time `toml:"-"` UpdatedAt time.Time `toml:"-"` SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"` diff --git a/core/services/job/orm.go b/core/services/job/orm.go index 5e8b5ce127f..92ec9b2e83c 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -433,8 +433,8 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error { case Stream: // 'stream' type has no associated spec, nothing to do here case Workflow: - sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at, spec_type, config) - VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW(), :spec_type, :config) + sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, binary_url, config_url, secrets_id, created_at, updated_at, spec_type, config) + VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, :binary_url, :config_url, :secrets_id, NOW(), NOW(), :spec_type, :config) RETURNING id;` specID, err := tx.prepareQuerySpecID(ctx, sql, jb.WorkflowSpec) if err != nil { diff --git a/core/store/migrate/migrations/0259_add_workflow_secrets.sql b/core/store/migrate/migrations/0259_add_workflow_secrets.sql index d6f82033b41..fb76d945571 100644 --- a/core/store/migrate/migrations/0259_add_workflow_secrets.sql +++ b/core/store/migrate/migrations/0259_add_workflow_secrets.sql @@ -13,8 +13,8 @@ CREATE INDEX idx_secrets_url ON workflow_secrets(secrets_url); -- Alter the workflow_specs table ALTER TABLE workflow_specs -ADD COLUMN binary_url TEXT, -ADD COLUMN config_url TEXT, +ADD COLUMN binary_url TEXT DEFAULT '', +ADD COLUMN config_url TEXT DEFAULT '', ADD COLUMN secrets_id INT UNIQUE REFERENCES workflow_secrets(id) ON DELETE CASCADE; -- Alter the config column type