Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CAPPL-394] Some database changes #15681

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 75 additions & 59 deletions core/services/workflows/syncer/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,65 +210,73 @@ func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) {

func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) {
var id int64
err := sqlutil.TransactDataSource(ctx, orm.ds, nil, func(tx sqlutil.DataSource) error {
txErr := tx.QueryRowxContext(
ctx,
`DELETE FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2 AND workflow_id != $3`,
spec.WorkflowOwner,
spec.WorkflowName,
spec.WorkflowID,
).Scan(nil)
if txErr != nil && !errors.Is(txErr, sql.ErrNoRows) {
return fmt.Errorf("failed to clean up previous workflow specs: %w", txErr)
}

query := `
INSERT INTO workflow_specs (
workflow,
config,
workflow_id,
workflow_owner,
workflow_name,
status,
binary_url,
config_url,
secrets_id,
created_at,
updated_at,
spec_type
) VALUES (
:workflow,
:config,
:workflow_id,
:workflow_owner,
:workflow_name,
:status,
:binary_url,
:config_url,
:secrets_id,
:created_at,
:updated_at,
:spec_type
) ON CONFLICT (workflow_owner, workflow_name) DO UPDATE
SET
workflow = EXCLUDED.workflow,
config = EXCLUDED.config,
workflow_id = EXCLUDED.workflow_id,
workflow_owner = EXCLUDED.workflow_owner,
workflow_name = EXCLUDED.workflow_name,
status = EXCLUDED.status,
binary_url = EXCLUDED.binary_url,
config_url = EXCLUDED.config_url,
secrets_id = EXCLUDED.secrets_id,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
spec_type = EXCLUDED.spec_type
RETURNING id
`

stmt, err := orm.ds.PrepareNamedContext(ctx, query)
if err != nil {
return 0, err
}
defer stmt.Close()
query := `
INSERT INTO workflow_specs (
workflow,
config,
workflow_id,
workflow_owner,
workflow_name,
status,
binary_url,
config_url,
secrets_id,
created_at,
updated_at,
spec_type
) VALUES (
:workflow,
:config,
:workflow_id,
:workflow_owner,
:workflow_name,
:status,
:binary_url,
:config_url,
:secrets_id,
:created_at,
:updated_at,
:spec_type
) ON CONFLICT (workflow_owner, workflow_name) DO UPDATE
SET
workflow = EXCLUDED.workflow,
config = EXCLUDED.config,
workflow_id = EXCLUDED.workflow_id,
workflow_owner = EXCLUDED.workflow_owner,
workflow_name = EXCLUDED.workflow_name,
status = EXCLUDED.status,
binary_url = EXCLUDED.binary_url,
config_url = EXCLUDED.config_url,
secrets_id = EXCLUDED.secrets_id,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
spec_type = EXCLUDED.spec_type
RETURNING id
`

spec.UpdatedAt = time.Now()
err = stmt.QueryRowxContext(ctx, spec).Scan(&id)
stmt, err := orm.ds.PrepareNamedContext(ctx, query)
if err != nil {
return err
}
defer stmt.Close()

if err != nil {
return 0, err
}
spec.UpdatedAt = time.Now()
return stmt.QueryRowxContext(ctx, spec).Scan(&id)
})

return id, nil
return id, err
}

func (orm *orm) UpsertWorkflowSpecWithSecrets(
Expand All @@ -293,6 +301,17 @@ func (orm *orm) UpsertWorkflowSpecWithSecrets(
return fmt.Errorf("failed to create workflow secrets: %w", txErr)
}

txErr = tx.QueryRowxContext(
ctx,
`DELETE FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2 AND workflow_id != $3`,
spec.WorkflowOwner,
spec.WorkflowName,
spec.WorkflowID,
).Scan(nil)
if txErr != nil && !errors.Is(txErr, sql.ErrNoRows) {
return fmt.Errorf("failed to clean up previous workflow specs: %w", txErr)
}

spec.SecretsID = sql.NullInt64{Int64: sid, Valid: true}

query := `
Expand Down Expand Up @@ -335,10 +354,7 @@ func (orm *orm) UpsertWorkflowSpecWithSecrets(
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
spec_type = EXCLUDED.spec_type,
secrets_id = CASE
WHEN workflow_specs.secrets_id IS NULL THEN EXCLUDED.secrets_id
ELSE workflow_specs.secrets_id
END
secrets_id = EXCLUDED.secrets_id
RETURNING id
`

Expand Down
42 changes: 42 additions & 0 deletions core/services/workflows/syncer/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"testing"
"time"

"github.com/google/uuid"

"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
Expand Down Expand Up @@ -372,4 +374,44 @@ func Test_UpsertWorkflowSpecWithSecrets(t *testing.T) {
require.NoError(t, err)
require.Equal(t, "new contents", contents)
})

t.Run("updates existing spec and secrets if spec has executions", func(t *testing.T) {
giveURL := "https://example.com"
giveBytes, err := crypto.Keccak256([]byte(giveURL))
require.NoError(t, err)
giveHash := hex.EncodeToString(giveBytes)
giveContent := "some contents"

spec := &job.WorkflowSpec{
Workflow: "test_workflow",
Config: "test_config",
WorkflowID: "cid-123",
cedric-cordenier marked this conversation as resolved.
Show resolved Hide resolved
WorkflowOwner: "owner-123",
WorkflowName: "Test Workflow",
Status: job.WorkflowSpecStatusActive,
BinaryURL: "http://example.com/binary",
ConfigURL: "http://example.com/config",
CreatedAt: time.Now(),
SpecType: job.WASMFile,
}

_, err = orm.UpsertWorkflowSpecWithSecrets(ctx, spec, giveURL, giveHash, giveContent)
require.NoError(t, err)

_, err = db.ExecContext(
ctx,
`INSERT INTO workflow_executions (id, workflow_id, status, created_at) VALUES ($1, $2, $3, $4)`,
uuid.New().String(),
"cid-123",
"started",
time.Now(),
)
require.NoError(t, err)

// Update the status
spec.WorkflowID = "cid-456"

_, err = orm.UpsertWorkflowSpecWithSecrets(ctx, spec, giveURL, giveHash, "new contents")
require.NoError(t, err)
cedric-cordenier marked this conversation as resolved.
Show resolved Hide resolved
})
}
2 changes: 1 addition & 1 deletion core/services/workflows/syncer/workflow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ func (w *workflowRegistry) readRegistryEvents(ctx context.Context, reader Contra
for _, event := range events {
err := w.handler.Handle(ctx, event.Event)
if err != nil {
w.lggr.Errorw("failed to handle event", "err", err)
w.lggr.Errorw("failed to handle event", "err", err, "type", event.Event.EventType)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ DROP INDEX IF EXISTS idx_secrets_url_hash;

-- Drop the workflow_artifacts table
DROP TABLE IF EXISTS workflow_secrets;
-- +goose StatementEnd
-- +goose StatementEnd

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
-- +goose Up
-- +goose StatementBegin
-- unique constraint on workflow_owner and workflow_name
ALTER TABLE workflow_specs DROP CONSTRAINT workflow_specs_secrets_id_key;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
ALTER TABLE workflow_specs ADD CONSTRAINT workflow_specs_secrets_id_key unique (secrets_id);
-- +goose StatementEnd
Loading