diff --git a/.goreleaser.develop.yaml b/.goreleaser.develop.yaml index c633e22fe62..67c5e65c71f 100644 --- a/.goreleaser.develop.yaml +++ b/.goreleaser.develop.yaml @@ -70,6 +70,7 @@ dockers: extra_files: - tmp/libs - tmp/plugins + - tmp/workflows build_flag_templates: - --platform=linux/amd64 - --pull @@ -121,6 +122,7 @@ dockers: extra_files: - tmp/libs - tmp/plugins + - tmp/workflows build_flag_templates: - --platform=linux/arm64 - --pull @@ -149,6 +151,7 @@ dockers: - '{{ .Env.IMG_PRE }}/ccip:sha-{{ .ShortCommit }}-amd64' extra_files: - tmp/libs + - tmp/workflows - ccip/config build_flag_templates: - --platform=linux/amd64 @@ -176,6 +179,7 @@ dockers: extra_files: - tmp/libs - tmp/plugins + - tmp/workflows - ccip/config build_flag_templates: - --platform=linux/amd64 @@ -205,6 +209,7 @@ dockers: - '{{ .Env.IMG_PRE }}/ccip:sha-{{ .ShortCommit }}-arm64' extra_files: - tmp/libs + - tmp/workflows - ccip/config build_flag_templates: - --platform=linux/arm64 @@ -231,6 +236,7 @@ dockers: extra_files: - tmp/libs - tmp/plugins + - tmp/workflows - ccip/config build_flag_templates: - --platform=linux/arm64 diff --git a/.mockery.yaml b/.mockery.yaml index 70b7a9947f6..711d70f59e9 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -579,21 +579,6 @@ packages: github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer: interfaces: ORM: - github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer: - interfaces: - ORM: - ContractReader: - config: - mockname: "Mock{{ .InterfaceName }}" - filename: contract_reader_mock.go - inpackage: true - dir: "{{ .InterfaceDir }}" - Handler: - config: - mockname: "Mock{{ .InterfaceName }}" - filename: handler_mock.go - inpackage: true - dir: "{{ .InterfaceDir }}" github.com/smartcontractkit/chainlink/v2/core/capabilities/targets: interfaces: ContractValueGetter: \ No newline at end of file diff --git a/config.yaml b/config.yaml new file mode 100644 index 00000000000..e69de29bb2d diff --git a/contracts/gas-snapshots/ccip.gas-snapshot b/contracts/gas-snapshots/ccip.gas-snapshot index 487265ef580..e53d6a5ab3b 100644 --- a/contracts/gas-snapshots/ccip.gas-snapshot +++ b/contracts/gas-snapshots/ccip.gas-snapshot @@ -765,4 +765,4 @@ USDCTokenPool_releaseOrMint:test_TokenMaxCapacityExceeded_Revert() (gas: 47231) USDCTokenPool_releaseOrMint:test_UnlockingUSDCFailed_Revert() (gas: 95315) USDCTokenPool_setDomains:test_InvalidDomain_Revert() (gas: 66437) USDCTokenPool_setDomains:test_OnlyOwner_Revert() (gas: 11314) -USDCTokenPool_supportsInterface:test_SupportsInterface_Success() (gas: 10107) \ No newline at end of file +USDCTokenPool_supportsInterface:test_SupportsInterface_Success() (gas: 10107) diff --git a/core/capabilities/compute/compute.go b/core/capabilities/compute/compute.go index 32e43e8d62e..68f0d4d9121 100644 --- a/core/capabilities/compute/compute.go +++ b/core/capabilities/compute/compute.go @@ -308,6 +308,7 @@ func (c *Compute) createFetcher() func(ctx context.Context, req *wasmpb.FetchReq headersReq[k] = v.String() } + fmt.Printf("LEN: %s, %d", req.Body, len(req.Body)) payloadBytes, err := json.Marshal(ghcapabilities.Request{ URL: req.Url, Method: req.Method, diff --git a/core/chainlink.goreleaser.Dockerfile b/core/chainlink.goreleaser.Dockerfile index eb359376006..3f64a4585c4 100644 --- a/core/chainlink.goreleaser.Dockerfile +++ b/core/chainlink.goreleaser.Dockerfile @@ -20,6 +20,9 @@ COPY ./chainlink /usr/local/bin/ # Copy native libs if cgo is enabled COPY ./tmp/libs /usr/local/bin/libs +# Copy workflows +COPY ./tm[p]/workflow[s] /usr/local/workflows + # Copy plugins if exist and enable them # https://stackoverflow.com/questions/70096208/dockerfile-copy-folder-if-it-exists-conditional-copy/70096420#70096420 COPY ./tm[p]/plugin[s]/ /usr/local/bin/ diff --git a/core/cmd/shell_local.go b/core/cmd/shell_local.go index bead4ba5afd..63db51fd2e6 100644 --- a/core/cmd/shell_local.go +++ b/core/cmd/shell_local.go @@ -474,6 +474,12 @@ func (s *Shell) runNode(c *cli.Context) error { if err2 != nil { return errors.Wrap(err2, "failed to ensure workflow key") } + keys, err := app.GetKeyStore().Workflow().GetAll() + if err != nil { + lggr.Errorf("WorkflowPublicKey: failed to get keys") + } else { + lggr.Infof("WorkflowPublicKey: %s", keys[0].PublicKeyString()) + } } err2 := app.GetKeyStore().CSA().EnsureKey(rootCtx) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index fef741c8c9b..4bb81eb2db2 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -215,9 +215,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { // TODO: wire this up to config so we only instantiate it // if a workflow registry address is provided. - workflowRegistrySyncer := syncer.NewNullWorkflowRegistrySyncer() - srvcs = append(srvcs, workflowRegistrySyncer) - var externalPeerWrapper p2ptypes.PeerWrapper if cfg.Capabilities().Peering().Enabled() { var dispatcher remotetypes.Dispatcher @@ -475,6 +472,14 @@ func NewApplication(opts ApplicationOpts) (Application, error) { webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner() ) + workflowRegistrySyncer := &syncer.WorkflowRegistry{ + DS: opts.DS, + Logger: globalLogger.Named("WorkflowRegistrySyncer"), + Store: workflowORM, + Registry: opts.CapabilitiesRegistry, + } + srvcs = append(srvcs, workflowRegistrySyncer) + delegates[job.Workflow] = workflows.NewDelegate( globalLogger, opts.CapabilitiesRegistry, diff --git a/core/services/gateway/handlers/capabilities/handler.go b/core/services/gateway/handlers/capabilities/handler.go index 90bc2065edd..01f2db6dbd3 100644 --- a/core/services/gateway/handlers/capabilities/handler.go +++ b/core/services/gateway/handlers/capabilities/handler.go @@ -20,10 +20,9 @@ import ( const ( // NOTE: more methods will go here. HTTP trigger/action/target; etc. - MethodWebAPITarget = "web_api_target" - MethodWebAPITrigger = "web_api_trigger" - MethodComputeAction = "compute_action" - MethodWorkflowSyncer = "workflow_syncer" + MethodWebAPITarget = "web_api_target" + MethodWebAPITrigger = "web_api_trigger" + MethodComputeAction = "compute_action" ) type handler struct { @@ -77,6 +76,7 @@ func NewHandler(handlerConfig json.RawMessage, donConfig *config.DONConfig, don // returns message to be sent back to the capability node func (h *handler) sendHTTPMessageToClient(ctx context.Context, req network.HTTPRequest, msg *api.Message) (*api.Message, error) { var payload Response + fmt.Printf("req: %v, %d", req) resp, err := h.httpClient.Send(ctx, req) if err != nil { return nil, err diff --git a/core/services/gateway/network/httpclient.go b/core/services/gateway/network/httpclient.go index 4aecaaed3cd..d560981c88d 100644 --- a/core/services/gateway/network/httpclient.go +++ b/core/services/gateway/network/httpclient.go @@ -3,6 +3,7 @@ package network import ( "bytes" "context" + "fmt" "io" "net/http" "strings" @@ -67,6 +68,7 @@ func (c *httpClient) Send(ctx context.Context, req HTTPRequest) (*HTTPResponse, } defer resp.Body.Close() + fmt.Printf("bytes: %d", c.config.MaxResponseBytes) reader := http.MaxBytesReader(nil, resp.Body, int64(c.config.MaxResponseBytes)) body, err := io.ReadAll(reader) if err != nil { diff --git a/core/services/job/job_orm_test.go b/core/services/job/job_orm_test.go index fd54a39d431..9db99fcd48d 100644 --- a/core/services/job/job_orm_test.go +++ b/core/services/job/job_orm_test.go @@ -45,7 +45,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/relay" "github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" - "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" "github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs" "github.com/smartcontractkit/chainlink/v2/core/utils/testutils/heavyweight" ) @@ -1874,7 +1873,6 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) { c.ID = s.ID c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow99", addr1) // insert with mismatched name c.SpecType = job.YamlSpec - c.SecretsID = s.SecretsID return mustInsertWFJob(t, o, &c) }, }, @@ -1894,7 +1892,6 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) { var c job.WorkflowSpec c.ID = s.ID c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow03", addr2) // insert with mismatched owner - c.SecretsID = s.SecretsID return mustInsertWFJob(t, o, &c) }, }, @@ -1902,32 +1899,22 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) { }, } - for i, tt := range tests { + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - ctx := testutils.Context(t) ks := cltest.NewKeyStore(t, tt.fields.ds) - - secretsORM := syncer.NewWorkflowRegistryDS(tt.fields.ds, logger.TestLogger(t)) - - sid, err := secretsORM.Create(ctx, "some-url.com", fmt.Sprintf("some-hash-%d", i), "some-contentz") - require.NoError(t, err) - tt.args.spec.SecretsID = sql.NullInt64{Int64: sid, Valid: true} - pipelineORM := pipeline.NewORM(tt.fields.ds, logger.TestLogger(t), configtest.NewTestGeneralConfig(t).JobPipeline().MaxSuccessfulRuns()) bridgesORM := bridges.NewORM(tt.fields.ds) o := NewTestORM(t, tt.fields.ds, pipelineORM, bridgesORM, ks) - var wantJobID int32 if tt.args.before != nil { wantJobID = tt.args.before(t, o, tt.args.spec) } - + ctx := testutils.Context(t) gotJ, err := o.FindJobIDByWorkflow(ctx, *tt.args.spec) if (err != nil) != tt.wantErr { t.Errorf("orm.FindJobByWorkflow() error = %v, wantErr %v", err, tt.wantErr) return } - if err == nil { assert.Equal(t, wantJobID, gotJ, "mismatch job id") } @@ -1949,36 +1936,25 @@ func Test_ORM_FindJobByWorkflow_Multiple(t *testing.T) { bridges.NewORM(db), cltest.NewKeyStore(t, db)) ctx := testutils.Context(t) - secretsORM := syncer.NewWorkflowRegistryDS(db, logger.TestLogger(t)) - - var sids []int64 - for i := 0; i < 3; i++ { - sid, err := secretsORM.Create(ctx, "some-url.com", fmt.Sprintf("some-hash-%d", i), "some-contentz") - require.NoError(t, err) - sids = append(sids, sid) - } wfYaml1 := pkgworkflows.WFYamlSpec(t, "workflow00", addr1) s1 := job.WorkflowSpec{ - Workflow: wfYaml1, - SpecType: job.YamlSpec, - SecretsID: sql.NullInt64{Int64: sids[0], Valid: true}, + Workflow: wfYaml1, + SpecType: job.YamlSpec, } wantJobID1 := mustInsertWFJob(t, o, &s1) wfYaml2 := pkgworkflows.WFYamlSpec(t, "workflow01", addr1) s2 := job.WorkflowSpec{ - Workflow: wfYaml2, - SpecType: job.YamlSpec, - SecretsID: sql.NullInt64{Int64: sids[1], Valid: true}, + Workflow: wfYaml2, + SpecType: job.YamlSpec, } wantJobID2 := mustInsertWFJob(t, o, &s2) wfYaml3 := pkgworkflows.WFYamlSpec(t, "workflow00", addr2) s3 := job.WorkflowSpec{ - Workflow: wfYaml3, - SpecType: job.YamlSpec, - SecretsID: sql.NullInt64{Int64: sids[2], Valid: true}, + Workflow: wfYaml3, + SpecType: job.YamlSpec, } wantJobID3 := mustInsertWFJob(t, o, &s3) @@ -2016,7 +1992,7 @@ func mustInsertWFJob(t *testing.T, orm job.ORM, s *job.WorkflowSpec) int32 { } err = orm.CreateJob(ctx, &j) - require.NoError(t, err, "failed to insert job with wf spec %+v %s", s, err) + require.NoError(t, err, "failed to insert job with wf spec %v %s", s, s.Workflow) return j.ID } diff --git a/core/services/job/models.go b/core/services/job/models.go index 26d563c7ac8..231bf10fda0 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -2,7 +2,6 @@ package job import ( "context" - "database/sql" "database/sql/driver" "encoding/json" "fmt" @@ -869,30 +868,18 @@ const ( DefaultSpecType = "" ) -type WorkflowSpecStatus string - -const ( - WorkflowSpecStatusActive WorkflowSpecStatus = "active" - WorkflowSpecStatusPaused WorkflowSpecStatus = "paused" - WorkflowSpecStatusDefault WorkflowSpecStatus = "" -) - type WorkflowSpec struct { ID int32 `toml:"-"` Workflow string `toml:"workflow"` // the raw representation of the workflow Config string `toml:"config" db:"config"` // the raw representation of the config // fields derived from the yaml spec, used for indexing the database // note: i tried to make these private, but translating them to the database seems to require them to be public - WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow. - WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow. - WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow. - Status WorkflowSpecStatus `db:"status"` - BinaryURL string `db:"binary_url"` - ConfigURL string `db:"config_url"` - SecretsID sql.NullInt64 `db:"secrets_id"` - CreatedAt time.Time `toml:"-"` - UpdatedAt time.Time `toml:"-"` - SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"` + WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow. + WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow. + WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow. + CreatedAt time.Time `toml:"-"` + UpdatedAt time.Time `toml:"-"` + SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"` sdkWorkflow *sdk.WorkflowSpec rawSpec []byte config []byte diff --git a/core/services/job/orm.go b/core/services/job/orm.go index 92ec9b2e83c..5e8b5ce127f 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -433,8 +433,8 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error { case Stream: // 'stream' type has no associated spec, nothing to do here case Workflow: - sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, binary_url, config_url, secrets_id, created_at, updated_at, spec_type, config) - VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, :binary_url, :config_url, :secrets_id, NOW(), NOW(), :spec_type, :config) + sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at, spec_type, config) + VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW(), :spec_type, :config) RETURNING id;` specID, err := tx.prepareQuerySpecID(ctx, sql, jb.WorkflowSpec) if err != nil { diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go deleted file mode 100644 index ba29e98526e..00000000000 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ /dev/null @@ -1,223 +0,0 @@ -package workflow_registry_syncer_test - -import ( - "context" - "crypto/rand" - "encoding/hex" - "encoding/json" - "testing" - "time" - - "github.com/ethereum/go-ethereum/accounts/abi/bind" - "github.com/ethereum/go-ethereum/common" - - "github.com/smartcontractkit/chainlink-common/pkg/custmsg" - "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" - "github.com/smartcontractkit/chainlink-common/pkg/types" - "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" - coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils" - evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" - "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" - "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" - - "github.com/stretchr/testify/require" -) - -func Test_SecretsWorker(t *testing.T) { - var ( - ctx = coretestutils.Context(t) - lggr = logger.TestLogger(t) - emitter = custmsg.NewLabeler() - backendTH = testutils.NewEVMBackendTH(t) - db = pgtest.NewSqlxDB(t) - orm = syncer.NewWorkflowRegistryDS(db, lggr) - - giveTicker = time.NewTicker(500 * time.Millisecond) - giveSecretsURL = "https://original-url.com" - donID = uint32(1) - giveWorkflow = RegisterWorkflowCMD{ - Name: "test-wf", - DonID: donID, - Status: uint8(1), - SecretsURL: giveSecretsURL, - } - giveContents = "contents" - wantContents = "updated contents" - fetcherFn = func(_ context.Context, _ string) ([]byte, error) { - return []byte(wantContents), nil - } - contractName = syncer.ContractName - forceUpdateSecretsEvent = string(syncer.ForceUpdateSecretsEvent) - ) - - defer giveTicker.Stop() - - // fill ID with randomd data - var giveID [32]byte - _, err := rand.Read((giveID)[:]) - require.NoError(t, err) - giveWorkflow.ID = giveID - - // Deploy a test workflow_registry - wfRegistryAddr, _, wfRegistryC, err := workflow_registry_wrapper.DeployWorkflowRegistry(backendTH.ContractsOwner, backendTH.Backend.Client()) - backendTH.Backend.Commit() - require.NoError(t, err) - - lggr.Infof("deployed workflow registry at %s\n", wfRegistryAddr.Hex()) - - // Build the ContractReader config - contractReaderCfg := evmtypes.ChainReaderConfig{ - Contracts: map[string]evmtypes.ChainContractReader{ - contractName: { - ContractPollingFilter: evmtypes.ContractPollingFilter{ - GenericEventNames: []string{forceUpdateSecretsEvent}, - }, - ContractABI: workflow_registry_wrapper.WorkflowRegistryABI, - Configs: map[string]*evmtypes.ChainReaderDefinition{ - forceUpdateSecretsEvent: { - ChainSpecificName: forceUpdateSecretsEvent, - ReadType: evmtypes.Event, - }, - }, - }, - }, - } - - contractReaderCfgBytes, err := json.Marshal(contractReaderCfg) - require.NoError(t, err) - - contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes) - require.NoError(t, err) - - err = contractReader.Bind(ctx, []types.BoundContract{{Name: contractName, Address: wfRegistryAddr.Hex()}}) - require.NoError(t, err) - - // Seed the DB - hash, err := crypto.Keccak256(append(backendTH.ContractsOwner.From[:], []byte(giveSecretsURL)...)) - require.NoError(t, err) - giveHash := hex.EncodeToString(hash) - - gotID, err := orm.Create(ctx, giveSecretsURL, giveHash, giveContents) - require.NoError(t, err) - - gotSecretsURL, err := orm.GetSecretsURLByID(ctx, gotID) - require.NoError(t, err) - require.Equal(t, giveSecretsURL, gotSecretsURL) - - // verify the DB - contents, err := orm.GetContents(ctx, giveSecretsURL) - require.NoError(t, err) - require.Equal(t, contents, giveContents) - - // Create the worker - worker := syncer.NewWorkflowRegistry( - lggr, - orm, - contractReader, - fetcherFn, - wfRegistryAddr.Hex(), - nil, - nil, - emitter, - syncer.WithTicker(giveTicker.C), - ) - - servicetest.Run(t, worker) - - // setup contract state to allow the secrets to be updated - updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true) - updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true) - registerWorkflow(t, backendTH, wfRegistryC, giveWorkflow) - - // generate a log event - requestForceUpdateSecrets(t, backendTH, wfRegistryC, giveSecretsURL) - - // Require the secrets contents to eventually be updated - require.Eventually(t, func() bool { - secrets, err := orm.GetContents(ctx, giveSecretsURL) - lggr.Debugf("got secrets %v", secrets) - require.NoError(t, err) - return secrets == wantContents - }, 5*time.Second, time.Second) -} - -func updateAuthorizedAddress( - t *testing.T, - th *testutils.EVMBackendTH, - wfRegC *workflow_registry_wrapper.WorkflowRegistry, - addresses []common.Address, - _ bool, -) { - t.Helper() - _, err := wfRegC.UpdateAuthorizedAddresses(th.ContractsOwner, addresses, true) - require.NoError(t, err, "failed to update authorised addresses") - th.Backend.Commit() - th.Backend.Commit() - th.Backend.Commit() - gotAddresses, err := wfRegC.GetAllAuthorizedAddresses(&bind.CallOpts{ - From: th.ContractsOwner.From, - }) - require.NoError(t, err) - require.ElementsMatch(t, addresses, gotAddresses) -} - -func updateAllowedDONs( - t *testing.T, - th *testutils.EVMBackendTH, - wfRegC *workflow_registry_wrapper.WorkflowRegistry, - donIDs []uint32, - allowed bool, -) { - t.Helper() - _, err := wfRegC.UpdateAllowedDONs(th.ContractsOwner, donIDs, allowed) - require.NoError(t, err, "failed to update DONs") - th.Backend.Commit() - th.Backend.Commit() - th.Backend.Commit() - gotDons, err := wfRegC.GetAllAllowedDONs(&bind.CallOpts{ - From: th.ContractsOwner.From, - }) - require.NoError(t, err) - require.ElementsMatch(t, donIDs, gotDons) -} - -type RegisterWorkflowCMD struct { - Name string - ID [32]byte - DonID uint32 - Status uint8 - BinaryURL string - ConfigURL string - SecretsURL string -} - -func registerWorkflow( - t *testing.T, - th *testutils.EVMBackendTH, - wfRegC *workflow_registry_wrapper.WorkflowRegistry, - input RegisterWorkflowCMD, -) { - t.Helper() - _, err := wfRegC.RegisterWorkflow(th.ContractsOwner, input.Name, input.ID, input.DonID, - input.Status, input.BinaryURL, input.ConfigURL, input.SecretsURL) - require.NoError(t, err, "failed to register workflow") - th.Backend.Commit() - th.Backend.Commit() - th.Backend.Commit() -} - -func requestForceUpdateSecrets( - t *testing.T, - th *testutils.EVMBackendTH, - wfRegC *workflow_registry_wrapper.WorkflowRegistry, - secretsURL string, -) { - _, err := wfRegC.RequestForceUpdateSecrets(th.ContractsOwner, secretsURL) - require.NoError(t, err) - th.Backend.Commit() - th.Backend.Commit() - th.Backend.Commit() -} diff --git a/core/services/standardcapabilities/standard_capabilities_test.go b/core/services/standardcapabilities/standard_capabilities_test.go index 538e08c65ad..f410b8efb85 100644 --- a/core/services/standardcapabilities/standard_capabilities_test.go +++ b/core/services/standardcapabilities/standard_capabilities_test.go @@ -7,11 +7,15 @@ import ( "github.com/stretchr/testify/require" + "github.com/test-go/testify/require" + "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink-common/pkg/types/core/mocks" "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 69655b5b39c..6de761f0841 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -96,7 +96,7 @@ func (sucm *stepUpdateManager) len() int64 { } type secretsFetcher interface { - SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) + SecretsFor(workflowOwner, workflowName string) (map[string]string, error) } // Engine handles the lifecycle of a single workflow and its executions. @@ -850,7 +850,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value // registry (for capability-level configuration). It doesn't perform any caching of the config values, since // the two registries perform their own caching. func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) { - secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.name) + secrets, err := e.secretsFetcher.SecretsFor(e.workflow.owner, e.workflow.name) if err != nil { return nil, fmt.Errorf("failed to fetch secrets: %w", err) } diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 70216ac8c78..f89f82e9486 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -153,7 +153,7 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string, type mockSecretsFetcher struct{} -func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) { +func (s mockSecretsFetcher) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) { return map[string]string{}, nil } @@ -1606,7 +1606,7 @@ type mockFetcher struct { retval map[string]string } -func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) { +func (m *mockFetcher) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) { return m.retval, nil } diff --git a/core/services/workflows/syncer/config.yaml b/core/services/workflows/syncer/config.yaml new file mode 100644 index 00000000000..e69de29bb2d diff --git a/core/services/workflows/syncer/contract_reader_mock.go b/core/services/workflows/syncer/contract_reader_mock.go deleted file mode 100644 index 61f59fa4e69..00000000000 --- a/core/services/workflows/syncer/contract_reader_mock.go +++ /dev/null @@ -1,148 +0,0 @@ -// Code generated by mockery v2.46.3. DO NOT EDIT. - -package syncer - -import ( - context "context" - - query "github.com/smartcontractkit/chainlink-common/pkg/types/query" - mock "github.com/stretchr/testify/mock" - - types "github.com/smartcontractkit/chainlink-common/pkg/types" -) - -// MockContractReader is an autogenerated mock type for the ContractReader type -type MockContractReader struct { - mock.Mock -} - -type MockContractReader_Expecter struct { - mock *mock.Mock -} - -func (_m *MockContractReader) EXPECT() *MockContractReader_Expecter { - return &MockContractReader_Expecter{mock: &_m.Mock} -} - -// Bind provides a mock function with given fields: _a0, _a1 -func (_m *MockContractReader) Bind(_a0 context.Context, _a1 []types.BoundContract) error { - ret := _m.Called(_a0, _a1) - - if len(ret) == 0 { - panic("no return value specified for Bind") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []types.BoundContract) error); ok { - r0 = rf(_a0, _a1) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockContractReader_Bind_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Bind' -type MockContractReader_Bind_Call struct { - *mock.Call -} - -// Bind is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 []types.BoundContract -func (_e *MockContractReader_Expecter) Bind(_a0 interface{}, _a1 interface{}) *MockContractReader_Bind_Call { - return &MockContractReader_Bind_Call{Call: _e.mock.On("Bind", _a0, _a1)} -} - -func (_c *MockContractReader_Bind_Call) Run(run func(_a0 context.Context, _a1 []types.BoundContract)) *MockContractReader_Bind_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]types.BoundContract)) - }) - return _c -} - -func (_c *MockContractReader_Bind_Call) Return(_a0 error) *MockContractReader_Bind_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockContractReader_Bind_Call) RunAndReturn(run func(context.Context, []types.BoundContract) error) *MockContractReader_Bind_Call { - _c.Call.Return(run) - return _c -} - -// QueryKey provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 -func (_m *MockContractReader) QueryKey(_a0 context.Context, _a1 types.BoundContract, _a2 query.KeyFilter, _a3 query.LimitAndSort, _a4 any) ([]types.Sequence, error) { - ret := _m.Called(_a0, _a1, _a2, _a3, _a4) - - if len(ret) == 0 { - panic("no return value specified for QueryKey") - } - - var r0 []types.Sequence - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error)); ok { - return rf(_a0, _a1, _a2, _a3, _a4) - } - if rf, ok := ret.Get(0).(func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) []types.Sequence); ok { - r0 = rf(_a0, _a1, _a2, _a3, _a4) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]types.Sequence) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) error); ok { - r1 = rf(_a0, _a1, _a2, _a3, _a4) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockContractReader_QueryKey_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueryKey' -type MockContractReader_QueryKey_Call struct { - *mock.Call -} - -// QueryKey is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 types.BoundContract -// - _a2 query.KeyFilter -// - _a3 query.LimitAndSort -// - _a4 any -func (_e *MockContractReader_Expecter) QueryKey(_a0 interface{}, _a1 interface{}, _a2 interface{}, _a3 interface{}, _a4 interface{}) *MockContractReader_QueryKey_Call { - return &MockContractReader_QueryKey_Call{Call: _e.mock.On("QueryKey", _a0, _a1, _a2, _a3, _a4)} -} - -func (_c *MockContractReader_QueryKey_Call) Run(run func(_a0 context.Context, _a1 types.BoundContract, _a2 query.KeyFilter, _a3 query.LimitAndSort, _a4 any)) *MockContractReader_QueryKey_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(types.BoundContract), args[2].(query.KeyFilter), args[3].(query.LimitAndSort), args[4].(any)) - }) - return _c -} - -func (_c *MockContractReader_QueryKey_Call) Return(_a0 []types.Sequence, _a1 error) *MockContractReader_QueryKey_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockContractReader_QueryKey_Call) RunAndReturn(run func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error)) *MockContractReader_QueryKey_Call { - _c.Call.Return(run) - return _c -} - -// NewMockContractReader creates a new instance of MockContractReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockContractReader(t interface { - mock.TestingT - Cleanup(func()) -}) *MockContractReader { - mock := &MockContractReader{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/core/services/workflows/syncer/engine_registry.go b/core/services/workflows/syncer/engine_registry.go deleted file mode 100644 index 809381c191c..00000000000 --- a/core/services/workflows/syncer/engine_registry.go +++ /dev/null @@ -1,76 +0,0 @@ -package syncer - -import ( - "errors" - "sync" - - "github.com/smartcontractkit/chainlink/v2/core/services/workflows" -) - -type engineRegistry struct { - engines map[string]*workflows.Engine - mu sync.RWMutex -} - -func newEngineRegistry() *engineRegistry { - return &engineRegistry{ - engines: make(map[string]*workflows.Engine), - } -} - -// Add adds an engine to the registry. -func (r *engineRegistry) Add(id string, engine *workflows.Engine) { - r.mu.Lock() - defer r.mu.Unlock() - r.engines[id] = engine -} - -// Get retrieves an engine from the registry. -func (r *engineRegistry) Get(id string) (*workflows.Engine, error) { - r.mu.RLock() - defer r.mu.RUnlock() - engine, found := r.engines[id] - if !found { - return nil, errors.New("engine not found") - } - return engine, nil -} - -// IsRunning is true if the engine exists and is ready. -func (r *engineRegistry) IsRunning(id string) bool { - r.mu.RLock() - defer r.mu.RUnlock() - engine, found := r.engines[id] - if !found { - return false - } - - return engine.Ready() == nil -} - -// Pop removes an engine from the registry and returns the engine if found. -func (r *engineRegistry) Pop(id string) (*workflows.Engine, error) { - r.mu.Lock() - defer r.mu.Unlock() - engine, ok := r.engines[id] - if !ok { - return nil, errors.New("remove failed: engine not found") - } - delete(r.engines, id) - return engine, nil -} - -// Close closes all engines in the registry. -func (r *engineRegistry) Close() error { - r.mu.Lock() - defer r.mu.Unlock() - var err error - for id, engine := range r.engines { - closeErr := engine.Close() - if closeErr != nil { - err = errors.Join(err, closeErr) - } - delete(r.engines, id) - } - return err -} diff --git a/core/services/workflows/syncer/fetcher.go b/core/services/workflows/syncer/fetcher.go deleted file mode 100644 index ed815a240ba..00000000000 --- a/core/services/workflows/syncer/fetcher.go +++ /dev/null @@ -1,43 +0,0 @@ -package syncer - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - "strings" - - "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" - "github.com/smartcontractkit/chainlink/v2/core/logger" - ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" -) - -func NewFetcherFunc( - ctx context.Context, - lggr logger.Logger, - och *webapi.OutgoingConnectorHandler) FetcherFunc { - return func(ctx context.Context, url string) ([]byte, error) { - payloadBytes, err := json.Marshal(ghcapabilities.Request{ - URL: url, - Method: http.MethodGet, - }) - if err != nil { - return nil, fmt.Errorf("failed to marshal fetch request: %w", err) - } - - messageID := strings.Join([]string{ghcapabilities.MethodWorkflowSyncer, url}, "/") - resp, err := och.HandleSingleNodeRequest(ctx, messageID, payloadBytes) - if err != nil { - return nil, err - } - - lggr.Debugw("received gateway response", "resp", resp) - var payload ghcapabilities.Response - err = json.Unmarshal(resp.Body.Payload, &payload) - if err != nil { - return nil, err - } - - return payload.Body, nil - } -} diff --git a/core/services/workflows/syncer/fetcher_test.go b/core/services/workflows/syncer/fetcher_test.go deleted file mode 100644 index 846a9186b5a..00000000000 --- a/core/services/workflows/syncer/fetcher_test.go +++ /dev/null @@ -1,76 +0,0 @@ -package syncer - -import ( - "context" - "encoding/json" - "strings" - "testing" - - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - - "github.com/smartcontractkit/chainlink/v2/core/capabilities/webapi" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/gateway/api" - gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks" - ghcapabilities "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/capabilities" - "github.com/smartcontractkit/chainlink/v2/core/services/gateway/handlers/common" -) - -func TestNewFetcherFunc(t *testing.T) { - ctx := context.Background() - lggr := logger.TestLogger(t) - - config := webapi.ServiceConfig{ - RateLimiter: common.RateLimiterConfig{ - GlobalRPS: 100.0, - GlobalBurst: 100, - PerSenderRPS: 100.0, - PerSenderBurst: 100, - }, - } - - connector := gcmocks.NewGatewayConnector(t) - och, err := webapi.NewOutgoingConnectorHandler(connector, config, ghcapabilities.MethodComputeAction, lggr) - require.NoError(t, err) - - url := "http://example.com" - - msgID := strings.Join([]string{ghcapabilities.MethodWorkflowSyncer, url}, "/") - - t.Run("OK-valid_request", func(t *testing.T) { - gatewayResp := gatewayResponse(t, msgID) - connector.EXPECT().SignAndSendToGateway(mock.Anything, "gateway1", mock.Anything).Run(func(ctx context.Context, gatewayID string, msg *api.MessageBody) { - och.HandleGatewayMessage(ctx, "gateway1", gatewayResp) - }).Return(nil).Times(1) - connector.EXPECT().DonID().Return("don-id") - connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"}) - - fetcher := NewFetcherFunc(ctx, lggr, och) - - payload, err := fetcher(ctx, url) - require.NoError(t, err) - - expectedPayload := []byte("response body") - require.Equal(t, expectedPayload, payload) - }) -} - -func gatewayResponse(t *testing.T, msgID string) *api.Message { - headers := map[string]string{"Content-Type": "application/json"} - body := []byte("response body") - responsePayload, err := json.Marshal(ghcapabilities.Response{ - StatusCode: 200, - Headers: headers, - Body: body, - ExecutionError: false, - }) - require.NoError(t, err) - return &api.Message{ - Body: api.MessageBody{ - MessageId: msgID, - Method: ghcapabilities.MethodWebAPITarget, - Payload: responsePayload, - }, - } -} diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go deleted file mode 100644 index 7004c740c97..00000000000 --- a/core/services/workflows/syncer/handler.go +++ /dev/null @@ -1,525 +0,0 @@ -package syncer - -import ( - "context" - "crypto/sha256" - "encoding/hex" - "errors" - "fmt" - - "github.com/smartcontractkit/chainlink-common/pkg/custmsg" - "github.com/smartcontractkit/chainlink-common/pkg/types/core" - "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/platform" - "github.com/smartcontractkit/chainlink/v2/core/services/job" - "github.com/smartcontractkit/chainlink/v2/core/services/workflows" - "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" -) - -var ErrNotImplemented = errors.New("not implemented") - -// WorkflowRegistryrEventType is the type of event that is emitted by the WorkflowRegistry -type WorkflowRegistryEventType string - -var ( - // ForceUpdateSecretsEvent is emitted when a request to force update a workflows secrets is made - ForceUpdateSecretsEvent WorkflowRegistryEventType = "WorkflowForceUpdateSecretsRequestedV1" - - // WorkflowRegisteredEvent is emitted when a workflow is registered - WorkflowRegisteredEvent WorkflowRegistryEventType = "WorkflowRegisteredV1" - - // WorkflowUpdatedEvent is emitted when a workflow is updated - WorkflowUpdatedEvent WorkflowRegistryEventType = "WorkflowUpdatedV1" - - // WorkflowPausedEvent is emitted when a workflow is paused - WorkflowPausedEvent WorkflowRegistryEventType = "WorkflowPausedV1" - - // WorkflowActivatedEvent is emitted when a workflow is activated - WorkflowActivatedEvent WorkflowRegistryEventType = "WorkflowActivatedV1" - - // WorkflowDeletedEvent is emitted when a workflow is deleted - WorkflowDeletedEvent WorkflowRegistryEventType = "WorkflowDeletedV1" -) - -// WorkflowRegistryForceUpdateSecretsRequestedV1 is a chain agnostic definition of the WorkflowRegistry -// ForceUpdateSecretsRequested event. -type WorkflowRegistryForceUpdateSecretsRequestedV1 struct { - SecretsURLHash []byte - Owner []byte - WorkflowName string -} - -type WorkflowRegistryWorkflowRegisteredV1 struct { - WorkflowID [32]byte - WorkflowOwner []byte - DonID uint32 - Status uint8 - WorkflowName string - BinaryURL string - ConfigURL string - SecretsURL string -} - -type WorkflowRegistryWorkflowUpdatedV1 struct { - OldWorkflowID [32]byte - WorkflowOwner []byte - DonID uint32 - NewWorkflowID [32]byte - WorkflowName string - BinaryURL string - ConfigURL string - SecretsURL string -} - -type WorkflowRegistryWorkflowPausedV1 struct { - WorkflowID [32]byte - WorkflowOwner []byte - DonID uint32 - WorkflowName string -} - -type WorkflowRegistryWorkflowActivatedV1 struct { - WorkflowID [32]byte - WorkflowOwner []byte - DonID uint32 - WorkflowName string -} - -type WorkflowRegistryWorkflowDeletedV1 struct { - WorkflowID [32]byte - WorkflowOwner []byte - DonID uint32 - WorkflowName string -} - -type secretsFetcher interface { - SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) -} - -// secretsFetcherFunc implements the secretsFetcher interface for a function. -type secretsFetcherFunc func(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) - -func (f secretsFetcherFunc) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) { - return f(ctx, workflowOwner, workflowName) -} - -// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding -// method that handles the event. -type eventHandler struct { - lggr logger.Logger - orm WorkflowRegistryDS - fetcher FetcherFunc - workflowStore store.Store - capRegistry core.CapabilitiesRegistry - engineRegistry *engineRegistry - emitter custmsg.MessageEmitter - secretsFetcher secretsFetcher -} - -// newEventHandler returns a new eventHandler instance. -func newEventHandler( - lggr logger.Logger, - orm ORM, - gateway FetcherFunc, - workflowStore store.Store, - capRegistry core.CapabilitiesRegistry, - engineRegistry *engineRegistry, - emitter custmsg.MessageEmitter, - secretsFetcher secretsFetcher, -) *eventHandler { - return &eventHandler{ - lggr: lggr, - orm: orm, - fetcher: gateway, - workflowStore: workflowStore, - capRegistry: capRegistry, - engineRegistry: engineRegistry, - emitter: emitter, - secretsFetcher: secretsFetcher, - } -} - -func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) error { - switch event.EventType { - case ForceUpdateSecretsEvent: - payload, ok := event.Data.(WorkflowRegistryForceUpdateSecretsRequestedV1) - if !ok { - return newHandlerTypeError(event.Data) - } - - cma := h.emitter.With( - platform.KeyWorkflowName, payload.WorkflowName, - platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner), - ) - - if err := h.forceUpdateSecretsEvent(ctx, payload); err != nil { - logCustMsg(ctx, cma, fmt.Sprintf("failed to handle force update secrets event: %v", err), h.lggr) - return err - } - - return nil - case WorkflowRegisteredEvent: - payload, ok := event.Data.(WorkflowRegistryWorkflowRegisteredV1) - if !ok { - return newHandlerTypeError(event.Data) - } - wfID := hex.EncodeToString(payload.WorkflowID[:]) - - cma := h.emitter.With( - platform.KeyWorkflowID, wfID, - platform.KeyWorkflowName, payload.WorkflowName, - platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), - ) - - if err := h.workflowRegisteredEvent(ctx, payload); err != nil { - logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow registered event: %v", err), h.lggr) - return err - } - - h.lggr.Debugf("workflow 0x%x registered and started", wfID) - return nil - case WorkflowUpdatedEvent: - payload, ok := event.Data.(WorkflowRegistryWorkflowUpdatedV1) - if !ok { - return fmt.Errorf("invalid data type %T for event", event.Data) - } - - newWorkflowID := hex.EncodeToString(payload.NewWorkflowID[:]) - cma := h.emitter.With( - platform.KeyWorkflowID, newWorkflowID, - platform.KeyWorkflowName, payload.WorkflowName, - platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), - ) - - if err := h.workflowUpdatedEvent(ctx, payload); err != nil { - logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow updated event: %v", err), h.lggr) - return err - } - - return nil - case WorkflowPausedEvent: - payload, ok := event.Data.(WorkflowRegistryWorkflowPausedV1) - if !ok { - return fmt.Errorf("invalid data type %T for event", event.Data) - } - - wfID := hex.EncodeToString(payload.WorkflowID[:]) - - cma := h.emitter.With( - platform.KeyWorkflowID, wfID, - platform.KeyWorkflowName, payload.WorkflowName, - platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), - ) - - if err := h.workflowPausedEvent(ctx, payload); err != nil { - logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow paused event: %v", err), h.lggr) - return err - } - return nil - case WorkflowActivatedEvent: - payload, ok := event.Data.(WorkflowRegistryWorkflowActivatedV1) - if !ok { - return fmt.Errorf("invalid data type %T for event", event.Data) - } - - wfID := hex.EncodeToString(payload.WorkflowID[:]) - - cma := h.emitter.With( - platform.KeyWorkflowID, wfID, - platform.KeyWorkflowName, payload.WorkflowName, - platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), - ) - if err := h.workflowActivatedEvent(ctx, payload); err != nil { - logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow activated event: %v", err), h.lggr) - return err - } - - return nil - case WorkflowDeletedEvent: - payload, ok := event.Data.(WorkflowRegistryWorkflowDeletedV1) - if !ok { - return fmt.Errorf("invalid data type %T for event", event.Data) - } - - wfID := hex.EncodeToString(payload.WorkflowID[:]) - - cma := h.emitter.With( - platform.KeyWorkflowID, wfID, - platform.KeyWorkflowName, payload.WorkflowName, - platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), - ) - - if err := h.workflowDeletedEvent(ctx, payload); err != nil { - logCustMsg(ctx, cma, fmt.Sprintf("failed to handle workflow deleted event: %v", err), h.lggr) - return err - } - - return nil - default: - return fmt.Errorf("event type unsupported: %v", event.EventType) - } -} - -// workflowRegisteredEvent handles the WorkflowRegisteredEvent event type. -func (h *eventHandler) workflowRegisteredEvent( - ctx context.Context, - payload WorkflowRegistryWorkflowRegisteredV1, -) error { - wfID := hex.EncodeToString(payload.WorkflowID[:]) - - // Download the contents of binaryURL, configURL and secretsURL and cache them locally. - binary, err := h.fetcher(ctx, payload.BinaryURL) - if err != nil { - return fmt.Errorf("failed to fetch binary from %s : %w", payload.BinaryURL, err) - } - - config, err := h.fetcher(ctx, payload.ConfigURL) - if err != nil { - return fmt.Errorf("failed to fetch config from %s : %w", payload.ConfigURL, err) - } - - secrets, err := h.fetcher(ctx, payload.SecretsURL) - if err != nil { - return fmt.Errorf("failed to fetch secrets from %s : %w", payload.SecretsURL, err) - } - - // Calculate the hash of the binary and config files - hash := workflowID(binary, config, []byte(payload.SecretsURL)) - - // Pre-check: verify that the workflowID matches; if it doesn’t abort and log an error via Beholder. - if hash != wfID { - return fmt.Errorf("workflowID mismatch: %s != %s", hash, wfID) - } - - // Save the workflow secrets - urlHash, err := h.orm.GetSecretsURLHash(payload.WorkflowOwner, []byte(payload.SecretsURL)) - if err != nil { - return fmt.Errorf("failed to get secrets URL hash: %w", err) - } - - // Create a new entry in the workflow_spec table corresponding for the new workflow, with the contents of the binaryURL + configURL in the table - status := job.WorkflowSpecStatusActive - if payload.Status == 1 { - status = job.WorkflowSpecStatusPaused - } - - entry := &job.WorkflowSpec{ - Workflow: hex.EncodeToString(binary), - Config: string(config), - WorkflowID: wfID, - Status: status, - WorkflowOwner: hex.EncodeToString(payload.WorkflowOwner), - WorkflowName: payload.WorkflowName, - SpecType: job.WASMFile, - BinaryURL: payload.BinaryURL, - ConfigURL: payload.ConfigURL, - } - if _, err = h.orm.UpsertWorkflowSpecWithSecrets(ctx, entry, payload.SecretsURL, hex.EncodeToString(urlHash), string(secrets)); err != nil { - return fmt.Errorf("failed to upsert workflow spec with secrets: %w", err) - } - - if status != job.WorkflowSpecStatusActive { - return nil - } - - // If status == active, start a new WorkflowEngine instance, and add it to local engine registry - moduleConfig := &host.ModuleConfig{Logger: h.lggr, Labeler: h.emitter} - sdkSpec, err := host.GetWorkflowSpec(ctx, moduleConfig, binary, config) - if err != nil { - return fmt.Errorf("failed to get workflow sdk spec: %w", err) - } - - cfg := workflows.Config{ - Lggr: h.lggr, - Workflow: *sdkSpec, - WorkflowID: wfID, - WorkflowOwner: hex.EncodeToString(payload.WorkflowOwner), - WorkflowName: payload.WorkflowName, - Registry: h.capRegistry, - Store: h.workflowStore, - Config: config, - Binary: binary, - SecretsFetcher: h.secretsFetcher, - } - e, err := workflows.NewEngine(ctx, cfg) - if err != nil { - return fmt.Errorf("failed to create workflow engine: %w", err) - } - - if err := e.Start(ctx); err != nil { - return fmt.Errorf("failed to start workflow engine: %w", err) - } - - h.engineRegistry.Add(wfID, e) - return nil -} - -// workflowUpdatedEvent handles the WorkflowUpdatedEvent event type by first finding the -// current workflow engine, stopping it, and then starting a new workflow engine with the -// updated workflow spec. -func (h *eventHandler) workflowUpdatedEvent( - ctx context.Context, - payload WorkflowRegistryWorkflowUpdatedV1, -) error { - // Remove the old workflow engine from the local registry if it exists - if err := h.tryEngineCleanup(hex.EncodeToString(payload.OldWorkflowID[:])); err != nil { - return err - } - - registeredEvent := WorkflowRegistryWorkflowRegisteredV1{ - WorkflowID: payload.NewWorkflowID, - WorkflowOwner: payload.WorkflowOwner, - DonID: payload.DonID, - Status: 0, - WorkflowName: payload.WorkflowName, - BinaryURL: payload.BinaryURL, - ConfigURL: payload.ConfigURL, - SecretsURL: payload.SecretsURL, - } - - return h.workflowRegisteredEvent(ctx, registeredEvent) -} - -// workflowPausedEvent handles the WorkflowPausedEvent event type. -func (h *eventHandler) workflowPausedEvent( - ctx context.Context, - payload WorkflowRegistryWorkflowPausedV1, -) error { - // Remove the workflow engine from the local registry if it exists - if err := h.tryEngineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil { - return err - } - - // get existing workflow spec from DB - spec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName) - if err != nil { - return fmt.Errorf("failed to get workflow spec: %w", err) - } - - // update the status of the workflow spec - spec.Status = job.WorkflowSpecStatusPaused - if _, err := h.orm.UpsertWorkflowSpec(ctx, spec); err != nil { - return fmt.Errorf("failed to update workflow spec: %w", err) - } - - return nil -} - -// workflowActivatedEvent handles the WorkflowActivatedEvent event type. -func (h *eventHandler) workflowActivatedEvent( - ctx context.Context, - payload WorkflowRegistryWorkflowActivatedV1, -) error { - // fetch the workflow spec from the DB - spec, err := h.orm.GetWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName) - if err != nil { - return fmt.Errorf("failed to get workflow spec: %w", err) - } - - // Do nothing if the workflow is already active - if spec.Status == job.WorkflowSpecStatusActive && h.engineRegistry.IsRunning(hex.EncodeToString(payload.WorkflowID[:])) { - return nil - } - - // get the secrets url by the secrets id - secretsURL, err := h.orm.GetSecretsURLByID(ctx, spec.SecretsID.Int64) - if err != nil { - return fmt.Errorf("failed to get secrets URL by ID: %w", err) - } - - // start a new workflow engine - registeredEvent := WorkflowRegistryWorkflowRegisteredV1{ - WorkflowID: payload.WorkflowID, - WorkflowOwner: payload.WorkflowOwner, - DonID: payload.DonID, - Status: 0, - WorkflowName: payload.WorkflowName, - BinaryURL: spec.BinaryURL, - ConfigURL: spec.ConfigURL, - SecretsURL: secretsURL, - } - - return h.workflowRegisteredEvent(ctx, registeredEvent) -} - -// workflowDeletedEvent handles the WorkflowDeletedEvent event type. -func (h *eventHandler) workflowDeletedEvent( - ctx context.Context, - payload WorkflowRegistryWorkflowDeletedV1, -) error { - if err := h.tryEngineCleanup(hex.EncodeToString(payload.WorkflowID[:])); err != nil { - return err - } - - if err := h.orm.DeleteWorkflowSpec(ctx, hex.EncodeToString(payload.WorkflowOwner), payload.WorkflowName); err != nil { - return fmt.Errorf("failed to delete workflow spec: %w", err) - } - return nil -} - -// forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type. -func (h *eventHandler) forceUpdateSecretsEvent( - ctx context.Context, - payload WorkflowRegistryForceUpdateSecretsRequestedV1, -) error { - // Get the URL of the secrets file from the event data - hash := hex.EncodeToString(payload.SecretsURLHash) - - url, err := h.orm.GetSecretsURLByHash(ctx, hash) - if err != nil { - return fmt.Errorf("failed to get URL by hash %s : %w", hash, err) - } - - // Fetch the contents of the secrets file from the url via the fetcher - secrets, err := h.fetcher(ctx, url) - if err != nil { - return fmt.Errorf("failed to fetch secrets from url %s : %w", url, err) - } - - // Update the secrets in the ORM - if _, err := h.orm.Update(ctx, hash, string(secrets)); err != nil { - return fmt.Errorf("failed to update secrets: %w", err) - } - - return nil -} - -// tryEngineCleanup attempts to stop the workflow engine for the given workflow ID. Does nothing if the -// workflow engine is not running. -func (h *eventHandler) tryEngineCleanup(wfID string) error { - if h.engineRegistry.IsRunning(wfID) { - // Remove the engine from the registry - e, err := h.engineRegistry.Pop(wfID) - if err != nil { - return fmt.Errorf("failed to get workflow engine: %w", err) - } - - // Stop the engine - if err := e.Close(); err != nil { - return fmt.Errorf("failed to close workflow engine: %w", err) - } - } - return nil -} - -// workflowID returns a hex encoded sha256 hash of the wasm, config and secretsURL. -func workflowID(wasm, config, secretsURL []byte) string { - sum := sha256.New() - sum.Write(wasm) - sum.Write(config) - sum.Write(secretsURL) - return hex.EncodeToString(sum.Sum(nil)) -} - -// logCustMsg emits a custom message to the external sink and logs an error if that fails. -func logCustMsg(ctx context.Context, cma custmsg.MessageEmitter, msg string, log logger.Logger) { - err := cma.Emit(ctx, msg) - if err != nil { - log.Helper(1).Errorf("failed to send custom message with msg: %s, err: %v", msg, err) - } -} - -func newHandlerTypeError(data any) error { - return fmt.Errorf("invalid data type %T for event", data) -} diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go deleted file mode 100644 index eb8b89ad7e1..00000000000 --- a/core/services/workflows/syncer/handler_test.go +++ /dev/null @@ -1,540 +0,0 @@ -package syncer - -import ( - "context" - "encoding/hex" - "testing" - - "github.com/smartcontractkit/chainlink-common/pkg/custmsg" - "github.com/smartcontractkit/chainlink/v2/core/capabilities" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/wasmtest" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/job" - wfstore "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" - "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer/mocks" - "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" - "github.com/smartcontractkit/chainlink/v2/core/utils/matches" - - "github.com/jonboulle/clockwork" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -type mockFetchResp struct { - Body []byte - Err error -} - -type mockFetcher struct { - responseMap map[string]mockFetchResp -} - -func (m *mockFetcher) Fetch(_ context.Context, url string) ([]byte, error) { - return m.responseMap[url].Body, m.responseMap[url].Err -} - -func newMockFetcher(m map[string]mockFetchResp) FetcherFunc { - return (&mockFetcher{responseMap: m}).Fetch -} - -func Test_Handler(t *testing.T) { - lggr := logger.TestLogger(t) - emitter := custmsg.NewLabeler() - t.Run("success", func(t *testing.T) { - mockORM := mocks.NewORM(t) - ctx := testutils.Context(t) - giveURL := "https://original-url.com" - giveBytes, err := crypto.Keccak256([]byte(giveURL)) - require.NoError(t, err) - - giveHash := hex.EncodeToString(giveBytes) - - giveEvent := WorkflowRegistryEvent{ - EventType: ForceUpdateSecretsEvent, - Data: WorkflowRegistryForceUpdateSecretsRequestedV1{ - SecretsURLHash: giveBytes, - }, - } - - fetcher := func(_ context.Context, _ string) ([]byte, error) { - return []byte("contents"), nil - } - mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) - mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil) - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil) - err = h.Handle(ctx, giveEvent) - require.NoError(t, err) - }) - - t.Run("fails with unsupported event type", func(t *testing.T) { - mockORM := mocks.NewORM(t) - ctx := testutils.Context(t) - - giveEvent := WorkflowRegistryEvent{} - fetcher := func(_ context.Context, _ string) ([]byte, error) { - return []byte("contents"), nil - } - - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil) - err := h.Handle(ctx, giveEvent) - require.Error(t, err) - require.Contains(t, err.Error(), "event type unsupported") - }) - - t.Run("fails to get secrets url", func(t *testing.T) { - mockORM := mocks.NewORM(t) - ctx := testutils.Context(t) - h := newEventHandler(lggr, mockORM, nil, nil, nil, nil, emitter, nil) - giveURL := "https://original-url.com" - giveBytes, err := crypto.Keccak256([]byte(giveURL)) - require.NoError(t, err) - - giveHash := hex.EncodeToString(giveBytes) - - giveEvent := WorkflowRegistryEvent{ - EventType: ForceUpdateSecretsEvent, - Data: WorkflowRegistryForceUpdateSecretsRequestedV1{ - SecretsURLHash: giveBytes, - }, - } - mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return("", assert.AnError) - err = h.Handle(ctx, giveEvent) - require.Error(t, err) - require.ErrorContains(t, err, assert.AnError.Error()) - }) - - t.Run("fails to fetch contents", func(t *testing.T) { - mockORM := mocks.NewORM(t) - ctx := testutils.Context(t) - giveURL := "http://example.com" - - giveBytes, err := crypto.Keccak256([]byte(giveURL)) - require.NoError(t, err) - - giveHash := hex.EncodeToString(giveBytes) - - giveEvent := WorkflowRegistryEvent{ - EventType: ForceUpdateSecretsEvent, - Data: WorkflowRegistryForceUpdateSecretsRequestedV1{ - SecretsURLHash: giveBytes, - }, - } - - fetcher := func(_ context.Context, _ string) ([]byte, error) { - return nil, assert.AnError - } - mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil) - err = h.Handle(ctx, giveEvent) - require.Error(t, err) - require.ErrorIs(t, err, assert.AnError) - }) - - t.Run("fails to update secrets", func(t *testing.T) { - mockORM := mocks.NewORM(t) - ctx := testutils.Context(t) - giveURL := "http://example.com" - giveBytes, err := crypto.Keccak256([]byte(giveURL)) - require.NoError(t, err) - - giveHash := hex.EncodeToString(giveBytes) - - giveEvent := WorkflowRegistryEvent{ - EventType: ForceUpdateSecretsEvent, - Data: WorkflowRegistryForceUpdateSecretsRequestedV1{ - SecretsURLHash: giveBytes, - }, - } - - fetcher := func(_ context.Context, _ string) ([]byte, error) { - return []byte("contents"), nil - } - mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) - mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError) - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil) - err = h.Handle(ctx, giveEvent) - require.Error(t, err) - require.ErrorIs(t, err, assert.AnError) - }) -} - -const ( - binaryLocation = "test/simple/cmd/testmodule.wasm" - binaryCmd = "core/capabilities/compute/test/simple/cmd" -) - -func Test_workflowRegisteredHandler(t *testing.T) { - t.Run("success with paused workflow registered", func(t *testing.T) { - var ( - ctx = testutils.Context(t) - lggr = logger.TestLogger(t) - db = pgtest.NewSqlxDB(t) - orm = NewWorkflowRegistryDS(db, lggr) - emitter = custmsg.NewLabeler() - - binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t) - config = []byte("") - secretsURL = "http://example.com" - binaryURL = "http://example.com/binary" - configURL = "http://example.com/config" - wfOwner = []byte("0xOwner") - - fetcher = newMockFetcher(map[string]mockFetchResp{ - binaryURL: {Body: binary, Err: nil}, - configURL: {Body: config, Err: nil}, - secretsURL: {Body: []byte("secrets"), Err: nil}, - }) - ) - - giveWFID := workflowID(binary, config, []byte(secretsURL)) - - b, err := hex.DecodeString(giveWFID) - require.NoError(t, err) - wfID := make([]byte, 32) - copy(wfID, b) - - paused := WorkflowRegistryWorkflowRegisteredV1{ - Status: uint8(1), - WorkflowID: [32]byte(wfID), - WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", - BinaryURL: binaryURL, - ConfigURL: configURL, - SecretsURL: secretsURL, - } - - h := &eventHandler{ - lggr: lggr, - orm: orm, - fetcher: fetcher, - emitter: emitter, - } - err = h.workflowRegisteredEvent(ctx, paused) - require.NoError(t, err) - - // Verify the record is updated in the database - dbSpec, err := orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") - require.NoError(t, err) - require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) - require.Equal(t, "workflow-name", dbSpec.WorkflowName) - require.Equal(t, job.WorkflowSpecStatusPaused, dbSpec.Status) - }) - - t.Run("success with active workflow registered", func(t *testing.T) { - var ( - ctx = testutils.Context(t) - lggr = logger.TestLogger(t) - db = pgtest.NewSqlxDB(t) - orm = NewWorkflowRegistryDS(db, lggr) - emitter = custmsg.NewLabeler() - - binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t) - config = []byte("") - secretsURL = "http://example.com" - binaryURL = "http://example.com/binary" - configURL = "http://example.com/config" - wfOwner = []byte("0xOwner") - - fetcher = newMockFetcher(map[string]mockFetchResp{ - binaryURL: {Body: binary, Err: nil}, - configURL: {Body: config, Err: nil}, - secretsURL: {Body: []byte("secrets"), Err: nil}, - }) - ) - - giveWFID := workflowID(binary, config, []byte(secretsURL)) - - b, err := hex.DecodeString(giveWFID) - require.NoError(t, err) - wfID := make([]byte, 32) - copy(wfID, b) - - active := WorkflowRegistryWorkflowRegisteredV1{ - Status: uint8(0), - WorkflowID: [32]byte(wfID), - WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", - BinaryURL: binaryURL, - ConfigURL: configURL, - SecretsURL: secretsURL, - } - - er := newEngineRegistry() - store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()) - registry := capabilities.NewRegistry(lggr) - registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) - h := &eventHandler{ - lggr: lggr, - orm: orm, - fetcher: fetcher, - emitter: emitter, - engineRegistry: er, - capRegistry: registry, - workflowStore: store, - } - err = h.workflowRegisteredEvent(ctx, active) - require.NoError(t, err) - - // Verify the record is updated in the database - dbSpec, err := orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") - require.NoError(t, err) - require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) - require.Equal(t, "workflow-name", dbSpec.WorkflowName) - require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status) - - // Verify the engine is started - engine, err := h.engineRegistry.Get(giveWFID) - require.NoError(t, err) - err = engine.Ready() - require.NoError(t, err) - }) -} - -func Test_workflowDeletedHandler(t *testing.T) { - t.Run("success deleting existing engine and spec", func(t *testing.T) { - var ( - ctx = testutils.Context(t) - lggr = logger.TestLogger(t) - db = pgtest.NewSqlxDB(t) - orm = NewWorkflowRegistryDS(db, lggr) - emitter = custmsg.NewLabeler() - - binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t) - config = []byte("") - secretsURL = "http://example.com" - binaryURL = "http://example.com/binary" - configURL = "http://example.com/config" - wfOwner = []byte("0xOwner") - - fetcher = newMockFetcher(map[string]mockFetchResp{ - binaryURL: {Body: binary, Err: nil}, - configURL: {Body: config, Err: nil}, - secretsURL: {Body: []byte("secrets"), Err: nil}, - }) - ) - - giveWFID := workflowID(binary, config, []byte(secretsURL)) - - b, err := hex.DecodeString(giveWFID) - require.NoError(t, err) - wfID := make([]byte, 32) - copy(wfID, b) - - active := WorkflowRegistryWorkflowRegisteredV1{ - Status: uint8(0), - WorkflowID: [32]byte(wfID), - WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", - BinaryURL: binaryURL, - ConfigURL: configURL, - SecretsURL: secretsURL, - } - - er := newEngineRegistry() - store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()) - registry := capabilities.NewRegistry(lggr) - registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) - h := &eventHandler{ - lggr: lggr, - orm: orm, - fetcher: fetcher, - emitter: emitter, - engineRegistry: er, - capRegistry: registry, - workflowStore: store, - } - err = h.workflowRegisteredEvent(ctx, active) - require.NoError(t, err) - - // Verify the record is updated in the database - dbSpec, err := orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") - require.NoError(t, err) - require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) - require.Equal(t, "workflow-name", dbSpec.WorkflowName) - require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status) - - // Verify the engine is started - engine, err := h.engineRegistry.Get(giveWFID) - require.NoError(t, err) - err = engine.Ready() - require.NoError(t, err) - - deleteEvent := WorkflowRegistryWorkflowDeletedV1{ - WorkflowID: [32]byte(wfID), - WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", - DonID: 1, - } - err = h.workflowDeletedEvent(ctx, deleteEvent) - require.NoError(t, err) - - // Verify the record is deleted in the database - _, err = orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") - require.Error(t, err) - - // Verify the engine is deleted - _, err = h.engineRegistry.Get(giveWFID) - require.Error(t, err) - }) -} - -func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) { - t.Run("success pausing activating and updating existing engine and spec", func(t *testing.T) { - var ( - ctx = testutils.Context(t) - lggr = logger.TestLogger(t) - db = pgtest.NewSqlxDB(t) - orm = NewWorkflowRegistryDS(db, lggr) - emitter = custmsg.NewLabeler() - - binary = wasmtest.CreateTestBinary(binaryCmd, binaryLocation, true, t) - config = []byte("") - updateConfig = []byte("updated") - secretsURL = "http://example.com" - binaryURL = "http://example.com/binary" - configURL = "http://example.com/config" - newConfigURL = "http://example.com/new-config" - wfOwner = []byte("0xOwner") - - fetcher = newMockFetcher(map[string]mockFetchResp{ - binaryURL: {Body: binary, Err: nil}, - configURL: {Body: config, Err: nil}, - newConfigURL: {Body: updateConfig, Err: nil}, - secretsURL: {Body: []byte("secrets"), Err: nil}, - }) - ) - - giveWFID := workflowID(binary, config, []byte(secretsURL)) - updatedWFID := workflowID(binary, updateConfig, []byte(secretsURL)) - - b, err := hex.DecodeString(giveWFID) - require.NoError(t, err) - wfID := make([]byte, 32) - copy(wfID, b) - - b, err = hex.DecodeString(updatedWFID) - require.NoError(t, err) - newWFID := make([]byte, 32) - copy(newWFID, b) - - active := WorkflowRegistryWorkflowRegisteredV1{ - Status: uint8(0), - WorkflowID: [32]byte(wfID), - WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", - BinaryURL: binaryURL, - ConfigURL: configURL, - SecretsURL: secretsURL, - } - - er := newEngineRegistry() - store := wfstore.NewDBStore(db, lggr, clockwork.NewFakeClock()) - registry := capabilities.NewRegistry(lggr) - registry.SetLocalRegistry(&capabilities.TestMetadataRegistry{}) - h := &eventHandler{ - lggr: lggr, - orm: orm, - fetcher: fetcher, - emitter: emitter, - engineRegistry: er, - capRegistry: registry, - workflowStore: store, - } - err = h.workflowRegisteredEvent(ctx, active) - require.NoError(t, err) - - // Verify the record is updated in the database - dbSpec, err := orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") - require.NoError(t, err) - require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) - require.Equal(t, "workflow-name", dbSpec.WorkflowName) - require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status) - - // Verify the engine is started - engine, err := h.engineRegistry.Get(giveWFID) - require.NoError(t, err) - err = engine.Ready() - require.NoError(t, err) - - // create a paused event - pauseEvent := WorkflowRegistryWorkflowPausedV1{ - WorkflowID: [32]byte(wfID), - WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", - DonID: 1, - } - err = h.workflowPausedEvent(ctx, pauseEvent) - require.NoError(t, err) - - // Verify the record is updated in the database - dbSpec, err = orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") - require.NoError(t, err) - require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) - require.Equal(t, "workflow-name", dbSpec.WorkflowName) - require.Equal(t, job.WorkflowSpecStatusPaused, dbSpec.Status) - - // Verify the engine is removed - _, err = h.engineRegistry.Get(giveWFID) - require.Error(t, err) - - // create an activated workflow event - activatedEvent := WorkflowRegistryWorkflowActivatedV1{ - WorkflowID: [32]byte(wfID), - WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", - DonID: 1, - } - - err = h.workflowActivatedEvent(ctx, activatedEvent) - require.NoError(t, err) - - // Verify the record is updated in the database - dbSpec, err = orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") - require.NoError(t, err) - require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) - require.Equal(t, "workflow-name", dbSpec.WorkflowName) - require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status) - - // Verify the engine is started - engine, err = h.engineRegistry.Get(giveWFID) - require.NoError(t, err) - err = engine.Ready() - require.NoError(t, err) - - // create an updated event - updatedEvent := WorkflowRegistryWorkflowUpdatedV1{ - OldWorkflowID: [32]byte(wfID), - NewWorkflowID: [32]byte(newWFID), - WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", - BinaryURL: binaryURL, - ConfigURL: newConfigURL, - SecretsURL: secretsURL, - DonID: 1, - } - err = h.workflowUpdatedEvent(ctx, updatedEvent) - require.NoError(t, err) - - // Verify the record is updated in the database - dbSpec, err = orm.GetWorkflowSpec(ctx, hex.EncodeToString(wfOwner), "workflow-name") - require.NoError(t, err) - require.Equal(t, hex.EncodeToString(wfOwner), dbSpec.WorkflowOwner) - require.Equal(t, "workflow-name", dbSpec.WorkflowName) - require.Equal(t, job.WorkflowSpecStatusActive, dbSpec.Status) - require.Equal(t, hex.EncodeToString(newWFID), dbSpec.WorkflowID) - require.Equal(t, newConfigURL, dbSpec.ConfigURL) - require.Equal(t, string(updateConfig), dbSpec.Config) - - // old engine is no longer running - _, err = h.engineRegistry.Get(giveWFID) - require.Error(t, err) - - // new engine is started - engine, err = h.engineRegistry.Get(updatedWFID) - require.NoError(t, err) - err = engine.Ready() - require.NoError(t, err) - }) -} diff --git a/core/services/workflows/syncer/heap.go b/core/services/workflows/syncer/heap.go deleted file mode 100644 index 061293928a3..00000000000 --- a/core/services/workflows/syncer/heap.go +++ /dev/null @@ -1,63 +0,0 @@ -package syncer - -import "container/heap" - -type Heap interface { - // Push adds a new item to the heap. - Push(x WorkflowRegistryEventResponse) - - // Pop removes the smallest item from the heap and returns it. - Pop() WorkflowRegistryEventResponse - - // Len returns the number of items in the heap. - Len() int -} - -// publicHeap is a wrapper around the heap.Interface that exposes the Push and Pop methods. -type publicHeap[T any] struct { - heap heap.Interface -} - -func (h *publicHeap[T]) Push(x T) { - heap.Push(h.heap, x) -} - -func (h *publicHeap[T]) Pop() T { - return heap.Pop(h.heap).(T) -} - -func (h *publicHeap[T]) Len() int { - return h.heap.Len() -} - -// blockHeightHeap is a heap.Interface that sorts WorkflowRegistryEventResponses by block height. -type blockHeightHeap []WorkflowRegistryEventResponse - -// newBlockHeightHeap returns an initialized heap that sorts WorkflowRegistryEventResponses by block height. -func newBlockHeightHeap() Heap { - h := blockHeightHeap(make([]WorkflowRegistryEventResponse, 0)) - heap.Init(&h) - return &publicHeap[WorkflowRegistryEventResponse]{heap: &h} -} - -func (h *blockHeightHeap) Len() int { return len(*h) } - -func (h *blockHeightHeap) Less(i, j int) bool { - return (*h)[i].Event.Head.Height < (*h)[j].Event.Head.Height -} - -func (h *blockHeightHeap) Swap(i, j int) { - (*h)[i], (*h)[j] = (*h)[j], (*h)[i] -} - -func (h *blockHeightHeap) Push(x any) { - *h = append(*h, x.(WorkflowRegistryEventResponse)) -} - -func (h *blockHeightHeap) Pop() any { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] - return x -} diff --git a/core/services/workflows/syncer/mocks/orm.go b/core/services/workflows/syncer/mocks/orm.go deleted file mode 100644 index 128100ea907..00000000000 --- a/core/services/workflows/syncer/mocks/orm.go +++ /dev/null @@ -1,666 +0,0 @@ -// Code generated by mockery v2.46.3. DO NOT EDIT. - -package mocks - -import ( - context "context" - - job "github.com/smartcontractkit/chainlink/v2/core/services/job" - mock "github.com/stretchr/testify/mock" -) - -// ORM is an autogenerated mock type for the ORM type -type ORM struct { - mock.Mock -} - -type ORM_Expecter struct { - mock *mock.Mock -} - -func (_m *ORM) EXPECT() *ORM_Expecter { - return &ORM_Expecter{mock: &_m.Mock} -} - -// Create provides a mock function with given fields: ctx, secretsURL, hash, contents -func (_m *ORM) Create(ctx context.Context, secretsURL string, hash string, contents string) (int64, error) { - ret := _m.Called(ctx, secretsURL, hash, contents) - - if len(ret) == 0 { - panic("no return value specified for Create") - } - - var r0 int64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string, string) (int64, error)); ok { - return rf(ctx, secretsURL, hash, contents) - } - if rf, ok := ret.Get(0).(func(context.Context, string, string, string) int64); ok { - r0 = rf(ctx, secretsURL, hash, contents) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(context.Context, string, string, string) error); ok { - r1 = rf(ctx, secretsURL, hash, contents) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_Create_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Create' -type ORM_Create_Call struct { - *mock.Call -} - -// Create is a helper method to define mock.On call -// - ctx context.Context -// - secretsURL string -// - hash string -// - contents string -func (_e *ORM_Expecter) Create(ctx interface{}, secretsURL interface{}, hash interface{}, contents interface{}) *ORM_Create_Call { - return &ORM_Create_Call{Call: _e.mock.On("Create", ctx, secretsURL, hash, contents)} -} - -func (_c *ORM_Create_Call) Run(run func(ctx context.Context, secretsURL string, hash string, contents string)) *ORM_Create_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string)) - }) - return _c -} - -func (_c *ORM_Create_Call) Return(_a0 int64, _a1 error) *ORM_Create_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_Create_Call) RunAndReturn(run func(context.Context, string, string, string) (int64, error)) *ORM_Create_Call { - _c.Call.Return(run) - return _c -} - -// DeleteWorkflowSpec provides a mock function with given fields: ctx, owner, name -func (_m *ORM) DeleteWorkflowSpec(ctx context.Context, owner string, name string) error { - ret := _m.Called(ctx, owner, name) - - if len(ret) == 0 { - panic("no return value specified for DeleteWorkflowSpec") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) error); ok { - r0 = rf(ctx, owner, name) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// ORM_DeleteWorkflowSpec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteWorkflowSpec' -type ORM_DeleteWorkflowSpec_Call struct { - *mock.Call -} - -// DeleteWorkflowSpec is a helper method to define mock.On call -// - ctx context.Context -// - owner string -// - name string -func (_e *ORM_Expecter) DeleteWorkflowSpec(ctx interface{}, owner interface{}, name interface{}) *ORM_DeleteWorkflowSpec_Call { - return &ORM_DeleteWorkflowSpec_Call{Call: _e.mock.On("DeleteWorkflowSpec", ctx, owner, name)} -} - -func (_c *ORM_DeleteWorkflowSpec_Call) Run(run func(ctx context.Context, owner string, name string)) *ORM_DeleteWorkflowSpec_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string)) - }) - return _c -} - -func (_c *ORM_DeleteWorkflowSpec_Call) Return(_a0 error) *ORM_DeleteWorkflowSpec_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *ORM_DeleteWorkflowSpec_Call) RunAndReturn(run func(context.Context, string, string) error) *ORM_DeleteWorkflowSpec_Call { - _c.Call.Return(run) - return _c -} - -// GetContents provides a mock function with given fields: ctx, url -func (_m *ORM) GetContents(ctx context.Context, url string) (string, error) { - ret := _m.Called(ctx, url) - - if len(ret) == 0 { - panic("no return value specified for GetContents") - } - - var r0 string - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string) (string, error)); ok { - return rf(ctx, url) - } - if rf, ok := ret.Get(0).(func(context.Context, string) string); ok { - r0 = rf(ctx, url) - } else { - r0 = ret.Get(0).(string) - } - - if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { - r1 = rf(ctx, url) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_GetContents_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetContents' -type ORM_GetContents_Call struct { - *mock.Call -} - -// GetContents is a helper method to define mock.On call -// - ctx context.Context -// - url string -func (_e *ORM_Expecter) GetContents(ctx interface{}, url interface{}) *ORM_GetContents_Call { - return &ORM_GetContents_Call{Call: _e.mock.On("GetContents", ctx, url)} -} - -func (_c *ORM_GetContents_Call) Run(run func(ctx context.Context, url string)) *ORM_GetContents_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *ORM_GetContents_Call) Return(_a0 string, _a1 error) *ORM_GetContents_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_GetContents_Call) RunAndReturn(run func(context.Context, string) (string, error)) *ORM_GetContents_Call { - _c.Call.Return(run) - return _c -} - -// GetContentsByHash provides a mock function with given fields: ctx, hash -func (_m *ORM) GetContentsByHash(ctx context.Context, hash string) (string, error) { - ret := _m.Called(ctx, hash) - - if len(ret) == 0 { - panic("no return value specified for GetContentsByHash") - } - - var r0 string - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string) (string, error)); ok { - return rf(ctx, hash) - } - if rf, ok := ret.Get(0).(func(context.Context, string) string); ok { - r0 = rf(ctx, hash) - } else { - r0 = ret.Get(0).(string) - } - - if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { - r1 = rf(ctx, hash) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_GetContentsByHash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetContentsByHash' -type ORM_GetContentsByHash_Call struct { - *mock.Call -} - -// GetContentsByHash is a helper method to define mock.On call -// - ctx context.Context -// - hash string -func (_e *ORM_Expecter) GetContentsByHash(ctx interface{}, hash interface{}) *ORM_GetContentsByHash_Call { - return &ORM_GetContentsByHash_Call{Call: _e.mock.On("GetContentsByHash", ctx, hash)} -} - -func (_c *ORM_GetContentsByHash_Call) Run(run func(ctx context.Context, hash string)) *ORM_GetContentsByHash_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *ORM_GetContentsByHash_Call) Return(_a0 string, _a1 error) *ORM_GetContentsByHash_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_GetContentsByHash_Call) RunAndReturn(run func(context.Context, string) (string, error)) *ORM_GetContentsByHash_Call { - _c.Call.Return(run) - return _c -} - -// GetSecretsURLByHash provides a mock function with given fields: ctx, hash -func (_m *ORM) GetSecretsURLByHash(ctx context.Context, hash string) (string, error) { - ret := _m.Called(ctx, hash) - - if len(ret) == 0 { - panic("no return value specified for GetSecretsURLByHash") - } - - var r0 string - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string) (string, error)); ok { - return rf(ctx, hash) - } - if rf, ok := ret.Get(0).(func(context.Context, string) string); ok { - r0 = rf(ctx, hash) - } else { - r0 = ret.Get(0).(string) - } - - if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { - r1 = rf(ctx, hash) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_GetSecretsURLByHash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSecretsURLByHash' -type ORM_GetSecretsURLByHash_Call struct { - *mock.Call -} - -// GetSecretsURLByHash is a helper method to define mock.On call -// - ctx context.Context -// - hash string -func (_e *ORM_Expecter) GetSecretsURLByHash(ctx interface{}, hash interface{}) *ORM_GetSecretsURLByHash_Call { - return &ORM_GetSecretsURLByHash_Call{Call: _e.mock.On("GetSecretsURLByHash", ctx, hash)} -} - -func (_c *ORM_GetSecretsURLByHash_Call) Run(run func(ctx context.Context, hash string)) *ORM_GetSecretsURLByHash_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *ORM_GetSecretsURLByHash_Call) Return(_a0 string, _a1 error) *ORM_GetSecretsURLByHash_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_GetSecretsURLByHash_Call) RunAndReturn(run func(context.Context, string) (string, error)) *ORM_GetSecretsURLByHash_Call { - _c.Call.Return(run) - return _c -} - -// GetSecretsURLByID provides a mock function with given fields: ctx, id -func (_m *ORM) GetSecretsURLByID(ctx context.Context, id int64) (string, error) { - ret := _m.Called(ctx, id) - - if len(ret) == 0 { - panic("no return value specified for GetSecretsURLByID") - } - - var r0 string - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64) (string, error)); ok { - return rf(ctx, id) - } - if rf, ok := ret.Get(0).(func(context.Context, int64) string); ok { - r0 = rf(ctx, id) - } else { - r0 = ret.Get(0).(string) - } - - if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { - r1 = rf(ctx, id) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_GetSecretsURLByID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSecretsURLByID' -type ORM_GetSecretsURLByID_Call struct { - *mock.Call -} - -// GetSecretsURLByID is a helper method to define mock.On call -// - ctx context.Context -// - id int64 -func (_e *ORM_Expecter) GetSecretsURLByID(ctx interface{}, id interface{}) *ORM_GetSecretsURLByID_Call { - return &ORM_GetSecretsURLByID_Call{Call: _e.mock.On("GetSecretsURLByID", ctx, id)} -} - -func (_c *ORM_GetSecretsURLByID_Call) Run(run func(ctx context.Context, id int64)) *ORM_GetSecretsURLByID_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64)) - }) - return _c -} - -func (_c *ORM_GetSecretsURLByID_Call) Return(_a0 string, _a1 error) *ORM_GetSecretsURLByID_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_GetSecretsURLByID_Call) RunAndReturn(run func(context.Context, int64) (string, error)) *ORM_GetSecretsURLByID_Call { - _c.Call.Return(run) - return _c -} - -// GetSecretsURLHash provides a mock function with given fields: owner, secretsURL -func (_m *ORM) GetSecretsURLHash(owner []byte, secretsURL []byte) ([]byte, error) { - ret := _m.Called(owner, secretsURL) - - if len(ret) == 0 { - panic("no return value specified for GetSecretsURLHash") - } - - var r0 []byte - var r1 error - if rf, ok := ret.Get(0).(func([]byte, []byte) ([]byte, error)); ok { - return rf(owner, secretsURL) - } - if rf, ok := ret.Get(0).(func([]byte, []byte) []byte); ok { - r0 = rf(owner, secretsURL) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]byte) - } - } - - if rf, ok := ret.Get(1).(func([]byte, []byte) error); ok { - r1 = rf(owner, secretsURL) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_GetSecretsURLHash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSecretsURLHash' -type ORM_GetSecretsURLHash_Call struct { - *mock.Call -} - -// GetSecretsURLHash is a helper method to define mock.On call -// - owner []byte -// - secretsURL []byte -func (_e *ORM_Expecter) GetSecretsURLHash(owner interface{}, secretsURL interface{}) *ORM_GetSecretsURLHash_Call { - return &ORM_GetSecretsURLHash_Call{Call: _e.mock.On("GetSecretsURLHash", owner, secretsURL)} -} - -func (_c *ORM_GetSecretsURLHash_Call) Run(run func(owner []byte, secretsURL []byte)) *ORM_GetSecretsURLHash_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].([]byte), args[1].([]byte)) - }) - return _c -} - -func (_c *ORM_GetSecretsURLHash_Call) Return(_a0 []byte, _a1 error) *ORM_GetSecretsURLHash_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_GetSecretsURLHash_Call) RunAndReturn(run func([]byte, []byte) ([]byte, error)) *ORM_GetSecretsURLHash_Call { - _c.Call.Return(run) - return _c -} - -// GetWorkflowSpec provides a mock function with given fields: ctx, owner, name -func (_m *ORM) GetWorkflowSpec(ctx context.Context, owner string, name string) (*job.WorkflowSpec, error) { - ret := _m.Called(ctx, owner, name) - - if len(ret) == 0 { - panic("no return value specified for GetWorkflowSpec") - } - - var r0 *job.WorkflowSpec - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) (*job.WorkflowSpec, error)); ok { - return rf(ctx, owner, name) - } - if rf, ok := ret.Get(0).(func(context.Context, string, string) *job.WorkflowSpec); ok { - r0 = rf(ctx, owner, name) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*job.WorkflowSpec) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { - r1 = rf(ctx, owner, name) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_GetWorkflowSpec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetWorkflowSpec' -type ORM_GetWorkflowSpec_Call struct { - *mock.Call -} - -// GetWorkflowSpec is a helper method to define mock.On call -// - ctx context.Context -// - owner string -// - name string -func (_e *ORM_Expecter) GetWorkflowSpec(ctx interface{}, owner interface{}, name interface{}) *ORM_GetWorkflowSpec_Call { - return &ORM_GetWorkflowSpec_Call{Call: _e.mock.On("GetWorkflowSpec", ctx, owner, name)} -} - -func (_c *ORM_GetWorkflowSpec_Call) Run(run func(ctx context.Context, owner string, name string)) *ORM_GetWorkflowSpec_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string)) - }) - return _c -} - -func (_c *ORM_GetWorkflowSpec_Call) Return(_a0 *job.WorkflowSpec, _a1 error) *ORM_GetWorkflowSpec_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_GetWorkflowSpec_Call) RunAndReturn(run func(context.Context, string, string) (*job.WorkflowSpec, error)) *ORM_GetWorkflowSpec_Call { - _c.Call.Return(run) - return _c -} - -// Update provides a mock function with given fields: ctx, secretsURL, contents -func (_m *ORM) Update(ctx context.Context, secretsURL string, contents string) (int64, error) { - ret := _m.Called(ctx, secretsURL, contents) - - if len(ret) == 0 { - panic("no return value specified for Update") - } - - var r0 int64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, string) (int64, error)); ok { - return rf(ctx, secretsURL, contents) - } - if rf, ok := ret.Get(0).(func(context.Context, string, string) int64); ok { - r0 = rf(ctx, secretsURL, contents) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { - r1 = rf(ctx, secretsURL, contents) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_Update_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Update' -type ORM_Update_Call struct { - *mock.Call -} - -// Update is a helper method to define mock.On call -// - ctx context.Context -// - secretsURL string -// - contents string -func (_e *ORM_Expecter) Update(ctx interface{}, secretsURL interface{}, contents interface{}) *ORM_Update_Call { - return &ORM_Update_Call{Call: _e.mock.On("Update", ctx, secretsURL, contents)} -} - -func (_c *ORM_Update_Call) Run(run func(ctx context.Context, secretsURL string, contents string)) *ORM_Update_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(string)) - }) - return _c -} - -func (_c *ORM_Update_Call) Return(_a0 int64, _a1 error) *ORM_Update_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_Update_Call) RunAndReturn(run func(context.Context, string, string) (int64, error)) *ORM_Update_Call { - _c.Call.Return(run) - return _c -} - -// UpsertWorkflowSpec provides a mock function with given fields: ctx, spec -func (_m *ORM) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { - ret := _m.Called(ctx, spec) - - if len(ret) == 0 { - panic("no return value specified for UpsertWorkflowSpec") - } - - var r0 int64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec) (int64, error)); ok { - return rf(ctx, spec) - } - if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec) int64); ok { - r0 = rf(ctx, spec) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(context.Context, *job.WorkflowSpec) error); ok { - r1 = rf(ctx, spec) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_UpsertWorkflowSpec_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertWorkflowSpec' -type ORM_UpsertWorkflowSpec_Call struct { - *mock.Call -} - -// UpsertWorkflowSpec is a helper method to define mock.On call -// - ctx context.Context -// - spec *job.WorkflowSpec -func (_e *ORM_Expecter) UpsertWorkflowSpec(ctx interface{}, spec interface{}) *ORM_UpsertWorkflowSpec_Call { - return &ORM_UpsertWorkflowSpec_Call{Call: _e.mock.On("UpsertWorkflowSpec", ctx, spec)} -} - -func (_c *ORM_UpsertWorkflowSpec_Call) Run(run func(ctx context.Context, spec *job.WorkflowSpec)) *ORM_UpsertWorkflowSpec_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*job.WorkflowSpec)) - }) - return _c -} - -func (_c *ORM_UpsertWorkflowSpec_Call) Return(_a0 int64, _a1 error) *ORM_UpsertWorkflowSpec_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_UpsertWorkflowSpec_Call) RunAndReturn(run func(context.Context, *job.WorkflowSpec) (int64, error)) *ORM_UpsertWorkflowSpec_Call { - _c.Call.Return(run) - return _c -} - -// UpsertWorkflowSpecWithSecrets provides a mock function with given fields: ctx, spec, url, hash, contents -func (_m *ORM) UpsertWorkflowSpecWithSecrets(ctx context.Context, spec *job.WorkflowSpec, url string, hash string, contents string) (int64, error) { - ret := _m.Called(ctx, spec, url, hash, contents) - - if len(ret) == 0 { - panic("no return value specified for UpsertWorkflowSpecWithSecrets") - } - - var r0 int64 - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec, string, string, string) (int64, error)); ok { - return rf(ctx, spec, url, hash, contents) - } - if rf, ok := ret.Get(0).(func(context.Context, *job.WorkflowSpec, string, string, string) int64); ok { - r0 = rf(ctx, spec, url, hash, contents) - } else { - r0 = ret.Get(0).(int64) - } - - if rf, ok := ret.Get(1).(func(context.Context, *job.WorkflowSpec, string, string, string) error); ok { - r1 = rf(ctx, spec, url, hash, contents) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// ORM_UpsertWorkflowSpecWithSecrets_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpsertWorkflowSpecWithSecrets' -type ORM_UpsertWorkflowSpecWithSecrets_Call struct { - *mock.Call -} - -// UpsertWorkflowSpecWithSecrets is a helper method to define mock.On call -// - ctx context.Context -// - spec *job.WorkflowSpec -// - url string -// - hash string -// - contents string -func (_e *ORM_Expecter) UpsertWorkflowSpecWithSecrets(ctx interface{}, spec interface{}, url interface{}, hash interface{}, contents interface{}) *ORM_UpsertWorkflowSpecWithSecrets_Call { - return &ORM_UpsertWorkflowSpecWithSecrets_Call{Call: _e.mock.On("UpsertWorkflowSpecWithSecrets", ctx, spec, url, hash, contents)} -} - -func (_c *ORM_UpsertWorkflowSpecWithSecrets_Call) Run(run func(ctx context.Context, spec *job.WorkflowSpec, url string, hash string, contents string)) *ORM_UpsertWorkflowSpecWithSecrets_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*job.WorkflowSpec), args[2].(string), args[3].(string), args[4].(string)) - }) - return _c -} - -func (_c *ORM_UpsertWorkflowSpecWithSecrets_Call) Return(_a0 int64, _a1 error) *ORM_UpsertWorkflowSpecWithSecrets_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *ORM_UpsertWorkflowSpecWithSecrets_Call) RunAndReturn(run func(context.Context, *job.WorkflowSpec, string, string, string) (int64, error)) *ORM_UpsertWorkflowSpecWithSecrets_Call { - _c.Call.Return(run) - return _c -} - -// NewORM creates a new instance of ORM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewORM(t interface { - mock.TestingT - Cleanup(func()) -}) *ORM { - mock := &ORM{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/core/services/workflows/syncer/orm.go b/core/services/workflows/syncer/orm.go deleted file mode 100644 index d1f2d55a3a1..00000000000 --- a/core/services/workflows/syncer/orm.go +++ /dev/null @@ -1,346 +0,0 @@ -package syncer - -import ( - "context" - "database/sql" - "fmt" - "time" - - "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/job" - "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" -) - -type WorkflowSecretsDS interface { - // GetSecretsURLByID returns the secrets URL for the given ID. - GetSecretsURLByID(ctx context.Context, id int64) (string, error) - - // GetSecretsURLByID returns the secrets URL for the given ID. - GetSecretsURLByHash(ctx context.Context, hash string) (string, error) - - // GetContents returns the contents of the secret at the given plain URL. - GetContents(ctx context.Context, url string) (string, error) - - // GetContentsByHash returns the contents of the secret at the given hashed URL. - GetContentsByHash(ctx context.Context, hash string) (string, error) - - // GetSecretsURLHash returns the keccak256 hash of the owner and secrets URL. - GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) - - // Update updates the contents of the secrets at the given plain URL or inserts a new record if not found. - Update(ctx context.Context, secretsURL, contents string) (int64, error) - - Create(ctx context.Context, secretsURL, hash, contents string) (int64, error) -} - -type WorkflowSpecsDS interface { - // UpsertWorkflowSpec inserts or updates a workflow spec. Updates on conflict of workflow name - // and owner - UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) - - // UpsertWorkflowSpecWithSecrets inserts or updates a workflow spec with secrets in a transaction. - // Updates on conflict of workflow name and owner. - UpsertWorkflowSpecWithSecrets(ctx context.Context, spec *job.WorkflowSpec, url, hash, contents string) (int64, error) - - // GetWorkflowSpec returns the workflow spec for the given owner and name. - GetWorkflowSpec(ctx context.Context, owner, name string) (*job.WorkflowSpec, error) - - // DeleteWorkflowSpec deletes the workflow spec for the given owner and name. - DeleteWorkflowSpec(ctx context.Context, owner, name string) error -} - -type ORM interface { - WorkflowSecretsDS - WorkflowSpecsDS -} - -type WorkflowRegistryDS = ORM - -type orm struct { - ds sqlutil.DataSource - lggr logger.Logger -} - -var _ WorkflowRegistryDS = (*orm)(nil) - -func NewWorkflowRegistryDS(ds sqlutil.DataSource, lggr logger.Logger) *orm { - return &orm{ - ds: ds, - lggr: lggr, - } -} - -func (orm *orm) GetSecretsURLByID(ctx context.Context, id int64) (string, error) { - var secretsURL string - err := orm.ds.GetContext(ctx, &secretsURL, - `SELECT secrets_url FROM workflow_secrets WHERE workflow_secrets.id = $1`, - id, - ) - - return secretsURL, err -} - -func (orm *orm) GetSecretsURLByHash(ctx context.Context, hash string) (string, error) { - var secretsURL string - err := orm.ds.GetContext(ctx, &secretsURL, - `SELECT secrets_url FROM workflow_secrets WHERE workflow_secrets.secrets_url_hash = $1`, - hash, - ) - - return secretsURL, err -} - -func (orm *orm) GetContentsByHash(ctx context.Context, hash string) (string, error) { - var contents string - err := orm.ds.GetContext(ctx, &contents, - `SELECT contents - FROM workflow_secrets - WHERE secrets_url_hash = $1`, - hash, - ) - - if err != nil { - return "", err // Return an empty Artifact struct and the error - } - - return contents, nil // Return the populated Artifact struct -} - -func (orm *orm) GetContents(ctx context.Context, url string) (string, error) { - var contents string - err := orm.ds.GetContext(ctx, &contents, - `SELECT contents - FROM workflow_secrets - WHERE secrets_url = $1`, - url, - ) - - if err != nil { - return "", err // Return an empty Artifact struct and the error - } - - return contents, nil // Return the populated Artifact struct -} - -// Update updates the secrets content at the given hash or inserts a new record if not found. -func (orm *orm) Update(ctx context.Context, hash, contents string) (int64, error) { - var id int64 - err := orm.ds.QueryRowxContext(ctx, - `INSERT INTO workflow_secrets (secrets_url_hash, contents) - VALUES ($1, $2) - ON CONFLICT (secrets_url_hash) DO UPDATE - SET secrets_url_hash = EXCLUDED.secrets_url_hash, contents = EXCLUDED.contents - RETURNING id`, - hash, contents, - ).Scan(&id) - - if err != nil { - return 0, err - } - - return id, nil -} - -// Update updates the secrets content at the given hash or inserts a new record if not found. -func (orm *orm) Create(ctx context.Context, url, hash, contents string) (int64, error) { - var id int64 - err := orm.ds.QueryRowxContext(ctx, - `INSERT INTO workflow_secrets (secrets_url, secrets_url_hash, contents) - VALUES ($1, $2, $3) - RETURNING id`, - url, hash, contents, - ).Scan(&id) - - if err != nil { - return 0, err - } - - return id, nil -} - -func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) { - return crypto.Keccak256(append(owner, secretsURL...)) -} - -func (orm *orm) UpsertWorkflowSpec(ctx context.Context, spec *job.WorkflowSpec) (int64, error) { - var id int64 - - query := ` - INSERT INTO workflow_specs ( - workflow, - config, - workflow_id, - workflow_owner, - workflow_name, - status, - binary_url, - config_url, - secrets_id, - created_at, - updated_at, - spec_type - ) VALUES ( - :workflow, - :config, - :workflow_id, - :workflow_owner, - :workflow_name, - :status, - :binary_url, - :config_url, - :secrets_id, - :created_at, - :updated_at, - :spec_type - ) ON CONFLICT (workflow_owner, workflow_name) DO UPDATE - SET - workflow = EXCLUDED.workflow, - config = EXCLUDED.config, - workflow_id = EXCLUDED.workflow_id, - workflow_owner = EXCLUDED.workflow_owner, - workflow_name = EXCLUDED.workflow_name, - status = EXCLUDED.status, - binary_url = EXCLUDED.binary_url, - config_url = EXCLUDED.config_url, - secrets_id = EXCLUDED.secrets_id, - created_at = EXCLUDED.created_at, - updated_at = EXCLUDED.updated_at, - spec_type = EXCLUDED.spec_type - RETURNING id - ` - - stmt, err := orm.ds.PrepareNamedContext(ctx, query) - if err != nil { - return 0, err - } - defer stmt.Close() - - spec.UpdatedAt = time.Now() - err = stmt.QueryRowxContext(ctx, spec).Scan(&id) - - if err != nil { - return 0, err - } - - return id, nil -} - -func (orm *orm) UpsertWorkflowSpecWithSecrets( - ctx context.Context, - spec *job.WorkflowSpec, url, hash, contents string) (int64, error) { - var id int64 - err := sqlutil.TransactDataSource(ctx, orm.ds, nil, func(tx sqlutil.DataSource) error { - var sid int64 - txErr := tx.QueryRowxContext(ctx, - `INSERT INTO workflow_secrets (secrets_url, secrets_url_hash, contents) - VALUES ($1, $2, $3) - ON CONFLICT (secrets_url_hash) DO UPDATE - SET - secrets_url_hash = EXCLUDED.secrets_url_hash, - contents = EXCLUDED.contents, - secrets_url = EXCLUDED.secrets_url - RETURNING id`, - url, hash, contents, - ).Scan(&sid) - - if txErr != nil { - return fmt.Errorf("failed to create workflow secrets: %w", txErr) - } - - spec.SecretsID = sql.NullInt64{Int64: sid, Valid: true} - - query := ` - INSERT INTO workflow_specs ( - workflow, - config, - workflow_id, - workflow_owner, - workflow_name, - status, - binary_url, - config_url, - secrets_id, - created_at, - updated_at, - spec_type - ) VALUES ( - :workflow, - :config, - :workflow_id, - :workflow_owner, - :workflow_name, - :status, - :binary_url, - :config_url, - :secrets_id, - :created_at, - :updated_at, - :spec_type - ) ON CONFLICT (workflow_owner, workflow_name) DO UPDATE - SET - workflow = EXCLUDED.workflow, - config = EXCLUDED.config, - workflow_id = EXCLUDED.workflow_id, - workflow_owner = EXCLUDED.workflow_owner, - workflow_name = EXCLUDED.workflow_name, - status = EXCLUDED.status, - binary_url = EXCLUDED.binary_url, - config_url = EXCLUDED.config_url, - secrets_id = EXCLUDED.secrets_id, - created_at = EXCLUDED.created_at, - updated_at = EXCLUDED.updated_at, - spec_type = EXCLUDED.spec_type - RETURNING id - ` - - stmt, txErr := tx.PrepareNamedContext(ctx, query) - if txErr != nil { - return txErr - } - defer stmt.Close() - - spec.UpdatedAt = time.Now() - return stmt.QueryRowxContext(ctx, spec).Scan(&id) - }) - return id, err -} - -func (orm *orm) GetWorkflowSpec(ctx context.Context, owner, name string) (*job.WorkflowSpec, error) { - query := ` - SELECT * - FROM workflow_specs - WHERE workflow_owner = $1 AND workflow_name = $2 - ` - - var spec job.WorkflowSpec - err := orm.ds.GetContext(ctx, &spec, query, owner, name) - if err != nil { - return nil, err - } - - return &spec, nil -} - -func (orm *orm) DeleteWorkflowSpec(ctx context.Context, owner, name string) error { - query := ` - DELETE FROM workflow_specs - WHERE workflow_owner = $1 AND workflow_name = $2 - ` - - result, err := orm.ds.ExecContext(ctx, query, owner, name) - if err != nil { - return err - } - - rowsAffected, err := result.RowsAffected() - if err != nil { - return err - } - - if rowsAffected == 0 { - return sql.ErrNoRows // No spec deleted - } - - return nil -} diff --git a/core/services/workflows/syncer/orm_test.go b/core/services/workflows/syncer/orm_test.go deleted file mode 100644 index 1be4e54f472..00000000000 --- a/core/services/workflows/syncer/orm_test.go +++ /dev/null @@ -1,198 +0,0 @@ -package syncer - -import ( - "database/sql" - "encoding/hex" - "testing" - "time" - - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/job" - "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func TestWorkflowArtifactsORM_GetAndUpdate(t *testing.T) { - db := pgtest.NewSqlxDB(t) - ctx := testutils.Context(t) - lggr := logger.TestLogger(t) - orm := &orm{ds: db, lggr: lggr} - - giveURL := "https://example.com" - giveBytes, err := crypto.Keccak256([]byte(giveURL)) - require.NoError(t, err) - giveHash := hex.EncodeToString(giveBytes) - giveContent := "some contents" - - gotID, err := orm.Create(ctx, giveURL, giveHash, giveContent) - require.NoError(t, err) - - url, err := orm.GetSecretsURLByID(ctx, gotID) - require.NoError(t, err) - assert.Equal(t, giveURL, url) - - contents, err := orm.GetContents(ctx, giveURL) - require.NoError(t, err) - assert.Equal(t, "some contents", contents) - - contents, err = orm.GetContentsByHash(ctx, giveHash) - require.NoError(t, err) - assert.Equal(t, "some contents", contents) - - _, err = orm.Update(ctx, giveHash, "new contents") - require.NoError(t, err) - - contents, err = orm.GetContents(ctx, giveURL) - require.NoError(t, err) - assert.Equal(t, "new contents", contents) - - contents, err = orm.GetContentsByHash(ctx, giveHash) - require.NoError(t, err) - assert.Equal(t, "new contents", contents) -} - -func Test_UpsertWorkflowSpec(t *testing.T) { - db := pgtest.NewSqlxDB(t) - ctx := testutils.Context(t) - lggr := logger.TestLogger(t) - orm := &orm{ds: db, lggr: lggr} - - t.Run("inserts new spec", func(t *testing.T) { - spec := &job.WorkflowSpec{ - Workflow: "test_workflow", - Config: "test_config", - WorkflowID: "cid-123", - WorkflowOwner: "owner-123", - WorkflowName: "Test Workflow", - Status: job.WorkflowSpecStatusActive, - BinaryURL: "http://example.com/binary", - ConfigURL: "http://example.com/config", - CreatedAt: time.Now(), - SpecType: job.WASMFile, - } - - _, err := orm.UpsertWorkflowSpec(ctx, spec) - require.NoError(t, err) - - // Verify the record exists in the database - var dbSpec job.WorkflowSpec - err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2`, spec.WorkflowOwner, spec.WorkflowName) - require.NoError(t, err) - require.Equal(t, spec.Workflow, dbSpec.Workflow) - }) - - t.Run("updates existing spec", func(t *testing.T) { - spec := &job.WorkflowSpec{ - Workflow: "test_workflow", - Config: "test_config", - WorkflowID: "cid-123", - WorkflowOwner: "owner-123", - WorkflowName: "Test Workflow", - Status: job.WorkflowSpecStatusActive, - BinaryURL: "http://example.com/binary", - ConfigURL: "http://example.com/config", - CreatedAt: time.Now(), - SpecType: job.WASMFile, - } - - _, err := orm.UpsertWorkflowSpec(ctx, spec) - require.NoError(t, err) - - // Update the status - spec.Status = job.WorkflowSpecStatusPaused - - _, err = orm.UpsertWorkflowSpec(ctx, spec) - require.NoError(t, err) - - // Verify the record is updated in the database - var dbSpec job.WorkflowSpec - err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE workflow_owner = $1 AND workflow_name = $2`, spec.WorkflowOwner, spec.WorkflowName) - require.NoError(t, err) - require.Equal(t, spec.Config, dbSpec.Config) - require.Equal(t, spec.Status, dbSpec.Status) - }) -} - -func Test_DeleteWorkflowSpec(t *testing.T) { - db := pgtest.NewSqlxDB(t) - ctx := testutils.Context(t) - lggr := logger.TestLogger(t) - orm := &orm{ds: db, lggr: lggr} - - t.Run("deletes a workflow spec", func(t *testing.T) { - spec := &job.WorkflowSpec{ - Workflow: "test_workflow", - Config: "test_config", - WorkflowID: "cid-123", - WorkflowOwner: "owner-123", - WorkflowName: "Test Workflow", - Status: job.WorkflowSpecStatusActive, - BinaryURL: "http://example.com/binary", - ConfigURL: "http://example.com/config", - CreatedAt: time.Now(), - SpecType: job.WASMFile, - } - - id, err := orm.UpsertWorkflowSpec(ctx, spec) - require.NoError(t, err) - require.NotZero(t, id) - - err = orm.DeleteWorkflowSpec(ctx, spec.WorkflowOwner, spec.WorkflowName) - require.NoError(t, err) - - // Verify the record is deleted from the database - var dbSpec job.WorkflowSpec - err = db.Get(&dbSpec, `SELECT * FROM workflow_specs WHERE id = $1`, id) - require.Error(t, err) - require.Equal(t, sql.ErrNoRows, err) - }) - - t.Run("fails if no workflow spec exists", func(t *testing.T) { - err := orm.DeleteWorkflowSpec(ctx, "owner-123", "Test Workflow") - require.Error(t, err) - require.Equal(t, sql.ErrNoRows, err) - }) -} - -func Test_GetWorkflowSpec(t *testing.T) { - db := pgtest.NewSqlxDB(t) - ctx := testutils.Context(t) - lggr := logger.TestLogger(t) - orm := &orm{ds: db, lggr: lggr} - - t.Run("gets a workflow spec", func(t *testing.T) { - spec := &job.WorkflowSpec{ - Workflow: "test_workflow", - Config: "test_config", - WorkflowID: "cid-123", - WorkflowOwner: "owner-123", - WorkflowName: "Test Workflow", - Status: job.WorkflowSpecStatusActive, - BinaryURL: "http://example.com/binary", - ConfigURL: "http://example.com/config", - CreatedAt: time.Now(), - SpecType: job.WASMFile, - } - - id, err := orm.UpsertWorkflowSpec(ctx, spec) - require.NoError(t, err) - require.NotZero(t, id) - - dbSpec, err := orm.GetWorkflowSpec(ctx, spec.WorkflowOwner, spec.WorkflowName) - require.NoError(t, err) - require.Equal(t, spec.Workflow, dbSpec.Workflow) - - err = orm.DeleteWorkflowSpec(ctx, spec.WorkflowOwner, spec.WorkflowName) - require.NoError(t, err) - }) - - t.Run("fails if no workflow spec exists", func(t *testing.T) { - dbSpec, err := orm.GetWorkflowSpec(ctx, "owner-123", "Test Workflow") - require.Error(t, err) - require.Nil(t, dbSpec) - }) -} diff --git a/core/services/workflows/syncer/por-read-chain.wasm.br b/core/services/workflows/syncer/por-read-chain.wasm.br new file mode 100644 index 00000000000..8882d7c7381 Binary files /dev/null and b/core/services/workflows/syncer/por-read-chain.wasm.br differ diff --git a/core/services/workflows/syncer/workflow.wasm b/core/services/workflows/syncer/workflow.wasm new file mode 100755 index 00000000000..afe597a71bc Binary files /dev/null and b/core/services/workflows/syncer/workflow.wasm differ diff --git a/core/services/workflows/syncer/workflow.wasm.br b/core/services/workflows/syncer/workflow.wasm.br new file mode 100755 index 00000000000..1783b108114 Binary files /dev/null and b/core/services/workflows/syncer/workflow.wasm.br differ diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index cdd0c71acc0..985913776ce 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -2,590 +2,168 @@ package syncer import ( "context" - "encoding/hex" - "encoding/json" - "errors" - "fmt" - "strconv" + _ "embed" "sync" "time" - "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services" - types "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/types/core" - query "github.com/smartcontractkit/chainlink-common/pkg/types/query" - "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" - "github.com/smartcontractkit/chainlink-common/pkg/values" - "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" + "github.com/smartcontractkit/chainlink/v2/core/logger" - evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) -const name = "WorkflowRegistrySyncer" +const ( + // Compute Fetch Workflow + workflowID = "924eef66516e5387b6e8ab8cc544685dfe50dfc837886f22beecebced5063968" + workflowOwner = "00000000000000000000000000000000000000ab" + workflowName = "trueusdpor" -var ( - defaultTickInterval = 12 * time.Second - ContractName = "WorkflowRegistry" + // Chain Read Workflow + workflow2ID = "00000066516e5387b6e8ab8cc544685dfe50dfc837886f22beecebced5063968" + workflow2Owner = "00000000000000000000000000000000000000ab" + workflow2Name = "ethsepopor" ) -type Head struct { - Hash string - Height string - Timestamp uint64 -} - -// WorkflowRegistryEvent is an event emitted by the WorkflowRegistry. Each event is typed -// so that the consumer can determine how to handle the event. -type WorkflowRegistryEvent struct { - Cursor string - Data any - EventType WorkflowRegistryEventType - Head Head -} - -// WorkflowRegistryEventResponse is a response to either parsing a queried event or handling the event. -type WorkflowRegistryEventResponse struct { - Err error - Event *WorkflowRegistryEvent -} - -// ContractEventPollerConfig is the configuration needed to poll for events on a contract. Currently -// requires the ContractEventName. -// -// TODO(mstreet3): Use LookbackBlocks instead of StartBlockNum -type ContractEventPollerConfig struct { - ContractName string - ContractAddress string - StartBlockNum uint64 - QueryCount uint64 -} - -// FetcherFunc is an abstraction for fetching the contents stored at a URL. -type FetcherFunc func(ctx context.Context, url string) ([]byte, error) +var ( + // Compute Fetch Workflow + //go:embed config.yaml + config []byte -type ContractReaderFactory interface { - NewContractReader(context.Context, []byte) (types.ContractReader, error) -} + //go:embed workflow.wasm.br + workflow []byte -// ContractReader is a subset of types.ContractReader defined locally to enable mocking. -type ContractReader interface { - Bind(context.Context, []types.BoundContract) error - QueryKey(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error) -} + // Chain Read Workflow + //go:embed config.yaml + config2 []byte -// WorkflowRegistrySyncer is the public interface of the package. -type WorkflowRegistrySyncer interface { - services.Service -} - -var _ WorkflowRegistrySyncer = (*workflowRegistry)(nil) + //go:embed por-read-chain.wasm.br + workflow2 []byte +) -// workflowRegistry is the implementation of the WorkflowRegistrySyncer interface. -type workflowRegistry struct { +type WorkflowRegistry struct { services.StateMachine - - // close stopCh to stop the workflowRegistry. - stopCh services.StopChan - - // all goroutines are waited on with wg. - wg sync.WaitGroup - - // ticker is the interval at which the workflowRegistry will poll the contract for events. - ticker <-chan time.Time - - lggr logger.Logger - emitter custmsg.Labeler - orm WorkflowRegistryDS - reader ContractReader - gateway FetcherFunc - - // initReader allows the workflowRegistry to initialize a contract reader if one is not provided - // and separates the contract reader initialization from the workflowRegistry start up. - initReader func(context.Context, logger.Logger, ContractReaderFactory, types.BoundContract) (types.ContractReader, error) - relayer ContractReaderFactory - - cfg ContractEventPollerConfig - eventTypes []WorkflowRegistryEventType - - // eventsCh is read by the handler and each event is handled once received. - eventsCh chan WorkflowRegistryEventResponse - handler *eventHandler - - // batchCh is a channel that receives batches of events from the contract query goroutines. - batchCh chan []WorkflowRegistryEventResponse - - // heap is a min heap that merges batches of events from the contract query goroutines. The - // default min heap is sorted by block height. - heap Heap - - workflowStore store.Store - capRegistry core.CapabilitiesRegistry - engineRegistry *engineRegistry + wg sync.WaitGroup + Logger logger.Logger + Registry core.CapabilitiesRegistry + Store store.Store + DS sqlutil.DataSource + subServices []job.ServiceCtx } -// WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful -// for overriding the default tick interval. -func WithTicker(ticker <-chan time.Time) func(*workflowRegistry) { - return func(wr *workflowRegistry) { - wr.ticker = ticker - } -} - -func WithReader(reader types.ContractReader) func(*workflowRegistry) { - return func(wr *workflowRegistry) { - wr.reader = reader - } -} - -// NewWorkflowRegistry returns a new workflowRegistry. -// Only queries for WorkflowRegistryForceUpdateSecretsRequestedV1 events. -func NewWorkflowRegistry[T ContractReader]( - lggr logger.Logger, - orm WorkflowRegistryDS, - reader T, - gateway FetcherFunc, - addr string, - workflowStore store.Store, - capRegistry core.CapabilitiesRegistry, - emitter custmsg.Labeler, - opts ...func(*workflowRegistry), -) *workflowRegistry { - ets := []WorkflowRegistryEventType{ForceUpdateSecretsEvent} - wr := &workflowRegistry{ - lggr: lggr.Named(name), - emitter: emitter, - orm: orm, - reader: reader, - gateway: gateway, - workflowStore: workflowStore, - capRegistry: capRegistry, - engineRegistry: newEngineRegistry(), - cfg: ContractEventPollerConfig{ - ContractName: ContractName, - ContractAddress: addr, - QueryCount: 20, - StartBlockNum: 0, - }, - initReader: newReader, - heap: newBlockHeightHeap(), - stopCh: make(services.StopChan), - eventTypes: ets, - eventsCh: make(chan WorkflowRegistryEventResponse), - batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), - } - wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway, wr.workflowStore, wr.capRegistry, - wr.engineRegistry, wr.emitter, secretsFetcherFunc(wr.SecretsFor), - ) - for _, opt := range opts { - opt(wr) - } - return wr -} +func (w *WorkflowRegistry) Start(ctx context.Context) error { + go func() { + timeout := time.After(5 * time.Minute) + ticker := time.NewTicker(10 * time.Second) -// Start starts the workflowRegistry. It starts two goroutines, one for querying the contract -// and one for handling the events. -func (w *workflowRegistry) Start(_ context.Context) error { - return w.StartOnce(w.Name(), func() error { - ctx, cancel := w.stopCh.NewCtx() - - w.wg.Add(1) - go func() { - defer w.wg.Done() - defer cancel() - - w.syncEventsLoop(ctx) - }() - - w.wg.Add(1) - go func() { - defer w.wg.Done() - defer cancel() - - w.handlerLoop(ctx) - }() - - return nil - }) -} - -func (w *workflowRegistry) Close() error { - return w.StopOnce(w.Name(), func() error { - close(w.stopCh) - w.wg.Wait() - return nil - }) -} - -func (w *workflowRegistry) Ready() error { - return nil -} - -func (w *workflowRegistry) HealthReport() map[string]error { - return nil -} - -func (w *workflowRegistry) Name() string { - return name -} - -func (w *workflowRegistry) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) { - return nil, errors.New("not implemented") -} - -// handlerLoop handles the events that are emitted by the contract. -func (w *workflowRegistry) handlerLoop(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case resp, open := <-w.eventsCh: - if !open { - return - } - - if resp.Err != nil || resp.Event == nil { - w.lggr.Errorf("failed to handle event: %+v", resp.Err) - continue - } - - event := resp.Event - w.lggr.Debugf("handling event: %+v", event) - if err := w.handler.Handle(ctx, *event); err != nil { - w.lggr.Errorf("failed to handle event: %+v", event) - continue - } - } - } -} - -// syncEventsLoop polls the contract for events and passes them to a channel for handling. -func (w *workflowRegistry) syncEventsLoop(ctx context.Context) { - var ( - // sendLog is a helper that sends a WorkflowRegistryEventResponse to the eventsCh in a - // blocking way that will send the response or be canceled. - sendLog = func(resp WorkflowRegistryEventResponse) { + for { select { - case w.eventsCh <- resp: - case <-ctx.Done(): - } - } - - ticker = w.getTicker() - - signals = make(map[WorkflowRegistryEventType]chan struct{}, 0) - ) - - // critical failure if there is no reader, the loop will exit and the parent context will be - // canceled. - reader, err := w.getContractReader(ctx) - if err != nil { - w.lggr.Criticalf("contract reader unavailable : %s", err) - return - } - - // fan out and query for each event type - for i := 0; i < len(w.eventTypes); i++ { - signal := make(chan struct{}, 1) - signals[w.eventTypes[i]] = signal - w.wg.Add(1) - go func() { - defer w.wg.Done() - - queryEvent( - ctx, - signal, - w.lggr, - reader, - w.cfg, - w.eventTypes[i], - w.batchCh, - ) - }() - } - - // Periodically send a signal to all the queryEvent goroutines to query the contract - for { - select { - case <-ctx.Done(): - return - case <-ticker: - // for each event type, send a signal for it to execute a query and produce a new - // batch of event logs - for i := 0; i < len(w.eventTypes); i++ { - signal := signals[w.eventTypes[i]] - select { - case signal <- struct{}{}: - case <-ctx.Done(): + case <-timeout: + w.Logger.Info("timed out setting up hardcoded workflow") + return + case <-ticker.C: + success1 := w.trySetup(workflowID, workflowName, workflowOwner, workflow, config) + success2 := w.trySetup(workflow2ID, workflow2Name, workflow2Owner, workflow2, config2) + if success1 && success2 { return } } - - // block on fan-in until all fetched event logs are sent to the handlers - w.orderAndSend( - ctx, - len(w.eventTypes), - w.batchCh, - sendLog, - ) - } - } -} - -// orderAndSend reads n batches from the batch channel, heapifies all the batches then dequeues -// the min heap via the sendLog function. -func (w *workflowRegistry) orderAndSend( - ctx context.Context, - batchCount int, - batchCh <-chan []WorkflowRegistryEventResponse, - sendLog func(WorkflowRegistryEventResponse), -) { - for { - select { - case <-ctx.Done(): - return - case batch := <-batchCh: - for _, response := range batch { - w.heap.Push(response) - } - batchCount-- - - // If we have received responses for all the events, then we can drain the heap. - if batchCount == 0 { - for w.heap.Len() > 0 { - sendLog(w.heap.Pop()) - } - return - } } - } -} - -// getTicker returns the ticker that the workflowRegistry will use to poll for events. If the ticker -// is nil, then a default ticker is returned. -func (w *workflowRegistry) getTicker() <-chan time.Time { - if w.ticker == nil { - return time.NewTicker(defaultTickInterval).C - } - - return w.ticker -} - -// getContractReader initializes a contract reader if needed, otherwise returns the existing -// reader. -func (w *workflowRegistry) getContractReader(ctx context.Context) (ContractReader, error) { - c := types.BoundContract{ - Name: w.cfg.ContractName, - Address: w.cfg.ContractAddress, - } - - if w.reader == nil { - reader, err := w.initReader(ctx, w.lggr, w.relayer, c) - if err != nil { - return nil, err - } - - w.reader = reader - } - - return w.reader, nil + }() + return nil } -// queryEvent queries the contract for events of the given type on each tick from the ticker. -// Sends a batch of event logs to the batch channel. The batch represents all the -// event logs read since the last query. Loops until the context is canceled. -func queryEvent( - ctx context.Context, - ticker <-chan struct{}, - lggr logger.Logger, - reader ContractReader, - cfg ContractEventPollerConfig, - et WorkflowRegistryEventType, - batchCh chan<- []WorkflowRegistryEventResponse, -) { - // create query - var ( - responseBatch []WorkflowRegistryEventResponse - logData values.Value - cursor = "" - limitAndSort = query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: cfg.QueryCount}, - } - bc = types.BoundContract{ - Name: cfg.ContractName, - Address: cfg.ContractAddress, - } - ) - - // Loop until canceled - for { - select { - case <-ctx.Done(): - return - case <-ticker: - if cursor != "" { - limitAndSort.Limit = query.CursorLimit(cursor, query.CursorFollowing, cfg.QueryCount) - } - - logs, err := reader.QueryKey( - ctx, - bc, - query.KeyFilter{ - Key: string(et), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block(strconv.FormatUint(cfg.StartBlockNum, 10), primitives.Gte), - }, - }, - limitAndSort, - &logData, - ) - - if err != nil { - lggr.Errorw("QueryKey failure", "err", err) - continue - } - - // ChainReader QueryKey API provides logs including the cursor value and not - // after the cursor value. If the response only consists of the log corresponding - // to the cursor and no log after it, then we understand that there are no new - // logs - if len(logs) == 1 && logs[0].Cursor == cursor { - lggr.Infow("No new logs since", "cursor", cursor) - continue - } - - for _, log := range logs { - if log.Cursor == cursor { - continue - } +func (w *WorkflowRegistry) trySetup(id, name, owner string, binary, config []byte) bool { + ctx := context.Background() + w.Logger.Info("starting hardcoded workflow...") - responseBatch = append(responseBatch, toWorkflowRegistryEventResponse(log, et, lggr)) - cursor = log.Cursor - } - batchCh <- responseBatch - } + // HACK: don't load the workflow if we aren't a workflow node. + _, err := w.Registry.Get(ctx, "offchain_reporting@1.0.0") + if err != nil { + w.Logger.Info("not a workflow node, skipping hardcoded workflow") + return false } -} -func newReader( - ctx context.Context, - lggr logger.Logger, - factory ContractReaderFactory, - bc types.BoundContract, -) (types.ContractReader, error) { - contractReaderCfg := evmtypes.ChainReaderConfig{ - Contracts: map[string]evmtypes.ChainContractReader{ - ContractName: { - ContractPollingFilter: evmtypes.ContractPollingFilter{ - GenericEventNames: []string{string(ForceUpdateSecretsEvent)}, - }, - ContractABI: workflow_registry_wrapper.WorkflowRegistryABI, - Configs: map[string]*evmtypes.ChainReaderDefinition{ - string(ForceUpdateSecretsEvent): { - ChainSpecificName: string(ForceUpdateSecretsEvent), - ReadType: evmtypes.Event, - }, - }, - }, - }, + jb := job.WorkflowSpec{ + Workflow: "a string", + Config: "a config", + WorkflowID: id, + WorkflowName: name, + WorkflowOwner: owner, } - - marshalledCfg, err := json.Marshal(contractReaderCfg) + sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at, spec_type, config) + VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW(), :spec_type, :config) + RETURNING id;` + _, err = w.DS.NamedExecContext(ctx, sql, jb) if err != nil { - return nil, err + w.Logger.Info("failed to create entry: %w", err) } - reader, err := factory.NewContractReader(ctx, marshalledCfg) + moduleConfig := &host.ModuleConfig{Logger: logger.NullLogger} + spec, err := host.GetWorkflowSpec(ctx, moduleConfig, binary, config) if err != nil { - return nil, err - } - - // bind contract to contract reader - if err := reader.Bind(ctx, []types.BoundContract{bc}); err != nil { - return nil, err - } - - return reader, nil -} - -// toWorkflowRegistryEventResponse converts a types.Sequence to a WorkflowRegistryEventResponse. -func toWorkflowRegistryEventResponse( - log types.Sequence, - evt WorkflowRegistryEventType, - lggr logger.Logger, -) WorkflowRegistryEventResponse { - resp := WorkflowRegistryEventResponse{ - Event: &WorkflowRegistryEvent{ - Cursor: log.Cursor, - EventType: evt, - Head: Head{ - Hash: hex.EncodeToString(log.Hash), - Height: log.Height, - Timestamp: log.Timestamp, - }, - }, + w.Logger.Errorf("failed to get workflow spec", err) + return false + } + + cfg := workflows.Config{ + Lggr: w.Logger, + Workflow: *spec, + WorkflowID: id, + WorkflowOwner: owner, + WorkflowName: name, + Registry: w.Registry, + Store: w.Store, + Config: config, + Binary: binary, + SecretsFetcher: w, + } + engine, err := workflows.NewEngine(ctx, cfg) + if err != nil { + w.Logger.Errorf("failed to create engine: %w", err) + return false } - - dataAsValuesMap, err := values.WrapMap(log.Data) + err = engine.Start(ctx) if err != nil { - return WorkflowRegistryEventResponse{ - Err: err, - } + w.Logger.Errorf("failed to start hardcoded workflow: %w", err) + return false } + w.subServices = []job.ServiceCtx{engine} + return true +} - switch evt { - case ForceUpdateSecretsEvent: - var data WorkflowRegistryForceUpdateSecretsRequestedV1 - if err := dataAsValuesMap.UnwrapTo(&data); err != nil { - lggr.Errorf("failed to unwrap data: %+v", log.Data) - resp.Event = nil - resp.Err = err - return resp +func (w *WorkflowRegistry) Close() error { + for _, s := range w.subServices { + err := s.Close() + if err != nil { + w.Logger.Errorf("could not close hardcoded engine: %w", err) } - resp.Event.Data = data - default: - lggr.Errorf("unknown event type: %s", evt) - resp.Event = nil - resp.Err = fmt.Errorf("unknown event type: %s", evt) } - return resp -} - -type nullWorkflowRegistrySyncer struct { - services.Service -} - -func NewNullWorkflowRegistrySyncer() *nullWorkflowRegistrySyncer { - return &nullWorkflowRegistrySyncer{} -} - -// Start -func (u *nullWorkflowRegistrySyncer) Start(context.Context) error { return nil } -// Close -func (u *nullWorkflowRegistrySyncer) Close() error { +func (w *WorkflowRegistry) Ready() error { return nil } -// SecretsFor -func (u *nullWorkflowRegistrySyncer) SecretsFor(context.Context, string, string) (map[string]string, error) { - return nil, nil +func (w *WorkflowRegistry) HealthReport() map[string]error { + return nil } -func (u *nullWorkflowRegistrySyncer) Ready() error { - return nil +func (w *WorkflowRegistry) Name() string { + return "WorkflowRegistrySyncer" } -func (u *nullWorkflowRegistrySyncer) HealthReport() map[string]error { - return nil +func (w *WorkflowRegistry) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) { + // TODO: actually get this from the right place. + return map[string]string{}, nil } -func (u *nullWorkflowRegistrySyncer) Name() string { - return "Null" + name +func NewWorkflowRegistry() *WorkflowRegistry { + return &WorkflowRegistry{} } diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go deleted file mode 100644 index 58dcbed1022..00000000000 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ /dev/null @@ -1,103 +0,0 @@ -package syncer - -import ( - "context" - "encoding/hex" - "strconv" - "testing" - "time" - - "github.com/smartcontractkit/chainlink-common/pkg/custmsg" - "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" - types "github.com/smartcontractkit/chainlink-common/pkg/types" - query "github.com/smartcontractkit/chainlink-common/pkg/types/query" - "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" - "github.com/smartcontractkit/chainlink-common/pkg/values" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" - "github.com/smartcontractkit/chainlink/v2/core/utils/matches" - - "github.com/stretchr/testify/require" -) - -func Test_Workflow_Registry_Syncer(t *testing.T) { - var ( - giveContents = "contents" - wantContents = "updated contents" - giveCfg = ContractEventPollerConfig{ - ContractName: ContractName, - ContractAddress: "0xdeadbeef", - StartBlockNum: 0, - QueryCount: 20, - } - giveURL = "http://example.com" - giveHash, err = crypto.Keccak256([]byte(giveURL)) - - giveLog = types.Sequence{ - Data: map[string]any{ - "SecretsURLHash": giveHash, - "Owner": "0xowneraddr", - }, - Cursor: "cursor", - } - ) - - require.NoError(t, err) - - var ( - lggr = logger.TestLogger(t) - db = pgtest.NewSqlxDB(t) - orm = &orm{ds: db, lggr: lggr} - ctx, cancel = context.WithCancel(testutils.Context(t)) - reader = NewMockContractReader(t) - emitter = custmsg.NewLabeler() - gateway = func(_ context.Context, _ string) ([]byte, error) { - return []byte(wantContents), nil - } - ticker = make(chan time.Time) - worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, nil, nil, emitter, WithTicker(ticker)) - ) - - // Cleanup the worker - defer cancel() - - // Seed the DB with an original entry - _, err = orm.Create(ctx, giveURL, hex.EncodeToString(giveHash), giveContents) - require.NoError(t, err) - - // Mock out the contract reader query - reader.EXPECT().QueryKey( - matches.AnyContext, - types.BoundContract{ - Name: giveCfg.ContractName, - Address: giveCfg.ContractAddress, - }, - query.KeyFilter{ - Key: string(ForceUpdateSecretsEvent), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block(strconv.FormatUint(giveCfg.StartBlockNum, 10), primitives.Gte), - }, - }, - query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: giveCfg.QueryCount}, - }, - new(values.Value), - ).Return([]types.Sequence{giveLog}, nil) - - // Go run the worker - servicetest.Run(t, worker) - - // Send a tick to start a query - ticker <- time.Now() - - // Require the secrets contents to eventually be updated - require.Eventually(t, func() bool { - secrets, err := orm.GetContents(ctx, giveURL) - require.NoError(t, err) - return secrets == wantContents - }, 5*time.Second, time.Second) -} diff --git a/core/store/migrate/migrations/0259_add_workflow_secrets.sql b/core/store/migrate/migrations/0259_add_workflow_secrets.sql deleted file mode 100644 index fb76d945571..00000000000 --- a/core/store/migrate/migrations/0259_add_workflow_secrets.sql +++ /dev/null @@ -1,41 +0,0 @@ --- +goose Up --- +goose StatementBegin --- Create the workflow_artifacts table -CREATE TABLE workflow_secrets ( - id SERIAL PRIMARY KEY, - secrets_url TEXT, - secrets_url_hash TEXT UNIQUE, - contents TEXT -); - --- Create an index on the secrets_url_hash column -CREATE INDEX idx_secrets_url ON workflow_secrets(secrets_url); - --- Alter the workflow_specs table -ALTER TABLE workflow_specs -ADD COLUMN binary_url TEXT DEFAULT '', -ADD COLUMN config_url TEXT DEFAULT '', -ADD COLUMN secrets_id INT UNIQUE REFERENCES workflow_secrets(id) ON DELETE CASCADE; - --- Alter the config column type -ALTER TABLE workflow_specs -ALTER COLUMN config TYPE TEXT; --- +goose StatementEnd - --- +goose Down --- +goose StatementBegin -ALTER TABLE workflow_specs -DROP COLUMN IF EXISTS secrets_id, -DROP COLUMN IF EXISTS config_url, -DROP COLUMN IF EXISTS binary_url; - --- Change the config column back to character varying(255) -ALTER TABLE workflow_specs -ALTER COLUMN config TYPE CHARACTER VARYING(255); - --- Drop the index on the secrets_url_hash column -DROP INDEX IF EXISTS idx_secrets_url_hash; - --- Drop the workflow_artifacts table -DROP TABLE IF EXISTS workflow_secrets; --- +goose StatementEnd \ No newline at end of file diff --git a/core/store/migrate/migrations/0260_add_status_workflow_spec.sql b/core/store/migrate/migrations/0260_add_status_workflow_spec.sql deleted file mode 100644 index 66c38eef2f7..00000000000 --- a/core/store/migrate/migrations/0260_add_status_workflow_spec.sql +++ /dev/null @@ -1,9 +0,0 @@ --- +goose Up --- Add a `status` column to the `workflow_specs` table. -ALTER TABLE workflow_specs -ADD COLUMN status TEXT DEFAULT '' NOT NULL; - --- +goose Down --- Remove the `status` column from the `workflow_specs` table. -ALTER TABLE workflow_specs -DROP COLUMN status; \ No newline at end of file diff --git a/core/utils/crypto/keccak_256.go b/core/utils/crypto/keccak_256.go deleted file mode 100644 index b6218d72cf0..00000000000 --- a/core/utils/crypto/keccak_256.go +++ /dev/null @@ -1,16 +0,0 @@ -package crypto - -import ( - "golang.org/x/crypto/sha3" -) - -func Keccak256(input []byte) ([]byte, error) { - // Create a Keccak-256 hash - hash := sha3.NewLegacyKeccak256() - _, err := hash.Write(input) - if err != nil { - return nil, err - } - - return hash.Sum(nil), nil -} diff --git a/core/utils/matches/matches.go b/core/utils/matches/matches.go deleted file mode 100644 index 90606af57e2..00000000000 --- a/core/utils/matches/matches.go +++ /dev/null @@ -1,21 +0,0 @@ -package matches - -import ( - "context" - - "github.com/stretchr/testify/mock" -) - -func anyContext(_ context.Context) bool { - return true -} - -func anyString(_ string) bool { - return true -} - -// AnyContext is an argument matcher that matches any argument of type context.Context. -var AnyContext = mock.MatchedBy(anyContext) - -// AnyString is an argument matcher that matches any argument of type string. -var AnyString = mock.MatchedBy(anyString) diff --git a/plugins/stdcap/amd64/cron b/plugins/stdcap/amd64/cron new file mode 100755 index 00000000000..50d0fa6e0ac Binary files /dev/null and b/plugins/stdcap/amd64/cron differ diff --git a/plugins/stdcap/amd64/kvstore b/plugins/stdcap/amd64/kvstore new file mode 100755 index 00000000000..35999c95f35 Binary files /dev/null and b/plugins/stdcap/amd64/kvstore differ diff --git a/plugins/stdcap/amd64/readcontract b/plugins/stdcap/amd64/readcontract new file mode 100755 index 00000000000..0c275ed7279 Binary files /dev/null and b/plugins/stdcap/amd64/readcontract differ diff --git a/plugins/stdcap/arm64/cron b/plugins/stdcap/arm64/cron new file mode 100755 index 00000000000..837699ff2ab Binary files /dev/null and b/plugins/stdcap/arm64/cron differ diff --git a/plugins/stdcap/arm64/kvstore b/plugins/stdcap/arm64/kvstore new file mode 100755 index 00000000000..ed6e7371cce Binary files /dev/null and b/plugins/stdcap/arm64/kvstore differ diff --git a/plugins/stdcap/arm64/readcontract b/plugins/stdcap/arm64/readcontract new file mode 100755 index 00000000000..4fc489a729e Binary files /dev/null and b/plugins/stdcap/arm64/readcontract differ diff --git a/plugins/stdcap/cll b/plugins/stdcap/cll new file mode 100755 index 00000000000..9f295d316f0 Binary files /dev/null and b/plugins/stdcap/cll differ diff --git a/tools/bin/goreleaser_utils b/tools/bin/goreleaser_utils index 52e37cefd51..e897378a1b1 100755 --- a/tools/bin/goreleaser_utils +++ b/tools/bin/goreleaser_utils @@ -14,6 +14,8 @@ before_hook() { install_remote_plugins mkdir -p "$lib_path/plugins" + mkdir -p "$lib_path/workflows" + # Retrieve GOPATH GOPATH=$(go env GOPATH) GOARCH=$(go env GOARCH) @@ -45,6 +47,14 @@ before_hook() { done fi + cp "$(go env GOPATH)"/bin/chainlink* "$lib_path/plugins" + + # Copy standard capability binaries + cp "$(pwd)"/plugins/stdcap/"$(go env GOARCH)"/* "$lib_path/plugins" + + # Copy workflow compute WASM artifacts + cp "$(pwd)"/workflow.wasm "$lib_path/workflows" + cp "$(pwd)"/config.yaml "$lib_path/workflows" } install_local_plugins() { @@ -84,8 +94,10 @@ build_post_hook() { local -r dist_path=$1 local -r plugin_src_path=./tmp/plugins local -r wasmvm_lib_path=./tmp/libs + local -r workflows_path=./tmp/workflows local -r lib_dest_path=$dist_path/libs local -r plugin_dest_path=$dist_path/plugins + local -r workflows_dest_path=$dist_path/workflows # COPY NATIVE LIBRARIES HERE mkdir -p "$lib_dest_path" @@ -94,6 +106,10 @@ build_post_hook() { # COPY PLUGINS HERE mkdir -p "$plugin_dest_path" cp -r "$plugin_src_path/." "$plugin_dest_path" + + # COPY WORKFLOW ARTIFACTS HERE + mkdir -p "$workflows_dest_path" + cp -r "$workflows_path/." "$workflows_dest_path" } "$@" diff --git a/workflow.wasm b/workflow.wasm new file mode 100755 index 00000000000..bb1c8c2a8ae Binary files /dev/null and b/workflow.wasm differ