diff --git a/core/services/job/models.go b/core/services/job/models.go index 26d563c7ac8..423a297c8da 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -869,30 +869,21 @@ const ( DefaultSpecType = "" ) -type WorkflowSpecStatus string - -const ( - WorkflowSpecStatusActive WorkflowSpecStatus = "active" - WorkflowSpecStatusPaused WorkflowSpecStatus = "paused" - WorkflowSpecStatusDefault WorkflowSpecStatus = "" -) - type WorkflowSpec struct { ID int32 `toml:"-"` Workflow string `toml:"workflow"` // the raw representation of the workflow Config string `toml:"config" db:"config"` // the raw representation of the config // fields derived from the yaml spec, used for indexing the database // note: i tried to make these private, but translating them to the database seems to require them to be public - WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow. - WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow. - WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow. - Status WorkflowSpecStatus `db:"status"` - BinaryURL string `db:"binary_url"` - ConfigURL string `db:"config_url"` - SecretsID sql.NullInt64 `db:"secrets_id"` - CreatedAt time.Time `toml:"-"` - UpdatedAt time.Time `toml:"-"` - SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"` + WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow. + WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow. + WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow. + BinaryURL string `db:"binary_url"` + ConfigURL string `db:"config_url"` + SecretsID sql.NullInt64 `db:"secrets_id"` + CreatedAt time.Time `toml:"-"` + UpdatedAt time.Time `toml:"-"` + SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"` sdkWorkflow *sdk.WorkflowSpec rawSpec []byte config []byte diff --git a/core/services/workflows/syncer/mocks/orm.go b/core/services/workflows/syncer/mocks/orm.go index 2bb116cba4f..19c459fa0ee 100644 --- a/core/services/workflows/syncer/mocks/orm.go +++ b/core/services/workflows/syncer/mocks/orm.go @@ -81,50 +81,59 @@ func (_c *ORM_Create_Call) RunAndReturn(run func(context.Context, string, string return _c } -// DeleteWorkflowSpec provides a mock function with given fields: ctx, owner, name -func (_m *ORM) DeleteWorkflowSpec(ctx context.Context, owner string, name string) error { - ret := _m.Called(ctx, owner, name) +// CreateWorkflowSpec provides a mock function with given fields: ctx, spec +func (_m *ORM) CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { + ret := _m.Called(ctx, spec) if len(ret) == 0 { - panic("no return value specified for DeleteWorkflowSpec") + panic("no return value specified for CreateWorkflowSpec") } - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, owner, name) + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec) (int64, error)); ok { + return rf(ctx, spec) + } + if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec) int64); ok { + r0 = rf(ctx, spec) } else { - r0 = ret.Error(0) + r0 = ret.Get(0).(int64) } - return r0 + if rf, ok := ret.Get(1).(func(context.Context, *job.WorkflowSpec) error); ok { + r1 = rf(ctx, spec) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } -// ORM_DeleteWorkflowSpec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteWorkflowSpec' -type ORM_DeleteWorkflowSpec_Call struct { +// ORM_CreateWorkflowSpec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateWorkflowSpec' +type ORM_CreateWorkflowSpec_Call struct { *mock.Call } -// DeleteWorkflowSpec is a helper method to define mock.On call +// CreateWorkflowSpec is a helper method to define mock.On call // - ctx context.Context -// - owner string -// - name string -func (_e *ORM_Expecter) DeleteWorkflowSpec(ctx interface{}, owner interface{}, name interface{}) *ORM_DeleteWorkflowSpec_Call { - return &ORM_DeleteWorkflowSpec_Call{Call: _e.mock.On("DeleteWorkflowSpec", ctx, owner, name)} +// - spec *job.WorkflowSpec +func (_e *ORM_Expecter) CreateWorkflowSpec(ctx interface{}, spec interface{}) *ORM_CreateWorkflowSpec_Call { + return &ORM_CreateWorkflowSpec_Call{Call: _e.mock.On("CreateWorkflowSpec", ctx, spec)} } -func (_c *ORM_DeleteWorkflowSpec_Call) Run(run func(ctx context.Context, owner string, name string)) *ORM_DeleteWorkflowSpec_Call { +func (_c *ORM_CreateWorkflowSpec_Call) Run(run func(ctx context.Context, spec *job.WorkflowSpec)) *ORM_CreateWorkflowSpec_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string)) + run(args[0].(context.Context), args[1].(*job.WorkflowSpec)) }) return _c } -func (_c *ORM_DeleteWorkflowSpec_Call) Return(_a0 error) *ORM_DeleteWorkflowSpec_Call { - _c.Call.Return(_a0) +func (_c *ORM_CreateWorkflowSpec_Call) Return(_a0 int64, _a1 error) *ORM_CreateWorkflowSpec_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *ORM_DeleteWorkflowSpec_Call) RunAndReturn(run func(context.Context, string, string) error) *ORM_DeleteWorkflowSpec_Call { +func (_c *ORM_CreateWorkflowSpec_Call) RunAndReturn(run func(context.Context, *job.WorkflowSpec) (int64, error)) *ORM_CreateWorkflowSpec_Call { _c.Call.Return(run) return _c } @@ -416,66 +425,6 @@ func (_c *ORM_GetSecretsURLHash_Call) RunAndReturn(run func([]byte, []byte) ([]b return _c } -// GetWorkflowSpec provides a mock function with given fields: ctx, owner, name -func (_m *ORM) GetWorkflowSpec(ctx context.Context, owner string, name string) (*job.WorkflowSpec, error) { - ret := _m.Called(ctx, owner, name) - - if len(ret) == 0 { - panic("no return value specified for GetWorkflowSpec") - } - - var r0 *job.WorkflowSpec - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) (*job.WorkflowSpec, error)); ok { - return rf(ctx, owner, name) - } - if rf, ok := ret.Get(0).(func(context.Context, string, string) *job.WorkflowSpec); ok { - r0 = rf(ctx, owner, name) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*job.WorkflowSpec) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { - r1 = rf(ctx, owner, name) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_GetWorkflowSpec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetWorkflowSpec' -type ORM_GetWorkflowSpec_Call struct { - *mock.Call -} - -// GetWorkflowSpec is a helper method to define mock.On call -// - ctx context.Context -// - owner string -// - name string -func (_e *ORM_Expecter) GetWorkflowSpec(ctx interface{}, owner interface{}, name interface{}) *ORM_GetWorkflowSpec_Call { - return &ORM_GetWorkflowSpec_Call{Call: _e.mock.On("GetWorkflowSpec", ctx, owner, name)} -} - -func (_c *ORM_GetWorkflowSpec_Call) Run(run func(ctx context.Context, owner string, name string)) *ORM_GetWorkflowSpec_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string)) - }) - return _c -} - -func (_c *ORM_GetWorkflowSpec_Call) Return(_a0 *job.WorkflowSpec, _a1 error) *ORM_GetWorkflowSpec_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_GetWorkflowSpec_Call) RunAndReturn(run func(context.Context, string, string) (*job.WorkflowSpec, error)) *ORM_GetWorkflowSpec_Call { - _c.Call.Return(run) - return _c -} - // Update provides a mock function with given fields: ctx, secretsURL, contents func (_m *ORM) Update(ctx context.Context, secretsURL string, contents string) (int64, error) { ret := _m.Called(ctx, secretsURL, contents) @@ -534,63 +483,6 @@ func (_c *ORM_Update_Call) RunAndReturn(run func(context.Context, string, string return _c } -// UpsertWorkflowSpec provides a mock function with given fields: ctx, spec -func (_m *ORM) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { - ret := _m.Called(ctx, spec) - - if len(ret) == 0 { - panic("no return value specified for UpsertWorkflowSpec") - } - - var r0 int64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec) (int64, error)); ok { - return rf(ctx, spec) - } - if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec) int64); ok { - r0 = rf(ctx, spec) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(context.Context, *job.WorkflowSpec) error); ok { - r1 = rf(ctx, spec) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_UpsertWorkflowSpec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertWorkflowSpec' -type ORM_UpsertWorkflowSpec_Call struct { - *mock.Call -} - -// UpsertWorkflowSpec is a helper method to define mock.On call -// - ctx context.Context -// - spec *job.WorkflowSpec -func (_e *ORM_Expecter) UpsertWorkflowSpec(ctx interface{}, spec interface{}) *ORM_UpsertWorkflowSpec_Call { - return &ORM_UpsertWorkflowSpec_Call{Call: _e.mock.On("UpsertWorkflowSpec", ctx, spec)} -} - -func (_c *ORM_UpsertWorkflowSpec_Call) Run(run func(ctx context.Context, spec *job.WorkflowSpec)) *ORM_UpsertWorkflowSpec_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*job.WorkflowSpec)) - }) - return _c -} - -func (_c *ORM_UpsertWorkflowSpec_Call) Return(_a0 int64, _a1 error) *ORM_UpsertWorkflowSpec_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_UpsertWorkflowSpec_Call) RunAndReturn(run func(context.Context, *job.WorkflowSpec) (int64, error)) *ORM_UpsertWorkflowSpec_Call { - _c.Call.Return(run) - return _c -} - // NewORM creates a new instance of ORM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewORM(t interface { diff --git a/core/services/workflows/syncer/orm.go b/core/services/workflows/syncer/orm.go index 4a5be9d1a58..d43dbe09b78 100644 --- a/core/services/workflows/syncer/orm.go +++ b/core/services/workflows/syncer/orm.go @@ -2,8 +2,7 @@ package syncer import ( "context" - "database/sql" - "time" + "errors" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -34,15 +33,7 @@ type WorkflowSecretsDS interface { } type WorkflowSpecsDS interface { - // UpsertWorkflowSpec inserts or updates a workflow spec. Updates on conflict of workflow name - // and owner - UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) - - // GetWorkflowSpec returns the workflow spec for the given owner and name. - GetWorkflowSpec(ctx context.Context, owner, name string) (*job.WorkflowSpec, error) - - // DeleteWorkflowSpec deletes the workflow spec for the given owner and name. - DeleteWorkflowSpec(ctx context.Context, owner, name string) error + CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) } type ORM interface { @@ -158,104 +149,6 @@ func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) { return crypto.Keccak256(append(owner, secretsURL...)) } -func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { - var id int64 - - query := ` - INSERT INTO workflow_specs ( - workflow, - config, - workflow_id, - workflow_owner, - workflow_name, - status, - binary_url, - config_url, - secrets_id, - created_at, - updated_at, - spec_type - ) VALUES ( - :workflow, - :config, - :workflow_id, - :workflow_owner, - :workflow_name, - :status, - :binary_url, - :config_url, - :secrets_id, - :created_at, - :updated_at, - :spec_type - ) ON CONFLICT (workflow_owner, workflow_name) DO UPDATE - SET - workflow = EXCLUDED.workflow, - config = EXCLUDED.config, - workflow_id = EXCLUDED.workflow_id, - workflow_owner = EXCLUDED.workflow_owner, - workflow_name = EXCLUDED.workflow_name, - status = EXCLUDED.status, - binary_url = EXCLUDED.binary_url, - config_url = EXCLUDED.config_url, - secrets_id = EXCLUDED.secrets_id, - created_at = EXCLUDED.created_at, - updated_at = EXCLUDED.updated_at, - spec_type = EXCLUDED.spec_type - RETURNING id - ` - - stmt, err := orm.ds.PrepareNamedContext(ctx, query) - if err != nil { - return 0, err - } - defer stmt.Close() - - spec.UpdatedAt = time.Now() - err = stmt.QueryRowxContext(ctx, spec).Scan(&id) - - if err != nil { - return 0, err - } - - return id, nil -} - -func (orm *orm) GetWorkflowSpec(ctx context.Context, owner, name string) (*job.WorkflowSpec, error) { - query := ` - SELECT * - FROM workflow_specs - WHERE workflow_owner = $1 AND workflow_name = $2 - ` - - var spec job.WorkflowSpec - err := orm.ds.GetContext(ctx, &spec, query, owner, name) - if err != nil { - return nil, err - } - - return &spec, nil -} - -func (orm *orm) DeleteWorkflowSpec(ctx context.Context, owner, name string) error { - query := ` - DELETE FROM workflow_specs - WHERE workflow_owner = $1 AND workflow_name = $2 - ` - - result, err := orm.ds.ExecContext(ctx, query, owner, name) - if err != nil { - return err - } - - rowsAffected, err := result.RowsAffected() - if err != nil { - return err - } - - if rowsAffected == 0 { - return sql.ErrNoRows // No spec deleted - } - - return nil +func (orm *orm) CreateWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { + return 0, errors.New("not implemented") } diff --git a/core/services/workflows/syncer/orm_test.go b/core/services/workflows/syncer/orm_test.go index 1be4e54f472..8b9f685bb52 100644 --- a/core/services/workflows/syncer/orm_test.go +++ b/core/services/workflows/syncer/orm_test.go @@ -1,15 +1,12 @@ package syncer import ( - "database/sql" "encoding/hex" "testing" - "time" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" "github.com/stretchr/testify/assert" @@ -54,145 +51,3 @@ func TestWorkflowArtifactsORM_GetAndUpdate(t *testing.T) { require.NoError(t, err) assert.Equal(t, "new contents", contents) } - -func Test_UpsertWorkflowSpec(t *testing.T) { - db := pgtest.NewSqlxDB(t) - ctx := testutils.Context(t) - lggr := logger.TestLogger(t) - orm := &orm{ds: db, lggr: lggr} - - t.Run("inserts new spec", func(t *testing.T) { - spec := &job.WorkflowSpec{ - Workflow: "test_workflow", - Config: "test_config", - WorkflowID: "cid-123", - WorkflowOwner: "owner-123", - WorkflowName: "Test Workflow", - Status: job.WorkflowSpecStatusActive, - BinaryURL: "http://example.com/binary", - ConfigURL: "http://example.com/config", - CreatedAt: time.Now(), - SpecType: job.WASMFile, - } - - _, err := orm.UpsertWorkflowSpec(ctx, spec) - require.NoError(t, err) - - // Verify the record exists in the database - var dbSpec job.WorkflowSpec - err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2`, spec.WorkflowOwner, spec.WorkflowName) - require.NoError(t, err) - require.Equal(t, spec.Workflow, dbSpec.Workflow) - }) - - t.Run("updates existing spec", func(t *testing.T) { - spec := &job.WorkflowSpec{ - Workflow: "test_workflow", - Config: "test_config", - WorkflowID: "cid-123", - WorkflowOwner: "owner-123", - WorkflowName: "Test Workflow", - Status: job.WorkflowSpecStatusActive, - BinaryURL: "http://example.com/binary", - ConfigURL: "http://example.com/config", - CreatedAt: time.Now(), - SpecType: job.WASMFile, - } - - _, err := orm.UpsertWorkflowSpec(ctx, spec) - require.NoError(t, err) - - // Update the status - spec.Status = job.WorkflowSpecStatusPaused - - _, err = orm.UpsertWorkflowSpec(ctx, spec) - require.NoError(t, err) - - // Verify the record is updated in the database - var dbSpec job.WorkflowSpec - err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2`, spec.WorkflowOwner, spec.WorkflowName) - require.NoError(t, err) - require.Equal(t, spec.Config, dbSpec.Config) - require.Equal(t, spec.Status, dbSpec.Status) - }) -} - -func Test_DeleteWorkflowSpec(t *testing.T) { - db := pgtest.NewSqlxDB(t) - ctx := testutils.Context(t) - lggr := logger.TestLogger(t) - orm := &orm{ds: db, lggr: lggr} - - t.Run("deletes a workflow spec", func(t *testing.T) { - spec := &job.WorkflowSpec{ - Workflow: "test_workflow", - Config: "test_config", - WorkflowID: "cid-123", - WorkflowOwner: "owner-123", - WorkflowName: "Test Workflow", - Status: job.WorkflowSpecStatusActive, - BinaryURL: "http://example.com/binary", - ConfigURL: "http://example.com/config", - CreatedAt: time.Now(), - SpecType: job.WASMFile, - } - - id, err := orm.UpsertWorkflowSpec(ctx, spec) - require.NoError(t, err) - require.NotZero(t, id) - - err = orm.DeleteWorkflowSpec(ctx, spec.WorkflowOwner, spec.WorkflowName) - require.NoError(t, err) - - // Verify the record is deleted from the database - var dbSpec job.WorkflowSpec - err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE id = $1`, id) - require.Error(t, err) - require.Equal(t, sql.ErrNoRows, err) - }) - - t.Run("fails if no workflow spec exists", func(t *testing.T) { - err := orm.DeleteWorkflowSpec(ctx, "owner-123", "Test Workflow") - require.Error(t, err) - require.Equal(t, sql.ErrNoRows, err) - }) -} - -func Test_GetWorkflowSpec(t *testing.T) { - db := pgtest.NewSqlxDB(t) - ctx := testutils.Context(t) - lggr := logger.TestLogger(t) - orm := &orm{ds: db, lggr: lggr} - - t.Run("gets a workflow spec", func(t *testing.T) { - spec := &job.WorkflowSpec{ - Workflow: "test_workflow", - Config: "test_config", - WorkflowID: "cid-123", - WorkflowOwner: "owner-123", - WorkflowName: "Test Workflow", - Status: job.WorkflowSpecStatusActive, - BinaryURL: "http://example.com/binary", - ConfigURL: "http://example.com/config", - CreatedAt: time.Now(), - SpecType: job.WASMFile, - } - - id, err := orm.UpsertWorkflowSpec(ctx, spec) - require.NoError(t, err) - require.NotZero(t, id) - - dbSpec, err := orm.GetWorkflowSpec(ctx, spec.WorkflowOwner, spec.WorkflowName) - require.NoError(t, err) - require.Equal(t, spec.Workflow, dbSpec.Workflow) - - err = orm.DeleteWorkflowSpec(ctx, spec.WorkflowOwner, spec.WorkflowName) - require.NoError(t, err) - }) - - t.Run("fails if no workflow spec exists", func(t *testing.T) { - dbSpec, err := orm.GetWorkflowSpec(ctx, "owner-123", "Test Workflow") - require.Error(t, err) - require.Nil(t, dbSpec) - }) -} diff --git a/core/store/migrate/migrations/0260_add_status_workflow_spec.sql b/core/store/migrate/migrations/0260_add_status_workflow_spec.sql deleted file mode 100644 index 66c38eef2f7..00000000000 --- a/core/store/migrate/migrations/0260_add_status_workflow_spec.sql +++ /dev/null @@ -1,9 +0,0 @@ --- +goose Up --- Add a `status` column to the `workflow_specs` table. -ALTER TABLE workflow_specs -ADD COLUMN status TEXT DEFAULT '' NOT NULL; - --- +goose Down --- Remove the `status` column from the `workflow_specs` table. -ALTER TABLE workflow_specs -DROP COLUMN status; \ No newline at end of file