Skip to content

Commit

Permalink
Add operator registration metric at node (#502)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Apr 19, 2024
1 parent a05def3 commit 898eb10
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 27 deletions.
1 change: 1 addition & 0 deletions inabox/deploy/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func (env *Config) generateOperatorVars(ind int, name, key, churnerUrl, logPath,
NODE_PUBLIC_IP_PROVIDER: "mockip",
NODE_PUBLIC_IP_CHECK_INTERVAL: "10s",
NODE_NUM_CONFIRMATIONS: "0",
NODE_ONCHAIN_METRICS_INTERVAL: "-1",
}

env.applyDefaults(&v, "NODE", "opr", ind)
Expand Down
2 changes: 2 additions & 0 deletions inabox/deploy/env_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ type OperatorVars struct {
NODE_LOG_PATH string

NODE_LOG_FORMAT string

NODE_ONCHAIN_METRICS_INTERVAL string
}

func (vars OperatorVars) getEnvMap() map[string]string {
Expand Down
2 changes: 2 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Config struct {
NodeApiPort string
EnableMetrics bool
MetricsPort string
OnchainMetricsInterval int64
Timeout time.Duration
RegisterNodeAtStart bool
ExpirationPollIntervalSec uint64
Expand Down Expand Up @@ -167,6 +168,7 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
NodeApiPort: ctx.GlobalString(flags.NodeApiPortFlag.Name),
EnableMetrics: ctx.GlobalBool(flags.EnableMetricsFlag.Name),
MetricsPort: ctx.GlobalString(flags.MetricsPortFlag.Name),
OnchainMetricsInterval: ctx.GlobalInt64(flags.OnchainMetricsIntervalFlag.Name),
Timeout: timeout,
RegisterNodeAtStart: registerNodeAtStart,
ExpirationPollIntervalSec: expirationPollIntervalSec,
Expand Down
8 changes: 8 additions & 0 deletions node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ var (
Value: "9091",
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "METRICS_PORT"),
}
OnchainMetricsIntervalFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "onchain-metrics-interval"),
Usage: "The interval in seconds at which the node polls the onchain state of the operator and update metrics. <=0 means no poll",
Required: false,
Value: "180",
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "ONCHAIN_METRICS_INTERVAL"),
}
TimeoutFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "timeout"),
Usage: "Amount of time to wait for GPRC",
Expand Down Expand Up @@ -238,6 +245,7 @@ var requiredFlags = []cli.Flag{
RetrievalPortFlag,
EnableMetricsFlag,
MetricsPortFlag,
OnchainMetricsIntervalFlag,
EnableNodeApiFlag,
NodeApiPortFlag,
TimeoutFlag,
Expand Down
15 changes: 9 additions & 6 deletions node/grpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
commonmock "github.com/Layr-Labs/eigenda/common/mock"
"github.com/Layr-Labs/eigenda/core"
core_mock "github.com/Layr-Labs/eigenda/core/mock"
coremock "github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/Layr-Labs/eigenda/encoding/kzg"
"github.com/Layr-Labs/eigenda/encoding/kzg/prover"
Expand Down Expand Up @@ -101,12 +102,7 @@ func newTestServer(t *testing.T, mockValidator bool) *grpc.Server {
}
noopMetrics := metrics.NewNoopMetrics()
reg := prometheus.NewRegistry()
metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090")
store, err := node.NewLevelDBStore(dbPath, logger, metrics, 1e9, 1e9)
if err != nil {
panic("failed to create a new levelDB store")
}
defer os.Remove(dbPath)
tx := &coremock.MockTransactor{}

ratelimiter := &commonmock.NoopRatelimiter{}

Expand Down Expand Up @@ -138,6 +134,13 @@ func newTestServer(t *testing.T, mockValidator bool) *grpc.Server {
val = core.NewShardValidator(v, asn, cst, opID)
}

metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090", opID, -1, tx, chainState)
store, err := node.NewLevelDBStore(dbPath, logger, metrics, 1e9, 1e9)
if err != nil {
panic("failed to create a new levelDB store")
}
defer os.Remove(dbPath)

node := &node.Node{
Config: config,
Logger: logger,
Expand Down
95 changes: 85 additions & 10 deletions node/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package node

import (
"context"
"math/big"
"sort"
"strconv"
"time"

"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/eth"
"github.com/Layr-Labs/eigensdk-go/logging"
eigenmetrics "github.com/Layr-Labs/eigensdk-go/metrics"

Expand All @@ -20,8 +24,8 @@ const (
type Metrics struct {
logger logging.Logger

// Whether the node is registered.
Registered prometheus.Gauge
// The quorums the node is registered.
RegisteredQuorums *prometheus.GaugeVec
// Accumulated number of RPC requests received.
AccNumRequests *prometheus.CounterVec
// The latency (in ms) to process the request.
Expand All @@ -40,22 +44,29 @@ type Metrics struct {
registry *prometheus.Registry
// socketAddr is the address at which the metrics server will be listening.
// should be in format ip:port
socketAddr string
socketAddr string
operatorId core.OperatorID
onchainMetricsInterval int64
tx core.Transactor
chainState core.ChainState
}

func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, logger logging.Logger, socketAddr string) *Metrics {
func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, logger logging.Logger, socketAddr string, operatorId core.OperatorID, onchainMetricsInterval int64, tx core.Transactor, chainState core.ChainState) *Metrics {

// Add Go module collectors
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
reg.MustRegister(collectors.NewGoCollector())

metrics := &Metrics{
Registered: promauto.With(reg).NewGauge(
// The "type" label have values: stake_share, rank. The "stake_share" is stake share (in basis point),
// and the "rank" is operator's ranking (the operator with highest amount of stake ranked as 1) by stake share in the quorum.
RegisteredQuorums: promauto.With(reg).NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Name: "registered",
Help: "indicator about if DA node is registered",
Help: "the quorums the DA node is registered",
},
[]string{"quorum", "type"},
),
// The "status" label has values: success, failure.
AccNumRequests: promauto.With(reg).NewCounterVec(
Expand Down Expand Up @@ -108,17 +119,25 @@ func NewMetrics(eigenMetrics eigenmetrics.Metrics, reg *prometheus.Registry, log
Help: "the total number of node's socket address updates",
},
),
EigenMetrics: eigenMetrics,
logger: logger.With("component", "NodeMetrics"),
registry: reg,
socketAddr: socketAddr,
EigenMetrics: eigenMetrics,
logger: logger.With("component", "NodeMetrics"),
registry: reg,
socketAddr: socketAddr,
operatorId: operatorId,
onchainMetricsInterval: onchainMetricsInterval,
tx: tx,
chainState: chainState,
}

return metrics
}

func (g *Metrics) Start() {
_ = g.EigenMetrics.Start(context.Background(), g.registry)

if g.onchainMetricsInterval > 0 {
go g.collectOnchainMetrics()
}
}

func (g *Metrics) RecordRPCRequest(method string, status string) {
Expand Down Expand Up @@ -150,3 +169,59 @@ func (g *Metrics) AcceptBatches(status string, batchSize uint64) {
g.AccuBatches.WithLabelValues("number", status).Inc()
g.AccuBatches.WithLabelValues("size", status).Add(float64(batchSize))
}

func (g *Metrics) collectOnchainMetrics() {
ticker := time.NewTicker(time.Duration(uint64(g.onchainMetricsInterval)))
defer ticker.Stop()

// 3 chain RPC calls in each cycle.
for range ticker.C {
ctx := context.Background()
blockNum, err := g.tx.GetCurrentBlockNumber(ctx)
if err != nil {
g.logger.Error("Failed to query chain RPC for current block number", "err", err)
continue
}
bitmaps, err := g.tx.GetQuorumBitmapForOperatorsAtBlockNumber(ctx, []core.OperatorID{g.operatorId}, blockNum)
if err != nil {
g.logger.Error("Failed to query chain RPC for quorum bitmap", "blockNumber", blockNum, "err", err)
continue
}
quorumIds := eth.BitmapToQuorumIds(bitmaps[0])
if len(quorumIds) == 0 {
g.logger.Warn("This node is currently not in any quorum", "blockNumber", blockNum, "operatorId", g.operatorId.Hex())
continue
}
state, err := g.chainState.GetOperatorState(ctx, uint(blockNum), quorumIds)
if err != nil {
g.logger.Error("Failed to query chain RPC for operator state", "blockNumber", blockNum, "quorumIds", quorumIds, "err", err)
continue
}
type OperatorStakeShare struct {
operatorId core.OperatorID
stakeShare float64
}
for q, operators := range state.Operators {
operatorStakeShares := make([]*OperatorStakeShare, 0)
for opId, opInfo := range operators {
share, _ := new(big.Int).Div(new(big.Int).Mul(opInfo.Stake, big.NewInt(10000)), state.Totals[q].Stake).Float64()
operatorStakeShares = append(operatorStakeShares, &OperatorStakeShare{operatorId: opId, stakeShare: share})
}
// Descending order by stake share in the quorum.
sort.Slice(operatorStakeShares, func(i, j int) bool {
if operatorStakeShares[i].stakeShare == operatorStakeShares[j].stakeShare {
return operatorStakeShares[i].operatorId.Hex() < operatorStakeShares[j].operatorId.Hex()
}
return operatorStakeShares[i].stakeShare > operatorStakeShares[j].stakeShare
})
for i, op := range operatorStakeShares {
if op.operatorId == g.operatorId {
g.RegisteredQuorums.WithLabelValues(string(q), "stake_share").Set(op.stakeShare)
g.RegisteredQuorums.WithLabelValues(string(q), "rank").Set(float64(i + 1))
g.logger.Info("Current operator registration onchain", "operatorId", g.operatorId.Hex(), "blockNumber", blockNum, "quorumId", q, "stakeShare (basis point)", op.stakeShare, "rank", i+1)
break
}
}
}
}
}
4 changes: 2 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger

promReg := prometheus.NewRegistry()
eigenMetrics := metrics.NewEigenMetrics(AppName, ":"+config.MetricsPort, promReg, logger.With("component", "EigenMetrics"))

metrics := NewMetrics(eigenMetrics, promReg, logger, ":"+config.MetricsPort)
rpcCallsCollector := rpccalls.NewCollector(AppName, promReg)

// Generate BLS keys
Expand Down Expand Up @@ -110,6 +108,8 @@ func NewNode(config *Config, pubIPProvider pubip.Provider, logger logging.Logger
// Setup Node Api
nodeApi := nodeapi.NewNodeApi(AppName, SemVer, ":"+config.NodeApiPort, logger.With("component", "NodeApi"))

metrics := NewMetrics(eigenMetrics, promReg, logger, ":"+config.MetricsPort, config.ID, config.OnchainMetricsInterval, tx, cst)

// Make validator
v, err := verifier.NewVerifier(&config.EncoderConfig, false)
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion node/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (
commonpb "github.com/Layr-Labs/eigenda/api/grpc/common"
pb "github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/mock"
coremock "github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/encoding"
"github.com/ethereum/go-ethereum/common/hexutil"

"github.com/Layr-Labs/eigenda/node"
"github.com/Layr-Labs/eigensdk-go/logging"
Expand Down Expand Up @@ -177,7 +180,13 @@ func TestStoringBlob(t *testing.T) {
noopMetrics := metrics.NewNoopMetrics()
reg := prometheus.NewRegistry()
logger := logging.NewNoopLogger()
s, _ := node.NewLevelDBStore(t.TempDir(), logger, node.NewMetrics(noopMetrics, reg, logger, ":9090"), staleMeasure, storeDuration)
operatorId := [32]byte(hexutil.MustDecode("0x3fbfefcdc76462d2cdb7d0cea75f27223829481b8b4aa6881c94cb2126a316ad"))
tx := &coremock.MockTransactor{}
dat, _ := mock.MakeChainDataMock(map[uint8]int{
0: 6,
1: 3,
})
s, _ := node.NewLevelDBStore(t.TempDir(), logger, node.NewMetrics(noopMetrics, reg, logger, ":9090", operatorId, -1, tx, dat), staleMeasure, storeDuration)
ctx := context.Background()

// Empty store
Expand Down
16 changes: 8 additions & 8 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,6 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging
_, v0 := mustMakeTestComponents()
val := core.NewShardValidator(v0, asn, cst, id)

noopMetrics := metrics.NewNoopMetrics()
reg := prometheus.NewRegistry()
metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090")
store, err := node.NewLevelDBStore(config.DbPath+"/chunk", logger, metrics, 1e9, 1e9)
if err != nil {
t.Fatal(err)
}

tx := &coremock.MockTransactor{}
tx.On("RegisterBLSPublicKey").Return(nil)
tx.On("RegisterOperator").Return(nil)
Expand All @@ -282,6 +274,14 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging
tx.On("GetBlockStaleMeasure").Return(nil)
tx.On("GetStoreDurationBlocks").Return(nil)

noopMetrics := metrics.NewNoopMetrics()
reg := prometheus.NewRegistry()
metrics := node.NewMetrics(noopMetrics, reg, logger, ":9090", config.ID, -1, tx, cst)
store, err := node.NewLevelDBStore(config.DbPath+"/chunk", logger, metrics, 1e9, 1e9)
if err != nil {
t.Fatal(err)
}

mockOperatorSocketsFilterer := &coremock.MockOperatorSocketsFilterer{}

mockSocketChan := make(chan string)
Expand Down

0 comments on commit 898eb10

Please sign in to comment.