Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor workflow registry to use querykeys api #15638

Merged
merged 13 commits into from
Dec 12, 2024
6 changes: 0 additions & 6 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -583,12 +583,6 @@ packages:
github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer:
interfaces:
ORM:
ContractReader:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: contract_reader_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
Handler:
config:
mockname: "Mock{{ .InterfaceName }}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"encoding/base64"
"encoding/hex"
"fmt"
rand2 "math/rand/v2"
"strings"
"sync"
"testing"
"time"

Expand All @@ -31,17 +33,38 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"

"github.com/stretchr/testify/require"

crypto2 "github.com/ethereum/go-ethereum/crypto"
)

type testEvtHandler struct {
events []syncer.Event
mux sync.Mutex
}

func (m *testEvtHandler) Handle(ctx context.Context, event syncer.Event) error {
m.mux.Lock()
defer m.mux.Unlock()
m.events = append(m.events, event)
return nil
}

func (m *testEvtHandler) ClearEvents() {
m.mux.Lock()
defer m.mux.Unlock()
m.events = make([]syncer.Event, 0)
}

func (m *testEvtHandler) GetEvents() []syncer.Event {
m.mux.Lock()
defer m.mux.Unlock()

eventsCopy := make([]syncer.Event, len(m.events))
copy(eventsCopy, m.events)

return eventsCopy
}

func newTestEvtHandler() *testEvtHandler {
return &testEvtHandler{
events: make([]syncer.Event, 0),
Expand All @@ -68,6 +91,138 @@ func (m *testWorkflowRegistryContractLoader) LoadWorkflows(ctx context.Context,
}, nil
}

func Test_EventHandlerStateSync(t *testing.T) {
lggr := logger.TestLogger(t)
backendTH := testutils.NewEVMBackendTH(t)
donID := uint32(1)

eventPollTicker := time.NewTicker(50 * time.Millisecond)
defer eventPollTicker.Stop()

// Deploy a test workflow_registry
wfRegistryAddr, _, wfRegistryC, err := workflow_registry_wrapper.DeployWorkflowRegistry(backendTH.ContractsOwner, backendTH.Backend.Client())
backendTH.Backend.Commit()
require.NoError(t, err)

// setup contract state to allow the secrets to be updated
updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true)
updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true)

// Create some initial static state
numberWorkflows := 20
for i := 0; i < numberWorkflows; i++ {
var workflowID [32]byte
_, err = rand.Read((workflowID)[:])
require.NoError(t, err)
workflow := RegisterWorkflowCMD{
Name: fmt.Sprintf("test-wf-%d", i),
DonID: donID,
Status: uint8(1),
SecretsURL: "someurl",
}
workflow.ID = workflowID
registerWorkflow(t, backendTH, wfRegistryC, workflow)
}

testEventHandler := newTestEvtHandler()
loader := syncer.NewWorkflowRegistryContractLoader(lggr, wfRegistryAddr.Hex(), func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return backendTH.NewContractReader(ctx, t, bytes)
}, testEventHandler)

// Create the registry
registry := syncer.NewWorkflowRegistry(
lggr,
func(ctx context.Context, bytes []byte) (syncer.ContractReader, error) {
return backendTH.NewContractReader(ctx, t, bytes)
},
wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{
QueryCount: 20,
},
testEventHandler,
loader,
&testDonNotifier{
don: capabilities.DON{
ID: donID,
},
err: nil,
},
syncer.WithTicker(eventPollTicker.C),
)

servicetest.Run(t, registry)

require.Eventually(t, func() bool {
numEvents := len(testEventHandler.GetEvents())
return numEvents == numberWorkflows
}, 5*time.Second, time.Second)

for _, event := range testEventHandler.GetEvents() {
assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType())
}

testEventHandler.ClearEvents()

