Skip to content

Commit

Permalink
Refactor workflow registry to use querykeys api (#15638)
Browse files Browse the repository at this point in the history
* temp disable test

* wip

* fix duplicate event bug - change contract reader poll query from Gte to Gt

* event state sync test added and passing with fixes - single event

* added methods to create different event types

* pre-refactor checkpoint - up to this commit no changes have been made to wf registry - test has been added to confirm (and actually has found bugs) in current wf registry behaviour

* refactor part 1:  replaced querykey with querykeys - all tests passing

* refactor part 2: collapse individual event query into single events query and reduce goroutine and channel usage

* refactor part 3 - down to single ticker thread

* tidy

* enable the event ordering test that was failing in the old workflow syncer

* lint

remove change to try and resolve flaky eth smoke tests

* log fix
  • Loading branch information
ettec authored Dec 12, 2024
1 parent dde1751 commit 83f7413
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 838 deletions.
6 changes: 0 additions & 6 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -583,12 +583,6 @@ packages:
github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer:
interfaces:
ORM:
ContractReader:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: contract_reader_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
Handler:
config:
mockname: "Mock{{ .InterfaceName }}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"encoding/base64"
"encoding/hex"
"fmt"
rand2 "math/rand/v2"
"strings"
"sync"
"testing"
"time"

Expand All @@ -31,17 +33,38 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"

"github.com/stretchr/testify/require"

crypto2 "github.com/ethereum/go-ethereum/crypto"
)

type testEvtHandler struct {
events []syncer.Event
mux sync.Mutex
}

func (m *testEvtHandler) Handle(ctx context.Context, event syncer.Event) error {
m.mux.Lock()
defer m.mux.Unlock()
m.events = append(m.events, event)
return nil
}

func (m *testEvtHandler) ClearEvents() {
m.mux.Lock()
defer m.mux.Unlock()
m.events = make([]syncer.Event, 0)
}

func (m *testEvtHandler) GetEvents() []syncer.Event {
m.mux.Lock()
defer m.mux.Unlock()

eventsCopy := make([]syncer.Event, len(m.events))
copy(eventsCopy, m.events)

return eventsCopy
}

func newTestEvtHandler() *testEvtHandler {
return &testEvtHandler{
events: make([]syncer.Event, 0),
Expand All @@ -68,6 +91,138 @@ func (m *testWorkflowRegistryContractLoader) LoadWorkflows(ctx context.Context,
}, nil
}

func Test_EventHandlerStateSync(t *testing.T) {
lggr := logger.TestLogger(t)
backendTH := testutils.NewEVMBackendTH(t)
donID := uint32(1)

eventPollTicker := time.NewTicker(50 * time.Millisecond)
defer eventPollTicker.Stop()

// Deploy a test workflow_registry
wfRegistryAddr, _, wfRegistryC, err := workflow_registry_wrapper.DeployWorkflowRegistry(backendTH.ContractsOwner, backendTH.Backend.Client())
backendTH.Backend.Commit()
require.NoError(t, err)

// setup contract state to allow the secrets to be updated
updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true)
updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true)

// Create some initial static state
numberWorkflows := 20
for i := 0; i < numberWorkflows; i++ {
var workflowID [32]byte
_, err = rand.Read((workflowID)[:])
require.NoError(t, err)
workflow := RegisterWorkflowCMD{
Name: fmt.Sprintf("test-wf-%d", i),
DonID: donID,
Status: uint8(1),
SecretsURL: "someurl",
}
workflow.ID = workflowID
registerWorkflow(t, backendTH, wfRegistryC, workflow)
}

testEventHandler := newTestEvtHandler()
loader := syncer.NewWorkflowRegistryContractLoader(lggr, wfRegistryAddr.Hex(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return backendTH.NewContractReader(ctx, t, bytes)
}, testEventHandler)

// Create the registry
registry := syncer.NewWorkflowRegistry(
lggr,
func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return backendTH.NewContractReader(ctx, t, bytes)
},
wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{
QueryCount: 20,
},
testEventHandler,
loader,
&testDonNotifier{
don: capabilities.DON{
ID: donID,
},
err: nil,
},
syncer.WithTicker(eventPollTicker.C),
)

servicetest.Run(t, registry)

require.Eventually(t, func() bool {
numEvents := len(testEventHandler.GetEvents())
return numEvents == numberWorkflows
}, 5*time.Second, time.Second)

for _, event := range testEventHandler.GetEvents() {
assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType())
}

