From c6c8ce2be1a3390efc31a48fc733bf35026b848b Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Tue, 10 Dec 2024 12:29:14 +0000 Subject: [PATCH 01/13] temp disable test --- core/services/workflows/syncer/workflow_registry.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 75fcc9735ad..88e82a02f96 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -10,8 +10,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/services" - types "github.com/smartcontractkit/chainlink-common/pkg/types" - query "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" From 8220556ef136dcf8ea0367c93fb6aac1394b2adc Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Tue, 10 Dec 2024 13:55:59 +0000 Subject: [PATCH 02/13] wip --- core/services/workflows/syncer/workflow_registry_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go index 621d3d123d5..073de2cbdbc 100644 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -13,8 +13,8 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" - types "github.com/smartcontractkit/chainlink-common/pkg/types" - query "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" From 350db876ac0945a30a64b9b361c9811d8ce5d542 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Tue, 10 Dec 2024 15:58:09 +0000 Subject: [PATCH 03/13] fix duplicate event bug - change contract reader poll query from Gte to Gt --- .../workflows/syncer/workflow_syncer_test.go | 119 +++++++++++++++++- .../workflows/syncer/workflow_registry.go | 4 +- 2 files changed, 120 insertions(+), 3 deletions(-) diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 3c6ee8a1d04..38ad9e52f3c 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -7,6 +7,7 @@ import ( "encoding/hex" "fmt" "strings" + "sync" "testing" "time" @@ -35,13 +36,26 @@ import ( type testEvtHandler struct { events []syncer.Event + mux sync.Mutex } func (m *testEvtHandler) Handle(ctx context.Context, event syncer.Event) error { + m.mux.Lock() + defer m.mux.Unlock() m.events = append(m.events, event) return nil } +func (m *testEvtHandler) GetEvents() []syncer.Event { + m.mux.Lock() + defer m.mux.Unlock() + + eventsCopy := make([]syncer.Event, len(m.events)) + copy(eventsCopy, m.events) + + return eventsCopy +} + func newTestEvtHandler() *testEvtHandler { return &testEvtHandler{ events: make([]syncer.Event, 0), @@ -68,6 +82,107 @@ func (m *testWorkflowRegistryContractLoader) LoadWorkflows(ctx context.Context, }, nil } +func Test_EventHandlerStateSync(t *testing.T) { + lggr := logger.TestLogger(t) + backendTH := testutils.NewEVMBackendTH(t) + donID := uint32(1) + + eventPollTicker := time.NewTicker(500 * time.Millisecond) + defer eventPollTicker.Stop() + + // Deploy a test workflow_registry + wfRegistryAddr, _, wfRegistryC, err := workflow_registry_wrapper.DeployWorkflowRegistry(backendTH.ContractsOwner, backendTH.Backend.Client()) + backendTH.Backend.Commit() + require.NoError(t, err) + + // setup contract state to allow the secrets to be updated + updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true) + updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true) + + // The number of workflows should be greater than the workflow registry contracts pagination limit to ensure + // that the syncer will query the contract multiple times to get the full list of workflows + numberWorkflows := 250 + for i := 0; i < numberWorkflows; i++ { + var workflowID [32]byte + _, err = rand.Read((workflowID)[:]) + require.NoError(t, err) + workflow := RegisterWorkflowCMD{ + Name: fmt.Sprintf("test-wf-%d", i), + DonID: donID, + Status: uint8(1), + SecretsURL: "someurl", + } + workflow.ID = workflowID + registerWorkflow(t, backendTH, wfRegistryC, workflow) + } + + testEventHandler := newTestEvtHandler() + loader := syncer.NewWorkflowRegistryContractLoader(lggr, wfRegistryAddr.Hex(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return backendTH.NewContractReader(ctx, t, bytes) + }, testEventHandler) + + // Create the registry + registry := syncer.NewWorkflowRegistry( + lggr, + func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) { + return backendTH.NewContractReader(ctx, t, bytes) + }, + wfRegistryAddr.Hex(), + syncer.WorkflowEventPollerConfig{ + QueryCount: 20, + }, + testEventHandler, + loader, + &testDonNotifier{ + don: capabilities.DON{ + ID: donID, + }, + err: nil, + }, + syncer.WithTicker(eventPollTicker.C), + ) + + servicetest.Run(t, registry) + + require.Eventually(t, func() bool { + numEvents := len(testEventHandler.GetEvents()) + fmt.Printf("FIRST NUMEVENTS: %d\n", numEvents) + + if numEvents == 251 { + fmt.Println("NUMEVENTS: ", numEvents) + } + + return numEvents == numberWorkflows + }, 5*time.Second, time.Second) + + for _, event := range testEventHandler.GetEvents() { + assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType()) + } + + // Create a new workflow and check that the event handler picks it up + var workflowID [32]byte + _, err = rand.Read((workflowID)[:]) + require.NoError(t, err) + workflow := RegisterWorkflowCMD{ + Name: "test-wf-register-event", + DonID: donID, + Status: uint8(1), + SecretsURL: "someurl", + } + workflow.ID = workflowID + registerWorkflow(t, backendTH, wfRegistryC, workflow) + + require.Eventually(t, func() bool { + numEvents := len(testEventHandler.GetEvents()) + fmt.Printf("SECOND NUMEVENTS: %d\n", numEvents) + + expectedNumEvents := numberWorkflows + 1 + + return numEvents == expectedNumEvents + }, 5*time.Second, time.Second) + +} + func Test_InitialStateSync(t *testing.T) { lggr := logger.TestLogger(t) backendTH := testutils.NewEVMBackendTH(t) @@ -128,10 +243,10 @@ func Test_InitialStateSync(t *testing.T) { servicetest.Run(t, worker) require.Eventually(t, func() bool { - return len(testEventHandler.events) == numberWorkflows + return len(testEventHandler.GetEvents()) == numberWorkflows }, 5*time.Second, time.Second) - for _, event := range testEventHandler.events { + for _, event := range testEventHandler.GetEvents() { assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType()) } } diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 88e82a02f96..e1e2ac75e74 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -400,6 +400,7 @@ func (w *workflowRegistry) orderAndSend( if batchCount == 0 { for w.heap.Len() > 0 { sendLog(w.heap.Pop()) + fmt.Println("SENDING EVENT") } return } @@ -489,12 +490,13 @@ func queryEvent( Key: string(et), Expressions: []query.Expression{ query.Confidence(primitives.Finalized), - query.Block(lastReadBlockNumber, primitives.Gte), + query.Block(lastReadBlockNumber, primitives.Gt), }, }, limitAndSort, &logData, ) + lcursor := cursor if lcursor == "" { lcursor = "empty" From 0df14965b5cbe902a4cdf65341e8539f8b7eb14a Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Tue, 10 Dec 2024 16:41:01 +0000 Subject: [PATCH 04/13] event state sync test added and passing with fixes - single event --- .../workflows/syncer/workflow_syncer_test.go | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 38ad9e52f3c..88b49a9f904 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -146,12 +146,6 @@ func Test_EventHandlerStateSync(t *testing.T) { require.Eventually(t, func() bool { numEvents := len(testEventHandler.GetEvents()) - fmt.Printf("FIRST NUMEVENTS: %d\n", numEvents) - - if numEvents == 251 { - fmt.Println("NUMEVENTS: ", numEvents) - } - return numEvents == numberWorkflows }, 5*time.Second, time.Second) @@ -170,14 +164,12 @@ func Test_EventHandlerStateSync(t *testing.T) { SecretsURL: "someurl", } workflow.ID = workflowID + registerWorkflow(t, backendTH, wfRegistryC, workflow) require.Eventually(t, func() bool { numEvents := len(testEventHandler.GetEvents()) - fmt.Printf("SECOND NUMEVENTS: %d\n", numEvents) - expectedNumEvents := numberWorkflows + 1 - return numEvents == expectedNumEvents }, 5*time.Second, time.Second) From 99b19b26e66ded7c2e9426c56b5552bb72adfc7a Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Tue, 10 Dec 2024 17:04:20 +0000 Subject: [PATCH 05/13] added methods to create different event types --- .../workflows/syncer/workflow_syncer_test.go | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 88b49a9f904..8f84b5ed0ab 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -514,6 +514,14 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) { // generate a log event registerWorkflow(t, backendTH, wfRegistryC, giveWorkflow) + /* + string(ForceUpdateSecretsEvent), + string(WorkflowActivatedEvent), + string(WorkflowDeletedEvent), + string(WorkflowPausedEvent), + string(WorkflowRegisteredEvent), + string(WorkflowUpdatedEvent), */ + // Require the secrets contents to eventually be updated require.Eventually(t, func() bool { _, err := er.Get("test-wf") @@ -604,3 +612,59 @@ func requestForceUpdateSecrets( th.Backend.Commit() th.Backend.Commit() } + +func activateWorkflow( + t *testing.T, + th *testutils.EVMBackendTH, + wfRegC *workflow_registry_wrapper.WorkflowRegistry, + workflowKey [32]byte, +) { + t.Helper() + _, err := wfRegC.ActivateWorkflow(th.ContractsOwner, workflowKey) + require.NoError(t, err, "failed to activate workflow") + th.Backend.Commit() + th.Backend.Commit() + th.Backend.Commit() +} + +func pauseWorkflow( + t *testing.T, + th *testutils.EVMBackendTH, + wfRegC *workflow_registry_wrapper.WorkflowRegistry, + workflowKey [32]byte, +) { + t.Helper() + _, err := wfRegC.PauseWorkflow(th.ContractsOwner, workflowKey) + require.NoError(t, err, "failed to pause workflow") + th.Backend.Commit() + th.Backend.Commit() + th.Backend.Commit() +} + +func deleteWorkflow( + t *testing.T, + th *testutils.EVMBackendTH, + wfRegC *workflow_registry_wrapper.WorkflowRegistry, + workflowKey [32]byte, +) { + t.Helper() + _, err := wfRegC.DeleteWorkflow(th.ContractsOwner, workflowKey) + require.NoError(t, err, "failed to delete workflow") + th.Backend.Commit() + th.Backend.Commit() + th.Backend.Commit() +} + +func updateWorkflow( + t *testing.T, + th *testutils.EVMBackendTH, + wfRegC *workflow_registry_wrapper.WorkflowRegistry, + workflowKey [32]byte, newWorkflowID [32]byte, binaryURL string, configURL string, secretsURL string, +) { + t.Helper() + _, err := wfRegC.UpdateWorkflow(th.ContractsOwner, workflowKey, newWorkflowID, binaryURL, configURL, secretsURL) + require.NoError(t, err, "failed to update workflow") + th.Backend.Commit() + th.Backend.Commit() + th.Backend.Commit() +} From 91de156abcb5bc22cfc7f5d5698c3ce1471c01b7 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Wed, 11 Dec 2024 12:09:15 +0000 Subject: [PATCH 06/13] pre-refactor checkpoint - up to this commit no changes have been made to wf registry - test has been added to confirm (and actually has found bugs) in current wf registry behaviour --- .../workflows/syncer/workflow_syncer_test.go | 88 +++++++++++++++---- .../workflows/syncer/workflow_registry.go | 1 - 2 files changed, 70 insertions(+), 19 deletions(-) diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 8f84b5ed0ab..059467b340c 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "encoding/hex" "fmt" + rand2 "math/rand/v2" "strings" "sync" "testing" @@ -32,6 +33,8 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" "github.com/stretchr/testify/require" + + crypto2 "github.com/ethereum/go-ethereum/crypto" ) type testEvtHandler struct { @@ -46,6 +49,12 @@ func (m *testEvtHandler) Handle(ctx context.Context, event syncer.Event) error { return nil } +func (m *testEvtHandler) ClearEvents() { + m.mux.Lock() + defer m.mux.Unlock() + m.events = make([]syncer.Event, 0) +} + func (m *testEvtHandler) GetEvents() []syncer.Event { m.mux.Lock() defer m.mux.Unlock() @@ -87,7 +96,7 @@ func Test_EventHandlerStateSync(t *testing.T) { backendTH := testutils.NewEVMBackendTH(t) donID := uint32(1) - eventPollTicker := time.NewTicker(500 * time.Millisecond) + eventPollTicker := time.NewTicker(50 * time.Millisecond) defer eventPollTicker.Stop() // Deploy a test workflow_registry @@ -101,7 +110,7 @@ func Test_EventHandlerStateSync(t *testing.T) { // The number of workflows should be greater than the workflow registry contracts pagination limit to ensure // that the syncer will query the contract multiple times to get the full list of workflows - numberWorkflows := 250 + numberWorkflows := 20 for i := 0; i < numberWorkflows; i++ { var workflowID [32]byte _, err = rand.Read((workflowID)[:]) @@ -153,26 +162,69 @@ func Test_EventHandlerStateSync(t *testing.T) { assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType()) } - // Create a new workflow and check that the event handler picks it up - var workflowID [32]byte - _, err = rand.Read((workflowID)[:]) - require.NoError(t, err) - workflow := RegisterWorkflowCMD{ - Name: "test-wf-register-event", - DonID: donID, - Status: uint8(1), - SecretsURL: "someurl", - } - workflow.ID = workflowID + testEventHandler.ClearEvents() + + // Create events for a number of workflows and confirm that the event handler processes them + + numberOfEventCycles := 50 + for i := 0; i < numberOfEventCycles; i++ { + + var workflowID [32]byte + _, err = rand.Read((workflowID)[:]) + require.NoError(t, err) + workflow := RegisterWorkflowCMD{ + Name: "test-wf-register-event", + DonID: donID, + Status: uint8(1), + SecretsURL: "", + } + workflow.ID = workflowID - registerWorkflow(t, backendTH, wfRegistryC, workflow) + // Generate events of different types with some jitter + registerWorkflow(t, backendTH, wfRegistryC, workflow) + time.Sleep(time.Millisecond * time.Duration(rand2.IntN(10))) + data := append(backendTH.ContractsOwner.From.Bytes(), []byte(workflow.Name)...) + workflowKey := crypto2.Keccak256Hash(data) + activateWorkflow(t, backendTH, wfRegistryC, workflowKey) + time.Sleep(time.Millisecond * time.Duration(rand2.IntN(10))) + pauseWorkflow(t, backendTH, wfRegistryC, workflowKey) + time.Sleep(time.Millisecond * time.Duration(rand2.IntN(10))) + var newWorkflowID [32]byte + _, err = rand.Read((newWorkflowID)[:]) + require.NoError(t, err) + updateWorkflow(t, backendTH, wfRegistryC, workflowKey, newWorkflowID, workflow.BinaryURL+"2", workflow.ConfigURL, workflow.SecretsURL) + time.Sleep(time.Millisecond * time.Duration(rand2.IntN(10))) + deleteWorkflow(t, backendTH, wfRegistryC, workflowKey) + } + // Confirm the expected number of events are received in the correct order require.Eventually(t, func() bool { - numEvents := len(testEventHandler.GetEvents()) - expectedNumEvents := numberWorkflows + 1 - return numEvents == expectedNumEvents - }, 5*time.Second, time.Second) + events := testEventHandler.GetEvents() + numEvents := len(events) + expectedNumEvents := 5 * numberOfEventCycles + + if numEvents == expectedNumEvents { + // verify the events are the expected types + /*. Note the below test does not work with the unrefactored workflow registry, event order is essentially random + for idx, event := range events { + switch idx % 5 { + case 0: + assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType()) + case 1: + assert.Equal(t, syncer.WorkflowActivatedEvent, event.GetEventType()) + case 2: + assert.Equal(t, syncer.WorkflowPausedEvent, event.GetEventType()) + case 3: + assert.Equal(t, syncer.WorkflowUpdatedEvent, event.GetEventType()) + case 4: + assert.Equal(t, syncer.WorkflowDeletedEvent, event.GetEventType()) + } + } */ + return true + } + return false + }, 5*time.Second, time.Second) } func Test_InitialStateSync(t *testing.T) { diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index e1e2ac75e74..84dd1b07566 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -400,7 +400,6 @@ func (w *workflowRegistry) orderAndSend( if batchCount == 0 { for w.heap.Len() > 0 { sendLog(w.heap.Pop()) - fmt.Println("SENDING EVENT") } return } From f118eed691e079fb9429142e605b10d370828245 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Wed, 11 Dec 2024 12:50:18 +0000 Subject: [PATCH 07/13] refactor part 1: replaced querykey with querykeys - all tests passing --- .../workflows/syncer/workflow_syncer_test.go | 2 +- .../workflows/syncer/workflow_registry.go | 32 +++++++++++++------ 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 059467b340c..953a778d504 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -204,7 +204,7 @@ func Test_EventHandlerStateSync(t *testing.T) { expectedNumEvents := 5 * numberOfEventCycles if numEvents == expectedNumEvents { - // verify the events are the expected types + // verify the events are the expected types in the expected order /*. Note the below test does not work with the unrefactored workflow registry, event order is essentially random for idx, event := range events { switch idx % 5 { diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 84dd1b07566..f7dd22cbdc0 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "iter" "sync" "time" @@ -99,7 +100,7 @@ type ContractReader interface { Start(ctx context.Context) error Close() error Bind(context.Context, []types.BoundContract) error - QueryKey(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error) + QueryKeys(ctx context.Context, keyQueries []types.ContractKeyFilter, limitAndSort query.LimitAndSort) (iter.Seq2[string, types.Sequence], error) GetLatestValueWithHeadData(ctx context.Context, readName string, confidenceLevel primitives.ConfidenceLevel, params any, returnVal any) (head *types.Head, err error) } @@ -482,20 +483,31 @@ func queryEvent( limitAndSort.Limit = query.CursorLimit(cursor, query.CursorFollowing, cfg.QueryCount) } - logs, err := reader.QueryKey( - ctx, - bc, - query.KeyFilter{ - Key: string(et), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block(lastReadBlockNumber, primitives.Gt), + keyQueries := []types.ContractKeyFilter{ + { + KeyFilter: query.KeyFilter{ + Key: string(et), + Expressions: []query.Expression{ + query.Confidence(primitives.Finalized), + query.Block(lastReadBlockNumber, primitives.Gt), + }, }, + Contract: bc, + SequenceDataType: &logData, }, + } + + logsIter, err := reader.QueryKeys( + ctx, + keyQueries, limitAndSort, - &logData, ) + var logs []types.Sequence + for _, log := range logsIter { + logs = append(logs, log) + } + lcursor := cursor if lcursor == "" { lcursor = "empty" From 4d82e96469b96a234f7b4c714d99dc3659fc48b0 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Wed, 11 Dec 2024 14:45:25 +0000 Subject: [PATCH 08/13] refactor part 2: collapse individual event query into single events query and reduce goroutine and channel usage --- .../workflows/syncer/workflow_syncer_test.go | 2 +- .../workflows/syncer/workflow_registry.go | 183 ++++++++---------- 2 files changed, 82 insertions(+), 103 deletions(-) diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 953a778d504..d51e0961e4b 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -224,7 +224,7 @@ func Test_EventHandlerStateSync(t *testing.T) { } return false - }, 5*time.Second, time.Second) + }, 50*time.Second, time.Second) } func Test_InitialStateSync(t *testing.T) { diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index f7dd22cbdc0..259fea048ba 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -202,7 +202,7 @@ func NewWorkflowRegistry( stopCh: make(services.StopChan), eventTypes: ets, eventsCh: make(chan WorkflowRegistryEventResponse), - batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), + batchCh: make(chan []WorkflowRegistryEventResponse, 50000), handler: handler, initialWorkflowsStateLoader: initialWorkflowsStateLoader, workflowDonNotifier: workflowDonNotifier, @@ -313,8 +313,6 @@ func (w *workflowRegistry) syncEventsLoop(ctx context.Context, lastReadBlockNumb } ticker = w.getTicker() - - signals = make(map[WorkflowRegistryEventType]chan struct{}, 0) ) // critical failure if there is no reader, the loop will exit and the parent context will be @@ -324,18 +322,15 @@ func (w *workflowRegistry) syncEventsLoop(ctx context.Context, lastReadBlockNumb w.lggr.Criticalf("contract reader unavailable : %s", err) return } - - // fan out and query for each event type - for i := 0; i < len(w.eventTypes); i++ { - signal := make(chan struct{}, 1) - signals[w.eventTypes[i]] = signal - w.wg.Add(1) - go func() { - defer w.wg.Done() - - queryEvent( + + cursor := "" + for { + select { + case <-ctx.Done(): + return + case <-ticker: + events, newCursor, err := queryEvent( ctx, - signal, w.lggr, reader, lastReadBlockNumber, @@ -344,37 +339,22 @@ func (w *workflowRegistry) syncEventsLoop(ctx context.Context, lastReadBlockNumb ContractAddress: w.workflowRegistryAddress, WorkflowEventPollerConfig: w.eventPollerCfg, }, - w.eventTypes[i], - w.batchCh, + w.eventTypes, + cursor, ) - }() - } - // Periodically send a signal to all the queryEvent goroutines to query the contract - for { - select { - case <-ctx.Done(): - return - case <-ticker: + cursor = newCursor + if err != nil { + w.lggr.Errorw("failed to query events", "err", err) + continue + } + w.lggr.Debugw("Syncing with WorkflowRegistry") - // for each event type, send a signal for it to execute a query and produce a new - // batch of event logs - for i := 0; i < len(w.eventTypes); i++ { - signal := signals[w.eventTypes[i]] - select { - case signal <- struct{}{}: - case <-ctx.Done(): - return - } + + for _, event := range events { + sendLog(event) } - // block on fan-in until all fetched event logs are sent to the handlers - w.orderAndSend( - ctx, - len(w.eventTypes), - w.batchCh, - sendLog, - ) } } } @@ -444,23 +424,26 @@ type queryEventConfig struct { WorkflowEventPollerConfig } +type sequenceWithEventType struct { + Sequence types.Sequence + EventType WorkflowRegistryEventType +} + // queryEvent queries the contract for events of the given type on each tick from the ticker. // Sends a batch of event logs to the batch channel. The batch represents all the // event logs read since the last query. Loops until the context is canceled. func queryEvent( ctx context.Context, - ticker <-chan struct{}, lggr logger.Logger, reader ContractReader, lastReadBlockNumber string, cfg queryEventConfig, - et WorkflowRegistryEventType, - batchCh chan<- []WorkflowRegistryEventResponse, -) { + eventTypes []WorkflowRegistryEventType, + cursor string, +) ([]WorkflowRegistryEventResponse, string, error) { // create query var ( logData values.Value - cursor = "" limitAndSort = query.LimitAndSort{ SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, Limit: query.Limit{Count: cfg.QueryCount}, @@ -471,74 +454,70 @@ func queryEvent( } ) - // Loop until canceled - for { - select { - case <-ctx.Done(): - return - case <-ticker: - responseBatch := []WorkflowRegistryEventResponse{} + responseBatch := []WorkflowRegistryEventResponse{} - if cursor != "" { - limitAndSort.Limit = query.CursorLimit(cursor, query.CursorFollowing, cfg.QueryCount) - } + if cursor != "" { + limitAndSort.Limit = query.CursorLimit(cursor, query.CursorFollowing, cfg.QueryCount) + } - keyQueries := []types.ContractKeyFilter{ - { - KeyFilter: query.KeyFilter{ - Key: string(et), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block(lastReadBlockNumber, primitives.Gt), - }, - }, - Contract: bc, - SequenceDataType: &logData, + var keyQueries []types.ContractKeyFilter + for _, et := range eventTypes { + keyQueries = append(keyQueries, types.ContractKeyFilter{ + KeyFilter: query.KeyFilter{ + Key: string(et), + Expressions: []query.Expression{ + query.Confidence(primitives.Finalized), + query.Block(lastReadBlockNumber, primitives.Gt), }, - } - - logsIter, err := reader.QueryKeys( - ctx, - keyQueries, - limitAndSort, - ) + }, + Contract: bc, + SequenceDataType: &logData, + }) + } - var logs []types.Sequence - for _, log := range logsIter { - logs = append(logs, log) - } + logsIter, err := reader.QueryKeys( + ctx, + keyQueries, + limitAndSort, + ) - lcursor := cursor - if lcursor == "" { - lcursor = "empty" - } - lggr.Debugw("QueryKeys called", "logs", len(logs), "eventType", et, "lastReadBlockNumber", lastReadBlockNumber, "logCursor", lcursor) + var logs []sequenceWithEventType + for eventType, log := range logsIter { + logs = append(logs, sequenceWithEventType{ + Sequence: log, + EventType: WorkflowRegistryEventType(eventType), + }) + } - if err != nil { - lggr.Errorw("QueryKey failure", "err", err) - continue - } + lcursor := cursor + if lcursor == "" { + lcursor = "empty" + } + lggr.Debugw("QueryKeys called", "logs", len(logs), "eventTypes", eventTypes, "lastReadBlockNumber", lastReadBlockNumber, "logCursor", lcursor) - // ChainReader QueryKey API provides logs including the cursor value and not - // after the cursor value. If the response only consists of the log corresponding - // to the cursor and no log after it, then we understand that there are no new - // logs - if len(logs) == 1 && logs[0].Cursor == cursor { - lggr.Infow("No new logs since", "cursor", cursor) - continue - } + if err != nil { + return nil, cursor, fmt.Errorf("failed to query keys: %w", err) + } - for _, log := range logs { - if log.Cursor == cursor { - continue - } + // ChainReader QueryKey API provides logs including the cursor value and not + // after the cursor value. If the response only consists of the log corresponding + // to the cursor and no log after it, then we understand that there are no new + // logs + if len(logs) == 1 && logs[0].Sequence.Cursor == cursor { + lggr.Infow("No new logs since", "cursor", cursor) + return nil, cursor, nil + } - responseBatch = append(responseBatch, toWorkflowRegistryEventResponse(log, et, lggr)) - cursor = log.Cursor - } - batchCh <- responseBatch + for _, log := range logs { + if log.Sequence.Cursor == cursor { + continue } + + responseBatch = append(responseBatch, toWorkflowRegistryEventResponse(log.Sequence, log.EventType, lggr)) + cursor = log.Sequence.Cursor } + + return responseBatch, cursor, nil } func getWorkflowRegistryEventReader( From 527f8043efa6981679b2dfca532e6c286465b2b4 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Wed, 11 Dec 2024 15:01:12 +0000 Subject: [PATCH 09/13] refactor part 3 - down to single ticker thread --- .../workflows/syncer/workflow_registry.go | 208 ++++++------------ 1 file changed, 67 insertions(+), 141 deletions(-) diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 259fea048ba..ea9eb57250b 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -313,6 +313,16 @@ func (w *workflowRegistry) syncEventsLoop(ctx context.Context, lastReadBlockNumb } ticker = w.getTicker() + + logData values.Value + limitAndSort = query.LimitAndSort{ + SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, + Limit: query.Limit{Count: w.eventPollerCfg.QueryCount}, + } + bc = types.BoundContract{ + Name: WorkflowRegistryContractName, + Address: w.workflowRegistryAddress, + } ) // critical failure if there is no reader, the loop will exit and the parent context will be @@ -322,68 +332,81 @@ func (w *workflowRegistry) syncEventsLoop(ctx context.Context, lastReadBlockNumb w.lggr.Criticalf("contract reader unavailable : %s", err) return } - + cursor := "" for { select { case <-ctx.Done(): return case <-ticker: - events, newCursor, err := queryEvent( + + responseBatch := []WorkflowRegistryEventResponse{} + + if cursor != "" { + limitAndSort.Limit = query.CursorLimit(cursor, query.CursorFollowing, w.eventPollerCfg.QueryCount) + } + + var keyQueries []types.ContractKeyFilter + for _, et := range w.eventTypes { + keyQueries = append(keyQueries, types.ContractKeyFilter{ + KeyFilter: query.KeyFilter{ + Key: string(et), + Expressions: []query.Expression{ + query.Confidence(primitives.Finalized), + query.Block(lastReadBlockNumber, primitives.Gt), + }, + }, + Contract: bc, + SequenceDataType: &logData, + }) + } + + logsIter, err := reader.QueryKeys( ctx, - w.lggr, - reader, - lastReadBlockNumber, - queryEventConfig{ - ContractName: WorkflowRegistryContractName, - ContractAddress: w.workflowRegistryAddress, - WorkflowEventPollerConfig: w.eventPollerCfg, - }, - w.eventTypes, - cursor, + keyQueries, + limitAndSort, ) - cursor = newCursor - if err != nil { - w.lggr.Errorw("failed to query events", "err", err) - continue + var logs []sequenceWithEventType + for eventType, log := range logsIter { + logs = append(logs, sequenceWithEventType{ + Sequence: log, + EventType: WorkflowRegistryEventType(eventType), + }) } - w.lggr.Debugw("Syncing with WorkflowRegistry") - - for _, event := range events { - sendLog(event) + lcursor := cursor + if lcursor == "" { + lcursor = "empty" } + w.lggr.Debugw("QueryKeys called", "logs", len(logs), "eventTypes", w.eventTypes, "lastReadBlockNumber", lastReadBlockNumber, "logCursor", lcursor) - } - } -} + if err != nil { + w.lggr.Errorw("failed to query keys: %w", err) + } -// orderAndSend reads n batches from the batch channel, heapifies all the batches then dequeues -// the min heap via the sendLog function. -func (w *workflowRegistry) orderAndSend( - ctx context.Context, - batchCount int, - batchCh <-chan []WorkflowRegistryEventResponse, - sendLog func(WorkflowRegistryEventResponse), -) { - for { - select { - case <-ctx.Done(): - return - case batch := <-batchCh: - for _, response := range batch { - w.heap.Push(response) + // ChainReader QueryKey API provides logs including the cursor value and not + // after the cursor value. If the response only consists of the log corresponding + // to the cursor and no log after it, then we understand that there are no new + // logs + if len(logs) == 1 && logs[0].Sequence.Cursor == cursor { + w.lggr.Infow("No new logs since", "cursor", cursor) + continue } - batchCount-- - // If we have received responses for all the events, then we can drain the heap. - if batchCount == 0 { - for w.heap.Len() > 0 { - sendLog(w.heap.Pop()) + for _, log := range logs { + if log.Sequence.Cursor == cursor { + continue } - return + + responseBatch = append(responseBatch, toWorkflowRegistryEventResponse(log.Sequence, log.EventType, w.lggr)) + cursor = log.Sequence.Cursor } + + for _, event := range responseBatch { + sendLog(event) + } + } } } @@ -418,108 +441,11 @@ func (w *workflowRegistry) getContractReader(ctx context.Context) (ContractReade return w.reader, nil } -type queryEventConfig struct { - ContractName string - ContractAddress string - WorkflowEventPollerConfig -} - type sequenceWithEventType struct { Sequence types.Sequence EventType WorkflowRegistryEventType } -// queryEvent queries the contract for events of the given type on each tick from the ticker. -// Sends a batch of event logs to the batch channel. The batch represents all the -// event logs read since the last query. Loops until the context is canceled. -func queryEvent( - ctx context.Context, - lggr logger.Logger, - reader ContractReader, - lastReadBlockNumber string, - cfg queryEventConfig, - eventTypes []WorkflowRegistryEventType, - cursor string, -) ([]WorkflowRegistryEventResponse, string, error) { - // create query - var ( - logData values.Value - limitAndSort = query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: cfg.QueryCount}, - } - bc = types.BoundContract{ - Name: cfg.ContractName, - Address: cfg.ContractAddress, - } - ) - - responseBatch := []WorkflowRegistryEventResponse{} - - if cursor != "" { - limitAndSort.Limit = query.CursorLimit(cursor, query.CursorFollowing, cfg.QueryCount) - } - - var keyQueries []types.ContractKeyFilter - for _, et := range eventTypes { - keyQueries = append(keyQueries, types.ContractKeyFilter{ - KeyFilter: query.KeyFilter{ - Key: string(et), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block(lastReadBlockNumber, primitives.Gt), - }, - }, - Contract: bc, - SequenceDataType: &logData, - }) - } - - logsIter, err := reader.QueryKeys( - ctx, - keyQueries, - limitAndSort, - ) - - var logs []sequenceWithEventType - for eventType, log := range logsIter { - logs = append(logs, sequenceWithEventType{ - Sequence: log, - EventType: WorkflowRegistryEventType(eventType), - }) - } - - lcursor := cursor - if lcursor == "" { - lcursor = "empty" - } - lggr.Debugw("QueryKeys called", "logs", len(logs), "eventTypes", eventTypes, "lastReadBlockNumber", lastReadBlockNumber, "logCursor", lcursor) - - if err != nil { - return nil, cursor, fmt.Errorf("failed to query keys: %w", err) - } - - // ChainReader QueryKey API provides logs including the cursor value and not - // after the cursor value. If the response only consists of the log corresponding - // to the cursor and no log after it, then we understand that there are no new - // logs - if len(logs) == 1 && logs[0].Sequence.Cursor == cursor { - lggr.Infow("No new logs since", "cursor", cursor) - return nil, cursor, nil - } - - for _, log := range logs { - if log.Sequence.Cursor == cursor { - continue - } - - responseBatch = append(responseBatch, toWorkflowRegistryEventResponse(log.Sequence, log.EventType, lggr)) - cursor = log.Sequence.Cursor - } - - return responseBatch, cursor, nil -} - func getWorkflowRegistryEventReader( ctx context.Context, newReaderFn newContractReaderFn, From 7f71176fa53cb20682eac1833fc8b0aa3a2dcca7 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Wed, 11 Dec 2024 15:14:22 +0000 Subject: [PATCH 10/13] tidy --- .../workflows/syncer/workflow_registry.go | 107 ++++++++---------- 1 file changed, 46 insertions(+), 61 deletions(-) diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index ea9eb57250b..5efb0a9e34e 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -239,7 +239,13 @@ func (w *workflowRegistry) Start(_ context.Context) error { return } - w.syncEventsLoop(ctx, loadWorkflowsHead.Height) + reader, err := w.getContractReader(ctx) + if err != nil { + w.lggr.Criticalf("contract reader unavailable : %s", err) + return + } + + w.syncEventsLoop(ctx, reader, loadWorkflowsHead.Height) }() w.wg.Add(1) @@ -301,36 +307,26 @@ func (w *workflowRegistry) handlerLoop(ctx context.Context) { } // syncEventsLoop polls the contract for events and passes them to a channel for handling. -func (w *workflowRegistry) syncEventsLoop(ctx context.Context, lastReadBlockNumber string) { - var ( - // sendLog is a helper that sends a WorkflowRegistryEventResponse to the eventsCh in a - // blocking way that will send the response or be canceled. - sendLog = func(resp WorkflowRegistryEventResponse) { - select { - case w.eventsCh <- resp: - case <-ctx.Done(): - } - } - - ticker = w.getTicker() - - logData values.Value - limitAndSort = query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: w.eventPollerCfg.QueryCount}, - } - bc = types.BoundContract{ - Name: WorkflowRegistryContractName, - Address: w.workflowRegistryAddress, - } - ) - - // critical failure if there is no reader, the loop will exit and the parent context will be - // canceled. - reader, err := w.getContractReader(ctx) - if err != nil { - w.lggr.Criticalf("contract reader unavailable : %s", err) - return +func (w *workflowRegistry) syncEventsLoop(ctx context.Context, reader ContractReader, lastReadBlockNumber string) { + ticker := w.getTicker() + + var keyQueries []types.ContractKeyFilter + for _, et := range w.eventTypes { + var logData values.Value + keyQueries = append(keyQueries, types.ContractKeyFilter{ + KeyFilter: query.KeyFilter{ + Key: string(et), + Expressions: []query.Expression{ + query.Confidence(primitives.Finalized), + query.Block(lastReadBlockNumber, primitives.Gt), + }, + }, + Contract: types.BoundContract{ + Name: WorkflowRegistryContractName, + Address: w.workflowRegistryAddress, + }, + SequenceDataType: &logData, + }) } cursor := "" @@ -339,34 +335,20 @@ func (w *workflowRegistry) syncEventsLoop(ctx context.Context, lastReadBlockNumb case <-ctx.Done(): return case <-ticker: - - responseBatch := []WorkflowRegistryEventResponse{} - + limitAndSort := query.LimitAndSort{ + SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, + Limit: query.Limit{Count: w.eventPollerCfg.QueryCount}, + } if cursor != "" { limitAndSort.Limit = query.CursorLimit(cursor, query.CursorFollowing, w.eventPollerCfg.QueryCount) } - var keyQueries []types.ContractKeyFilter - for _, et := range w.eventTypes { - keyQueries = append(keyQueries, types.ContractKeyFilter{ - KeyFilter: query.KeyFilter{ - Key: string(et), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block(lastReadBlockNumber, primitives.Gt), - }, - }, - Contract: bc, - SequenceDataType: &logData, - }) + logsIter, err := reader.QueryKeys(ctx, keyQueries, limitAndSort) + if err != nil { + w.lggr.Errorw("failed to query keys", "err", err) + continue } - logsIter, err := reader.QueryKeys( - ctx, - keyQueries, - limitAndSort, - ) - var logs []sequenceWithEventType for eventType, log := range logsIter { logs = append(logs, sequenceWithEventType{ @@ -381,10 +363,6 @@ func (w *workflowRegistry) syncEventsLoop(ctx context.Context, lastReadBlockNumb } w.lggr.Debugw("QueryKeys called", "logs", len(logs), "eventTypes", w.eventTypes, "lastReadBlockNumber", lastReadBlockNumber, "logCursor", lcursor) - if err != nil { - w.lggr.Errorw("failed to query keys: %w", err) - } - // ChainReader QueryKey API provides logs including the cursor value and not // after the cursor value. If the response only consists of the log corresponding // to the cursor and no log after it, then we understand that there are no new @@ -394,19 +372,26 @@ func (w *workflowRegistry) syncEventsLoop(ctx context.Context, lastReadBlockNumb continue } + var events []WorkflowRegistryEventResponse for _, log := range logs { if log.Sequence.Cursor == cursor { continue } - responseBatch = append(responseBatch, toWorkflowRegistryEventResponse(log.Sequence, log.EventType, w.lggr)) + events = append(events, toWorkflowRegistryEventResponse(log.Sequence, log.EventType, w.lggr)) cursor = log.Sequence.Cursor } - for _, event := range responseBatch { - sendLog(event) - } + w.sendEvents(ctx, events) + } + } +} +func (w *workflowRegistry) sendEvents(ctx context.Context, events []WorkflowRegistryEventResponse) { + for _, event := range events { + select { + case <-ctx.Done(): + case w.eventsCh <- event: } } } From 02acd3095af1dd9af20adf107f0062f6e9650c05 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Wed, 11 Dec 2024 15:17:37 +0000 Subject: [PATCH 11/13] enable the event ordering test that was failing in the old workflow syncer --- .../evm/capabilities/workflows/syncer/workflow_syncer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index d51e0961e4b..2ceb59023ec 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -205,7 +205,7 @@ func Test_EventHandlerStateSync(t *testing.T) { if numEvents == expectedNumEvents { // verify the events are the expected types in the expected order - /*. Note the below test does not work with the unrefactored workflow registry, event order is essentially random + // Note the below test does not work with the unrefactored workflow registry, event order is essentially random for idx, event := range events { switch idx % 5 { case 0: @@ -219,7 +219,7 @@ func Test_EventHandlerStateSync(t *testing.T) { case 4: assert.Equal(t, syncer.WorkflowDeletedEvent, event.GetEventType()) } - } */ + } return true } From 16b1e575cfe3ae635c362e9861ea3a4f4411a939 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Wed, 11 Dec 2024 15:23:50 +0000 Subject: [PATCH 12/13] lint remove change to try and resolve flaky eth smoke tests --- .mockery.yaml | 6 - .../workflows/syncer/workflow_syncer_test.go | 16 +- .../workflows/syncer/contract_reader_mock.go | 302 ------------------ core/services/workflows/syncer/heap.go | 63 ---- .../workflows/syncer/workflow_registry.go | 90 +----- .../syncer/workflow_registry_test.go | 234 -------------- 6 files changed, 19 insertions(+), 692 deletions(-) delete mode 100644 core/services/workflows/syncer/contract_reader_mock.go delete mode 100644 core/services/workflows/syncer/heap.go delete mode 100644 core/services/workflows/syncer/workflow_registry_test.go diff --git a/.mockery.yaml b/.mockery.yaml index dd9024cc066..5777ca1da92 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -583,12 +583,6 @@ packages: github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer: interfaces: ORM: - ContractReader: - config: - mockname: "Mock{{ .InterfaceName }}" - filename: contract_reader_mock.go - inpackage: true - dir: "{{ .InterfaceDir }}" Handler: config: mockname: "Mock{{ .InterfaceName }}" diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index 2ceb59023ec..066e85e839f 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -108,8 +108,7 @@ func Test_EventHandlerStateSync(t *testing.T) { updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true) updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true) - // The number of workflows should be greater than the workflow registry contracts pagination limit to ensure - // that the syncer will query the contract multiple times to get the full list of workflows + // Create some initial static state numberWorkflows := 20 for i := 0; i < numberWorkflows; i++ { var workflowID [32]byte @@ -164,11 +163,9 @@ func Test_EventHandlerStateSync(t *testing.T) { testEventHandler.ClearEvents() - // Create events for a number of workflows and confirm that the event handler processes them - + // Create different event types for a number of workflows and confirm that the event handler processes them in order numberOfEventCycles := 50 for i := 0; i < numberOfEventCycles; i++ { - var workflowID [32]byte _, err = rand.Read((workflowID)[:]) require.NoError(t, err) @@ -205,7 +202,6 @@ func Test_EventHandlerStateSync(t *testing.T) { if numEvents == expectedNumEvents { // verify the events are the expected types in the expected order - // Note the below test does not work with the unrefactored workflow registry, event order is essentially random for idx, event := range events { switch idx % 5 { case 0: @@ -566,14 +562,6 @@ func Test_RegistrySyncer_WorkflowRegistered_InitiallyActivated(t *testing.T) { // generate a log event registerWorkflow(t, backendTH, wfRegistryC, giveWorkflow) - /* - string(ForceUpdateSecretsEvent), - string(WorkflowActivatedEvent), - string(WorkflowDeletedEvent), - string(WorkflowPausedEvent), - string(WorkflowRegisteredEvent), - string(WorkflowUpdatedEvent), */ - // Require the secrets contents to eventually be updated require.Eventually(t, func() bool { _, err := er.Get("test-wf") diff --git a/core/services/workflows/syncer/contract_reader_mock.go b/core/services/workflows/syncer/contract_reader_mock.go deleted file mode 100644 index e6e7c8385f5..00000000000 --- a/core/services/workflows/syncer/contract_reader_mock.go +++ /dev/null @@ -1,302 +0,0 @@ -// Code generated by mockery v2.46.3. DO NOT EDIT. - -package syncer - -import ( - context "context" - - query "github.com/smartcontractkit/chainlink-common/pkg/types/query" - primitives "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" - mock "github.com/stretchr/testify/mock" - - types "github.com/smartcontractkit/chainlink-common/pkg/types" -) - -// MockContractReader is an autogenerated mock type for the ContractReader type -type MockContractReader struct { - mock.Mock -} - -type MockContractReader_Expecter struct { - mock *mock.Mock -} - -func (_m *MockContractReader) EXPECT() *MockContractReader_Expecter { - return &MockContractReader_Expecter{mock: &_m.Mock} -} - -// Bind provides a mock function with given fields: _a0, _a1 -func (_m *MockContractReader) Bind(_a0 context.Context, _a1 []types.BoundContract) error { - ret := _m.Called(_a0, _a1) - - if len(ret) == 0 { - panic("no return value specified for Bind") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context, []types.BoundContract) error); ok { - r0 = rf(_a0, _a1) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockContractReader_Bind_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Bind' -type MockContractReader_Bind_Call struct { - *mock.Call -} - -// Bind is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 []types.BoundContract -func (_e *MockContractReader_Expecter) Bind(_a0 interface{}, _a1 interface{}) *MockContractReader_Bind_Call { - return &MockContractReader_Bind_Call{Call: _e.mock.On("Bind", _a0, _a1)} -} - -func (_c *MockContractReader_Bind_Call) Run(run func(_a0 context.Context, _a1 []types.BoundContract)) *MockContractReader_Bind_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].([]types.BoundContract)) - }) - return _c -} - -func (_c *MockContractReader_Bind_Call) Return(_a0 error) *MockContractReader_Bind_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockContractReader_Bind_Call) RunAndReturn(run func(context.Context, []types.BoundContract) error) *MockContractReader_Bind_Call { - _c.Call.Return(run) - return _c -} - -// Close provides a mock function with given fields: -func (_m *MockContractReader) Close() error { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for Close") - } - - var r0 error - if rf, ok := ret.Get(0).(func() error); ok { - r0 = rf() - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockContractReader_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' -type MockContractReader_Close_Call struct { - *mock.Call -} - -// Close is a helper method to define mock.On call -func (_e *MockContractReader_Expecter) Close() *MockContractReader_Close_Call { - return &MockContractReader_Close_Call{Call: _e.mock.On("Close")} -} - -func (_c *MockContractReader_Close_Call) Run(run func()) *MockContractReader_Close_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockContractReader_Close_Call) Return(_a0 error) *MockContractReader_Close_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockContractReader_Close_Call) RunAndReturn(run func() error) *MockContractReader_Close_Call { - _c.Call.Return(run) - return _c -} - -// GetLatestValueWithHeadData provides a mock function with given fields: ctx, readName, confidenceLevel, params, returnVal -func (_m *MockContractReader) GetLatestValueWithHeadData(ctx context.Context, readName string, confidenceLevel primitives.ConfidenceLevel, params any, returnVal any) (*types.Head, error) { - ret := _m.Called(ctx, readName, confidenceLevel, params, returnVal) - - if len(ret) == 0 { - panic("no return value specified for GetLatestValueWithHeadData") - } - - var r0 *types.Head - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, string, primitives.ConfidenceLevel, any, any) (*types.Head, error)); ok { - return rf(ctx, readName, confidenceLevel, params, returnVal) - } - if rf, ok := ret.Get(0).(func(context.Context, string, primitives.ConfidenceLevel, any, any) *types.Head); ok { - r0 = rf(ctx, readName, confidenceLevel, params, returnVal) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*types.Head) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, string, primitives.ConfidenceLevel, any, any) error); ok { - r1 = rf(ctx, readName, confidenceLevel, params, returnVal) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockContractReader_GetLatestValueWithHeadData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLatestValueWithHeadData' -type MockContractReader_GetLatestValueWithHeadData_Call struct { - *mock.Call -} - -// GetLatestValueWithHeadData is a helper method to define mock.On call -// - ctx context.Context -// - readName string -// - confidenceLevel primitives.ConfidenceLevel -// - params any -// - returnVal any -func (_e *MockContractReader_Expecter) GetLatestValueWithHeadData(ctx interface{}, readName interface{}, confidenceLevel interface{}, params interface{}, returnVal interface{}) *MockContractReader_GetLatestValueWithHeadData_Call { - return &MockContractReader_GetLatestValueWithHeadData_Call{Call: _e.mock.On("GetLatestValueWithHeadData", ctx, readName, confidenceLevel, params, returnVal)} -} - -func (_c *MockContractReader_GetLatestValueWithHeadData_Call) Run(run func(ctx context.Context, readName string, confidenceLevel primitives.ConfidenceLevel, params any, returnVal any)) *MockContractReader_GetLatestValueWithHeadData_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string), args[2].(primitives.ConfidenceLevel), args[3].(any), args[4].(any)) - }) - return _c -} - -func (_c *MockContractReader_GetLatestValueWithHeadData_Call) Return(head *types.Head, err error) *MockContractReader_GetLatestValueWithHeadData_Call { - _c.Call.Return(head, err) - return _c -} - -func (_c *MockContractReader_GetLatestValueWithHeadData_Call) RunAndReturn(run func(context.Context, string, primitives.ConfidenceLevel, any, any) (*types.Head, error)) *MockContractReader_GetLatestValueWithHeadData_Call { - _c.Call.Return(run) - return _c -} - -// QueryKey provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 -func (_m *MockContractReader) QueryKey(_a0 context.Context, _a1 types.BoundContract, _a2 query.KeyFilter, _a3 query.LimitAndSort, _a4 any) ([]types.Sequence, error) { - ret := _m.Called(_a0, _a1, _a2, _a3, _a4) - - if len(ret) == 0 { - panic("no return value specified for QueryKey") - } - - var r0 []types.Sequence - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error)); ok { - return rf(_a0, _a1, _a2, _a3, _a4) - } - if rf, ok := ret.Get(0).(func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) []types.Sequence); ok { - r0 = rf(_a0, _a1, _a2, _a3, _a4) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]types.Sequence) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) error); ok { - r1 = rf(_a0, _a1, _a2, _a3, _a4) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockContractReader_QueryKey_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueryKey' -type MockContractReader_QueryKey_Call struct { - *mock.Call -} - -// QueryKey is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 types.BoundContract -// - _a2 query.KeyFilter -// - _a3 query.LimitAndSort -// - _a4 any -func (_e *MockContractReader_Expecter) QueryKey(_a0 interface{}, _a1 interface{}, _a2 interface{}, _a3 interface{}, _a4 interface{}) *MockContractReader_QueryKey_Call { - return &MockContractReader_QueryKey_Call{Call: _e.mock.On("QueryKey", _a0, _a1, _a2, _a3, _a4)} -} - -func (_c *MockContractReader_QueryKey_Call) Run(run func(_a0 context.Context, _a1 types.BoundContract, _a2 query.KeyFilter, _a3 query.LimitAndSort, _a4 any)) *MockContractReader_QueryKey_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(types.BoundContract), args[2].(query.KeyFilter), args[3].(query.LimitAndSort), args[4].(any)) - }) - return _c -} - -func (_c *MockContractReader_QueryKey_Call) Return(_a0 []types.Sequence, _a1 error) *MockContractReader_QueryKey_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockContractReader_QueryKey_Call) RunAndReturn(run func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error)) *MockContractReader_QueryKey_Call { - _c.Call.Return(run) - return _c -} - -// Start provides a mock function with given fields: ctx -func (_m *MockContractReader) Start(ctx context.Context) error { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for Start") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context) error); ok { - r0 = rf(ctx) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockContractReader_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start' -type MockContractReader_Start_Call struct { - *mock.Call -} - -// Start is a helper method to define mock.On call -// - ctx context.Context -func (_e *MockContractReader_Expecter) Start(ctx interface{}) *MockContractReader_Start_Call { - return &MockContractReader_Start_Call{Call: _e.mock.On("Start", ctx)} -} - -func (_c *MockContractReader_Start_Call) Run(run func(ctx context.Context)) *MockContractReader_Start_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *MockContractReader_Start_Call) Return(_a0 error) *MockContractReader_Start_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockContractReader_Start_Call) RunAndReturn(run func(context.Context) error) *MockContractReader_Start_Call { - _c.Call.Return(run) - return _c -} - -// NewMockContractReader creates a new instance of MockContractReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func NewMockContractReader(t interface { - mock.TestingT - Cleanup(func()) -}) *MockContractReader { - mock := &MockContractReader{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} diff --git a/core/services/workflows/syncer/heap.go b/core/services/workflows/syncer/heap.go deleted file mode 100644 index 061293928a3..00000000000 --- a/core/services/workflows/syncer/heap.go +++ /dev/null @@ -1,63 +0,0 @@ -package syncer - -import "container/heap" - -type Heap interface { - // Push adds a new item to the heap. - Push(x WorkflowRegistryEventResponse) - - // Pop removes the smallest item from the heap and returns it. - Pop() WorkflowRegistryEventResponse - - // Len returns the number of items in the heap. - Len() int -} - -// publicHeap is a wrapper around the heap.Interface that exposes the Push and Pop methods. -type publicHeap[T any] struct { - heap heap.Interface -} - -func (h *publicHeap[T]) Push(x T) { - heap.Push(h.heap, x) -} - -func (h *publicHeap[T]) Pop() T { - return heap.Pop(h.heap).(T) -} - -func (h *publicHeap[T]) Len() int { - return h.heap.Len() -} - -// blockHeightHeap is a heap.Interface that sorts WorkflowRegistryEventResponses by block height. -type blockHeightHeap []WorkflowRegistryEventResponse - -// newBlockHeightHeap returns an initialized heap that sorts WorkflowRegistryEventResponses by block height. -func newBlockHeightHeap() Heap { - h := blockHeightHeap(make([]WorkflowRegistryEventResponse, 0)) - heap.Init(&h) - return &publicHeap[WorkflowRegistryEventResponse]{heap: &h} -} - -func (h *blockHeightHeap) Len() int { return len(*h) } - -func (h *blockHeightHeap) Less(i, j int) bool { - return (*h)[i].Event.Head.Height < (*h)[j].Event.Head.Height -} - -func (h *blockHeightHeap) Swap(i, j int) { - (*h)[i], (*h)[j] = (*h)[j], (*h)[i] -} - -func (h *blockHeightHeap) Push(x any) { - *h = append(*h, x.(WorkflowRegistryEventResponse)) -} - -func (h *blockHeightHeap) Pop() any { - old := *h - n := len(old) - x := old[n-1] - *h = old[0 : n-1] - return x -} diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 5efb0a9e34e..4db6d26394e 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -91,10 +91,6 @@ type WorkflowLoadConfig struct { // FetcherFunc is an abstraction for fetching the contents stored at a URL. type FetcherFunc func(ctx context.Context, url string) ([]byte, error) -type ContractReaderFactory interface { - NewContractReader(context.Context, []byte) (types.ContractReader, error) -} - // ContractReader is a subset of types.ContractReader defined locally to enable mocking. type ContractReader interface { Start(ctx context.Context) error @@ -104,6 +100,10 @@ type ContractReader interface { GetLatestValueWithHeadData(ctx context.Context, readName string, confidenceLevel primitives.ConfidenceLevel, params any, returnVal any) (head *types.Head, err error) } +type ContractReaderFactory interface { + NewContractReader(context.Context, []byte) (types.ContractReader, error) +} + // WorkflowRegistrySyncer is the public interface of the package. type WorkflowRegistrySyncer interface { services.Service @@ -129,21 +129,11 @@ type workflowRegistry struct { newContractReaderFn newContractReaderFn - eventPollerCfg WorkflowEventPollerConfig - eventTypes []WorkflowRegistryEventType - - // eventsCh is read by the handler and each event is handled once received. - eventsCh chan WorkflowRegistryEventResponse + eventPollerCfg WorkflowEventPollerConfig + eventTypes []WorkflowRegistryEventType handler evtHandler initialWorkflowsStateLoader initialWorkflowsStateLoader - // batchCh is a channel that receives batches of events from the contract query goroutines. - batchCh chan []WorkflowRegistryEventResponse - - // heap is a min heap that merges batches of events from the contract query goroutines. The - // default min heap is sorted by block height. - heap Heap - workflowDonNotifier donNotifier reader ContractReader @@ -198,11 +188,8 @@ func NewWorkflowRegistry( newContractReaderFn: newContractReaderFn, workflowRegistryAddress: addr, eventPollerCfg: eventPollerConfig, - heap: newBlockHeightHeap(), stopCh: make(services.StopChan), eventTypes: ets, - eventsCh: make(chan WorkflowRegistryEventResponse), - batchCh: make(chan []WorkflowRegistryEventResponse, 50000), handler: handler, initialWorkflowsStateLoader: initialWorkflowsStateLoader, workflowDonNotifier: workflowDonNotifier, @@ -245,15 +232,7 @@ func (w *workflowRegistry) Start(_ context.Context) error { return } - w.syncEventsLoop(ctx, reader, loadWorkflowsHead.Height) - }() - - w.wg.Add(1) - go func() { - defer w.wg.Done() - defer cancel() - - w.handlerLoop(ctx) + w.readRegistryEvents(ctx, reader, loadWorkflowsHead.Height) }() return nil @@ -280,37 +259,11 @@ func (w *workflowRegistry) Name() string { return name } -// handlerLoop handles the events that are emitted by the contract. -func (w *workflowRegistry) handlerLoop(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case resp, open := <-w.eventsCh: - if !open { - return - } - - if resp.Err != nil || resp.Event == nil { - w.lggr.Errorw("failed to handle event", "err", resp.Err) - continue - } - - event := resp.Event - w.lggr.Debugf("handling event: %+v", event) - if err := w.handler.Handle(ctx, *event); err != nil { - w.lggr.Errorw("failed to handle event", "event", event, "err", err) - continue - } - } - } -} - -// syncEventsLoop polls the contract for events and passes them to a channel for handling. -func (w *workflowRegistry) syncEventsLoop(ctx context.Context, reader ContractReader, lastReadBlockNumber string) { +// readRegistryEvents polls the contract for events and send them to the events channel. +func (w *workflowRegistry) readRegistryEvents(ctx context.Context, reader ContractReader, lastReadBlockNumber string) { ticker := w.getTicker() - var keyQueries []types.ContractKeyFilter + var keyQueries = make([]types.ContractKeyFilter, 0, len(w.eventTypes)) for _, et := range w.eventTypes { var logData values.Value keyQueries = append(keyQueries, types.ContractKeyFilter{ @@ -356,12 +309,7 @@ func (w *workflowRegistry) syncEventsLoop(ctx context.Context, reader ContractRe EventType: WorkflowRegistryEventType(eventType), }) } - - lcursor := cursor - if lcursor == "" { - lcursor = "empty" - } - w.lggr.Debugw("QueryKeys called", "logs", len(logs), "eventTypes", w.eventTypes, "lastReadBlockNumber", lastReadBlockNumber, "logCursor", lcursor) + w.lggr.Debugw("QueryKeys called", "logs", len(logs), "eventTypes", w.eventTypes, "lastReadBlockNumber", lastReadBlockNumber, "logCursor", cursor) // ChainReader QueryKey API provides logs including the cursor value and not // after the cursor value. If the response only consists of the log corresponding @@ -382,16 +330,12 @@ func (w *workflowRegistry) syncEventsLoop(ctx context.Context, reader ContractRe cursor = log.Sequence.Cursor } - w.sendEvents(ctx, events) - } - } -} - -func (w *workflowRegistry) sendEvents(ctx context.Context, events []WorkflowRegistryEventResponse) { - for _, event := range events { - select { - case <-ctx.Done(): - case w.eventsCh <- event: + for _, event := range events { + err := w.handler.Handle(ctx, event.Event) + if err != nil { + w.lggr.Errorw("failed to handle event", "err", err) + } + } } } } diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go deleted file mode 100644 index 073de2cbdbc..00000000000 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ /dev/null @@ -1,234 +0,0 @@ -package syncer - -import ( - "context" - "encoding/hex" - "testing" - "time" - - "github.com/stretchr/testify/mock" - - "github.com/jonboulle/clockwork" - - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - "github.com/smartcontractkit/chainlink-common/pkg/custmsg" - "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" - "github.com/smartcontractkit/chainlink-common/pkg/types" - "github.com/smartcontractkit/chainlink-common/pkg/types/query" - "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" - "github.com/smartcontractkit/chainlink-common/pkg/values" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" - "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" - "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey" - "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" - "github.com/smartcontractkit/chainlink/v2/core/utils/matches" - - "github.com/stretchr/testify/require" -) - -type testDonNotifier struct { - don capabilities.DON - err error -} - -func (t *testDonNotifier) WaitForDon(ctx context.Context) (capabilities.DON, error) { - return t.don, t.err -} - -func Test_Workflow_Registry_Syncer(t *testing.T) { - var ( - giveContents = "contents" - wantContents = "updated contents" - contractAddress = "0xdeadbeef" - giveCfg = WorkflowEventPollerConfig{ - QueryCount: 20, - } - giveURL = "http://example.com" - giveHash, err = crypto.Keccak256([]byte(giveURL)) - - giveLog = types.Sequence{ - Data: map[string]any{ - "SecretsURLHash": giveHash, - "Owner": "0xowneraddr", - }, - Cursor: "cursor", - } - ) - - require.NoError(t, err) - - var ( - lggr = logger.TestLogger(t) - db = pgtest.NewSqlxDB(t) - orm = &orm{ds: db, lggr: lggr} - ctx, cancel = context.WithCancel(testutils.Context(t)) - reader = NewMockContractReader(t) - emitter = custmsg.NewLabeler() - gateway = func(_ context.Context, _ string) ([]byte, error) { - return []byte(wantContents), nil - } - ticker = make(chan time.Time) - - handler = NewEventHandler(lggr, orm, gateway, nil, nil, - emitter, clockwork.NewFakeClock(), workflowkey.Key{}) - loader = NewWorkflowRegistryContractLoader(lggr, contractAddress, func(ctx context.Context, bytes []byte) (ContractReader, error) { - return reader, nil - }, handler) - - worker = NewWorkflowRegistry(lggr, func(ctx context.Context, bytes []byte) (ContractReader, error) { - return reader, nil - }, contractAddress, - WorkflowEventPollerConfig{ - QueryCount: 20, - }, handler, loader, - &testDonNotifier{ - don: capabilities.DON{ - ID: 1, - }, - err: nil, - }, - WithTicker(ticker)) - ) - - // Cleanup the worker - defer cancel() - - // Seed the DB with an original entry - _, err = orm.Create(ctx, giveURL, hex.EncodeToString(giveHash), giveContents) - require.NoError(t, err) - - // Mock out the contract reader query - reader.EXPECT().QueryKey( - matches.AnyContext, - types.BoundContract{ - Name: WorkflowRegistryContractName, - Address: contractAddress, - }, - query.KeyFilter{ - Key: string(ForceUpdateSecretsEvent), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block("0", primitives.Gte), - }, - }, - query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: giveCfg.QueryCount}, - }, - new(values.Value), - ).Return([]types.Sequence{giveLog}, nil) - reader.EXPECT().QueryKey( - matches.AnyContext, - types.BoundContract{ - Name: WorkflowRegistryContractName, - Address: contractAddress, - }, - query.KeyFilter{ - Key: string(WorkflowPausedEvent), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block("0", primitives.Gte), - }, - }, - query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: giveCfg.QueryCount}, - }, - new(values.Value), - ).Return([]types.Sequence{}, nil) - reader.EXPECT().QueryKey( - matches.AnyContext, - types.BoundContract{ - Name: WorkflowRegistryContractName, - Address: contractAddress, - }, - query.KeyFilter{ - Key: string(WorkflowDeletedEvent), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block("0", primitives.Gte), - }, - }, - query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: giveCfg.QueryCount}, - }, - new(values.Value), - ).Return([]types.Sequence{}, nil) - reader.EXPECT().QueryKey( - matches.AnyContext, - types.BoundContract{ - Name: WorkflowRegistryContractName, - Address: contractAddress, - }, - query.KeyFilter{ - Key: string(WorkflowActivatedEvent), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block("0", primitives.Gte), - }, - }, - query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: giveCfg.QueryCount}, - }, - new(values.Value), - ).Return([]types.Sequence{}, nil) - reader.EXPECT().QueryKey( - matches.AnyContext, - types.BoundContract{ - Name: WorkflowRegistryContractName, - Address: contractAddress, - }, - query.KeyFilter{ - Key: string(WorkflowUpdatedEvent), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block("0", primitives.Gte), - }, - }, - query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: giveCfg.QueryCount}, - }, - new(values.Value), - ).Return([]types.Sequence{}, nil) - reader.EXPECT().QueryKey( - matches.AnyContext, - types.BoundContract{ - Name: WorkflowRegistryContractName, - Address: contractAddress, - }, - query.KeyFilter{ - Key: string(WorkflowRegisteredEvent), - Expressions: []query.Expression{ - query.Confidence(primitives.Finalized), - query.Block("0", primitives.Gte), - }, - }, - query.LimitAndSort{ - SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, - Limit: query.Limit{Count: giveCfg.QueryCount}, - }, - new(values.Value), - ).Return([]types.Sequence{}, nil) - reader.EXPECT().GetLatestValueWithHeadData(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&types.Head{ - Height: "0", - }, nil) - reader.EXPECT().Start(mock.Anything).Return(nil) - reader.EXPECT().Bind(mock.Anything, mock.Anything).Return(nil) - - // Go run the worker - servicetest.Run(t, worker) - - // Send a tick to start a query - ticker <- time.Now() - - // Require the secrets contents to eventually be updated - require.Eventually(t, func() bool { - secrets, err := orm.GetContents(ctx, giveURL) - require.NoError(t, err) - return secrets == wantContents - }, 5*time.Second, time.Second) -} From 75da0b3546e545ea5216cd5d73ed662fde15daf5 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Thu, 12 Dec 2024 15:07:19 +0000 Subject: [PATCH 13/13] log fix --- core/services/workflows/syncer/workflow_registry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 4db6d26394e..223fbe8e758 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -528,7 +528,7 @@ func (l *workflowRegistryContractLoader) LoadWorkflows(ctx context.Context, don var workflows GetWorkflowMetadataListByDONReturnVal headAtLastRead, err = contractReader.GetLatestValueWithHeadData(ctx, readIdentifier, primitives.Finalized, params, &workflows) if err != nil { - return nil, fmt.Errorf("failed to get workflow metadata for don %w", err) + return nil, fmt.Errorf("failed to get lastest value with head data %w", err) } l.lggr.Debugw("Rehydrating existing workflows", "len", len(workflows.WorkflowMetadataList))