diff --git a/core/cmd/shell.go b/core/cmd/shell.go index 966fa1a0ff8..e4f4c5bd6e3 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -246,9 +246,9 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G } evmFactoryCfg := chainlink.EVMFactoryConfig{ - CSAETHKeystore: keyStore, - ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds}, - MercuryTransmitter: cfg.Mercury().Transmitter(), + CSAETHKeystore: keyStore, + ChainOpts: legacyevm.ChainOpts{AppConfig: cfg, MailMon: mailMon, DS: ds}, + MercuryConfig: cfg.Mercury(), } // evm always enabled for backward compatibility // TODO BCF-2510 this needs to change in order to clear the path for EVM extraction diff --git a/core/internal/cltest/cltest.go b/core/internal/cltest/cltest.go index 32c63e7944c..29515df7034 100644 --- a/core/internal/cltest/cltest.go +++ b/core/internal/cltest/cltest.go @@ -418,8 +418,8 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn MailMon: mailMon, DS: ds, }, - CSAETHKeystore: keyStore, - MercuryTransmitter: cfg.Mercury().Transmitter(), + CSAETHKeystore: keyStore, + MercuryConfig: cfg.Mercury(), } if cfg.EVMEnabled() { diff --git a/core/services/llo/codecs.go b/core/services/llo/codecs.go index 7813c8923ea..2ccadfe330b 100644 --- a/core/services/llo/codecs.go +++ b/core/services/llo/codecs.go @@ -1,6 +1,7 @@ package llo import ( + "github.com/smartcontractkit/chainlink-common/pkg/logger" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" "github.com/smartcontractkit/chainlink-data-streams/llo" @@ -8,11 +9,11 @@ import ( ) // NOTE: All supported codecs must be specified here -func NewReportCodecs() map[llotypes.ReportFormat]llo.ReportCodec { +func NewReportCodecs(lggr logger.Logger) map[llotypes.ReportFormat]llo.ReportCodec { codecs := make(map[llotypes.ReportFormat]llo.ReportCodec) codecs[llotypes.ReportFormatJSON] = llo.JSONReportCodec{} - codecs[llotypes.ReportFormatEVMPremiumLegacy] = evm.ReportCodecPremiumLegacy{} + codecs[llotypes.ReportFormatEVMPremiumLegacy] = evm.NewReportCodecPremiumLegacy(lggr) return codecs } diff --git a/core/services/llo/codecs_test.go b/core/services/llo/codecs_test.go index 4a7f3f65571..3af881a1de0 100644 --- a/core/services/llo/codecs_test.go +++ b/core/services/llo/codecs_test.go @@ -6,10 +6,11 @@ import ( "github.com/stretchr/testify/assert" llotypes "github.com/smartcontractkit/chainlink-common/pkg/types/llo" + "github.com/smartcontractkit/chainlink/v2/core/logger" ) func Test_NewReportCodecs(t *testing.T) { - c := NewReportCodecs() + c := NewReportCodecs(logger.TestLogger(t)) assert.Contains(t, c, llotypes.ReportFormatJSON, "expected JSON to be supported") assert.Contains(t, c, llotypes.ReportFormatEVMPremiumLegacy, "expected EVMPremiumLegacy to be supported") diff --git a/core/services/llo/delegate.go b/core/services/llo/delegate.go index 52153ef9379..a8939e47662 100644 --- a/core/services/llo/delegate.go +++ b/core/services/llo/delegate.go @@ -91,7 +91,7 @@ func NewDelegate(cfg DelegateConfig) (job.ServiceCtx, error) { if cfg.ShouldRetireCache == nil { return nil, errors.New("ShouldRetireCache must not be nil") } - reportCodecs := NewReportCodecs() + reportCodecs := NewReportCodecs(logger.Named(lggr, "ReportCodecs")) var t TelemeterService if cfg.CaptureEATelemetry { diff --git a/core/services/llo/evm/report_codec_premium_legacy.go b/core/services/llo/evm/report_codec_premium_legacy.go index b2236d2a85c..2ab9488c758 100644 --- a/core/services/llo/evm/report_codec_premium_legacy.go +++ b/core/services/llo/evm/report_codec_premium_legacy.go @@ -32,7 +32,7 @@ type ReportCodecPremiumLegacy struct { logger.Logger } -func NewReportCodecPremiumLegacy(lggr logger.Logger) llo.ReportCodec { +func NewReportCodecPremiumLegacy(lggr logger.Logger) ReportCodecPremiumLegacy { return ReportCodecPremiumLegacy{logger.Sugared(lggr).Named("ReportCodecPremiumLegacy")} } diff --git a/core/services/llo/mercurytransmitter/server.go b/core/services/llo/mercurytransmitter/server.go index be9f5cffa22..2fe8890fd48 100644 --- a/core/services/llo/mercurytransmitter/server.go +++ b/core/services/llo/mercurytransmitter/server.go @@ -10,6 +10,10 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + + ocr2types "github.com/smartcontractkit/libocr/offchainreporting2plus/types" + commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -49,6 +53,10 @@ var ( ) ) +type ReportPacker interface { + Pack(digest types.ConfigDigest, seqNr uint64, report ocr2types.Report, sigs []ocr2types.AttributedOnchainSignature) ([]byte, error) +} + // A server handles the queue for a given mercury server type server struct { @@ -65,7 +73,8 @@ type server struct { url string - reportCodecPremiumLegacy llo.ReportCodec + evmPremiumLegacyPacker ReportPacker + jsonPacker ReportPacker transmitSuccessCount prometheus.Counter transmitDuplicateCount prometheus.Counter @@ -93,6 +102,7 @@ func newServer(lggr logger.Logger, verboseLogging bool, cfg QueueConfig, client make(chan [32]byte, int(cfg.TransmitQueueMaxSize())), serverURL, evm.NewReportCodecPremiumLegacy(lggr), + llo.JSONReportCodec{}, transmitSuccessCount.WithLabelValues(donIDStr, serverURL), transmitDuplicateCount.WithLabelValues(donIDStr, serverURL), transmitConnectionErrorCount.WithLabelValues(donIDStr, serverURL), @@ -225,11 +235,11 @@ func (s *server) transmit(ctx context.Context, t *Transmission) (*pb.TransmitReq switch t.Report.Info.ReportFormat { case llotypes.ReportFormatJSON: - payload, err = llo.JSONReportCodec{}.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs) + payload, err = s.jsonPacker.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs) case llotypes.ReportFormatEVMPremiumLegacy: - payload, err = s.reportCodecPremiumLegacy.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs) + payload, err = s.evmPremiumLegacyPacker.Pack(t.ConfigDigest, t.SeqNr, t.Report.Report, t.Sigs) default: - return nil, nil, fmt.Errorf("Transmit failed; unsupported report format: %q", t.Report.Info.ReportFormat) + return nil, nil, fmt.Errorf("Transmit failed; don't know how to Pack unsupported report format: %q", t.Report.Info.ReportFormat) } if err != nil { diff --git a/core/services/llo/mercurytransmitter/transmitter_test.go b/core/services/llo/mercurytransmitter/transmitter_test.go index db3d0d2e584..b1364643ddc 100644 --- a/core/services/llo/mercurytransmitter/transmitter_test.go +++ b/core/services/llo/mercurytransmitter/transmitter_test.go @@ -135,7 +135,7 @@ func Test_Transmitter_runQueueLoop(t *testing.T) { orm := NewORM(db, donID) cfg := mockCfg{} - s := newServer(lggr, cfg, c, orm, sURL) + s := newServer(lggr, true, cfg, c, orm, sURL) t.Run("pulls from queue and transmits successfully", func(t *testing.T) { transmit := make(chan *pb.TransmitRequest, 1) diff --git a/core/services/ocr2/plugins/llo/integration_test.go b/core/services/ocr2/plugins/llo/integration_test.go index bdd773910f4..043ce34c946 100644 --- a/core/services/ocr2/plugins/llo/integration_test.go +++ b/core/services/ocr2/plugins/llo/integration_test.go @@ -509,7 +509,8 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi assert.Equal(t, expectedBid.String(), reportElems["bid"].(*big.Int).String()) assert.Equal(t, expectedAsk.String(), reportElems["ask"].(*big.Int).String()) - t.Run(fmt.Sprintf("emulate mercury server verifying report (local verification) - node %x", req.pk), func(t *testing.T) { + // emulate mercury server verifying report (local verification) + { rv := mercuryverifier.NewVerifier() reportSigners, err := rv.Verify(mercuryverifier.SignedReport{ @@ -522,14 +523,13 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi require.NoError(t, err) assert.GreaterOrEqual(t, len(reportSigners), int(fNodes+1)) assert.Subset(t, signerAddresses, reportSigners) - }) + } - t.Run(fmt.Sprintf("test on-chain verification - node %x", req.pk), func(t *testing.T) { - t.Run("destination verifier", func(t *testing.T) { - _, err = verifierProxy.Verify(steve, req.req.Payload, []byte{}) - require.NoError(t, err) - }) - }) + // test on-chain verification + { + _, err = verifierProxy.Verify(steve, req.req.Payload, []byte{}) + require.NoError(t, err) + } t.Logf("oracle %x reported for 0x%x", req.pk[:], feedID[:]) @@ -597,7 +597,8 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi var greenDigest ocr2types.ConfigDigest allReports := make(map[types.ConfigDigest][]datastreamsllo.Report) - t.Run("start off with blue=production, green=staging (specimen reports)", func(t *testing.T) { + // start off with blue=production, green=staging (specimen reports) + { // Set config on configurator blueDigest = setProductionConfig( t, donID, steve, backend, configurator, configuratorAddress, nodes, oracles, @@ -617,8 +618,9 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi assert.Equal(t, "2976.39", r.Values[0].(*datastreamsllo.Decimal).String()) break } - }) - t.Run("setStagingConfig does not affect production", func(t *testing.T) { + } + // setStagingConfig does not affect production + { greenDigest = setStagingConfig( t, donID, steve, backend, configurator, configuratorAddress, nodes, oracles, blueDigest, ) @@ -639,8 +641,9 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi } assert.Equal(t, blueDigest, r.ConfigDigest) } - }) - t.Run("promoteStagingConfig flow has clean and gapless hand off from old production to newly promoted staging instance, leaving old production instance in 'retired' state", func(t *testing.T) { + } + // promoteStagingConfig flow has clean and gapless hand off from old production to newly promoted staging instance, leaving old production instance in 'retired' state + { promoteStagingConfig(t, donID, steve, backend, configurator, configuratorAddress, false) // NOTE: Wait for first non-specimen report for the newly promoted (green) instance @@ -704,8 +707,9 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi assert.Less(t, finalBlueReport.ValidAfterSeconds, finalBlueReport.ObservationTimestampSeconds) assert.Equal(t, finalBlueReport.ObservationTimestampSeconds, initialPromotedGreenReport.ValidAfterSeconds) assert.Less(t, initialPromotedGreenReport.ValidAfterSeconds, initialPromotedGreenReport.ObservationTimestampSeconds) - }) - t.Run("retired instance does not produce reports", func(t *testing.T) { + } + // retired instance does not produce reports + { // NOTE: Wait for five "green" reports to be produced and assert no "blue" reports i := 0 @@ -721,8 +725,9 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi assert.False(t, r.Specimen) assert.Equal(t, greenDigest, r.ConfigDigest) } - }) - t.Run("setStagingConfig replaces 'retired' instance with new config and starts producing specimen reports again", func(t *testing.T) { + } + // setStagingConfig replaces 'retired' instance with new config and starts producing specimen reports again + { blueDigest = setStagingConfig( t, donID, steve, backend, configurator, configuratorAddress, nodes, oracles, greenDigest, ) @@ -740,8 +745,9 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi } assert.Equal(t, greenDigest, r.ConfigDigest) } - }) - t.Run("promoteStagingConfig swaps the instances again", func(t *testing.T) { + } + // promoteStagingConfig swaps the instances again + { // TODO: Check that once an instance enters 'retired' state, it // doesn't produce reports or bother making observations promoteStagingConfig(t, donID, steve, backend, configurator, configuratorAddress, true) @@ -766,8 +772,9 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi assert.Less(t, finalGreenReport.ValidAfterSeconds, finalGreenReport.ObservationTimestampSeconds) assert.Equal(t, finalGreenReport.ObservationTimestampSeconds, initialPromotedBlueReport.ValidAfterSeconds) assert.Less(t, initialPromotedBlueReport.ValidAfterSeconds, initialPromotedBlueReport.ObservationTimestampSeconds) - }) - t.Run("adding a new channel definition is picked up on the fly", func(t *testing.T) { + } + // adding a new channel definition is picked up on the fly + { channelDefinitions[2] = llotypes.ChannelDefinition{ ReportFormat: llotypes.ReportFormatJSON, Streams: []llotypes.Stream{ @@ -805,7 +812,7 @@ channelDefinitionsContractFromBlock = %d`, serverURL, serverPubKey, donID, confi assert.Len(t, r.Values, 1) assert.Equal(t, "2976.39", r.Values[0].(*datastreamsllo.Decimal).String()) } - }) + } t.Run("deleting the jobs turns off oracles and cleans up resources", func(t *testing.T) { t.Skip("TODO - MERC-3524") }) diff --git a/core/services/ocrcommon/telemetry_test.go b/core/services/ocrcommon/telemetry_test.go index 8fac0ab2cbf..e499550dd4f 100644 --- a/core/services/ocrcommon/telemetry_test.go +++ b/core/services/ocrcommon/telemetry_test.go @@ -1,6 +1,7 @@ package ocrcommon import ( + "fmt" "math/big" "sync" "testing" @@ -1024,9 +1025,8 @@ func TestCollectMercuryEnhancedTelemetryV1(t *testing.T) { } wg.Wait() - require.Equal(t, 2, logs.Len()) - require.Contains(t, logs.All()[0].Message, `cannot get bridge response from bridge task, id=ds1, name="test-mercury-bridge-1"`) - require.Contains(t, logs.All()[1].Message, "cannot parse EA telemetry") + require.Equal(t, 1, logs.Len()) + require.Contains(t, logs.All()[0].Message, "cannot parse EA telemetry") chDone <- struct{}{} } @@ -1140,11 +1140,11 @@ func TestCollectMercuryEnhancedTelemetryV2(t *testing.T) { } wg.Wait() - require.Equal(t, 4, logs.Len()) + require.Equal(t, 3, logs.Len()) + fmt.Println(logs.All()) require.Contains(t, logs.All()[0].Message, "cannot parse enhanced EA telemetry bid price") require.Contains(t, logs.All()[1].Message, "cannot get bridge response from bridge task") - require.Contains(t, logs.All()[2].Message, "cannot parse EA telemetry") - require.Contains(t, logs.All()[3].Message, "cannot parse enhanced EA telemetry bid price") + require.Contains(t, logs.All()[2].Message, "cannot parse enhanced EA telemetry bid price") chDone <- struct{}{} } diff --git a/core/services/relay/evm/evm.go b/core/services/relay/evm/evm.go index fcf03cd8138..103d5274671 100644 --- a/core/services/relay/evm/evm.go +++ b/core/services/relay/evm/evm.go @@ -39,6 +39,7 @@ import ( txm "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" + coreconfig "github.com/smartcontractkit/chainlink/v2/core/config" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" "github.com/smartcontractkit/chainlink/v2/core/services/llo" "github.com/smartcontractkit/chainlink/v2/core/services/llo/bm" @@ -163,7 +164,7 @@ type CSAETHKeystore interface { } type MercuryConfig interface { - Transmitter() mercury.TransmitterConfig + Transmitter() coreconfig.MercuryTransmitter VerboseLogging() bool }