Skip to content

Commit

Permalink
refactor: swap to services.Service
Browse files Browse the repository at this point in the history
  • Loading branch information
MStreet3 committed Dec 10, 2024
1 parent 60be59d commit 044a707
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/workflows"
Expand Down Expand Up @@ -320,7 +321,7 @@ func (m *mockService) Ready() error { return nil }

type mockEngineFactory struct{}

func (m *mockEngineFactory) new(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (syncer.StartReadyCloser, error) {
func (m *mockEngineFactory) new(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
return &mockService{}, nil
}

Expand Down
24 changes: 7 additions & 17 deletions core/services/workflows/syncer/engine_registry.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,32 @@
package syncer

import (
"context"
"errors"
"sync"
)

// StartReadyCloser is an abstraction for engines that can be checked for readiness and closed.
type StartReadyCloser interface {
Start(context.Context) error

// Ready returns nil if the engine is ready to be used.
Ready() error

// Close closes the engine.
Close() error
}
"github.com/smartcontractkit/chainlink-common/pkg/services"
)

type EngineRegistry struct {
engines map[string]StartReadyCloser
engines map[string]services.Service
mu sync.RWMutex
}

func NewEngineRegistry() *EngineRegistry {
return &EngineRegistry{
engines: make(map[string]StartReadyCloser),
engines: make(map[string]services.Service),
}
}

// Add adds an engine to the registry.
func (r *EngineRegistry) Add(id string, engine StartReadyCloser) {
func (r *EngineRegistry) Add(id string, engine services.Service) {
r.mu.Lock()
defer r.mu.Unlock()
r.engines[id] = engine
}

// Get retrieves an engine from the registry.
func (r *EngineRegistry) Get(id string) (StartReadyCloser, error) {
func (r *EngineRegistry) Get(id string) (services.Service, error) {
r.mu.RLock()
defer r.mu.RUnlock()
engine, found := r.engines[id]
Expand All @@ -59,7 +49,7 @@ func (r *EngineRegistry) IsRunning(id string) bool {
}

// Pop removes an engine from the registry and returns the engine if found.
func (r *EngineRegistry) Pop(id string) (StartReadyCloser, error) {
func (r *EngineRegistry) Pop(id string) (services.Service, error) {
r.mu.Lock()
defer r.mu.Unlock()
engine, ok := r.engines[id]
Expand Down
5 changes: 3 additions & 2 deletions core/services/workflows/syncer/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/jonboulle/clockwork"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/secrets"
Expand Down Expand Up @@ -126,7 +127,7 @@ func newLastFetchedAtMap() *lastFetchedAtMap {
}
}

type engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (StartReadyCloser, error)
type engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error)

// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding
// method that handles the event.
Expand Down Expand Up @@ -497,7 +498,7 @@ func (h *eventHandler) workflowRegisteredEvent(
return nil
}

func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, config []byte, binary []byte) (StartReadyCloser, error) {
func (h *eventHandler) engineFactoryFn(ctx context.Context, id string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
moduleConfig := &host.ModuleConfig{Logger: h.lggr, Labeler: h.emitter}
sdkSpec, err := host.GetWorkflowSpec(ctx, moduleConfig, binary, config)
if err != nil {
Expand Down
11 changes: 8 additions & 3 deletions core/services/workflows/syncer/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/smartcontractkit/chainlink-common/pkg/custmsg"
"github.com/smartcontractkit/chainlink-common/pkg/services"
pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/secrets"
"github.com/smartcontractkit/chainlink/v2/core/capabilities"
Expand Down Expand Up @@ -65,6 +66,10 @@ func (m *mockEngine) Start(_ context.Context) error {
return m.StartErr
}

func (m *mockEngine) HealthReport() map[string]error { return nil }

func (m *mockEngine) Name() string { return "mockEngine" }

func Test_Handler(t *testing.T) {
lggr := logger.TestLogger(t)
emitter := custmsg.NewLabeler()
Expand Down Expand Up @@ -226,7 +231,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (StartReadyCloser, error) {
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
return &mockEngine{}, nil
},
GiveConfig: config,
Expand Down Expand Up @@ -255,7 +260,7 @@ func Test_workflowRegisteredHandler(t *testing.T) {
configURL: {Body: config, Err: nil},
secretsURL: {Body: []byte("secrets"), Err: nil},
}),
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (StartReadyCloser, error) {
engineFactoryFn: func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error) {
return &mockEngine{StartErr: assert.AnError}, nil
},
GiveConfig: config,
Expand Down Expand Up @@ -418,7 +423,7 @@ type testCase struct {
fetcher FetcherFunc
Event func([]byte) WorkflowRegistryWorkflowRegisteredV1
validationFn func(t *testing.T, ctx context.Context, event WorkflowRegistryWorkflowRegisteredV1, h *eventHandler, wfOwner []byte, wfName string, wfID string)
engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (StartReadyCloser, error)
engineFactoryFn func(ctx context.Context, wfid string, owner string, name string, config []byte, binary []byte) (services.Service, error)
}

func testRunningWorkflow(t *testing.T, tc testCase) {
Expand Down

0 comments on commit 044a707

Please sign in to comment.