diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index e830b141d68..802dc427c93 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -21,7 +21,6 @@ import ( evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" - "github.com/smartcontractkit/chainlink/v2/core/utils/signalers" "github.com/stretchr/testify/require" ) @@ -34,7 +33,7 @@ func Test_SecretsWorker(t *testing.T) { db = pgtest.NewSqlxDB(t) orm = syncer.NewWorkflowRegistryDS(db, lggr) - giveTicker = signalers.MakeTicker(ctx.Done(), 500*time.Millisecond) + giveTicker = time.NewTicker(500 * time.Millisecond) giveSecretsURL = "https://original-url.com" donID = uint32(1) giveWorkflow = RegisterWorkflowCMD{ @@ -52,6 +51,8 @@ func Test_SecretsWorker(t *testing.T) { forceUpdateSecretsEvent = string(syncer.ForceUpdateSecretsEvent) ) + defer giveTicker.Stop() + // fill ID with randomd data var giveID [32]byte _, err := rand.Read((giveID)[:]) @@ -116,7 +117,7 @@ func Test_SecretsWorker(t *testing.T) { contractReader, fetcherFn, wfRegistryAddr.Hex(), - syncer.WithTicker(giveTicker), + syncer.WithTicker(giveTicker.C), ) servicetest.Run(t, worker) diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 5e712cb762f..ff77da9ea6f 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -18,7 +18,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" "github.com/smartcontractkit/chainlink/v2/core/logger" evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" - "github.com/smartcontractkit/chainlink/v2/core/utils/signalers" ) const name = "WorkflowRegistrySyncer" @@ -107,7 +106,7 @@ type workflowRegistry struct { wg sync.WaitGroup // ticker is the interval at which the workflowRegistry will poll the contract for events. - ticker <-chan struct{} + ticker <-chan time.Time lggr logger.Logger orm WorkflowRegistryDS @@ -136,7 +135,7 @@ type workflowRegistry struct { // WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful // for overriding the default tick interval. -func WithTicker(ticker <-chan struct{}) func(*workflowRegistry) { +func WithTicker(ticker <-chan time.Time) func(*workflowRegistry) { return func(wr *workflowRegistry) { wr.ticker = ticker } @@ -272,7 +271,7 @@ func (w *workflowRegistry) syncEventsLoop(ctx context.Context) { } } - ticker = w.getTicker(ctx) + ticker = w.getTicker() signals = make(map[WorkflowRegistryEventType]chan struct{}, 0) ) @@ -364,9 +363,9 @@ func (w *workflowRegistry) orderAndSend( // getTicker returns the ticker that the workflowRegistry will use to poll for events. If the ticker // is nil, then a default ticker is returned. -func (w *workflowRegistry) getTicker(ctx context.Context) <-chan struct{} { +func (w *workflowRegistry) getTicker() <-chan time.Time { if w.ticker == nil { - return signalers.MakeTicker(ctx.Done(), defaultTickInterval) + return time.NewTicker(defaultTickInterval).C } return w.ticker diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go index 0fb1224d432..d979437d54d 100644 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -54,16 +54,13 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { gateway = func(_ context.Context, _ string) ([]byte, error) { return []byte(wantContents), nil } - ticker = make(chan struct{}) - worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress) + ticker = make(chan time.Time) + worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, WithTicker(ticker)) ) // Cleanup the worker defer cancel() - // Override the ticker - worker.ticker = ticker - // Seed the DB with an original entry _, err = orm.Create(ctx, giveURL, hex.EncodeToString(giveHash), giveContents) require.NoError(t, err) @@ -93,7 +90,7 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { servicetest.Run(t, worker) // Send a tick to start a query - ticker <- struct{}{} + ticker <- time.Now() // Require the secrets contents to eventually be updated require.Eventually(t, func() bool { diff --git a/core/utils/signalers/signalers.go b/core/utils/signalers/signalers.go deleted file mode 100644 index b05af179251..00000000000 --- a/core/utils/signalers/signalers.go +++ /dev/null @@ -1,24 +0,0 @@ -package signalers - -import "time" - -func MakeTicker(stop <-chan struct{}, d time.Duration) <-chan struct{} { - ticker := make(chan struct{}) - internalTicker := time.NewTicker(d) - - go func() { - defer close(ticker) - defer internalTicker.Stop() - - for { - select { - case <-stop: - return - case <-internalTicker.C: - ticker <- struct{}{} - } - } - }() - - return ticker -}