diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index fdd25b48a79..f9e107cf552 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -17,6 +17,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" "github.com/smartcontractkit/chainlink-common/pkg/types" "github.com/smartcontractkit/chainlink-common/pkg/workflows" @@ -320,7 +321,7 @@ func (m *mockService) Ready() error { return nil } type mockEngineFactory struct{} -func (m *mockEngineFactory) new(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (syncer.StartReadyCloser, error) { +func (m *mockEngineFactory) new(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) { return &mockService{}, nil } diff --git a/core/services/workflows/syncer/engine_registry.go b/core/services/workflows/syncer/engine_registry.go index 44f3b7e198b..fa3771c5a0f 100644 --- a/core/services/workflows/syncer/engine_registry.go +++ b/core/services/workflows/syncer/engine_registry.go @@ -1,42 +1,32 @@ package syncer import ( - "context" "errors" "sync" -) - -// StartReadyCloser is an abstraction for engines that can be checked for readiness and closed. -type StartReadyCloser interface { - Start(context.Context) error - - // Ready returns nil if the engine is ready to be used. - Ready() error - // Close closes the engine. - Close() error -} + "github.com/smartcontractkit/chainlink-common/pkg/services" +) type EngineRegistry struct { - engines map[string]StartReadyCloser + engines map[string]services.Service mu sync.RWMutex } func NewEngineRegistry() *EngineRegistry { return &EngineRegistry{ - engines: make(map[string]StartReadyCloser), + engines: make(map[string]services.Service), } } // Add adds an engine to the registry. -func (r *EngineRegistry) Add(id string, engine StartReadyCloser) { +func (r *EngineRegistry) Add(id string, engine services.Service) { r.mu.Lock() defer r.mu.Unlock() r.engines[id] = engine } // Get retrieves an engine from the registry. -func (r *EngineRegistry) Get(id string) (StartReadyCloser, error) { +func (r *EngineRegistry) Get(id string) (services.Service, error) { r.mu.RLock() defer r.mu.RUnlock() engine, found := r.engines[id] @@ -59,7 +49,7 @@ func (r *EngineRegistry) IsRunning(id string) bool { } // Pop removes an engine from the registry and returns the engine if found. -func (r *EngineRegistry) Pop(id string) (StartReadyCloser, error) { +func (r *EngineRegistry) Pop(id string) (services.Service, error) { r.mu.Lock() defer r.mu.Unlock() engine, ok := r.engines[id] diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 700676d17d4..b88527f905d 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -14,6 +14,7 @@ import ( "github.com/jonboulle/clockwork" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types/core" pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows" "github.com/smartcontractkit/chainlink-common/pkg/workflows/secrets" @@ -126,7 +127,7 @@ func newLastFetchedAtMap() *lastFetchedAtMap { } } -type engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (StartReadyCloser, error) +type engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) // eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding // method that handles the event. @@ -497,7 +498,7 @@ func (h *eventHandler) workflowRegisteredEvent( return nil } -func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, config []byte, binary []byte) (StartReadyCloser, error) { +func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, config []byte, binary []byte) (services.Service, error) { moduleConfig := &host.ModuleConfig{Logger: h.lggr, Labeler: h.emitter} sdkSpec, err := host.GetWorkflowSpec(ctx, moduleConfig, binary, config) if err != nil { diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index 5dc124cda19..eb8b338158f 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/smartcontractkit/chainlink-common/pkg/custmsg" + "github.com/smartcontractkit/chainlink-common/pkg/services" pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows" "github.com/smartcontractkit/chainlink-common/pkg/workflows/secrets" "github.com/smartcontractkit/chainlink/v2/core/capabilities" @@ -65,6 +66,10 @@ func (m *mockEngine) Start(_ context.Context) error { return m.StartErr } +func (m *mockEngine) HealthReport() map[string]error { return nil } + +func (m *mockEngine) Name() string { return "mockEngine" } + func Test_Handler(t *testing.T) { lggr := logger.TestLogger(t) emitter := custmsg.NewLabeler() @@ -226,7 +231,7 @@ func Test_workflowRegisteredHandler(t *testing.T) { configURL: {Body: config, Err: nil}, secretsURL: {Body: []byte("secrets"), Err: nil}, }), - engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (StartReadyCloser, error) { + engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) { return &mockEngine{}, nil }, GiveConfig: config, @@ -255,7 +260,7 @@ func Test_workflowRegisteredHandler(t *testing.T) { configURL: {Body: config, Err: nil}, secretsURL: {Body: []byte("secrets"), Err: nil}, }), - engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (StartReadyCloser, error) { + engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) { return &mockEngine{StartErr: assert.AnError}, nil }, GiveConfig: config, @@ -418,7 +423,7 @@ type testCase struct { fetcher FetcherFunc Event func([]byte) WorkflowRegistryWorkflowRegisteredV1 validationFn func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string) - engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (StartReadyCloser, error) + engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) } func testRunningWorkflow(t *testing.T, tc testCase) {