testEventHandler.ClearEvents()

// Create different event types for a number of workflows and confirm that the event handler processes them in order
numberOfEventCycles := 50
for i := 0; i < numberOfEventCycles; i++ {
var workflowID [32]byte
_, err = rand.Read((workflowID)[:])
require.NoError(t, err)
workflow := RegisterWorkflowCMD{
Name: "test-wf-register-event",
DonID: donID,
Status: uint8(1),
SecretsURL: "",
}
workflow.ID = workflowID

// Generate events of different types with some jitter
registerWorkflow(t, backendTH, wfRegistryC, workflow)
time.Sleep(time.Millisecond * time.Duration(rand2.IntN(10)))
data := append(backendTH.ContractsOwner.From.Bytes(), []byte(workflow.Name)...)
workflowKey := crypto2.Keccak256Hash(data)
activateWorkflow(t, backendTH, wfRegistryC, workflowKey)
time.Sleep(time.Millisecond * time.Duration(rand2.IntN(10)))
pauseWorkflow(t, backendTH, wfRegistryC, workflowKey)
time.Sleep(time.Millisecond * time.Duration(rand2.IntN(10)))
var newWorkflowID [32]byte
_, err = rand.Read((newWorkflowID)[:])
require.NoError(t, err)
updateWorkflow(t, backendTH, wfRegistryC, workflowKey, newWorkflowID, workflow.BinaryURL+"2", workflow.ConfigURL, workflow.SecretsURL)
time.Sleep(time.Millisecond * time.Duration(rand2.IntN(10)))
deleteWorkflow(t, backendTH, wfRegistryC, workflowKey)
}

// Confirm the expected number of events are received in the correct order
require.Eventually(t, func() bool {
events := testEventHandler.GetEvents()
numEvents := len(events)
expectedNumEvents := 5 * numberOfEventCycles

if numEvents == expectedNumEvents {
// verify the events are the expected types in the expected order
for idx, event := range events {
switch idx % 5 {
case 0:
assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType())
case 1:
assert.Equal(t, syncer.WorkflowActivatedEvent, event.GetEventType())
case 2:
assert.Equal(t, syncer.WorkflowPausedEvent, event.GetEventType())
case 3:
assert.Equal(t, syncer.WorkflowUpdatedEvent, event.GetEventType())
case 4:
assert.Equal(t, syncer.WorkflowDeletedEvent, event.GetEventType())
}
}
return true
}

return false
}, 50*time.Second, time.Second)
}

func Test_InitialStateSync(t *testing.T) {
lggr := logger.TestLogger(t)
backendTH := testutils.NewEVMBackendTH(t)
Expand Down Expand Up @@ -128,10 +283,10 @@ func Test_InitialStateSync(t *testing.T) {
servicetest.Run(t, worker)

require.Eventually(t, func() bool {
return len(testEventHandler.events) == numberWorkflows
return len(testEventHandler.GetEvents()) == numberWorkflows
}, 5*time.Second, time.Second)

for _, event := range testEventHandler.events {
for _, event := range testEventHandler.GetEvents() {
assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType())
}
}
Expand Down Expand Up @@ -497,3 +652,59 @@ func requestForceUpdateSecrets(
th.Backend.Commit()
th.Backend.Commit()
}

func activateWorkflow(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
workflowKey [32]byte,
) {
t.Helper()
_, err := wfRegC.ActivateWorkflow(th.ContractsOwner, workflowKey)
require.NoError(t, err, "failed to activate workflow")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}

func pauseWorkflow(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
workflowKey [32]byte,
) {
t.Helper()
_, err := wfRegC.PauseWorkflow(th.ContractsOwner, workflowKey)
require.NoError(t, err, "failed to pause workflow")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}

func deleteWorkflow(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
workflowKey [32]byte,
) {
t.Helper()
_, err := wfRegC.DeleteWorkflow(th.ContractsOwner, workflowKey)
require.NoError(t, err, "failed to delete workflow")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}

func updateWorkflow(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
workflowKey [32]byte, newWorkflowID [32]byte, binaryURL string, configURL string, secretsURL string,
) {
t.Helper()
_, err := wfRegC.UpdateWorkflow(th.ContractsOwner, workflowKey, newWorkflowID, binaryURL, configURL, secretsURL)
require.NoError(t, err, "failed to update workflow")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}
Loading

0 comments on commit 83f7413

Please sign in to comment.