From 42b14781ff68d23107229c63f4e30dac876542ad Mon Sep 17 00:00:00 2001 From: martin-cll <121895364+martin-cll@users.noreply.github.com> Date: Fri, 20 Sep 2024 22:50:19 +1000 Subject: [PATCH] Skip telemetry for market status bridges (#14490) --- .changeset/slow-lizards-shout.md | 5 + core/services/ocrcommon/telemetry.go | 44 ++++-- core/services/ocrcommon/telemetry_test.go | 143 ++++++++++++++++-- .../telem/telem_enhanced_ea_mercury.pb.go | 2 +- .../telem/telem_enhanced_ea_mercury.proto | 2 +- 5 files changed, 172 insertions(+), 24 deletions(-) create mode 100644 .changeset/slow-lizards-shout.md diff --git a/.changeset/slow-lizards-shout.md b/.changeset/slow-lizards-shout.md new file mode 100644 index 00000000000..3b21c6576e7 --- /dev/null +++ b/.changeset/slow-lizards-shout.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Skip telemetry for market-status bridges #internal diff --git a/core/services/ocrcommon/telemetry.go b/core/services/ocrcommon/telemetry.go index 2a24cc4b9bd..78b6fff7180 100644 --- a/core/services/ocrcommon/telemetry.go +++ b/core/services/ocrcommon/telemetry.go @@ -395,7 +395,11 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced e.lggr.Warnw(fmt.Sprintf("cannot parse EA telemetry, job=%d, id=%s, name=%q", e.job.ID, trr.Task.DotID(), bridgeName), "err", err, "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName) } - assetSymbol := e.getAssetSymbolFromRequestData(bridgeTask.RequestData) + parsedBridgeData := parseBridgeRequestData(bridgeTask.RequestData, d.FeedVersion) + if parsedBridgeData.IsMarketStatus { + // Only collect telemetry for pricing bridges. + continue + } benchmarkPrice, bidPrice, askPrice := e.getPricesFromBridgeTask(trr, d.TaskRunResults, d.FeedVersion) @@ -432,7 +436,7 @@ func (e *EnhancedTelemetryService[T]) collectMercuryEnhancedTelemetry(d Enhanced Round: int64(d.RepTimestamp.Round), Epoch: int64(d.RepTimestamp.Epoch), BridgeRequestData: bridgeTask.RequestData, - AssetSymbol: assetSymbol, + AssetSymbol: parsedBridgeData.AssetSymbol, Version: uint32(d.FeedVersion), } e.lggr.Debugw(fmt.Sprintf("EA Telemetry = %+v", t), "feedID", e.job.OCR2OracleSpec.FeedID.Hex(), "jobID", e.job.ID, "dotID", trr.Task.DotID(), "bridgeName", bridgeName) @@ -459,12 +463,19 @@ func (e *EnhancedTelemetryService[T]) parseTelemetryAttributes(a string) (teleme return *attrs, nil } -// getAssetSymbolFromRequestData parses the requestData of the bridge to generate an asset symbol pair -func (e *EnhancedTelemetryService[T]) getAssetSymbolFromRequestData(requestData string) string { +type bridgeRequestData struct { + AssetSymbol string + IsMarketStatus bool +} + +// parseRequestData parses the requestData of the bridge. +func parseBridgeRequestData(requestData string, mercuryVersion mercuryutils.FeedVersion) bridgeRequestData { type reqDataPayload struct { - To *string `json:"to"` - From *string `json:"from"` - Address *string `json:"address"` // used for view function ea only + Endpoint *string `json:"endpoint"` + To *string `json:"to"` + From *string `json:"from"` + Address *string `json:"address"` // used for view function ea only + Market *string `json:"market"` // used for market status ea only } type reqData struct { Data reqDataPayload `json:"data"` @@ -473,18 +484,25 @@ func (e *EnhancedTelemetryService[T]) getAssetSymbolFromRequestData(requestData rd := &reqData{} err := json.Unmarshal([]byte(requestData), rd) if err != nil { - return "" + return bridgeRequestData{} + } + + if mercuryVersion == 4 && ((rd.Data.Endpoint != nil && *rd.Data.Endpoint == "market-status") || (rd.Data.Market != nil && *rd.Data.Market != "")) { + return bridgeRequestData{ + AssetSymbol: *rd.Data.Market, + IsMarketStatus: true, + } } if rd.Data.From != nil && rd.Data.To != nil { - return *rd.Data.From + "/" + *rd.Data.To + return bridgeRequestData{AssetSymbol: *rd.Data.From + "/" + *rd.Data.To} } if rd.Data.Address != nil { - return *rd.Data.Address + return bridgeRequestData{AssetSymbol: *rd.Data.Address} } - return "" + return bridgeRequestData{} } // ShouldCollectEnhancedTelemetryMercury checks if enhanced telemetry should be collected and sent @@ -599,8 +617,8 @@ func (e *EnhancedTelemetryService[T]) getPricesFromResultsByOrder(startTask pipe benchmarkPrice = e.parsePriceFromTask(*benchmarkPriceTask) } - // mercury version 2 only supports benchmarkPrice - if mercuryVersion == 2 { + // mercury versions 2 and 4 only supports benchmarkPrice + if mercuryVersion == 2 || mercuryVersion == 4 { return benchmarkPrice, 0, 0 } diff --git a/core/services/ocrcommon/telemetry_test.go b/core/services/ocrcommon/telemetry_test.go index 13b5ef34d43..e672f9f440a 100644 --- a/core/services/ocrcommon/telemetry_test.go +++ b/core/services/ocrcommon/telemetry_test.go @@ -7,19 +7,18 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/shopspring/decimal" + "github.com/smartcontractkit/libocr/offchainreporting2plus/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.uber.org/zap" "google.golang.org/protobuf/proto" - "github.com/smartcontractkit/libocr/offchainreporting2plus/types" - "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" "github.com/smartcontractkit/chainlink-common/pkg/types/mercury" mercuryv1 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v1" mercuryv2 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v2" - + mercuryv4 "github.com/smartcontractkit/chainlink-common/pkg/types/mercury/v4" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils" ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" @@ -274,7 +273,6 @@ func TestSendEATelemetry(t *testing.T) { expectedMessage, _ := proto.Marshal(&expectedTelemetry) wg.Wait() assert.Equal(t, expectedMessage, sentMessage) - //enhancedTelemService.StopOnce("EnhancedTelemetryService", func() error { return nil }) doneCh <- struct{}{} } @@ -446,6 +444,45 @@ var trrsMercuryV2 = pipeline.TaskRunResults{ }, } +var trrsMercuryV4 = pipeline.TaskRunResults{ + pipeline.TaskRunResult{ + Task: &pipeline.BridgeTask{ + Name: "link-usd-test-bridge-v2", + BaseTask: pipeline.NewBaseTask(0, "ds1", nil, nil, 0), + RequestData: `{"data":{"to":"LINK","from":"USD"}}`, + }, + Result: pipeline.Result{ + Value: bridgeResponse, + }, + }, + pipeline.TaskRunResult{ + Task: &pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(1, "ds1_benchmark", nil, nil, 1), + }, + Result: pipeline.Result{ + Value: 123456.123456, + }, + }, + pipeline.TaskRunResult{ + Task: &pipeline.BridgeTask{ + Name: "market-status-bridge", + BaseTask: pipeline.NewBaseTask(2, "ds2", nil, nil, 2), + RequestData: `{"data":{"endpoint":"market-status","market":"forex"}}`, + }, + Result: pipeline.Result{ + Value: bridgeResponse, + }, + }, + pipeline.TaskRunResult{ + Task: &pipeline.JSONParseTask{ + BaseTask: pipeline.NewBaseTask(3, "market_status", nil, nil, 3), + }, + Result: pipeline.Result{ + Value: 2.0, + }, + }, +} + func TestGetPricesFromBridgeByTelemetryField(t *testing.T) { lggr, _ := logger.TestLoggerObserved(t, zap.WarnLevel) e := EnhancedTelemetryService[EnhancedTelemetryMercuryData]{ @@ -618,13 +655,23 @@ func TestShouldCollectEnhancedTelemetryMercury(t *testing.T) { require.Equal(t, ShouldCollectEnhancedTelemetryMercury(j), false) } -func TestGetAssetSymbolFromRequestData(t *testing.T) { - e := EnhancedTelemetryService[EnhancedTelemetryMercuryData]{} - require.Equal(t, e.getAssetSymbolFromRequestData(""), "") +func TestParseBridgeRequestData(t *testing.T) { + require.Equal(t, parseBridgeRequestData("", 2), bridgeRequestData{}) + reqData := `{"data":{"to":"LINK","from":"USD"}}` - require.Equal(t, e.getAssetSymbolFromRequestData(reqData), "USD/LINK") + require.Equal(t, parseBridgeRequestData(reqData, 2), bridgeRequestData{AssetSymbol: "USD/LINK"}) + + reqData = `{"data":{"to":"LINK","from":"USD","market":"forex"}}` + require.Equal(t, parseBridgeRequestData(reqData, 2), bridgeRequestData{AssetSymbol: "USD/LINK"}) + + reqData = `{"data":{"endpoint":"market-status","market":"forex"}}` + require.Equal(t, parseBridgeRequestData(reqData, 4), bridgeRequestData{AssetSymbol: "forex", IsMarketStatus: true}) + + reqData = `{"data":{"market":"metals"}}` + require.Equal(t, parseBridgeRequestData(reqData, 4), bridgeRequestData{AssetSymbol: "metals", IsMarketStatus: true}) + viewFunctionReqData := `{"data":{"address":"0x12345678", "signature": "function stEthPerToken() view returns (int256)"}}` - require.Equal(t, "0x12345678", e.getAssetSymbolFromRequestData(viewFunctionReqData)) + require.Equal(t, parseBridgeRequestData(viewFunctionReqData, 3), bridgeRequestData{AssetSymbol: "0x12345678"}) } func getViewFunctionTaskRunResults() pipeline.TaskRunResults { @@ -1019,3 +1066,81 @@ func TestCollectMercuryEnhancedTelemetryV2(t *testing.T) { require.Contains(t, logs.All()[3].Message, "cannot parse enhanced EA telemetry bid price") chDone <- struct{}{} } + +func TestCollectMercuryEnhancedTelemetryV4(t *testing.T) { + ingressClient := mocks.NewTelemetryService(t) + ingressAgent := telemetry.NewIngressAgentWrapper(ingressClient) + monitoringEndpoint := ingressAgent.GenMonitoringEndpoint("test-network", "test-chainID", "0xa", synchronization.EnhancedEAMercury) + + sentMessageCh := make(chan []byte) + ingressClient.On("Send", mock.Anything, mock.AnythingOfType("[]uint8"), mock.AnythingOfType("string"), mock.AnythingOfType("TelemetryType")).Return().Run(func(args mock.Arguments) { + sentMessageCh <- args[1].([]byte) + }) + + lggr, _ := logger.TestLoggerObserved(t, zap.WarnLevel) + chTelem := make(chan EnhancedTelemetryMercuryData, 100) + chDone := make(chan struct{}) + feedID := common.HexToHash("0x0004") + e := EnhancedTelemetryService[EnhancedTelemetryMercuryData]{ + chDone: chDone, + chTelem: chTelem, + job: &job.Job{ + Type: job.Type(pipeline.OffchainReporting2JobType), + OCR2OracleSpec: &job.OCR2OracleSpec{ + CaptureEATelemetry: true, + FeedID: &feedID, + }, + }, + lggr: lggr, + monitoringEndpoint: monitoringEndpoint, + } + servicetest.Run(t, &e) + + chTelem <- EnhancedTelemetryMercuryData{ + TaskRunResults: trrsMercuryV4, + FeedVersion: 4, + V4Observation: &mercuryv4.Observation{ + BenchmarkPrice: mercury.ObsResult[*big.Int]{Val: big.NewInt(111111)}, + MarketStatus: mercury.ObsResult[uint32]{Val: 2}, + MaxFinalizedTimestamp: mercury.ObsResult[int64]{Val: 321}, + LinkPrice: mercury.ObsResult[*big.Int]{Val: big.NewInt(4321)}, + NativePrice: mercury.ObsResult[*big.Int]{Val: big.NewInt(54321)}, + }, + RepTimestamp: types.ReportTimestamp{ + ConfigDigest: types.ConfigDigest{2}, + Epoch: 11, + Round: 22, + }, + } + + expectedPricingTelemetry := telem.EnhancedEAMercury{ + DataSource: "data-source-name", + DpBenchmarkPrice: 123456.123456, + BridgeTaskRunStartedTimestamp: trrsMercuryV4[0].CreatedAt.UnixMilli(), + BridgeTaskRunEndedTimestamp: trrsMercuryV4[0].FinishedAt.Time.UnixMilli(), + ProviderRequestedTimestamp: 92233720368547760, + ProviderReceivedTimestamp: -92233720368547760, + ProviderDataStreamEstablished: 1, + ProviderIndicatedTime: -123456789, + Feed: common.HexToHash("0x0004").String(), + ObservationBenchmarkPrice: 111111, + ObservationMarketStatus: 2, + ConfigDigest: "0200000000000000000000000000000000000000000000000000000000000000", + Round: 22, + Epoch: 11, + AssetSymbol: "USD/LINK", + ObservationBenchmarkPriceString: "111111", + MaxFinalizedTimestamp: 321, + LinkPrice: 4321, + NativePrice: 54321, + Version: 4, + BridgeRequestData: `{"data":{"to":"LINK","from":"USD"}}`, + } + expectedPricingMessage, _ := proto.Marshal(&expectedPricingTelemetry) + require.Equal(t, expectedPricingMessage, <-sentMessageCh) + + chDone <- struct{}{} + + // Verify that no other telemetry is sent. + require.Len(t, sentMessageCh, 0) +} diff --git a/core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go b/core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go index 7be7ad8d706..09eed12ee8a 100644 --- a/core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go +++ b/core/services/synchronization/telem/telem_enhanced_ea_mercury.pb.go @@ -101,7 +101,7 @@ type EnhancedEAMercury struct { ProviderDataStreamEstablished int64 `protobuf:"varint,12,opt,name=provider_data_stream_established,json=providerDataStreamEstablished,proto3" json:"provider_data_stream_established,omitempty"` ProviderIndicatedTime int64 `protobuf:"varint,13,opt,name=provider_indicated_time,json=providerIndicatedTime,proto3" json:"provider_indicated_time,omitempty"` Feed string `protobuf:"bytes,14,opt,name=feed,proto3" json:"feed,omitempty"` - // v1+v2+v3 + // v1+v2+v3+v4 ObservationBenchmarkPrice int64 `protobuf:"varint,15,opt,name=observation_benchmark_price,json=observationBenchmarkPrice,proto3" json:"observation_benchmark_price,omitempty"` // This value overflows, will be reserved and removed in future versions ObservationBenchmarkPriceString string `protobuf:"bytes,22,opt,name=observation_benchmark_price_string,json=observationBenchmarkPriceString,proto3" json:"observation_benchmark_price_string,omitempty"` // v1+v3 diff --git a/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto b/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto index c96c58f9ea3..d57b7ca836a 100644 --- a/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto +++ b/core/services/synchronization/telem/telem_enhanced_ea_mercury.proto @@ -44,7 +44,7 @@ message EnhancedEAMercury { string feed=14; - // v1+v2+v3 + // v1+v2+v3+v4 int64 observation_benchmark_price=15; // This value overflows, will be reserved and removed in future versions string observation_benchmark_price_string = 22; // v1+v3