Skip to content

Commit

Permalink
fix(workflow/syncer): upsert spec with existing secrets (#15655)
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 authored Dec 12, 2024
1 parent 86ccd47 commit 771151b
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 2 deletions.
7 changes: 5 additions & 2 deletions core/services/workflows/syncer/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,13 @@ func (orm *orm) UpsertWorkflowSpecWithSecrets(
status = EXCLUDED.status,
binary_url = EXCLUDED.binary_url,
config_url = EXCLUDED.config_url,
secrets_id = EXCLUDED.secrets_id,
created_at = EXCLUDED.created_at,
updated_at = EXCLUDED.updated_at,
spec_type = EXCLUDED.spec_type
spec_type = EXCLUDED.spec_type,
secrets_id = CASE
WHEN workflow_specs.secrets_id IS NULL THEN EXCLUDED.secrets_id
ELSE workflow_specs.secrets_id
END
RETURNING id
`

Expand Down
83 changes: 83 additions & 0 deletions core/services/workflows/syncer/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,86 @@ func Test_GetContentsByWorkflowID_SecretsProvidedButEmpty(t *testing.T) {
_, _, err = orm.GetContentsByWorkflowID(ctx, workflowID)
require.ErrorIs(t, err, ErrEmptySecrets)
}

func Test_UpsertWorkflowSpecWithSecrets(t *testing.T) {
db := pgtest.NewSqlxDB(t)
ctx := testutils.Context(t)
lggr := logger.TestLogger(t)
orm := &orm{ds: db, lggr: lggr}

t.Run("inserts new spec and new secrets", func(t *testing.T) {
giveURL := "https://example.com"
giveBytes, err := crypto.Keccak256([]byte(giveURL))
require.NoError(t, err)
giveHash := hex.EncodeToString(giveBytes)
giveContent := "some contents"

spec := &job.WorkflowSpec{
Workflow: "test_workflow",
Config: "test_config",
WorkflowID: "cid-123",
WorkflowOwner: "owner-123",
WorkflowName: "Test Workflow",
Status: job.WorkflowSpecStatusActive,
BinaryURL: "http://example.com/binary",
ConfigURL: "http://example.com/config",
CreatedAt: time.Now(),
SpecType: job.WASMFile,
}

_, err = orm.UpsertWorkflowSpecWithSecrets(ctx, spec, giveURL, giveHash, giveContent)
require.NoError(t, err)

// Verify the record exists in the database
var dbSpec job.WorkflowSpec
err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2`, spec.WorkflowOwner, spec.WorkflowName)
require.NoError(t, err)
require.Equal(t, spec.Workflow, dbSpec.Workflow)

// Verify the secrets exists in the database
contents, err := orm.GetContents(ctx, giveURL)
require.NoError(t, err)
require.Equal(t, giveContent, contents)
})

t.Run("updates existing spec and secrets", func(t *testing.T) {
giveURL := "https://example.com"
giveBytes, err := crypto.Keccak256([]byte(giveURL))
require.NoError(t, err)
giveHash := hex.EncodeToString(giveBytes)
giveContent := "some contents"

spec := &job.WorkflowSpec{
Workflow: "test_workflow",
Config: "test_config",
WorkflowID: "cid-123",
WorkflowOwner: "owner-123",
WorkflowName: "Test Workflow",
Status: job.WorkflowSpecStatusActive,
BinaryURL: "http://example.com/binary",
ConfigURL: "http://example.com/config",
CreatedAt: time.Now(),
SpecType: job.WASMFile,
}

_, err = orm.UpsertWorkflowSpecWithSecrets(ctx, spec, giveURL, giveHash, giveContent)
require.NoError(t, err)

// Update the status
spec.Status = job.WorkflowSpecStatusPaused

_, err = orm.UpsertWorkflowSpecWithSecrets(ctx, spec, giveURL, giveHash, "new contents")
require.NoError(t, err)

// Verify the record is updated in the database
var dbSpec job.WorkflowSpec
err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2`, spec.WorkflowOwner, spec.WorkflowName)
require.NoError(t, err)
require.Equal(t, spec.Config, dbSpec.Config)

// Verify the secrets is updated in the database
contents, err := orm.GetContents(ctx, giveURL)
require.NoError(t, err)
require.Equal(t, "new contents", contents)
})
}

0 comments on commit 771151b

Please sign in to comment.