diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 112b87cf0af..4c87b94ed4f 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -215,9 +215,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { // TODO: wire this up to config so we only instantiate it // if a workflow registry address is provided. - workflowRegistrySyncer := syncer.NewWorkflowRegistry() - srvcs = append(srvcs, workflowRegistrySyncer) - var externalPeerWrapper p2ptypes.PeerWrapper if cfg.Capabilities().Peering().Enabled() { var dispatcher remotetypes.Dispatcher @@ -468,6 +465,13 @@ func NewApplication(opts ApplicationOpts) (Application, error) { webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner() ) + workflowRegistrySyncer := &syncer.WorkflowRegistry{ + Logger: globalLogger, + Store: workflowORM, + Registry: opts.CapabilitiesRegistry, + } + srvcs = append(srvcs, workflowRegistrySyncer) + delegates[job.Workflow] = workflows.NewDelegate( globalLogger, opts.CapabilitiesRegistry, diff --git a/core/services/workflows/syncer/config.yaml b/core/services/workflows/syncer/config.yaml new file mode 100644 index 00000000000..e69de29bb2d diff --git a/core/services/workflows/syncer/workflow.wasm b/core/services/workflows/syncer/workflow.wasm new file mode 100755 index 00000000000..bb1c8c2a8ae Binary files /dev/null and b/core/services/workflows/syncer/workflow.wasm differ diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 1d42e9d5deb..8f99896a6e3 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -2,19 +2,86 @@ package syncer import ( "context" + _ "embed" + "sync" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/types/core" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" +) + +const ( + workflowID = "" + workflowOwner = "" + workflowName = "" +) + +var ( + //go:embed config.yaml + config []byte + + //go:embed workflow.wasm + workflow []byte ) type WorkflowRegistry struct { services.StateMachine + wg sync.WaitGroup + Logger logger.Logger + Registry core.CapabilitiesRegistry + Store store.Store + subServices []job.ServiceCtx } func (w *WorkflowRegistry) Start(ctx context.Context) error { + w.wg.Add(1) + go func() { + w.Logger.Info("starting hardcoded workflow...") + + moduleConfig := &host.ModuleConfig{Logger: logger.NullLogger, IsUncompressed: true} + spec, err := host.GetWorkflowSpec(ctx, moduleConfig, workflow, config) + if err != nil { + w.Logger.Errorf("failed to get workflow spec", err) + } + + cfg := workflows.Config{ + Lggr: w.Logger, + Workflow: *spec, + WorkflowID: workflowID, + WorkflowOwner: workflowOwner, + WorkflowName: workflowName, + Registry: w.Registry, + Store: w.Store, + Config: config, + Binary: workflow, + SecretsFetcher: w, + } + engine, err := workflows.NewEngine(ctx, cfg) + if err != nil { + w.Logger.Errorf("failed to create engine: %w", err) + } + err = engine.Start(ctx) + if err != nil { + w.Logger.Errorf("failed to start hardcoded workflow: %w", err) + } + w.subServices = []job.ServiceCtx{engine} + }() return nil } func (w *WorkflowRegistry) Close() error { + for _, s := range w.subServices { + err := s.Close() + if err != nil { + w.Logger.Errorf("could not close hardcoded engine: %w", err) + } + } + return nil }