diff --git a/inabox/deploy/config.go b/inabox/deploy/config.go index db62acb307..06e4c3f071 100644 --- a/inabox/deploy/config.go +++ b/inabox/deploy/config.go @@ -309,6 +309,7 @@ func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath, NODE_PUBLIC_IP_PROVIDER: "mockip", NODE_PUBLIC_IP_CHECK_INTERVAL: "10s", NODE_NUM_CONFIRMATIONS: "0", + NODE_ONCHAIN_METRICS_INTERVAL: "-1", } env.applyDefaults(&v, "NODE", "opr", ind) diff --git a/inabox/deploy/env_vars.go b/inabox/deploy/env_vars.go index fa64211064..8ef944fb30 100644 --- a/inabox/deploy/env_vars.go +++ b/inabox/deploy/env_vars.go @@ -325,6 +325,8 @@ type OperatorVars struct { NODE_LOG_PATH string NODE_LOG_FORMAT string + + NODE_ONCHAIN_METRICS_INTERVAL string } func (vars OperatorVars) getEnvMap() map[string]string { diff --git a/node/config.go b/node/config.go index 970cc355cd..a619c8be05 100644 --- a/node/config.go +++ b/node/config.go @@ -48,6 +48,7 @@ type Config struct { NodeApiPort string EnableMetrics bool MetricsPort string + OnchainMetricsInterval int64 Timeout time.Duration RegisterNodeAtStart bool ExpirationPollIntervalSec uint64 @@ -167,6 +168,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) { NodeApiPort: ctx.GlobalString(flags.NodeApiPortFlag.Name), EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name), MetricsPort: ctx.GlobalString(flags.MetricsPortFlag.Name), + OnchainMetricsInterval: ctx.GlobalInt64(flags.OnchainMetricsIntervalFlag.Name), Timeout: timeout, RegisterNodeAtStart: registerNodeAtStart, ExpirationPollIntervalSec: expirationPollIntervalSec, diff --git a/node/flags/flags.go b/node/flags/flags.go index 7da53f6867..f40ac22b5c 100644 --- a/node/flags/flags.go +++ b/node/flags/flags.go @@ -73,6 +73,13 @@ var ( Value: "9091", EnvVar: common.PrefixEnvVar(EnvVarPrefix, "METRICS_PORT"), } + OnchainMetricsIntervalFlag = cli.StringFlag{ + Name: common.PrefixFlag(FlagPrefix, "onchain-metrics-interval"), + Usage: "The interval in seconds at which the node polls the onchain state of the operator and update metrics. <=0 means no poll", + Required: false, + Value: "180", + EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ONCHAIN_METRICS_INTERVAL"), + } TimeoutFlag = cli.StringFlag{ Name: common.PrefixFlag(FlagPrefix, "timeout"), Usage: "Amount of time to wait for GPRC", @@ -238,6 +245,7 @@ var requiredFlags = []cli.Flag{ RetrievalPortFlag, EnableMetricsFlag, MetricsPortFlag, + OnchainMetricsIntervalFlag, EnableNodeApiFlag, NodeApiPortFlag, TimeoutFlag, diff --git a/node/grpc/server_test.go b/node/grpc/server_test.go index 39d645a85f..56c2f00749 100644 --- a/node/grpc/server_test.go +++ b/node/grpc/server_test.go @@ -16,6 +16,7 @@ import ( commonmock "github.com/Layr-Labs/eigenda/common/mock" "github.com/Layr-Labs/eigenda/core" core_mock "github.com/Layr-Labs/eigenda/core/mock" + coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/encoding" "github.com/Layr-Labs/eigenda/encoding/kzg" "github.com/Layr-Labs/eigenda/encoding/kzg/prover" @@ -101,12 +102,7 @@ func newTestServer(t *testing.T, mockValidator bool) *grpc.Server { } noopMetrics := metrics.NewNoopMetrics() reg := prometheus.NewRegistry() - metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090") - store, err := node.NewLevelDBStore(dbPath, logger, metrics, 1e9, 1e9) - if err != nil { - panic("failed to create a new levelDB store") - } - defer os.Remove(dbPath) + tx := &coremock.MockTransactor{} ratelimiter := &commonmock.NoopRatelimiter{} @@ -138,6 +134,13 @@ func newTestServer(t *testing.T, mockValidator bool) *grpc.Server { val = core.NewShardValidator(v, asn, cst, opID) } + metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090", opID, -1, tx, chainState) + store, err := node.NewLevelDBStore(dbPath, logger, metrics, 1e9, 1e9) + if err != nil { + panic("failed to create a new levelDB store") + } + defer os.Remove(dbPath) + node := &node.Node{ Config: config, Logger: logger, diff --git a/node/metrics.go b/node/metrics.go index f523a7a57c..ce7fef5e9c 100644 --- a/node/metrics.go +++ b/node/metrics.go @@ -2,9 +2,13 @@ package node import ( "context" + "math/big" + "sort" "strconv" + "time" "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/eth" "github.com/Layr-Labs/eigensdk-go/logging" eigenmetrics "github.com/Layr-Labs/eigensdk-go/metrics" @@ -20,8 +24,8 @@ const ( type Metrics struct { logger logging.Logger - // Whether the node is registered. - Registered prometheus.Gauge + // The quorums the node is registered. + RegisteredQuorums *prometheus.GaugeVec // Accumulated number of RPC requests received. AccNumRequests *prometheus.CounterVec // The latency (in ms) to process the request. @@ -40,22 +44,29 @@ type Metrics struct { registry *prometheus.Registry // socketAddr is the address at which the metrics server will be listening. // should be in format ip:port - socketAddr string + socketAddr string + operatorId core.OperatorID + onchainMetricsInterval int64 + tx core.Transactor + chainState core.ChainState } -func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, logger logging.Logger, socketAddr string) *Metrics { +func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, logger logging.Logger, socketAddr string, operatorId core.OperatorID, onchainMetricsInterval int64, tx core.Transactor, chainState core.ChainState) *Metrics { // Add Go module collectors reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) reg.MustRegister(collectors.NewGoCollector()) metrics := &Metrics{ - Registered: promauto.With(reg).NewGauge( + // The "type" label have values: stake_share, rank. The "stake_share" is stake share (in basis point), + // and the "rank" is operator's ranking (the operator with highest amount of stake ranked as 1) by stake share in the quorum. + RegisteredQuorums: promauto.With(reg).NewGaugeVec( prometheus.GaugeOpts{ Namespace: Namespace, Name: "registered", - Help: "indicator about if DA node is registered", + Help: "the quorums the DA node is registered", }, + []string{"quorum", "type"}, ), // The "status" label has values: success, failure. AccNumRequests: promauto.With(reg).NewCounterVec( @@ -108,10 +119,14 @@ func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, log Help: "the total number of node's socket address updates", }, ), - EigenMetrics: eigenMetrics, - logger: logger.With("component", "NodeMetrics"), - registry: reg, - socketAddr: socketAddr, + EigenMetrics: eigenMetrics, + logger: logger.With("component", "NodeMetrics"), + registry: reg, + socketAddr: socketAddr, + operatorId: operatorId, + onchainMetricsInterval: onchainMetricsInterval, + tx: tx, + chainState: chainState, } return metrics @@ -119,6 +134,10 @@ func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, log func (g *Metrics) Start() { _ = g.EigenMetrics.Start(context.Background(), g.registry) + + if g.onchainMetricsInterval > 0 { + go g.collectOnchainMetrics() + } } func (g *Metrics) RecordRPCRequest(method string, status string) { @@ -150,3 +169,59 @@ func (g *Metrics) AcceptBatches(status string, batchSize uint64) { g.AccuBatches.WithLabelValues("number", status).Inc() g.AccuBatches.WithLabelValues("size", status).Add(float64(batchSize)) } + +func (g *Metrics) collectOnchainMetrics() { + ticker := time.NewTicker(time.Duration(uint64(g.onchainMetricsInterval))) + defer ticker.Stop() + + // 3 chain RPC calls in each cycle. + for range ticker.C { + ctx := context.Background() + blockNum, err := g.tx.GetCurrentBlockNumber(ctx) + if err != nil { + g.logger.Error("Failed to query chain RPC for current block number", "err", err) + continue + } + bitmaps, err := g.tx.GetQuorumBitmapForOperatorsAtBlockNumber(ctx, []core.OperatorID{g.operatorId}, blockNum) + if err != nil { + g.logger.Error("Failed to query chain RPC for quorum bitmap", "blockNumber", blockNum, "err", err) + continue + } + quorumIds := eth.BitmapToQuorumIds(bitmaps[0]) + if len(quorumIds) == 0 { + g.logger.Warn("This node is currently not in any quorum", "blockNumber", blockNum, "operatorId", g.operatorId.Hex()) + continue + } + state, err := g.chainState.GetOperatorState(ctx, uint(blockNum), quorumIds) + if err != nil { + g.logger.Error("Failed to query chain RPC for operator state", "blockNumber", blockNum, "quorumIds", quorumIds, "err", err) + continue + } + type OperatorStakeShare struct { + operatorId core.OperatorID + stakeShare float64 + } + for q, operators := range state.Operators { + operatorStakeShares := make([]*OperatorStakeShare, 0) + for opId, opInfo := range operators { + share, _ := new(big.Int).Div(new(big.Int).Mul(opInfo.Stake, big.NewInt(10000)), state.Totals[q].Stake).Float64() + operatorStakeShares = append(operatorStakeShares, &OperatorStakeShare{operatorId: opId, stakeShare: share}) + } + // Descending order by stake share in the quorum. + sort.Slice(operatorStakeShares, func(i, j int) bool { + if operatorStakeShares[i].stakeShare == operatorStakeShares[j].stakeShare { + return operatorStakeShares[i].operatorId.Hex() < operatorStakeShares[j].operatorId.Hex() + } + return operatorStakeShares[i].stakeShare > operatorStakeShares[j].stakeShare + }) + for i, op := range operatorStakeShares { + if op.operatorId == g.operatorId { + g.RegisteredQuorums.WithLabelValues(string(q), "stake_share").Set(op.stakeShare) + g.RegisteredQuorums.WithLabelValues(string(q), "rank").Set(float64(i + 1)) + g.logger.Info("Current operator registration onchain", "operatorId", g.operatorId.Hex(), "blockNumber", blockNum, "quorumId", q, "stakeShare (basis point)", op.stakeShare, "rank", i+1) + break + } + } + } + } +} diff --git a/node/node.go b/node/node.go index a2c6e3b817..1356e7d41e 100644 --- a/node/node.go +++ b/node/node.go @@ -70,8 +70,6 @@ func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger promReg := prometheus.NewRegistry() eigenMetrics := metrics.NewEigenMetrics(AppName, ":"+config.MetricsPort, promReg, logger.With("component", "EigenMetrics")) - - metrics := NewMetrics(eigenMetrics, promReg, logger, ":"+config.MetricsPort) rpcCallsCollector := rpccalls.NewCollector(AppName, promReg) // Generate BLS keys @@ -110,6 +108,8 @@ func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger // Setup Node Api nodeApi := nodeapi.NewNodeApi(AppName, SemVer, ":"+config.NodeApiPort, logger.With("component", "NodeApi")) + metrics := NewMetrics(eigenMetrics, promReg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx, cst) + // Make validator v, err := verifier.NewVerifier(&config.EncoderConfig, false) if err != nil { diff --git a/node/store_test.go b/node/store_test.go index cb28814af0..176345dd93 100644 --- a/node/store_test.go +++ b/node/store_test.go @@ -9,7 +9,10 @@ import ( commonpb "github.com/Layr-Labs/eigenda/api/grpc/common" pb "github.com/Layr-Labs/eigenda/api/grpc/node" "github.com/Layr-Labs/eigenda/core" + "github.com/Layr-Labs/eigenda/core/mock" + coremock "github.com/Layr-Labs/eigenda/core/mock" "github.com/Layr-Labs/eigenda/encoding" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/Layr-Labs/eigenda/node" "github.com/Layr-Labs/eigensdk-go/logging" @@ -177,7 +180,13 @@ func TestStoringBlob(t *testing.T) { noopMetrics := metrics.NewNoopMetrics() reg := prometheus.NewRegistry() logger := logging.NewNoopLogger() - s, _ := node.NewLevelDBStore(t.TempDir(), logger, node.NewMetrics(noopMetrics, reg, logger, ":9090"), staleMeasure, storeDuration) + operatorId := [32]byte(hexutil.MustDecode("0x3fbfefcdc76462d2cdb7d0cea75f27223829481b8b4aa6881c94cb2126a316ad")) + tx := &coremock.MockTransactor{} + dat, _ := mock.MakeChainDataMock(map[uint8]int{ + 0: 6, + 1: 3, + }) + s, _ := node.NewLevelDBStore(t.TempDir(), logger, node.NewMetrics(noopMetrics, reg, logger, ":9090", operatorId, -1, tx, dat), staleMeasure, storeDuration) ctx := context.Background() // Empty store diff --git a/test/integration_test.go b/test/integration_test.go index 9d5e40a314..ecda8fbe85 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -266,14 +266,6 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging _, v0 := mustMakeTestComponents() val := core.NewShardValidator(v0, asn, cst, id) - noopMetrics := metrics.NewNoopMetrics() - reg := prometheus.NewRegistry() - metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090") - store, err := node.NewLevelDBStore(config.DbPath+"/chunk", logger, metrics, 1e9, 1e9) - if err != nil { - t.Fatal(err) - } - tx := &coremock.MockTransactor{} tx.On("RegisterBLSPublicKey").Return(nil) tx.On("RegisterOperator").Return(nil) @@ -282,6 +274,14 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging tx.On("GetBlockStaleMeasure").Return(nil) tx.On("GetStoreDurationBlocks").Return(nil) + noopMetrics := metrics.NewNoopMetrics() + reg := prometheus.NewRegistry() + metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090", config.ID, -1, tx, cst) + store, err := node.NewLevelDBStore(config.DbPath+"/chunk", logger, metrics, 1e9, 1e9) + if err != nil { + t.Fatal(err) + } + mockOperatorSocketsFilterer := &coremock.MockOperatorSocketsFilterer{} mockSocketChan := make(chan string)