// Create different event types for a number of workflows and confirm that the event handler processes them in order
numberOfEventCycles := 50
for i := 0; i < numberOfEventCycles; i++ {
var workflowID [32]byte
_, err = rand.Read((workflowID)[:])
require.NoError(t, err)
workflow := RegisterWorkflowCMD{
Name: "test-wf-register-event",
DonID: donID,
Status: uint8(1),
SecretsURL: "",
}
workflow.ID = workflowID

// Generate events of different types with some jitter
registerWorkflow(t, backendTH, wfRegistryC, workflow)
time.Sleep(time.Millisecond * time.Duration(rand2.IntN(10)))
data := append(backendTH.ContractsOwner.From.Bytes(), []byte(workflow.Name)...)
workflowKey := crypto2.Keccak256Hash(data)
activateWorkflow(t, backendTH, wfRegistryC, workflowKey)
time.Sleep(time.Millisecond * time.Duration(rand2.IntN(10)))
pauseWorkflow(t, backendTH, wfRegistryC, workflowKey)
time.Sleep(time.Millisecond * time.Duration(rand2.IntN(10)))
var newWorkflowID [32]byte
_, err = rand.Read((newWorkflowID)[:])
require.NoError(t, err)
updateWorkflow(t, backendTH, wfRegistryC, workflowKey, newWorkflowID, workflow.BinaryURL+"2", workflow.ConfigURL, workflow.SecretsURL)
time.Sleep(time.Millisecond * time.Duration(rand2.IntN(10)))
deleteWorkflow(t, backendTH, wfRegistryC, workflowKey)
}

// Confirm the expected number of events are received in the correct order
require.Eventually(t, func() bool {
events := testEventHandler.GetEvents()
numEvents := len(events)
expectedNumEvents := 5 * numberOfEventCycles

if numEvents == expectedNumEvents {
// verify the events are the expected types in the expected order
for idx, event := range events {
switch idx % 5 {
case 0:
assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType())
case 1:
assert.Equal(t, syncer.WorkflowActivatedEvent, event.GetEventType())
case 2:
assert.Equal(t, syncer.WorkflowPausedEvent, event.GetEventType())
case 3:
assert.Equal(t, syncer.WorkflowUpdatedEvent, event.GetEventType())
case 4:
assert.Equal(t, syncer.WorkflowDeletedEvent, event.GetEventType())
}
}
return true
}

return false
}, 50*time.Second, time.Second)
}

func Test_InitialStateSync(t *testing.T) {
lggr := logger.TestLogger(t)
backendTH := testutils.NewEVMBackendTH(t)
Expand Down Expand Up @@ -128,10 +283,10 @@ func Test_InitialStateSync(t *testing.T) {
servicetest.Run(t, worker)

require.Eventually(t, func() bool {
return len(testEventHandler.events) == numberWorkflows
return len(testEventHandler.GetEvents()) == numberWorkflows
}, 5*time.Second, time.Second)

for _, event := range testEventHandler.events {
for _, event := range testEventHandler.GetEvents() {
assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType())
}
}
Expand Down Expand Up @@ -497,3 +652,59 @@ func requestForceUpdateSecrets(
th.Backend.Commit()
th.Backend.Commit()
}

func activateWorkflow(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
workflowKey [32]byte,
) {
t.Helper()
_, err := wfRegC.ActivateWorkflow(th.ContractsOwner, workflowKey)
require.NoError(t, err, "failed to activate workflow")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}

func pauseWorkflow(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
workflowKey [32]byte,
) {
t.Helper()
_, err := wfRegC.PauseWorkflow(th.ContractsOwner, workflowKey)
require.NoError(t, err, "failed to pause workflow")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}

func deleteWorkflow(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
workflowKey [32]byte,
) {
t.Helper()
_, err := wfRegC.DeleteWorkflow(th.ContractsOwner, workflowKey)
require.NoError(t, err, "failed to delete workflow")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}

func updateWorkflow(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
workflowKey [32]byte, newWorkflowID [32]byte, binaryURL string, configURL string, secretsURL string,
) {
t.Helper()
_, err := wfRegC.UpdateWorkflow(th.ContractsOwner, workflowKey, newWorkflowID, binaryURL, configURL, secretsURL)
require.NoError(t, err, "failed to update workflow")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}
Loading
Loading