Skip to content

Commit

Permalink
Parallel Comp (LLO) cleanup and minor optimizations (#15368)
Browse files Browse the repository at this point in the history
* Parallel Comp (LLO) cleanup and minor optimizations

- Add ChannelDefinitionCacheFactory tests
- Cleanup TODOs/FIXMEs
- Add comments/docs
- Include Don ID in LLO extra hash
- Optimize log poller calls

* Fix test

* Fix linter issue re: append

* Update core/services/llo/evm/report_codec_premium_legacy.go

Co-authored-by: msuchacz-cll <[email protected]>

---------

Co-authored-by: msuchacz-cll <[email protected]>
  • Loading branch information
samsondav and msuchacz-cll authored Nov 26, 2024
1 parent b79da55 commit d77db32
Show file tree
Hide file tree
Showing 25 changed files with 304 additions and 233 deletions.
2 changes: 0 additions & 2 deletions core/services/llo/channel_definition_cache_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ type channelDefinitionCacheFactory struct {
mu sync.Mutex
}

// TODO: Test this
// MERC-3653
func (f *channelDefinitionCacheFactory) NewCache(cfg lloconfig.PluginConfig) (llotypes.ChannelDefinitionCache, error) {
if cfg.ChannelDefinitions != "" {
return NewStaticChannelDefinitionCache(f.lggr, cfg.ChannelDefinitions)
Expand Down
58 changes: 58 additions & 0 deletions core/services/llo/channel_definition_cache_factory_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package llo

import (
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/v2/core/logger"
lloconfig "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/plugins/llo/config"
)

func Test_ChannelDefinitionCacheFactory(t *testing.T) {
lggr := logger.TestLogger(t)
cdcFactory := NewChannelDefinitionCacheFactory(lggr, nil, nil, nil)

t.Run("NewCache", func(t *testing.T) {
t.Run("when ChannelDefinitions is present, returns static cache", func(t *testing.T) {
_, err := cdcFactory.NewCache(lloconfig.PluginConfig{ChannelDefinitions: "..."})
require.EqualError(t, err, "failed to unmarshal static channel definitions: invalid character '.' looking for beginning of value")

cdc, err := cdcFactory.NewCache(lloconfig.PluginConfig{ChannelDefinitions: "{}"})
require.NoError(t, err)
require.IsType(t, &staticCDC{}, cdc)
})
t.Run("when ChannelDefinitions is not present, returns dynamic cache", func(t *testing.T) {
cdc, err := cdcFactory.NewCache(lloconfig.PluginConfig{
ChannelDefinitionsContractAddress: common.HexToAddress("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
DonID: 1,
})
require.NoError(t, err)
require.IsType(t, &channelDefinitionCache{}, cdc)

// returns error if you try to do it again with the same addr/donID
_, err = cdcFactory.NewCache(lloconfig.PluginConfig{
ChannelDefinitionsContractAddress: common.HexToAddress("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
DonID: 1,
})
require.EqualError(t, err, "cache already exists for contract address 0xaAaAaAaaAaAaAaaAaAAAAAAAAaaaAaAaAaaAaaAa and don ID 1")

// is fine if you do it again with different addr
cdc, err = cdcFactory.NewCache(lloconfig.PluginConfig{
ChannelDefinitionsContractAddress: common.HexToAddress("0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"),
DonID: 1,
})
require.NoError(t, err)
require.IsType(t, &channelDefinitionCache{}, cdc)

// is fine if you do it again with different don ID
cdc, err = cdcFactory.NewCache(lloconfig.PluginConfig{
ChannelDefinitionsContractAddress: common.HexToAddress("0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
DonID: 2,
})
require.NoError(t, err)
require.IsType(t, &channelDefinitionCache{}, cdc)
})
})
}
4 changes: 2 additions & 2 deletions core/services/llo/codecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
)

// NOTE: All supported codecs must be specified here
func NewReportCodecs(lggr logger.Logger) map[llotypes.ReportFormat]llo.ReportCodec {
func NewReportCodecs(lggr logger.Logger, donID uint32) map[llotypes.ReportFormat]llo.ReportCodec {
codecs := make(map[llotypes.ReportFormat]llo.ReportCodec)

codecs[llotypes.ReportFormatJSON] = llo.JSONReportCodec{}
codecs[llotypes.ReportFormatEVMPremiumLegacy] = evm.NewReportCodecPremiumLegacy(lggr)
codecs[llotypes.ReportFormatEVMPremiumLegacy] = evm.NewReportCodecPremiumLegacy(lggr, donID)

return codecs
}
2 changes: 1 addition & 1 deletion core/services/llo/codecs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func Test_NewReportCodecs(t *testing.T) {
c := NewReportCodecs(logger.TestLogger(t))
c := NewReportCodecs(logger.TestLogger(t), 1)

assert.Contains(t, c, llotypes.ReportFormatJSON, "expected JSON to be supported")
assert.Contains(t, c, llotypes.ReportFormatEVMPremiumLegacy, "expected EVMPremiumLegacy to be supported")
Expand Down
3 changes: 0 additions & 3 deletions core/services/llo/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,6 @@ func ExtractStreamValue(trrs pipeline.TaskRunResults) (llo.StreamValue, error) {
// by the pipeline executor
finaltrrs := trrs.Terminals()

// TODO: Special handling for missing native/link streams?
// https://smartcontract-it.atlassian.net/browse/MERC-5949

// HACK: Right now we rely on the number of outputs to determine whether
// its a Decimal or a Quote.
// This isn't very robust or future-proof but is sufficient to support v0.3
Expand Down
7 changes: 4 additions & 3 deletions core/services/llo/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) {
} else {
codecLggr = corelogger.NullLogger
}
reportCodecs := NewReportCodecs(codecLggr)
reportCodecs := NewReportCodecs(codecLggr, cfg.DonID)

var t TelemeterService
if cfg.CaptureEATelemetry {
Expand Down Expand Up @@ -134,8 +134,9 @@ func (d *delegate) Start(ctx context.Context) error {
lggr = logger.With(lggr, "instanceType", "Green")
}
ocrLogger := logger.NewOCRWrapper(NewSuppressedLogger(lggr, d.cfg.ReportingPluginConfig.VerboseLogging), d.cfg.TraceLogging, func(msg string) {
// TODO: do we actually need to DB-persist errors?
// MERC-3524
// NOTE: Some OCR loggers include a DB-persist here
// We do not DB persist errors in LLO, since they could be quite voluminous and ought to be present in logs anyway.
// This is a performance optimization
})

oracle, err := ocr2plus.NewOracle(ocr2plus.OCR3OracleArgs[llotypes.ReportInfo]{
Expand Down
54 changes: 0 additions & 54 deletions core/services/llo/evm/report_codec.go

This file was deleted.

34 changes: 26 additions & 8 deletions core/services/llo/evm/report_codec_premium_legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/shopspring/decimal"
Expand All @@ -30,10 +31,11 @@ var (

type ReportCodecPremiumLegacy struct {
logger.Logger
donID uint32
}

func NewReportCodecPremiumLegacy(lggr logger.Logger) ReportCodecPremiumLegacy {
return ReportCodecPremiumLegacy{logger.Sugared(lggr).Named("ReportCodecPremiumLegacy")}
func NewReportCodecPremiumLegacy(lggr logger.Logger, donID uint32) ReportCodecPremiumLegacy {
return ReportCodecPremiumLegacy{logger.Sugared(lggr).Named("ReportCodecPremiumLegacy"), donID}
}

type ReportFormatEVMPremiumLegacyOpts struct {
Expand Down Expand Up @@ -119,7 +121,7 @@ func (r ReportCodecPremiumLegacy) Pack(digest types.ConfigDigest, seqNr uint64,
ss = append(ss, s)
vs[i] = v
}
reportCtx := LegacyReportContext(digest, seqNr)
reportCtx := LegacyReportContext(digest, seqNr, r.donID)
rawReportCtx := evmutil.RawReportContext(reportCtx)

payload, err := mercury.PayloadTypes.Pack(rawReportCtx, []byte(report), rs, ss, vs)
Expand Down Expand Up @@ -181,9 +183,25 @@ func extractPrice(price llo.StreamValue) (decimal.Decimal, error) {
}
}

// TODO: Consider embedding the DON ID here?
// MERC-3524
var LLOExtraHash = common.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001")
const PluginVersion uint32 = 1 // the legacy mercury plugin is 0

// Uniquely identifies this as LLO plugin, rather than the legacy plugin (which
// uses all zeroes).
//
// This is quite a hack but serves the purpose of uniquely identifying
// dons/plugin versions to the mercury server without having to modify any
// existing tooling or breaking backwards compatibility. It should be safe
// since the DonID is encoded into the config digest anyway so report context
// is already dependent on it, and all LLO jobs in the same don are expected to
// have the same don ID set.
//
// Packs donID+pluginVersion as (uint32, uint32), for example donID=2,
// PluginVersion=1 Yields:
// 0x0000000000000000000000000000000000000000000000000000000200000001
func LLOExtraHash(donID uint32) common.Hash {
combined := uint64(donID)<<32 | uint64(PluginVersion)
return common.BigToHash(new(big.Int).SetUint64(combined))
}

func SeqNrToEpochAndRound(seqNr uint64) (epoch uint32, round uint8) {
// Simulate 256 rounds/epoch
Expand All @@ -192,14 +210,14 @@ func SeqNrToEpochAndRound(seqNr uint64) (epoch uint32, round uint8) {
return
}

func LegacyReportContext(cd ocr2types.ConfigDigest, seqNr uint64) ocr2types.ReportContext {
func LegacyReportContext(cd ocr2types.ConfigDigest, seqNr uint64, donID uint32) ocr2types.ReportContext {
epoch, round := SeqNrToEpochAndRound(seqNr)
return ocr2types.ReportContext{
ReportTimestamp: ocr2types.ReportTimestamp{
ConfigDigest: cd,
Epoch: uint32(epoch),
Round: uint8(round),
},
ExtraHash: LLOExtraHash, // ExtraHash is always zero for mercury, we use LLOExtraHash here to differentiate from the legacy plugin
ExtraHash: LLOExtraHash(donID), // ExtraHash is always zero for mercury, we use LLOExtraHash here to differentiate from the legacy plugin
}
}
8 changes: 7 additions & 1 deletion core/services/llo/evm/report_codec_premium_legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func newValidPremiumLegacyReport() llo.Report {
}

func Test_ReportCodecPremiumLegacy(t *testing.T) {
rc := ReportCodecPremiumLegacy{logger.TestLogger(t)}
rc := ReportCodecPremiumLegacy{logger.TestLogger(t), 2}

feedID := [32]uint8{0x1, 0x2, 0x3, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0}
cd := llotypes.ChannelDefinition{Opts: llotypes.ChannelOpts(fmt.Sprintf(`{"baseUSDFee":"10.50","expirationWindow":60,"feedId":"0x%x","multiplier":10}`, feedID))}
Expand Down Expand Up @@ -225,3 +225,9 @@ func Test_ExtractReportValues(t *testing.T) {
assert.Equal(t, &llo.Quote{Bid: decimal.NewFromInt(37), Benchmark: decimal.NewFromInt(38), Ask: decimal.NewFromInt(39)}, quote)
})
}

func Test_LLOExtraHash(t *testing.T) {
donID := uint32(8)
extraHash := LLOExtraHash(donID)
assert.Equal(t, "0x0000000000000000000000000000000000000000000000000000000800000001", extraHash.String())
}
13 changes: 7 additions & 6 deletions core/services/llo/keyring.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ type Key interface {
}

type onchainKeyring struct {
lggr logger.Logger
keys map[llotypes.ReportFormat]Key
lggr logger.Logger
keys map[llotypes.ReportFormat]Key
donID uint32
}

func NewOnchainKeyring(lggr logger.Logger, keys map[llotypes.ReportFormat]Key) LLOOnchainKeyring {
func NewOnchainKeyring(lggr logger.Logger, keys map[llotypes.ReportFormat]Key, donID uint32) LLOOnchainKeyring {
return &onchainKeyring{
lggr.Named("OnchainKeyring"), keys,
lggr.Named("OnchainKeyring"), keys, donID,
}
}

Expand Down Expand Up @@ -83,7 +84,7 @@ func (okr *onchainKeyring) Sign(digest types.ConfigDigest, seqNr uint64, r ocr3t
rf := r.Info.ReportFormat
if key, exists := okr.keys[rf]; exists {
// NOTE: Must use legacy Sign method for compatibility with v0.3 report verification
rc := evm.LegacyReportContext(digest, seqNr)
rc := evm.LegacyReportContext(digest, seqNr, okr.donID)
return key.Sign(rc, r.Report)
}
default:
Expand All @@ -101,7 +102,7 @@ func (okr *onchainKeyring) Verify(key types.OnchainPublicKey, digest types.Confi
rf := r.Info.ReportFormat
if verifier, exists := okr.keys[rf]; exists {
// NOTE: Must use legacy Verify method for compatibility with v0.3 report verification
rc := evm.LegacyReportContext(digest, seqNr)
rc := evm.LegacyReportContext(digest, seqNr, okr.donID)
return verifier.Verify(key, rc, r.Report, signature)
}
default:
Expand Down
2 changes: 1 addition & 1 deletion core/services/llo/keyring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func Test_Keyring(t *testing.T) {
llotypes.ReportFormatJSON: &mockKey{format: llotypes.ReportFormatJSON, maxSignatureLen: 2, sig: []byte("sig-2")},
}

kr := NewOnchainKeyring(lggr, ks)
kr := NewOnchainKeyring(lggr, ks, 2)

cases := []struct {
format llotypes.ReportFormat
Expand Down
2 changes: 1 addition & 1 deletion core/services/llo/mercurytransmitter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client
NewTransmitQueue(lggr, serverURL, int(cfg.TransmitQueueMaxSize()), pm),
make(chan [32]byte, int(cfg.TransmitQueueMaxSize())),
serverURL,
evm.NewReportCodecPremiumLegacy(codecLggr),
evm.NewReportCodecPremiumLegacy(codecLggr, pm.DonID()),
llo.JSONReportCodec{},
promTransmitSuccessCount.WithLabelValues(donIDStr, serverURL),
promTransmitDuplicateCount.WithLabelValues(donIDStr, serverURL),
Expand Down
Loading

0 comments on commit d77db32

Please sign in to comment.