From 8a944818e16111c811751787e6b99e82fd01df3c Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Fri, 19 Apr 2024 04:28:10 +0000 Subject: [PATCH 1/6] Add operator registered quorum metrics --- node/config.go | 2 ++ node/flags/flags.go | 8 ++++++ node/grpc/server_test.go | 4 ++- node/metrics.go | 55 ++++++++++++++++++++++++++++++++-------- node/node.go | 4 +-- node/store_test.go | 6 ++++- test/integration_test.go | 16 ++++++------ 7 files changed, 72 insertions(+), 23 deletions(-) diff --git a/node/config.go b/node/config.go index 970cc355cd..a619c8be05 100644 --- a/node/config.go +++ b/node/config.go @@ -48,6 +48,7 @@ type Config struct { NodeApiPort string EnableMetrics bool MetricsPort string + OnchainMetricsInterval int64 Timeout time.Duration RegisterNodeAtStart bool ExpirationPollIntervalSec uint64 @@ -167,6 +168,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) { NodeApiPort: ctx.GlobalString(flags.NodeApiPortFlag.Name), EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name), MetricsPort: ctx.GlobalString(flags.MetricsPortFlag.Name), + OnchainMetricsInterval: ctx.GlobalInt64(flags.OnchainMetricsIntervalFlag.Name), Timeout: timeout, RegisterNodeAtStart: registerNodeAtStart, ExpirationPollIntervalSec: expirationPollIntervalSec, diff --git a/node/flags/flags.go b/node/flags/flags.go index 7da53f6867..b05d7f0e24 100644 --- a/node/flags/flags.go +++ b/node/flags/flags.go @@ -73,6 +73,13 @@ var ( Value: "9091", EnvVar: common.PrefixEnvVar(EnvVarPrefix, "METRICS_PORT"), } + OnchainMetricsIntervalFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "onchain-metrics-interval"), + Usage: "The interval in seconds at which the node polls the onchain state of the operator and update metrics. <=0 means no poll", + Required: false, + Value: "60", + EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ONCHAIN_METRICS_INTERVAL"), + } TimeoutFlag = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "timeout"), Usage: "Amount of time to wait for GPRC", @@ -238,6 +245,7 @@ var requiredFlags = []cli.Flag{ RetrievalPortFlag, EnableMetricsFlag, MetricsPortFlag, + OnchainMetricsIntervalFlag, EnableNodeApiFlag, NodeApiPortFlag, TimeoutFlag, diff --git a/node/grpc/server_test.go b/node/grpc/server_test.go index 39d645a85f..24e94ac849 100644 --- a/node/grpc/server_test.go +++ b/node/grpc/server_test.go @@ -16,6 +16,7 @@ import ( commonmock "github.com/Layr-Labs/eigenda/common/mock" "github.com/Layr-Labs/eigenda/core" core_mock "github.com/Layr-Labs/eigenda/core/mock" + coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/encoding/kzg" "github.com/Layr-Labs/eigenda/encoding/kzg/prover" @@ -101,7 +102,8 @@ func newTestServer(t *testing.T, mockValidator bool) *grpc.Server { } noopMetrics := metrics.NewNoopMetrics() reg := prometheus.NewRegistry() - metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090") + tx := &coremock.MockTransactor{} + metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090", opID, -1, tx) store, err := node.NewLevelDBStore(dbPath, logger, metrics, 1e9, 1e9) if err != nil { panic("failed to create a new levelDB store") diff --git a/node/metrics.go b/node/metrics.go index f523a7a57c..e85d9fce62 100644 --- a/node/metrics.go +++ b/node/metrics.go @@ -3,8 +3,10 @@ package node import ( "context" "strconv" + "time" "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/eth" "github.com/Layr-Labs/eigensdk-go/logging" eigenmetrics "github.com/Layr-Labs/eigensdk-go/metrics" @@ -20,8 +22,8 @@ const ( type Metrics struct { logger logging.Logger - // Whether the node is registered. - Registered prometheus.Gauge + // The quorums the node is registered. + RegisteredQuorums *prometheus.GaugeVec // Accumulated number of RPC requests received. AccNumRequests *prometheus.CounterVec // The latency (in ms) to process the request. @@ -40,22 +42,26 @@ type Metrics struct { registry *prometheus.Registry // socketAddr is the address at which the metrics server will be listening. // should be in format ip:port - socketAddr string + socketAddr string + operatorId core.OperatorID + onchainMetricsInterval int64 + tx core.Transactor } -func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, logger logging.Logger, socketAddr string) *Metrics { +func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, logger logging.Logger, socketAddr string, operatorId core.OperatorID, onchainMetricsInterval int64, tx core.Transactor) *Metrics { // Add Go module collectors reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) metrics := &Metrics{ - Registered: promauto.With(reg).NewGauge( + RegisteredQuorums: promauto.With(reg).NewGaugeVec( prometheus.GaugeOpts{ Namespace: Namespace, - Name: "registered", - Help: "indicator about if DA node is registered", + Name: "registered_quorums", + Help: "the quorums the DA node is registered", }, + []string{"quorum"}, ), // The "status" label has values: success, failure. AccNumRequests: promauto.With(reg).NewCounterVec( @@ -108,10 +114,13 @@ func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, log Help: "the total number of node's socket address updates", }, ), - EigenMetrics: eigenMetrics, - logger: logger.With("component", "NodeMetrics"), - registry: reg, - socketAddr: socketAddr, + EigenMetrics: eigenMetrics, + logger: logger.With("component", "NodeMetrics"), + registry: reg, + socketAddr: socketAddr, + operatorId: operatorId, + onchainMetricsInterval: onchainMetricsInterval, + tx: tx, } return metrics @@ -119,6 +128,10 @@ func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, log func (g *Metrics) Start() { _ = g.EigenMetrics.Start(context.Background(), g.registry) + + if g.onchainMetricsInterval > 0 { + go g.collectOnchainMetrics() + } } func (g *Metrics) RecordRPCRequest(method string, status string) { @@ -150,3 +163,23 @@ func (g *Metrics) AcceptBatches(status string, batchSize uint64) { g.AccuBatches.WithLabelValues("number", status).Inc() g.AccuBatches.WithLabelValues("size", status).Add(float64(batchSize)) } + +func (g *Metrics) collectOnchainMetrics() { + ticker := time.NewTicker(time.Duration(uint64(g.onchainMetricsInterval))) + defer ticker.Stop() + for range ticker.C { + bitmap, err := g.tx.GetCurrentQuorumBitmapByOperatorId(context.Background(), g.operatorId) + if err != nil { + g.logger.Error("Failed to GetOperatorStakes from the Chain for metrics", "err", err) + continue + } + quorums := eth.BitmapToQuorumIds(bitmap) + if len(quorums) == 0 { + g.logger.Info("Warning: this node is no longer in any quorum") + continue + } + for _, q := range quorums { + g.RegisteredQuorums.WithLabelValues(string(q)).Set(float64(1.0)) + } + } +} diff --git a/node/node.go b/node/node.go index a2c6e3b817..b7b32beb0d 100644 --- a/node/node.go +++ b/node/node.go @@ -70,8 +70,6 @@ func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger promReg := prometheus.NewRegistry() eigenMetrics := metrics.NewEigenMetrics(AppName, ":"+config.MetricsPort, promReg, logger.With("component", "EigenMetrics")) - - metrics := NewMetrics(eigenMetrics, promReg, logger, ":"+config.MetricsPort) rpcCallsCollector := rpccalls.NewCollector(AppName, promReg) // Generate BLS keys @@ -110,6 +108,8 @@ func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger // Setup Node Api nodeApi := nodeapi.NewNodeApi(AppName, SemVer, ":"+config.NodeApiPort, logger.With("component", "NodeApi")) + metrics := NewMetrics(eigenMetrics, promReg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx) + // Make validator v, err := verifier.NewVerifier(&config.EncoderConfig, false) if err != nil { diff --git a/node/store_test.go b/node/store_test.go index cb28814af0..456f51b771 100644 --- a/node/store_test.go +++ b/node/store_test.go @@ -9,7 +9,9 @@ import ( commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" pb "github.com/Layr-Labs/eigenda/api/grpc/node" "github.com/Layr-Labs/eigenda/core" + coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/encoding" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/Layr-Labs/eigenda/node" "github.com/Layr-Labs/eigensdk-go/logging" @@ -177,7 +179,9 @@ func TestStoringBlob(t *testing.T) { noopMetrics := metrics.NewNoopMetrics() reg := prometheus.NewRegistry() logger := logging.NewNoopLogger() - s, _ := node.NewLevelDBStore(t.TempDir(), logger, node.NewMetrics(noopMetrics, reg, logger, ":9090"), staleMeasure, storeDuration) + operatorId := [32]byte(hexutil.MustDecode("0x3fbfefcdc76462d2cdb7d0cea75f27223829481b8b4aa6881c94cb2126a316ad")) + tx := &coremock.MockTransactor{} + s, _ := node.NewLevelDBStore(t.TempDir(), logger, node.NewMetrics(noopMetrics, reg, logger, ":9090", operatorId, -1, tx), staleMeasure, storeDuration) ctx := context.Background() // Empty store diff --git a/test/integration_test.go b/test/integration_test.go index 9d5e40a314..9c1fd99169 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -266,14 +266,6 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging _, v0 := mustMakeTestComponents() val := core.NewShardValidator(v0, asn, cst, id) - noopMetrics := metrics.NewNoopMetrics() - reg := prometheus.NewRegistry() - metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090") - store, err := node.NewLevelDBStore(config.DbPath+"/chunk", logger, metrics, 1e9, 1e9) - if err != nil { - t.Fatal(err) - } - tx := &coremock.MockTransactor{} tx.On("RegisterBLSPublicKey").Return(nil) tx.On("RegisterOperator").Return(nil) @@ -282,6 +274,14 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging tx.On("GetBlockStaleMeasure").Return(nil) tx.On("GetStoreDurationBlocks").Return(nil) + noopMetrics := metrics.NewNoopMetrics() + reg := prometheus.NewRegistry() + metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090", config.ID, -1, tx) + store, err := node.NewLevelDBStore(config.DbPath+"/chunk", logger, metrics, 1e9, 1e9) + if err != nil { + t.Fatal(err) + } + mockOperatorSocketsFilterer := &coremock.MockOperatorSocketsFilterer{} mockSocketChan := make(chan string) From 3292263e9e382c05c8bcace724bb006fc0ef1bda Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Fri, 19 Apr 2024 19:10:46 +0000 Subject: [PATCH 2/6] improve --- node/grpc/server_test.go | 13 +++++---- node/metrics.go | 63 +++++++++++++++++++++++++++++++++------- node/node.go | 2 +- node/store_test.go | 7 ++++- test/integration_test.go | 2 +- 5 files changed, 67 insertions(+), 20 deletions(-) diff --git a/node/grpc/server_test.go b/node/grpc/server_test.go index 24e94ac849..56c2f00749 100644 --- a/node/grpc/server_test.go +++ b/node/grpc/server_test.go @@ -103,12 +103,6 @@ func newTestServer(t *testing.T, mockValidator bool) *grpc.Server { noopMetrics := metrics.NewNoopMetrics() reg := prometheus.NewRegistry() tx := &coremock.MockTransactor{} - metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090", opID, -1, tx) - store, err := node.NewLevelDBStore(dbPath, logger, metrics, 1e9, 1e9) - if err != nil { - panic("failed to create a new levelDB store") - } - defer os.Remove(dbPath) ratelimiter := &commonmock.NoopRatelimiter{} @@ -140,6 +134,13 @@ func newTestServer(t *testing.T, mockValidator bool) *grpc.Server { val = core.NewShardValidator(v, asn, cst, opID) } + metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090", opID, -1, tx, chainState) + store, err := node.NewLevelDBStore(dbPath, logger, metrics, 1e9, 1e9) + if err != nil { + panic("failed to create a new levelDB store") + } + defer os.Remove(dbPath) + node := &node.Node{ Config: config, Logger: logger, diff --git a/node/metrics.go b/node/metrics.go index e85d9fce62..91fc6d744b 100644 --- a/node/metrics.go +++ b/node/metrics.go @@ -2,6 +2,8 @@ package node import ( "context" + "math/big" + "sort" "strconv" "time" @@ -23,7 +25,7 @@ type Metrics struct { logger logging.Logger // The quorums the node is registered. - RegisteredQuorums *prometheus.GaugeVec + Registered *prometheus.GaugeVec // Accumulated number of RPC requests received. AccNumRequests *prometheus.CounterVec // The latency (in ms) to process the request. @@ -46,22 +48,25 @@ type Metrics struct { operatorId core.OperatorID onchainMetricsInterval int64 tx core.Transactor + chainState core.ChainState } -func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, logger logging.Logger, socketAddr string, operatorId core.OperatorID, onchainMetricsInterval int64, tx core.Transactor) *Metrics { +func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, logger logging.Logger, socketAddr string, operatorId core.OperatorID, onchainMetricsInterval int64, tx core.Transactor, chainState core.ChainState) *Metrics { // Add Go module collectors reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) metrics := &Metrics{ - RegisteredQuorums: promauto.With(reg).NewGaugeVec( + // The "type" label have values: stake, rank. The "stake" is stake share (in basis point), + // and the "rank" is operator's ranking by stake share in the quorum. + Registered: promauto.With(reg).NewGaugeVec( prometheus.GaugeOpts{ Namespace: Namespace, - Name: "registered_quorums", + Name: "registered", Help: "the quorums the DA node is registered", }, - []string{"quorum"}, + []string{"quorum", "type"}, ), // The "status" label has values: success, failure. AccNumRequests: promauto.With(reg).NewCounterVec( @@ -121,6 +126,7 @@ func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, log operatorId: operatorId, onchainMetricsInterval: onchainMetricsInterval, tx: tx, + chainState: chainState, } return metrics @@ -167,19 +173,54 @@ func (g *Metrics) AcceptBatches(status string, batchSize uint64) { func (g *Metrics) collectOnchainMetrics() { ticker := time.NewTicker(time.Duration(uint64(g.onchainMetricsInterval))) defer ticker.Stop() + + // 3 chain RPC calls in each cycle. for range ticker.C { - bitmap, err := g.tx.GetCurrentQuorumBitmapByOperatorId(context.Background(), g.operatorId) + ctx := context.Background() + blockNum, err := g.tx.GetCurrentBlockNumber(ctx) + if err != nil { + g.logger.Error("Failed to query chain RPC for current block number", "err", err) + continue + } + bitmaps, err := g.tx.GetQuorumBitmapForOperatorsAtBlockNumber(ctx, []core.OperatorID{g.operatorId}, blockNum) if err != nil { - g.logger.Error("Failed to GetOperatorStakes from the Chain for metrics", "err", err) + g.logger.Error("Failed to query chain RPC for quorum bitmap", "err", err) continue } - quorums := eth.BitmapToQuorumIds(bitmap) + quorums := eth.BitmapToQuorumIds(bitmaps[0]) if len(quorums) == 0 { - g.logger.Info("Warning: this node is no longer in any quorum") + g.logger.Info("Warning: this node is no longer in any quorum", "blockNumber", blockNum, "operatorId", g.operatorId.Hex()) continue } - for _, q := range quorums { - g.RegisteredQuorums.WithLabelValues(string(q)).Set(float64(1.0)) + state, err := g.chainState.GetOperatorState(ctx, uint(blockNum), quorums) + if err != nil { + g.logger.Error("Failed to query chain RPC for operator state", "blockNumber", blockNum, "quorumIds", quorums, "err", err) + continue + } + type OperatorStakeShare struct { + operatorId core.OperatorID + stakeShare float64 + } + for q, operators := range state.Operators { + operatorStakeShares := make([]*OperatorStakeShare, 0) + for opId, opInfo := range operators { + share, _ := new(big.Int).Div(new(big.Int).Mul(opInfo.Stake, big.NewInt(10000)), state.Totals[q].Stake).Float64() + operatorStakeShares = append(operatorStakeShares, &OperatorStakeShare{operatorId: opId, stakeShare: share}) + } + sort.Slice(operatorStakeShares, func(i, j int) bool { + if operatorStakeShares[i].stakeShare == operatorStakeShares[j].stakeShare { + return operatorStakeShares[i].operatorId.Hex() < operatorStakeShares[j].operatorId.Hex() + } + return operatorStakeShares[i].stakeShare > operatorStakeShares[j].stakeShare + }) + for i, op := range operatorStakeShares { + if op.operatorId == g.operatorId { + g.logger.Info("Current operator registration onchain", "operatorId", g.operatorId.Hex(), "blockNumber", blockNum, "quorumId", q, "stakeShare", op.stakeShare, "rank", i+1) + g.Registered.WithLabelValues(string(q), "stake").Set(op.stakeShare) + g.Registered.WithLabelValues(string(q), "rank").Set(float64(i + 1)) + break + } + } } } } diff --git a/node/node.go b/node/node.go index b7b32beb0d..1356e7d41e 100644 --- a/node/node.go +++ b/node/node.go @@ -108,7 +108,7 @@ func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger // Setup Node Api nodeApi := nodeapi.NewNodeApi(AppName, SemVer, ":"+config.NodeApiPort, logger.With("component", "NodeApi")) - metrics := NewMetrics(eigenMetrics, promReg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx) + metrics := NewMetrics(eigenMetrics, promReg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx, cst) // Make validator v, err := verifier.NewVerifier(&config.EncoderConfig, false) diff --git a/node/store_test.go b/node/store_test.go index 456f51b771..176345dd93 100644 --- a/node/store_test.go +++ b/node/store_test.go @@ -9,6 +9,7 @@ import ( commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" pb "github.com/Layr-Labs/eigenda/api/grpc/node" "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/mock" coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/encoding" "github.com/ethereum/go-ethereum/common/hexutil" @@ -181,7 +182,11 @@ func TestStoringBlob(t *testing.T) { logger := logging.NewNoopLogger() operatorId := [32]byte(hexutil.MustDecode("0x3fbfefcdc76462d2cdb7d0cea75f27223829481b8b4aa6881c94cb2126a316ad")) tx := &coremock.MockTransactor{} - s, _ := node.NewLevelDBStore(t.TempDir(), logger, node.NewMetrics(noopMetrics, reg, logger, ":9090", operatorId, -1, tx), staleMeasure, storeDuration) + dat, _ := mock.MakeChainDataMock(map[uint8]int{ + 0: 6, + 1: 3, + }) + s, _ := node.NewLevelDBStore(t.TempDir(), logger, node.NewMetrics(noopMetrics, reg, logger, ":9090", operatorId, -1, tx, dat), staleMeasure, storeDuration) ctx := context.Background() // Empty store diff --git a/test/integration_test.go b/test/integration_test.go index 9c1fd99169..ecda8fbe85 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -276,7 +276,7 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging noopMetrics := metrics.NewNoopMetrics() reg := prometheus.NewRegistry() - metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090", config.ID, -1, tx) + metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090", config.ID, -1, tx, cst) store, err := node.NewLevelDBStore(config.DbPath+"/chunk", logger, metrics, 1e9, 1e9) if err != nil { t.Fatal(err) From 370591076cf9a17dc47911e0f2d500a1d8f604ee Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Fri, 19 Apr 2024 19:33:46 +0000 Subject: [PATCH 3/6] fix --- node/metrics.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/node/metrics.go b/node/metrics.go index 91fc6d744b..734a7eebc9 100644 --- a/node/metrics.go +++ b/node/metrics.go @@ -58,8 +58,8 @@ func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, log reg.MustRegister(collectors.NewGoCollector()) metrics := &Metrics{ - // The "type" label have values: stake, rank. The "stake" is stake share (in basis point), - // and the "rank" is operator's ranking by stake share in the quorum. + // The "type" label have values: stake_share, rank. The "stake_share" is stake share (in basis point), + // and the "rank" is operator's ranking (the operator with highest amount of stake ranked as 1) by stake share in the quorum. Registered: promauto.With(reg).NewGaugeVec( prometheus.GaugeOpts{ Namespace: Namespace, @@ -184,17 +184,17 @@ func (g *Metrics) collectOnchainMetrics() { } bitmaps, err := g.tx.GetQuorumBitmapForOperatorsAtBlockNumber(ctx, []core.OperatorID{g.operatorId}, blockNum) if err != nil { - g.logger.Error("Failed to query chain RPC for quorum bitmap", "err", err) + g.logger.Error("Failed to query chain RPC for quorum bitmap", "blockNumber", blockNum, "err", err) continue } - quorums := eth.BitmapToQuorumIds(bitmaps[0]) - if len(quorums) == 0 { - g.logger.Info("Warning: this node is no longer in any quorum", "blockNumber", blockNum, "operatorId", g.operatorId.Hex()) + quorumIds := eth.BitmapToQuorumIds(bitmaps[0]) + if len(quorumIds) == 0 { + g.logger.Warn("This node is currently not in any quorum", "blockNumber", blockNum, "operatorId", g.operatorId.Hex()) continue } - state, err := g.chainState.GetOperatorState(ctx, uint(blockNum), quorums) + state, err := g.chainState.GetOperatorState(ctx, uint(blockNum), quorumIds) if err != nil { - g.logger.Error("Failed to query chain RPC for operator state", "blockNumber", blockNum, "quorumIds", quorums, "err", err) + g.logger.Error("Failed to query chain RPC for operator state", "blockNumber", blockNum, "quorumIds", quorumIds, "err", err) continue } type OperatorStakeShare struct { @@ -207,6 +207,7 @@ func (g *Metrics) collectOnchainMetrics() { share, _ := new(big.Int).Div(new(big.Int).Mul(opInfo.Stake, big.NewInt(10000)), state.Totals[q].Stake).Float64() operatorStakeShares = append(operatorStakeShares, &OperatorStakeShare{operatorId: opId, stakeShare: share}) } + // Descending order by stake share in the quorum. sort.Slice(operatorStakeShares, func(i, j int) bool { if operatorStakeShares[i].stakeShare == operatorStakeShares[j].stakeShare { return operatorStakeShares[i].operatorId.Hex() < operatorStakeShares[j].operatorId.Hex() @@ -215,9 +216,9 @@ func (g *Metrics) collectOnchainMetrics() { }) for i, op := range operatorStakeShares { if op.operatorId == g.operatorId { - g.logger.Info("Current operator registration onchain", "operatorId", g.operatorId.Hex(), "blockNumber", blockNum, "quorumId", q, "stakeShare", op.stakeShare, "rank", i+1) - g.Registered.WithLabelValues(string(q), "stake").Set(op.stakeShare) + g.Registered.WithLabelValues(string(q), "stake_share").Set(op.stakeShare) g.Registered.WithLabelValues(string(q), "rank").Set(float64(i + 1)) + g.logger.Info("Current operator registration onchain", "operatorId", g.operatorId.Hex(), "blockNumber", blockNum, "quorumId", q, "stakeShare (basis point)", op.stakeShare, "rank", i+1) break } } From 6da9de8196171ca6585da025ad38c13be9669a64 Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Fri, 19 Apr 2024 19:35:10 +0000 Subject: [PATCH 4/6] fix --- node/flags/flags.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/flags/flags.go b/node/flags/flags.go index b05d7f0e24..f40ac22b5c 100644 --- a/node/flags/flags.go +++ b/node/flags/flags.go @@ -77,7 +77,7 @@ var ( Name: common.PrefixFlag(FlagPrefix, "onchain-metrics-interval"), Usage: "The interval in seconds at which the node polls the onchain state of the operator and update metrics. <=0 means no poll", Required: false, - Value: "60", + Value: "180", EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ONCHAIN_METRICS_INTERVAL"), } TimeoutFlag = cli.StringFlag{ From 9eac11a25628f83fa5d5e0031563934aadfd1d5f Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Fri, 19 Apr 2024 21:02:19 +0000 Subject: [PATCH 5/6] rename --- node/metrics.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/node/metrics.go b/node/metrics.go index 734a7eebc9..ce7fef5e9c 100644 --- a/node/metrics.go +++ b/node/metrics.go @@ -25,7 +25,7 @@ type Metrics struct { logger logging.Logger // The quorums the node is registered. - Registered *prometheus.GaugeVec + RegisteredQuorums *prometheus.GaugeVec // Accumulated number of RPC requests received. AccNumRequests *prometheus.CounterVec // The latency (in ms) to process the request. @@ -60,7 +60,7 @@ func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, log metrics := &Metrics{ // The "type" label have values: stake_share, rank. The "stake_share" is stake share (in basis point), // and the "rank" is operator's ranking (the operator with highest amount of stake ranked as 1) by stake share in the quorum. - Registered: promauto.With(reg).NewGaugeVec( + RegisteredQuorums: promauto.With(reg).NewGaugeVec( prometheus.GaugeOpts{ Namespace: Namespace, Name: "registered", @@ -216,8 +216,8 @@ func (g *Metrics) collectOnchainMetrics() { }) for i, op := range operatorStakeShares { if op.operatorId == g.operatorId { - g.Registered.WithLabelValues(string(q), "stake_share").Set(op.stakeShare) - g.Registered.WithLabelValues(string(q), "rank").Set(float64(i + 1)) + g.RegisteredQuorums.WithLabelValues(string(q), "stake_share").Set(op.stakeShare) + g.RegisteredQuorums.WithLabelValues(string(q), "rank").Set(float64(i + 1)) g.logger.Info("Current operator registration onchain", "operatorId", g.operatorId.Hex(), "blockNumber", blockNum, "quorumId", q, "stakeShare (basis point)", op.stakeShare, "rank", i+1) break } From 1bc0e4b76ca5f6093a5e5c9f72e223d864463cbe Mon Sep 17 00:00:00 2001 From: Jian Xiao Date: Fri, 19 Apr 2024 21:41:39 +0000 Subject: [PATCH 6/6] fix --- inabox/deploy/config.go | 1 + inabox/deploy/env_vars.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go index db62acb307..06e4c3f071 100644 --- a/inabox/deploy/config.go +++ b/inabox/deploy/config.go @@ -309,6 +309,7 @@ func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath, NODE_PUBLIC_IP_PROVIDER: "mockip", NODE_PUBLIC_IP_CHECK_INTERVAL: "10s", NODE_NUM_CONFIRMATIONS: "0", + NODE_ONCHAIN_METRICS_INTERVAL: "-1", } env.applyDefaults(&v, "NODE", "opr", ind) diff --git a/inabox/deploy/env_vars.go b/inabox/deploy/env_vars.go index fa64211064..8ef944fb30 100644 --- a/inabox/deploy/env_vars.go +++ b/inabox/deploy/env_vars.go @@ -325,6 +325,8 @@ type OperatorVars struct { NODE_LOG_PATH string NODE_LOG_FORMAT string + + NODE_ONCHAIN_METRICS_INTERVAL string } func (vars OperatorVars) getEnvMap() map[string]string {