Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KS-616] asset job updates #15573

Merged
merged 7 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/big-camels-report.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#bugfix fix non-idempotent loopp registry.Register
89 changes: 60 additions & 29 deletions core/services/ocr2/plugins/mercury/plugin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mercury

import (
"context"
"encoding/json"
"fmt"
"os/exec"
Expand Down Expand Up @@ -79,14 +80,13 @@ func NewServices(
return nil, errors.New("expected job to have a non-nil PipelineSpec")
}

var err error
var pluginConfig config.PluginConfig
if len(jb.OCR2OracleSpec.PluginConfig) == 0 {
if !enableTriggerCapability {
return nil, fmt.Errorf("at least one transmission option must be configured")
}
} else {
err = json.Unmarshal(jb.OCR2OracleSpec.PluginConfig.Bytes(), &pluginConfig)
err := json.Unmarshal(jb.OCR2OracleSpec.PluginConfig.Bytes(), &pluginConfig)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -101,8 +101,8 @@ func NewServices(
// encapsulate all the subservices and ensure we close them all if any fail to start
srvs := []job.ServiceCtx{ocr2Provider}
abort := func() {
if err = services.MultiCloser(srvs).Close(); err != nil {
lggr.Errorw("Error closing unused services", "err", err)
if cerr := services.MultiCloser(srvs).Close(); cerr != nil {
lggr.Errorw("Error closing unused services", "err", cerr)
}
}
saver := ocrcommon.NewResultRunSaver(pipelineRunner, lggr, cfg.MaxSuccessfulRuns(), cfg.ResultWriteQueueDepth())
Expand All @@ -112,6 +112,7 @@ func NewServices(
var (
factory ocr3types.MercuryPluginFactory
factoryServices []job.ServiceCtx
fErr error
)
fCfg := factoryCfg{
orm: orm,
Expand All @@ -127,31 +128,31 @@ func NewServices(
}
switch feedID.Version() {
case 1:
factory, factoryServices, err = newv1factory(fCfg)
if err != nil {
factory, factoryServices, fErr = newv1factory(fCfg)
if fErr != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v1 factory: %w", err)
return nil, fmt.Errorf("failed to create mercury v1 factory: %w", fErr)
}
srvs = append(srvs, factoryServices...)
case 2:
factory, factoryServices, err = newv2factory(fCfg)
if err != nil {
factory, factoryServices, fErr = newv2factory(fCfg)
if fErr != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v2 factory: %w", err)
return nil, fmt.Errorf("failed to create mercury v2 factory: %w", fErr)
}
srvs = append(srvs, factoryServices...)
case 3:
factory, factoryServices, err = newv3factory(fCfg)
if err != nil {
factory, factoryServices, fErr = newv3factory(fCfg)
if fErr != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v3 factory: %w", err)
return nil, fmt.Errorf("failed to create mercury v3 factory: %w", fErr)
}
srvs = append(srvs, factoryServices...)
case 4:
factory, factoryServices, err = newv4factory(fCfg)
if err != nil {
factory, factoryServices, fErr = newv4factory(fCfg)
if fErr != nil {
abort()
return nil, fmt.Errorf("failed to create mercury v4 factory: %w", err)
return nil, fmt.Errorf("failed to create mercury v4 factory: %w", fErr)
}
srvs = append(srvs, factoryServices...)
default:
Expand Down Expand Up @@ -214,13 +215,14 @@ func newv4factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loop mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV4Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand Down Expand Up @@ -253,13 +255,14 @@ func newv3factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV3Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand Down Expand Up @@ -292,13 +295,14 @@ func newv2factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV2Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand Down Expand Up @@ -329,13 +333,14 @@ func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
loopEnabled := loopCmd != ""

if loopEnabled {
cmdFn, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
cmdFn, unregisterer, opts, mercuryLggr, err := initLoop(loopCmd, factoryCfg.cfg, factoryCfg.feedID, factoryCfg.lggr)
if err != nil {
return nil, nil, fmt.Errorf("failed to init loop for feed %s: %w", factoryCfg.feedID, err)
}
// in loopp mode, the factory is grpc server, and we need to handle the server lifecycle
// and unregistration of the loop
factoryServer := loop.NewMercuryV1Service(mercuryLggr, opts, cmdFn, factoryCfg.ocr2Provider, ds)
srvs = append(srvs, factoryServer)
srvs = append(srvs, factoryServer, unregisterer)
// adapt the grpc server to the vanilla mercury plugin factory interface used by the oracle
factory = factoryServer
} else {
Expand All @@ -344,20 +349,46 @@ func newv1factory(factoryCfg factoryCfg) (ocr3types.MercuryPluginFactory, []job.
return factory, srvs, nil
}

func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, loop.GRPCOpts, logger.Logger, error) {
func initLoop(cmd string, cfg Config, feedID utils.FeedID, lggr logger.Logger) (func() *exec.Cmd, *loopUnregisterCloser, loop.GRPCOpts, logger.Logger, error) {
lggr.Debugw("Initializing Mercury loop", "command", cmd)
mercuryLggr := lggr.Named(fmt.Sprintf("MercuryV%d", feedID.Version())).Named(feedID.String())
envVars, err := plugins.ParseEnvFile(env.MercuryPlugin.Env.Get())
if err != nil {
return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err)
return nil, nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to parse mercury env file: %w", err)
}
loopID := mercuryLggr.Name()
cmdFn, opts, err := cfg.RegisterLOOP(plugins.CmdConfig{
ID: mercuryLggr.Name(),
ID: loopID,
Cmd: cmd,
Env: envVars,
})
if err != nil {
return nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err)
return nil, nil, loop.GRPCOpts{}, nil, fmt.Errorf("failed to register loop: %w", err)
}
return cmdFn, newLoopUnregister(cfg, loopID), opts, mercuryLggr, nil
}

// loopUnregisterCloser is a helper to unregister a loop
// as a service
// TODO BCF-3451 all other jobs that use custom plugin providers that should be refactored to use this pattern
// perhaps it can be implemented in the delegate on job delete.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

type loopUnregisterCloser struct {
r plugins.RegistrarConfig
id string
}

func (l *loopUnregisterCloser) Close() error {
l.r.UnregisterLOOP(l.id)
return nil
}

func (l *loopUnregisterCloser) Start(ctx context.Context) error {
return nil
}

func newLoopUnregister(r plugins.RegistrarConfig, id string) *loopUnregisterCloser {
return &loopUnregisterCloser{
r: r,
id: id,
}
return cmdFn, opts, mercuryLggr, nil
}
Loading
Loading