diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index ba29e98526e..570d6d0ad91 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -5,11 +5,13 @@ import ( "crypto/rand" "encoding/hex" "encoding/json" + "fmt" "testing" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/assert" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" @@ -26,6 +28,111 @@ import ( "github.com/stretchr/testify/require" ) +type testEvtHandler struct { + events []syncer.Event +} + +func (m *testEvtHandler) Handle(ctx context.Context, event syncer.Event) error { + m.events = append(m.events, event) + return nil +} + +func newTestEvtHandler() *testEvtHandler { + return &testEvtHandler{ + events: make([]syncer.Event, 0), + } +} + +type testWorkflowRegistryContractLoader struct { +} + +func (m *testWorkflowRegistryContractLoader) LoadWorkflows(ctx context.Context) (*types.Head, error) { + return &types.Head{ + Height: "0", + Hash: nil, + Timestamp: 0, + }, nil +} + +func Test_InitialStateSync(t *testing.T) { + ctx := coretestutils.Context(t) + lggr := logger.TestLogger(t) + backendTH := testutils.NewEVMBackendTH(t) + donID := uint32(1) + + // Deploy a test workflow_registry + wfRegistryAddr, _, wfRegistryC, err := workflow_registry_wrapper.DeployWorkflowRegistry(backendTH.ContractsOwner, backendTH.Backend.Client()) + backendTH.Backend.Commit() + require.NoError(t, err) + + // Build the ContractReader config + contractReaderCfg := evmtypes.ChainReaderConfig{ + Contracts: map[string]evmtypes.ChainContractReader{ + syncer.WorkflowRegistryContractName: { + ContractABI: workflow_registry_wrapper.WorkflowRegistryABI, + Configs: map[string]*evmtypes.ChainReaderDefinition{ + syncer.GetWorkflowMetadataListByDONMethodName: { + ChainSpecificName: syncer.GetWorkflowMetadataListByDONMethodName, + }, + }, + }, + }, + } + + contractReaderCfgBytes, err := json.Marshal(contractReaderCfg) + require.NoError(t, err) + + contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes) + require.NoError(t, err) + + err = contractReader.Bind(ctx, []types.BoundContract{{Name: syncer.WorkflowRegistryContractName, Address: wfRegistryAddr.Hex()}}) + require.NoError(t, err) + + // setup contract state to allow the secrets to be updated + updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true) + updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true) + + // The number of workflows should be greater than the workflow registry contracts pagination limit to ensure + // that the syncer will query the contract multiple times to get the full list of workflows + numberWorkflows := 250 + for i := 0; i < numberWorkflows; i++ { + var workflowID [32]byte + _, err = rand.Read((workflowID)[:]) + require.NoError(t, err) + workflow := RegisterWorkflowCMD{ + Name: fmt.Sprintf("test-wf-%d", i), + DonID: donID, + Status: uint8(1), + SecretsURL: "someurl", + } + workflow.ID = workflowID + registerWorkflow(t, backendTH, wfRegistryC, workflow) + } + + testEventHandler := newTestEvtHandler() + loader := syncer.NewWorkflowRegistryContractLoader(wfRegistryAddr.Hex(), donID, contractReader, testEventHandler) + + // Create the worker + worker := syncer.NewWorkflowRegistry( + lggr, + contractReader, + wfRegistryAddr.Hex(), + syncer.WorkflowEventPollerConfig{ + QueryCount: 20, + }, + testEventHandler, + loader, + syncer.WithTicker(make(chan time.Time)), + ) + + servicetest.Run(t, worker) + + assert.Len(t, testEventHandler.events, numberWorkflows) + for _, event := range testEventHandler.events { + assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType()) + } +} + func Test_SecretsWorker(t *testing.T) { var ( ctx = coretestutils.Context(t) @@ -49,7 +156,7 @@ func Test_SecretsWorker(t *testing.T) { fetcherFn = func(_ context.Context, _ string) ([]byte, error) { return []byte(wantContents), nil } - contractName = syncer.ContractName + contractName = syncer.WorkflowRegistryContractName forceUpdateSecretsEvent = string(syncer.ForceUpdateSecretsEvent) ) @@ -81,6 +188,9 @@ func Test_SecretsWorker(t *testing.T) { ChainSpecificName: forceUpdateSecretsEvent, ReadType: evmtypes.Event, }, + syncer.GetWorkflowMetadataListByDONMethodName: { + ChainSpecificName: syncer.GetWorkflowMetadataListByDONMethodName, + }, }, }, }, @@ -112,26 +222,21 @@ func Test_SecretsWorker(t *testing.T) { require.NoError(t, err) require.Equal(t, contents, giveContents) - // Create the worker - worker := syncer.NewWorkflowRegistry( - lggr, - orm, - contractReader, - fetcherFn, - wfRegistryAddr.Hex(), - nil, - nil, - emitter, - syncer.WithTicker(giveTicker.C), - ) + handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil, + emitter, nil) - servicetest.Run(t, worker) + worker := syncer.NewWorkflowRegistry(lggr, contractReader, wfRegistryAddr.Hex(), + syncer.WorkflowEventPollerConfig{ + QueryCount: 20, + }, handler, &testWorkflowRegistryContractLoader{}, syncer.WithTicker(giveTicker.C)) // setup contract state to allow the secrets to be updated updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true) updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true) registerWorkflow(t, backendTH, wfRegistryC, giveWorkflow) + servicetest.Run(t, worker) + // generate a log event requestForceUpdateSecrets(t, backendTH, wfRegistryC, giveSecretsURL) diff --git a/core/services/workflows/syncer/contract_reader_mock.go b/core/services/workflows/syncer/contract_reader_mock.go index 61f59fa4e69..391ba5eacdb 100644 --- a/core/services/workflows/syncer/contract_reader_mock.go +++ b/core/services/workflows/syncer/contract_reader_mock.go @@ -6,6 +6,7 @@ import ( context "context" query "github.com/smartcontractkit/chainlink-common/pkg/types/query" + primitives "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" mock "github.com/stretchr/testify/mock" types "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -71,6 +72,68 @@ func (_c *MockContractReader_Bind_Call) RunAndReturn(run func(context.Context, [ return _c } +// GetLatestValueWithHeadData provides a mock function with given fields: ctx, readName, confidenceLevel, params, returnVal +func (_m *MockContractReader) GetLatestValueWithHeadData(ctx context.Context, readName string, confidenceLevel primitives.ConfidenceLevel, params any, returnVal any) (*types.Head, error) { + ret := _m.Called(ctx, readName, confidenceLevel, params, returnVal) + + if len(ret) == 0 { + panic("no return value specified for GetLatestValueWithHeadData") + } + + var r0 *types.Head + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, primitives.ConfidenceLevel, any, any) (*types.Head, error)); ok { + return rf(ctx, readName, confidenceLevel, params, returnVal) + } + if rf, ok := ret.Get(0).(func(context.Context, string, primitives.ConfidenceLevel, any, any) *types.Head); ok { + r0 = rf(ctx, readName, confidenceLevel, params, returnVal) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*types.Head) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, primitives.ConfidenceLevel, any, any) error); ok { + r1 = rf(ctx, readName, confidenceLevel, params, returnVal) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockContractReader_GetLatestValueWithHeadData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestValueWithHeadData' +type MockContractReader_GetLatestValueWithHeadData_Call struct { + *mock.Call +} + +// GetLatestValueWithHeadData is a helper method to define mock.On call +// - ctx context.Context +// - readName string +// - confidenceLevel primitives.ConfidenceLevel +// - params any +// - returnVal any +func (_e *MockContractReader_Expecter) GetLatestValueWithHeadData(ctx interface{}, readName interface{}, confidenceLevel interface{}, params interface{}, returnVal interface{}) *MockContractReader_GetLatestValueWithHeadData_Call { + return &MockContractReader_GetLatestValueWithHeadData_Call{Call: _e.mock.On("GetLatestValueWithHeadData", ctx, readName, confidenceLevel, params, returnVal)} +} + +func (_c *MockContractReader_GetLatestValueWithHeadData_Call) Run(run func(ctx context.Context, readName string, confidenceLevel primitives.ConfidenceLevel, params any, returnVal any)) *MockContractReader_GetLatestValueWithHeadData_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(primitives.ConfidenceLevel), args[3].(any), args[4].(any)) + }) + return _c +} + +func (_c *MockContractReader_GetLatestValueWithHeadData_Call) Return(head *types.Head, err error) *MockContractReader_GetLatestValueWithHeadData_Call { + _c.Call.Return(head, err) + return _c +} + +func (_c *MockContractReader_GetLatestValueWithHeadData_Call) RunAndReturn(run func(context.Context, string, primitives.ConfidenceLevel, any, any) (*types.Head, error)) *MockContractReader_GetLatestValueWithHeadData_Call { + _c.Call.Return(run) + return _c +} + // QueryKey provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 func (_m *MockContractReader) QueryKey(_a0 context.Context, _a1 types.BoundContract, _a2 query.KeyFilter, _a3 query.LimitAndSort, _a4 any) ([]types.Sequence, error) { ret := _m.Called(_a0, _a1, _a2, _a3, _a4) diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 7004c740c97..9c5684cb090 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -51,14 +51,14 @@ type WorkflowRegistryForceUpdateSecretsRequestedV1 struct { } type WorkflowRegistryWorkflowRegisteredV1 struct { - WorkflowID [32]byte - WorkflowOwner []byte - DonID uint32 - Status uint8 - WorkflowName string - BinaryURL string - ConfigURL string - SecretsURL string + WorkflowID [32]byte + Owner []byte + DonID uint32 + Status uint8 + WorkflowName string + BinaryURL string + ConfigURL string + SecretsURL string } type WorkflowRegistryWorkflowUpdatedV1 struct { @@ -97,13 +97,6 @@ type secretsFetcher interface { SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) } -// secretsFetcherFunc implements the secretsFetcher interface for a function. -type secretsFetcherFunc func(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) - -func (f secretsFetcherFunc) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) { - return f(ctx, workflowOwner, workflowName) -} - // eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding // method that handles the event. type eventHandler struct { @@ -117,14 +110,18 @@ type eventHandler struct { secretsFetcher secretsFetcher } -// newEventHandler returns a new eventHandler instance. -func newEventHandler( +type Event interface { + GetEventType() WorkflowRegistryEventType + GetData() any +} + +// NewEventHandler returns a new eventHandler instance. +func NewEventHandler( lggr logger.Logger, orm ORM, gateway FetcherFunc, workflowStore store.Store, capRegistry core.CapabilitiesRegistry, - engineRegistry *engineRegistry, emitter custmsg.MessageEmitter, secretsFetcher secretsFetcher, ) *eventHandler { @@ -134,18 +131,18 @@ func newEventHandler( fetcher: gateway, workflowStore: workflowStore, capRegistry: capRegistry, - engineRegistry: engineRegistry, + engineRegistry: newEngineRegistry(), emitter: emitter, secretsFetcher: secretsFetcher, } } -func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) error { - switch event.EventType { +func (h *eventHandler) Handle(ctx context.Context, event Event) error { + switch event.GetEventType() { case ForceUpdateSecretsEvent: - payload, ok := event.Data.(WorkflowRegistryForceUpdateSecretsRequestedV1) + payload, ok := event.GetData().(WorkflowRegistryForceUpdateSecretsRequestedV1) if !ok { - return newHandlerTypeError(event.Data) + return newHandlerTypeError(event.GetData()) } cma := h.emitter.With( @@ -160,16 +157,16 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) return nil case WorkflowRegisteredEvent: - payload, ok := event.Data.(WorkflowRegistryWorkflowRegisteredV1) + payload, ok := event.GetData().(WorkflowRegistryWorkflowRegisteredV1) if !ok { - return newHandlerTypeError(event.Data) + return newHandlerTypeError(event.GetData()) } wfID := hex.EncodeToString(payload.WorkflowID[:]) cma := h.emitter.With( platform.KeyWorkflowID, wfID, platform.KeyWorkflowName, payload.WorkflowName, - platform.KeyWorkflowOwner, hex.EncodeToString(payload.WorkflowOwner), + platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner), ) if err := h.workflowRegisteredEvent(ctx, payload); err != nil { @@ -180,9 +177,9 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) h.lggr.Debugf("workflow 0x%x registered and started", wfID) return nil case WorkflowUpdatedEvent: - payload, ok := event.Data.(WorkflowRegistryWorkflowUpdatedV1) + payload, ok := event.GetData().(WorkflowRegistryWorkflowUpdatedV1) if !ok { - return fmt.Errorf("invalid data type %T for event", event.Data) + return fmt.Errorf("invalid data type %T for event", event.GetData()) } newWorkflowID := hex.EncodeToString(payload.NewWorkflowID[:]) @@ -199,9 +196,9 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) return nil case WorkflowPausedEvent: - payload, ok := event.Data.(WorkflowRegistryWorkflowPausedV1) + payload, ok := event.GetData().(WorkflowRegistryWorkflowPausedV1) if !ok { - return fmt.Errorf("invalid data type %T for event", event.Data) + return fmt.Errorf("invalid data type %T for event", event.GetData()) } wfID := hex.EncodeToString(payload.WorkflowID[:]) @@ -218,9 +215,9 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) } return nil case WorkflowActivatedEvent: - payload, ok := event.Data.(WorkflowRegistryWorkflowActivatedV1) + payload, ok := event.GetData().(WorkflowRegistryWorkflowActivatedV1) if !ok { - return fmt.Errorf("invalid data type %T for event", event.Data) + return fmt.Errorf("invalid data type %T for event", event.GetData()) } wfID := hex.EncodeToString(payload.WorkflowID[:]) @@ -237,9 +234,9 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) return nil case WorkflowDeletedEvent: - payload, ok := event.Data.(WorkflowRegistryWorkflowDeletedV1) + payload, ok := event.GetData().(WorkflowRegistryWorkflowDeletedV1) if !ok { - return fmt.Errorf("invalid data type %T for event", event.Data) + return fmt.Errorf("invalid data type %T for event", event.GetData()) } wfID := hex.EncodeToString(payload.WorkflowID[:]) @@ -257,7 +254,7 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) return nil default: - return fmt.Errorf("event type unsupported: %v", event.EventType) + return fmt.Errorf("event type unsupported: %v", event.GetEventType()) } } @@ -293,7 +290,7 @@ func (h *eventHandler) workflowRegisteredEvent( } // Save the workflow secrets - urlHash, err := h.orm.GetSecretsURLHash(payload.WorkflowOwner, []byte(payload.SecretsURL)) + urlHash, err := h.orm.GetSecretsURLHash(payload.Owner, []byte(payload.SecretsURL)) if err != nil { return fmt.Errorf("failed to get secrets URL hash: %w", err) } @@ -309,7 +306,7 @@ func (h *eventHandler) workflowRegisteredEvent( Config: string(config), WorkflowID: wfID, Status: status, - WorkflowOwner: hex.EncodeToString(payload.WorkflowOwner), + WorkflowOwner: hex.EncodeToString(payload.Owner), WorkflowName: payload.WorkflowName, SpecType: job.WASMFile, BinaryURL: payload.BinaryURL, @@ -334,7 +331,7 @@ func (h *eventHandler) workflowRegisteredEvent( Lggr: h.lggr, Workflow: *sdkSpec, WorkflowID: wfID, - WorkflowOwner: hex.EncodeToString(payload.WorkflowOwner), + WorkflowOwner: hex.EncodeToString(payload.Owner), WorkflowName: payload.WorkflowName, Registry: h.capRegistry, Store: h.workflowStore, @@ -352,6 +349,7 @@ func (h *eventHandler) workflowRegisteredEvent( } h.engineRegistry.Add(wfID, e) + return nil } @@ -368,14 +366,14 @@ func (h *eventHandler) workflowUpdatedEvent( } registeredEvent := WorkflowRegistryWorkflowRegisteredV1{ - WorkflowID: payload.NewWorkflowID, - WorkflowOwner: payload.WorkflowOwner, - DonID: payload.DonID, - Status: 0, - WorkflowName: payload.WorkflowName, - BinaryURL: payload.BinaryURL, - ConfigURL: payload.ConfigURL, - SecretsURL: payload.SecretsURL, + WorkflowID: payload.NewWorkflowID, + Owner: payload.WorkflowOwner, + DonID: payload.DonID, + Status: 0, + WorkflowName: payload.WorkflowName, + BinaryURL: payload.BinaryURL, + ConfigURL: payload.ConfigURL, + SecretsURL: payload.SecretsURL, } return h.workflowRegisteredEvent(ctx, registeredEvent) @@ -430,14 +428,14 @@ func (h *eventHandler) workflowActivatedEvent( // start a new workflow engine registeredEvent := WorkflowRegistryWorkflowRegisteredV1{ - WorkflowID: payload.WorkflowID, - WorkflowOwner: payload.WorkflowOwner, - DonID: payload.DonID, - Status: 0, - WorkflowName: payload.WorkflowName, - BinaryURL: spec.BinaryURL, - ConfigURL: spec.ConfigURL, - SecretsURL: secretsURL, + WorkflowID: payload.WorkflowID, + Owner: payload.WorkflowOwner, + DonID: payload.DonID, + Status: 0, + WorkflowName: payload.WorkflowName, + BinaryURL: spec.BinaryURL, + ConfigURL: spec.ConfigURL, + SecretsURL: secretsURL, } return h.workflowRegisteredEvent(ctx, registeredEvent) diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index eb8b89ad7e1..621b6b75f28 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -63,7 +63,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil) - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil) + h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, nil) err = h.Handle(ctx, giveEvent) require.NoError(t, err) }) @@ -77,7 +77,7 @@ func Test_Handler(t *testing.T) { return []byte("contents"), nil } - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil) + h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, nil) err := h.Handle(ctx, giveEvent) require.Error(t, err) require.Contains(t, err.Error(), "event type unsupported") @@ -86,7 +86,7 @@ func Test_Handler(t *testing.T) { t.Run("fails to get secrets url", func(t *testing.T) { mockORM := mocks.NewORM(t) ctx := testutils.Context(t) - h := newEventHandler(lggr, mockORM, nil, nil, nil, nil, emitter, nil) + h := NewEventHandler(lggr, mockORM, nil, nil, nil, emitter, nil) giveURL := "https://original-url.com" giveBytes, err := crypto.Keccak256([]byte(giveURL)) require.NoError(t, err) @@ -126,7 +126,7 @@ func Test_Handler(t *testing.T) { return nil, assert.AnError } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil) + h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, nil) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) @@ -153,7 +153,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError) - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil) + h := NewEventHandler(lggr, mockORM, fetcher, nil, nil, emitter, nil) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) @@ -196,13 +196,13 @@ func Test_workflowRegisteredHandler(t *testing.T) { copy(wfID, b) paused := WorkflowRegistryWorkflowRegisteredV1{ - Status: uint8(1), - WorkflowID: [32]byte(wfID), - WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", - BinaryURL: binaryURL, - ConfigURL: configURL, - SecretsURL: secretsURL, + Status: uint8(1), + WorkflowID: [32]byte(wfID), + Owner: wfOwner, + WorkflowName: "workflow-name", + BinaryURL: binaryURL, + ConfigURL: configURL, + SecretsURL: secretsURL, } h := &eventHandler{ @@ -252,13 +252,13 @@ func Test_workflowRegisteredHandler(t *testing.T) { copy(wfID, b) active := WorkflowRegistryWorkflowRegisteredV1{ - Status: uint8(0), - WorkflowID: [32]byte(wfID), - WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", - BinaryURL: binaryURL, - ConfigURL: configURL, - SecretsURL: secretsURL, + Status: uint8(0), + WorkflowID: [32]byte(wfID), + Owner: wfOwner, + WorkflowName: "workflow-name", + BinaryURL: binaryURL, + ConfigURL: configURL, + SecretsURL: secretsURL, } er := newEngineRegistry() @@ -323,13 +323,13 @@ func Test_workflowDeletedHandler(t *testing.T) { copy(wfID, b) active := WorkflowRegistryWorkflowRegisteredV1{ - Status: uint8(0), - WorkflowID: [32]byte(wfID), - WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", - BinaryURL: binaryURL, - ConfigURL: configURL, - SecretsURL: secretsURL, + Status: uint8(0), + WorkflowID: [32]byte(wfID), + Owner: wfOwner, + WorkflowName: "workflow-name", + BinaryURL: binaryURL, + ConfigURL: configURL, + SecretsURL: secretsURL, } er := newEngineRegistry() @@ -420,13 +420,13 @@ func Test_workflowPausedActivatedUpdatedHandler(t *testing.T) { copy(newWFID, b) active := WorkflowRegistryWorkflowRegisteredV1{ - Status: uint8(0), - WorkflowID: [32]byte(wfID), - WorkflowOwner: wfOwner, - WorkflowName: "workflow-name", - BinaryURL: binaryURL, - ConfigURL: configURL, - SecretsURL: secretsURL, + Status: uint8(0), + WorkflowID: [32]byte(wfID), + Owner: wfOwner, + WorkflowName: "workflow-name", + BinaryURL: binaryURL, + ConfigURL: configURL, + SecretsURL: secretsURL, } er := newEngineRegistry() diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index cdd0c71acc0..ed48cc5b458 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -6,28 +6,25 @@ import ( "encoding/json" "errors" "fmt" - "strconv" "sync" "time" - "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services" types "github.com/smartcontractkit/chainlink-common/pkg/types" - "github.com/smartcontractkit/chainlink-common/pkg/types/core" query "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" "github.com/smartcontractkit/chainlink/v2/core/logger" evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" - "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) const name = "WorkflowRegistrySyncer" var ( - defaultTickInterval = 12 * time.Second - ContractName = "WorkflowRegistry" + defaultTickInterval = 12 * time.Second + WorkflowRegistryContractName = "WorkflowRegistry" + GetWorkflowMetadataListByDONMethodName = "getWorkflowMetadataListByDON" ) type Head struct { @@ -36,6 +33,16 @@ type Head struct { Timestamp uint64 } +type GetWorkflowMetadataListByDONParams struct { + DonID uint32 + Start uint64 + Limit uint64 +} + +type GetWorkflowMetadataListByDONReturnVal struct { + WorkflowMetadataList []WorkflowRegistryWorkflowRegisteredV1 +} + // WorkflowRegistryEvent is an event emitted by the WorkflowRegistry. Each event is typed // so that the consumer can determine how to handle the event. type WorkflowRegistryEvent struct { @@ -45,21 +52,28 @@ type WorkflowRegistryEvent struct { Head Head } +func (we WorkflowRegistryEvent) GetEventType() WorkflowRegistryEventType { + return we.EventType +} + +func (we WorkflowRegistryEvent) GetData() any { + return we.Data +} + // WorkflowRegistryEventResponse is a response to either parsing a queried event or handling the event. type WorkflowRegistryEventResponse struct { Err error Event *WorkflowRegistryEvent } -// ContractEventPollerConfig is the configuration needed to poll for events on a contract. Currently +// WorkflowEventPollerConfig is the configuration needed to poll for events on a contract. Currently // requires the ContractEventName. -// -// TODO(mstreet3): Use LookbackBlocks instead of StartBlockNum -type ContractEventPollerConfig struct { - ContractName string - ContractAddress string - StartBlockNum uint64 - QueryCount uint64 +type WorkflowEventPollerConfig struct { + QueryCount uint64 +} + +type WorkflowLoadConfig struct { + FetchBatchSize int } // FetcherFunc is an abstraction for fetching the contents stored at a URL. @@ -73,6 +87,7 @@ type ContractReaderFactory interface { type ContractReader interface { Bind(context.Context, []types.BoundContract) error QueryKey(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error) + GetLatestValueWithHeadData(ctx context.Context, readName string, confidenceLevel primitives.ConfidenceLevel, params any, returnVal any) (head *types.Head, err error) } // WorkflowRegistrySyncer is the public interface of the package. @@ -82,6 +97,14 @@ type WorkflowRegistrySyncer interface { var _ WorkflowRegistrySyncer = (*workflowRegistry)(nil) +// WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful +// for overriding the default tick interval. +func WithTicker(ticker <-chan time.Time) func(*workflowRegistry) { + return func(wr *workflowRegistry) { + wr.ticker = ticker + } +} + // workflowRegistry is the implementation of the WorkflowRegistrySyncer interface. type workflowRegistry struct { services.StateMachine @@ -95,23 +118,22 @@ type workflowRegistry struct { // ticker is the interval at which the workflowRegistry will poll the contract for events. ticker <-chan time.Time - lggr logger.Logger - emitter custmsg.Labeler - orm WorkflowRegistryDS - reader ContractReader - gateway FetcherFunc + lggr logger.Logger + workflowRegistryAddress string + reader ContractReader // initReader allows the workflowRegistry to initialize a contract reader if one is not provided // and separates the contract reader initialization from the workflowRegistry start up. initReader func(context.Context, logger.Logger, ContractReaderFactory, types.BoundContract) (types.ContractReader, error) relayer ContractReaderFactory - cfg ContractEventPollerConfig - eventTypes []WorkflowRegistryEventType + eventPollerCfg WorkflowEventPollerConfig + eventTypes []WorkflowRegistryEventType // eventsCh is read by the handler and each event is handled once received. - eventsCh chan WorkflowRegistryEventResponse - handler *eventHandler + eventsCh chan WorkflowRegistryEventResponse + handler evtHandler + initialWorkflowsStateLoader initialWorkflowsStateLoader // batchCh is a channel that receives batches of events from the contract query goroutines. batchCh chan []WorkflowRegistryEventResponse @@ -119,18 +141,6 @@ type workflowRegistry struct { // heap is a min heap that merges batches of events from the contract query goroutines. The // default min heap is sorted by block height. heap Heap - - workflowStore store.Store - capRegistry core.CapabilitiesRegistry - engineRegistry *engineRegistry -} - -// WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful -// for overriding the default tick interval. -func WithTicker(ticker <-chan time.Time) func(*workflowRegistry) { - return func(wr *workflowRegistry) { - wr.ticker = ticker - } } func WithReader(reader types.ContractReader) func(*workflowRegistry) { @@ -139,45 +149,43 @@ func WithReader(reader types.ContractReader) func(*workflowRegistry) { } } +type evtHandler interface { + Handle(ctx context.Context, event Event) error +} + +type initialWorkflowsStateLoader interface { + // LoadWorkflows loads all the workflows for the given donID from the contract. Returns the head of the chain as of the + // point in time at which the load occurred. + LoadWorkflows(ctx context.Context) (*types.Head, error) +} + // NewWorkflowRegistry returns a new workflowRegistry. // Only queries for WorkflowRegistryForceUpdateSecretsRequestedV1 events. func NewWorkflowRegistry[T ContractReader]( lggr logger.Logger, - orm WorkflowRegistryDS, reader T, - gateway FetcherFunc, addr string, - workflowStore store.Store, - capRegistry core.CapabilitiesRegistry, - emitter custmsg.Labeler, + eventPollerConfig WorkflowEventPollerConfig, + handler evtHandler, + initialWorkflowsStateLoader initialWorkflowsStateLoader, opts ...func(*workflowRegistry), ) *workflowRegistry { ets := []WorkflowRegistryEventType{ForceUpdateSecretsEvent} wr := &workflowRegistry{ - lggr: lggr.Named(name), - emitter: emitter, - orm: orm, - reader: reader, - gateway: gateway, - workflowStore: workflowStore, - capRegistry: capRegistry, - engineRegistry: newEngineRegistry(), - cfg: ContractEventPollerConfig{ - ContractName: ContractName, - ContractAddress: addr, - QueryCount: 20, - StartBlockNum: 0, - }, - initReader: newReader, - heap: newBlockHeightHeap(), - stopCh: make(services.StopChan), - eventTypes: ets, - eventsCh: make(chan WorkflowRegistryEventResponse), - batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), + lggr: lggr.Named(name), + workflowRegistryAddress: addr, + reader: reader, + eventPollerCfg: eventPollerConfig, + initReader: newReader, + heap: newBlockHeightHeap(), + stopCh: make(services.StopChan), + eventTypes: ets, + eventsCh: make(chan WorkflowRegistryEventResponse), + batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), + handler: handler, + initialWorkflowsStateLoader: initialWorkflowsStateLoader, } - wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway, wr.workflowStore, wr.capRegistry, - wr.engineRegistry, wr.emitter, secretsFetcherFunc(wr.SecretsFor), - ) + for _, opt := range opts { opt(wr) } @@ -186,8 +194,13 @@ func NewWorkflowRegistry[T ContractReader]( // Start starts the workflowRegistry. It starts two goroutines, one for querying the contract // and one for handling the events. -func (w *workflowRegistry) Start(_ context.Context) error { +func (w *workflowRegistry) Start(ctx context.Context) error { return w.StartOnce(w.Name(), func() error { + loadWorkflowsHead, err := w.initialWorkflowsStateLoader.LoadWorkflows(ctx) + if err != nil { + return fmt.Errorf("failed to load workflows: %w", err) + } + ctx, cancel := w.stopCh.NewCtx() w.wg.Add(1) @@ -195,7 +208,7 @@ func (w *workflowRegistry) Start(_ context.Context) error { defer w.wg.Done() defer cancel() - w.syncEventsLoop(ctx) + w.syncEventsLoop(ctx, loadWorkflowsHead.Height) }() w.wg.Add(1) @@ -261,7 +274,7 @@ func (w *workflowRegistry) handlerLoop(ctx context.Context) { } // syncEventsLoop polls the contract for events and passes them to a channel for handling. -func (w *workflowRegistry) syncEventsLoop(ctx context.Context) { +func (w *workflowRegistry) syncEventsLoop(ctx context.Context, lastReadBlockNumber string) { var ( // sendLog is a helper that sends a WorkflowRegistryEventResponse to the eventsCh in a // blocking way that will send the response or be canceled. @@ -298,7 +311,12 @@ func (w *workflowRegistry) syncEventsLoop(ctx context.Context) { signal, w.lggr, reader, - w.cfg, + lastReadBlockNumber, + queryEventConfig{ + ContractName: WorkflowRegistryContractName, + ContractAddress: w.workflowRegistryAddress, + WorkflowEventPollerConfig: w.eventPollerCfg, + }, w.eventTypes[i], w.batchCh, ) @@ -376,8 +394,8 @@ func (w *workflowRegistry) getTicker() <-chan time.Time { // reader. func (w *workflowRegistry) getContractReader(ctx context.Context) (ContractReader, error) { c := types.BoundContract{ - Name: w.cfg.ContractName, - Address: w.cfg.ContractAddress, + Name: WorkflowRegistryContractName, + Address: w.workflowRegistryAddress, } if w.reader == nil { @@ -392,6 +410,12 @@ func (w *workflowRegistry) getContractReader(ctx context.Context) (ContractReade return w.reader, nil } +type queryEventConfig struct { + ContractName string + ContractAddress string + WorkflowEventPollerConfig +} + // queryEvent queries the contract for events of the given type on each tick from the ticker. // Sends a batch of event logs to the batch channel. The batch represents all the // event logs read since the last query. Loops until the context is canceled. @@ -400,7 +424,8 @@ func queryEvent( ticker <-chan struct{}, lggr logger.Logger, reader ContractReader, - cfg ContractEventPollerConfig, + lastReadBlockNumber string, + cfg queryEventConfig, et WorkflowRegistryEventType, batchCh chan<- []WorkflowRegistryEventResponse, ) { @@ -436,7 +461,7 @@ func queryEvent( Key: string(et), Expressions: []query.Expression{ query.Confidence(primitives.Finalized), - query.Block(strconv.FormatUint(cfg.StartBlockNum, 10), primitives.Gte), + query.Block(lastReadBlockNumber, primitives.Gt), }, }, limitAndSort, @@ -478,7 +503,7 @@ func newReader( ) (types.ContractReader, error) { contractReaderCfg := evmtypes.ChainReaderConfig{ Contracts: map[string]evmtypes.ChainContractReader{ - ContractName: { + WorkflowRegistryContractName: { ContractPollingFilter: evmtypes.ContractPollingFilter{ GenericEventNames: []string{string(ForceUpdateSecretsEvent)}, }, @@ -511,6 +536,81 @@ func newReader( return reader, nil } +type workflowAsEvent struct { + Data WorkflowRegistryWorkflowRegisteredV1 + EventType WorkflowRegistryEventType +} + +func (r workflowAsEvent) GetEventType() WorkflowRegistryEventType { + return r.EventType +} + +func (r workflowAsEvent) GetData() any { + return r.Data +} + +type workflowRegistryContractLoader struct { + workflowRegistryAddress string + donID uint32 + reader ContractReader + handler evtHandler +} + +func NewWorkflowRegistryContractLoader( + workflowRegistryAddress string, + donID uint32, + reader ContractReader, + handler evtHandler, +) *workflowRegistryContractLoader { + return &workflowRegistryContractLoader{ + workflowRegistryAddress: workflowRegistryAddress, + donID: donID, + reader: reader, + handler: handler, + } +} + +func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context) (*types.Head, error) { + contractBinding := types.BoundContract{ + Address: l.workflowRegistryAddress, + Name: WorkflowRegistryContractName, + } + + readIdentifier := contractBinding.ReadIdentifier(GetWorkflowMetadataListByDONMethodName) + params := GetWorkflowMetadataListByDONParams{ + DonID: l.donID, + Start: 0, + Limit: 0, // 0 tells the contract to return max pagination limit workflows on each call + } + + var headAtLastRead *types.Head + for { + var err error + var workflows GetWorkflowMetadataListByDONReturnVal + headAtLastRead, err = l.reader.GetLatestValueWithHeadData(ctx, readIdentifier, primitives.Finalized, params, &workflows) + if err != nil { + return nil, fmt.Errorf("failed to get workflow metadata for don %w", err) + } + + for _, workflow := range workflows.WorkflowMetadataList { + if err = l.handler.Handle(ctx, workflowAsEvent{ + Data: workflow, + EventType: WorkflowRegisteredEvent, + }); err != nil { + return nil, fmt.Errorf("failed to handle workflow registration: %w", err) + } + } + + if len(workflows.WorkflowMetadataList) == 0 { + break + } + + params.Start += uint64(len(workflows.WorkflowMetadataList)) + } + + return headAtLastRead, nil +} + // toWorkflowRegistryEventResponse converts a types.Sequence to a WorkflowRegistryEventResponse. func toWorkflowRegistryEventResponse( log types.Sequence, diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go index 58dcbed1022..4746fbc919f 100644 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -3,10 +3,11 @@ package syncer import ( "context" "encoding/hex" - "strconv" "testing" "time" + "github.com/stretchr/testify/mock" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" types "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -24,13 +25,11 @@ import ( func Test_Workflow_Registry_Syncer(t *testing.T) { var ( - giveContents = "contents" - wantContents = "updated contents" - giveCfg = ContractEventPollerConfig{ - ContractName: ContractName, - ContractAddress: "0xdeadbeef", - StartBlockNum: 0, - QueryCount: 20, + giveContents = "contents" + wantContents = "updated contents" + contractAddress = "0xdeadbeef" + giveCfg = WorkflowEventPollerConfig{ + QueryCount: 20, } giveURL = "http://example.com" giveHash, err = crypto.Keccak256([]byte(giveURL)) @@ -57,7 +56,15 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { return []byte(wantContents), nil } ticker = make(chan time.Time) - worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, nil, nil, emitter, WithTicker(ticker)) + + handler = NewEventHandler(lggr, orm, gateway, nil, nil, + emitter, nil) + loader = NewWorkflowRegistryContractLoader(contractAddress, 1, reader, handler) + + worker = NewWorkflowRegistry(lggr, reader, contractAddress, + WorkflowEventPollerConfig{ + QueryCount: 20, + }, handler, loader, WithTicker(ticker)) ) // Cleanup the worker @@ -71,14 +78,14 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { reader.EXPECT().QueryKey( matches.AnyContext, types.BoundContract{ - Name: giveCfg.ContractName, - Address: giveCfg.ContractAddress, + Name: WorkflowRegistryContractName, + Address: contractAddress, }, query.KeyFilter{ Key: string(ForceUpdateSecretsEvent), Expressions: []query.Expression{ query.Confidence(primitives.Finalized), - query.Block(strconv.FormatUint(giveCfg.StartBlockNum, 10), primitives.Gte), + query.Block("0", primitives.Gt), }, }, query.LimitAndSort{ @@ -87,6 +94,9 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { }, new(values.Value), ).Return([]types.Sequence{giveLog}, nil) + reader.EXPECT().GetLatestValueWithHeadData(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&types.Head{ + Height: "0", + }, nil) // Go run the worker servicetest.Run(t, worker)