Skip to content

Commit

Permalink
Create dummy registry
Browse files Browse the repository at this point in the history
  • Loading branch information
cedric-cordenier committed Nov 6, 2024
1 parent 96b0cfc commit 0d7ff26
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 3 deletions.
10 changes: 7 additions & 3 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) {

// TODO: wire this up to config so we only instantiate it
// if a workflow registry address is provided.
workflowRegistrySyncer := syncer.NewWorkflowRegistry()
srvcs = append(srvcs, workflowRegistrySyncer)

var externalPeerWrapper p2ptypes.PeerWrapper
if cfg.Capabilities().Peering().Enabled() {
var dispatcher remotetypes.Dispatcher
Expand Down Expand Up @@ -468,6 +465,13 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner()
)

workflowRegistrySyncer := &syncer.WorkflowRegistry{
Logger: globalLogger,
Store: workflowORM,
Registry: opts.CapabilitiesRegistry,
}
srvcs = append(srvcs, workflowRegistrySyncer)

delegates[job.Workflow] = workflows.NewDelegate(
globalLogger,
opts.CapabilitiesRegistry,
Expand Down
Empty file.
Binary file added core/services/workflows/syncer/workflow.wasm
Binary file not shown.
67 changes: 67 additions & 0 deletions core/services/workflows/syncer/workflow_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,86 @@ package syncer

import (
"context"
_ "embed"
"sync"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host"

"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
)

const (
workflowID = ""
workflowOwner = ""
workflowName = ""
)

var (
//go:embed config.yaml
config []byte

//go:embed workflow.wasm
workflow []byte
)

type WorkflowRegistry struct {
services.StateMachine
wg sync.WaitGroup
Logger logger.Logger
Registry core.CapabilitiesRegistry
Store store.Store
subServices []job.ServiceCtx
}

func (w *WorkflowRegistry) Start(ctx context.Context) error {
w.wg.Add(1)
go func() {
w.Logger.Info("starting hardcoded workflow...")

moduleConfig := &host.ModuleConfig{Logger: logger.NullLogger, IsUncompressed: true}
spec, err := host.GetWorkflowSpec(ctx, moduleConfig, workflow, config)
if err != nil {
w.Logger.Errorf("failed to get workflow spec", err)
}

cfg := workflows.Config{
Lggr: w.Logger,
Workflow: *spec,
WorkflowID: workflowID,
WorkflowOwner: workflowOwner,
WorkflowName: workflowName,
Registry: w.Registry,
Store: w.Store,
Config: config,
Binary: workflow,
SecretsFetcher: w,
}
engine, err := workflows.NewEngine(ctx, cfg)
if err != nil {
w.Logger.Errorf("failed to create engine: %w", err)
}
err = engine.Start(ctx)
if err != nil {
w.Logger.Errorf("failed to start hardcoded workflow: %w", err)
}
w.subServices = []job.ServiceCtx{engine}
}()
return nil
}

func (w *WorkflowRegistry) Close() error {
for _, s := range w.subServices {
err := s.Close()
if err != nil {
w.Logger.Errorf("could not close hardcoded engine: %w", err)
}
}

return nil
}

Expand Down

0 comments on commit 0d7ff26

Please sign in to comment.