Skip to content

Commit

Permalink
Wf syncer rebuild state (#15387)
Browse files Browse the repository at this point in the history
* workflow registry sychronisation at startup

* cleanup after rebase

* lint

* tidy

* common bump

* common version

* cv

* cv

* cv

* lint

* lint
  • Loading branch information
ettec authored Nov 27, 2024
1 parent 5c73741 commit 142f67c
Show file tree
Hide file tree
Showing 6 changed files with 461 additions and 185 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"crypto/rand"
"encoding/hex"
"encoding/json"
"fmt"
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
Expand All @@ -26,6 +28,111 @@ import (
"github.com/stretchr/testify/require"
)

type testEvtHandler struct {
events []syncer.Event
}

func (m *testEvtHandler) Handle(ctx context.Context, event syncer.Event) error {
m.events = append(m.events, event)
return nil
}

func newTestEvtHandler() *testEvtHandler {
return &testEvtHandler{
events: make([]syncer.Event, 0),
}
}

type testWorkflowRegistryContractLoader struct {
}

func (m *testWorkflowRegistryContractLoader) LoadWorkflows(ctx context.Context) (*types.Head, error) {
return &types.Head{
Height: "0",
Hash: nil,
Timestamp: 0,
}, nil
}

func Test_InitialStateSync(t *testing.T) {
ctx := coretestutils.Context(t)
lggr := logger.TestLogger(t)
backendTH := testutils.NewEVMBackendTH(t)
donID := uint32(1)

// Deploy a test workflow_registry
wfRegistryAddr, _, wfRegistryC, err := workflow_registry_wrapper.DeployWorkflowRegistry(backendTH.ContractsOwner, backendTH.Backend.Client())
backendTH.Backend.Commit()
require.NoError(t, err)

// Build the ContractReader config
contractReaderCfg := evmtypes.ChainReaderConfig{
Contracts: map[string]evmtypes.ChainContractReader{
syncer.WorkflowRegistryContractName: {
ContractABI: workflow_registry_wrapper.WorkflowRegistryABI,
Configs: map[string]*evmtypes.ChainReaderDefinition{
syncer.GetWorkflowMetadataListByDONMethodName: {
ChainSpecificName: syncer.GetWorkflowMetadataListByDONMethodName,
},
},
},
},
}

contractReaderCfgBytes, err := json.Marshal(contractReaderCfg)
require.NoError(t, err)

contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes)
require.NoError(t, err)

err = contractReader.Bind(ctx, []types.BoundContract{{Name: syncer.WorkflowRegistryContractName, Address: wfRegistryAddr.Hex()}})
require.NoError(t, err)

// setup contract state to allow the secrets to be updated
updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true)
updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true)

// The number of workflows should be greater than the workflow registry contracts pagination limit to ensure
// that the syncer will query the contract multiple times to get the full list of workflows
numberWorkflows := 250
for i := 0; i < numberWorkflows; i++ {
var workflowID [32]byte
_, err = rand.Read((workflowID)[:])
require.NoError(t, err)
workflow := RegisterWorkflowCMD{
Name: fmt.Sprintf("test-wf-%d", i),
DonID: donID,
Status: uint8(1),
SecretsURL: "someurl",
}
workflow.ID = workflowID
registerWorkflow(t, backendTH, wfRegistryC, workflow)
}

testEventHandler := newTestEvtHandler()
loader := syncer.NewWorkflowRegistryContractLoader(wfRegistryAddr.Hex(), donID, contractReader, testEventHandler)

// Create the worker
worker := syncer.NewWorkflowRegistry(
lggr,
contractReader,
wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{
QueryCount: 20,
},
testEventHandler,
loader,
syncer.WithTicker(make(chan time.Time)),
)

servicetest.Run(t, worker)

assert.Len(t, testEventHandler.events, numberWorkflows)
for _, event := range testEventHandler.events {
assert.Equal(t, syncer.WorkflowRegisteredEvent, event.GetEventType())
}
}

func Test_SecretsWorker(t *testing.T) {
var (
ctx = coretestutils.Context(t)
Expand All @@ -49,7 +156,7 @@ func Test_SecretsWorker(t *testing.T) {
fetcherFn = func(_ context.Context, _ string) ([]byte, error) {
return []byte(wantContents), nil
}
contractName = syncer.ContractName
contractName = syncer.WorkflowRegistryContractName
forceUpdateSecretsEvent = string(syncer.ForceUpdateSecretsEvent)
)

Expand Down Expand Up @@ -81,6 +188,9 @@ func Test_SecretsWorker(t *testing.T) {
ChainSpecificName: forceUpdateSecretsEvent,
ReadType: evmtypes.Event,
},
syncer.GetWorkflowMetadataListByDONMethodName: {
ChainSpecificName: syncer.GetWorkflowMetadataListByDONMethodName,
},
},
},
},
Expand Down Expand Up @@ -112,26 +222,21 @@ func Test_SecretsWorker(t *testing.T) {
require.NoError(t, err)
require.Equal(t, contents, giveContents)

// Create the worker
worker := syncer.NewWorkflowRegistry(
lggr,
orm,
contractReader,
fetcherFn,
wfRegistryAddr.Hex(),
nil,
nil,
emitter,
syncer.WithTicker(giveTicker.C),
)
handler := syncer.NewEventHandler(lggr, orm, fetcherFn, nil, nil,
emitter, nil)

servicetest.Run(t, worker)
worker := syncer.NewWorkflowRegistry(lggr, contractReader, wfRegistryAddr.Hex(),
syncer.WorkflowEventPollerConfig{
QueryCount: 20,
}, handler, &testWorkflowRegistryContractLoader{}, syncer.WithTicker(giveTicker.C))

// setup contract state to allow the secrets to be updated
updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true)
updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true)
registerWorkflow(t, backendTH, wfRegistryC, giveWorkflow)

servicetest.Run(t, worker)

// generate a log event
requestForceUpdateSecrets(t, backendTH, wfRegistryC, giveSecretsURL)

Expand Down
63 changes: 63 additions & 0 deletions core/services/workflows/syncer/contract_reader_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 142f67c

Please sign in to comment.