Skip to content

Commit

Permalink
Log semver scan operator id as hex (#802)
Browse files Browse the repository at this point in the history
  • Loading branch information
pschork authored Oct 31, 2024
1 parent 15bd7ee commit f4015d6
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 41 deletions.
67 changes: 56 additions & 11 deletions disperser/common/semver/semver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package semver

import (
"context"
"math/big"
"strings"
"sync"
"time"
Expand All @@ -13,19 +14,57 @@ import (
"google.golang.org/grpc/credentials/insecure"
)

func ScanOperators(operators map[core.OperatorID]*core.IndexedOperatorInfo, numWorkers int, nodeInfoTimeout time.Duration, logger logging.Logger) map[string]int {
type SemverMetrics struct {
Semver string `json:"semver"`
Operators uint8 `json:"count"`
OperatorIds []string `json:"operators"`
QuorumStakePercentage map[uint8]float64 `json:"stake_percentage"`
}

func ScanOperators(operators map[core.OperatorID]*core.IndexedOperatorInfo, operatorState *core.OperatorState, useRetrievalSocket bool, numWorkers int, nodeInfoTimeout time.Duration, logger logging.Logger) map[string]*SemverMetrics {
var wg sync.WaitGroup
var mu sync.Mutex
semvers := make(map[string]int)
semvers := make(map[string]*SemverMetrics)
operatorChan := make(chan core.OperatorID, len(operators))
worker := func() {
for operatorId := range operatorChan {
operatorSocket := core.OperatorSocket(operators[operatorId].Socket)
dispersalSocket := operatorSocket.GetDispersalSocket()
semver := GetSemverInfo(context.Background(), dispersalSocket, operatorId, logger, nodeInfoTimeout)
var socket string
if useRetrievalSocket {
socket = operatorSocket.GetRetrievalSocket()
} else {
socket = operatorSocket.GetDispersalSocket()
}
semver := GetSemverInfo(context.Background(), socket, useRetrievalSocket, operatorId, logger, nodeInfoTimeout)

mu.Lock()
semvers[semver]++
if _, exists := semvers[semver]; !exists {
semvers[semver] = &SemverMetrics{
Semver: semver,
Operators: 1,
OperatorIds: []string{operatorId.Hex()},
QuorumStakePercentage: make(map[uint8]float64),
}
} else {
semvers[semver].Operators += 1
semvers[semver].OperatorIds = append(semvers[semver].OperatorIds, operatorId.Hex())
}

// Calculate stake percentage for each quorum
for quorum, totalOperatorInfo := range operatorState.Totals {
stakePercentage := float64(0)
if stake, ok := operatorState.Operators[quorum][operatorId]; ok {
totalStake := new(big.Float).SetInt(totalOperatorInfo.Stake)
operatorStake := new(big.Float).SetInt(stake.Stake)
stakePercentage, _ = new(big.Float).Mul(big.NewFloat(100), new(big.Float).Quo(operatorStake, totalStake)).Float64()
}

if _, exists := semvers[semver].QuorumStakePercentage[quorum]; !exists {
semvers[semver].QuorumStakePercentage[quorum] = stakePercentage
} else {
semvers[semver].QuorumStakePercentage[quorum] += stakePercentage
}
}
mu.Unlock()
}
wg.Done()
Expand All @@ -49,16 +88,22 @@ func ScanOperators(operators map[core.OperatorID]*core.IndexedOperatorInfo, numW
}

// query operator host info endpoint if available
func GetSemverInfo(ctx context.Context, socket string, operatorId core.OperatorID, logger logging.Logger, timeout time.Duration) string {
func GetSemverInfo(ctx context.Context, socket string, userRetrievalClient bool, operatorId core.OperatorID, logger logging.Logger, timeout time.Duration) string {
conn, err := grpc.Dial(socket, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return "unreachable"
}
defer conn.Close()
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
client := node.NewDispersalClient(conn)
reply, err := client.NodeInfo(ctxWithTimeout, &node.NodeInfoRequest{})
var reply *node.NodeInfoReply
if userRetrievalClient {
client := node.NewRetrievalClient(conn)
reply, err = client.NodeInfo(ctxWithTimeout, &node.NodeInfoRequest{})
} else {
client := node.NewDispersalClient(conn)
reply, err = client.NodeInfo(ctxWithTimeout, &node.NodeInfoRequest{})
}
if err != nil {
var semver string
if strings.Contains(err.Error(), "unknown method NodeInfo") {
Expand All @@ -73,15 +118,15 @@ func GetSemverInfo(ctx context.Context, socket string, operatorId core.OperatorI
semver = "error"
}

logger.Warn("NodeInfo", "operatorId", operatorId, "semver", semver, "error", err)
logger.Warn("NodeInfo", "operatorId", operatorId.Hex(), "semver", semver, "error", err)
return semver
}

// local node source compiles without semver
if reply.Semver == "" {
reply.Semver = "src-compile"
reply.Semver = "0.8.4"
}

logger.Info("NodeInfo", "operatorId", operatorId, "socker", socket, "semver", reply.Semver, "os", reply.Os, "arch", reply.Arch, "numCpu", reply.NumCpu, "memBytes", reply.MemBytes)
logger.Info("NodeInfo", "operatorId", operatorId.Hex(), "socket", socket, "userRetrievalClient", userRetrievalClient, "semver", reply.Semver, "os", reply.Os, "arch", reply.Arch, "numCpu", reply.NumCpu, "memBytes", reply.MemBytes)
return reply.Semver
}
7 changes: 4 additions & 3 deletions disperser/dataapi/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/common/blobstore"
"github.com/Layr-Labs/eigenda/disperser/common/semver"
"github.com/Layr-Labs/eigenda/operators"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -110,9 +111,9 @@ func (g *Metrics) IncrementNotFoundRequestNum(method string) {
}

// UpdateSemverMetrics updates the semver metrics
func (g *Metrics) UpdateSemverCounts(semverData map[string]int) {
for semver, count := range semverData {
g.Semvers.WithLabelValues(semver).Set(float64(count))
func (g *Metrics) UpdateSemverCounts(semverData map[string]*semver.SemverMetrics) {
for semver, metrics := range semverData {
g.Semvers.WithLabelValues(semver).Set(float64(metrics.Operators))
}
}

Expand Down
13 changes: 9 additions & 4 deletions disperser/dataapi/queried_operators_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,20 @@ func (s *server) scanOperatorsHostInfo(ctx context.Context) (*SemverReportRespon
if err != nil {
return nil, fmt.Errorf("failed to fetch current block number - %s", err)
}
operatorState, err := s.indexedChainState.GetIndexedOperatorState(context.Background(), currentBlock, []core.QuorumID{0, 1, 2})
operators, err := s.indexedChainState.GetIndexedOperators(context.Background(), currentBlock)
if err != nil {
return nil, fmt.Errorf("failed to fetch indexed operator state - %s", err)
return nil, fmt.Errorf("failed to fetch indexed operator info - %s", err)
}
s.logger.Info("Queried indexed operators", "operators", len(operators), "block", currentBlock)
operatorState, err := s.chainState.GetOperatorState(context.Background(), currentBlock, []core.QuorumID{0, 1, 2})
if err != nil {
return nil, fmt.Errorf("failed to fetch operator state - %s", err)
}
s.logger.Info("Queried operator state", "count", len(operatorState.IndexedOperators))

nodeInfoWorkers := 20
nodeInfoTimeout := time.Duration(1 * time.Second)
semvers := semver.ScanOperators(operatorState.IndexedOperators, nodeInfoWorkers, nodeInfoTimeout, s.logger)
useRetrievalClient := false
semvers := semver.ScanOperators(operators, operatorState, useRetrievalClient, nodeInfoWorkers, nodeInfoTimeout, s.logger)

// Create HostInfoReportResponse instance
semverReport := &SemverReportResponse{
Expand Down
3 changes: 2 additions & 1 deletion disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/common/semver"
"github.com/Layr-Labs/eigenda/disperser/dataapi/docs"
"github.com/gin-contrib/cors"
"github.com/gin-contrib/logger"
Expand Down Expand Up @@ -181,7 +182,7 @@ type (
RetrievalOnline bool `json:"retrieval_online"`
}
SemverReportResponse struct {
Semver map[string]int `json:"semver"`
Semver map[string]*semver.SemverMetrics `json:"semver"`
}

ErrorResponse struct {
Expand Down
65 changes: 50 additions & 15 deletions tools/semverscan/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,38 +59,73 @@ func RunScan(ctx *cli.Context) error {
if err != nil {
log.Fatalln("could not start tcp listener", err)
}
cs := eth.NewChainState(tx, gethClient)
chainState := eth.NewChainState(tx, gethClient)

logger.Info("Connecting to subgraph", "url", config.ChainStateConfig.Endpoint)
ics := thegraph.MakeIndexedChainState(config.ChainStateConfig, cs, logger)
ics := thegraph.MakeIndexedChainState(config.ChainStateConfig, chainState, logger)

currentBlock, err := ics.GetCurrentBlockNumber()
if err != nil {
return fmt.Errorf("failed to fetch current block number - %s", err)
}
operatorState, err := ics.GetIndexedOperatorState(context.Background(), currentBlock, []core.QuorumID{0, 1, 2})
operatorState, err := chainState.GetOperatorState(context.Background(), currentBlock, []core.QuorumID{0, 1, 2})
if err != nil {
return fmt.Errorf("failed to fetch indexed operator state - %s", err)
return fmt.Errorf("failed to fetch operator state - %s", err)
}
logger.Info("Queried operator state", "count", len(operatorState.IndexedOperators))
operators, err := ics.GetIndexedOperators(context.Background(), currentBlock)
if err != nil {
return fmt.Errorf("failed to fetch indexed operators info - %s", err)
}
if config.OperatorId != "" {
operatorId, err := core.OperatorIDFromHex(config.OperatorId)
if err != nil {
return fmt.Errorf("failed to parse operator id %s - %v", config.OperatorId, err)
}
for operator := range operators {
if operator.Hex() != operatorId.Hex() {
delete(operators, operator)
}
}
}
logger.Info("Queried operator state", "count", len(operators))

semvers := semver.ScanOperators(operatorState.IndexedOperators, config.Workers, config.Timeout, logger)
semvers := semver.ScanOperators(operators, operatorState, config.UseRetrievalClient, config.Workers, config.Timeout, logger)
for semver, metrics := range semvers {
logger.Info("Semver Report", "semver", semver, "operators", metrics.Operators, "stake", metrics.QuorumStakePercentage)
}
displayResults(semvers)
return nil
}

func displayResults(results map[string]int) {
func displayResults(results map[string]*semver.SemverMetrics) {
tw := table.NewWriter()
rowAutoMerge := table.RowConfig{AutoMerge: true}
tw.AppendHeader(table.Row{"semver", "install %", "operators", "quorum 0 stake %", "quorum 1 stake %", "quorum 2 stake %"}, rowAutoMerge)
//tw.AppendHeader(table.Row{"", "", "quorum 0", "quorum 1", "quorum 2"})

rowHeader := table.Row{"semver", "count"}
tw.AppendHeader(rowHeader)

total := 0
for semver, count := range results {
tw.AppendRow(table.Row{semver, count})
total += count
total_operators := 0
total_semver_pct := 0.0
total_stake_q0 := 0.0
total_stake_q1 := 0.0
total_stake_q2 := 0.0
for _, metrics := range results {
total_operators += int(metrics.Operators)
total_stake_q0 += metrics.QuorumStakePercentage[0]
total_stake_q1 += metrics.QuorumStakePercentage[1]
total_stake_q2 += metrics.QuorumStakePercentage[2]
}
for semver, metrics := range results {
semver_pct := 100 * (float64(metrics.Operators) / float64(total_operators))
total_semver_pct += semver_pct
tw.AppendRow(table.Row{semver, semver_pct, metrics.Operators, metrics.QuorumStakePercentage[0], metrics.QuorumStakePercentage[1], metrics.QuorumStakePercentage[2]})
}
tw.AppendFooter(table.Row{"total", total})
tw.AppendFooter(table.Row{"totals", total_semver_pct, total_operators, total_stake_q0, total_stake_q1, total_stake_q2})
tw.SetColumnConfigs([]table.ColumnConfig{
{Number: 1, AutoMerge: true},
{Number: 3, AlignHeader: 2},
{Number: 4, AlignHeader: 2},
{Number: 5, AlignHeader: 2},
})

fmt.Println(tw.Render())
}
11 changes: 7 additions & 4 deletions tools/semverscan/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import (
)

type Config struct {
LoggerConfig common.LoggerConfig
Workers int
OperatorId string
Timeout time.Duration
LoggerConfig common.LoggerConfig
Workers int
OperatorId string
Timeout time.Duration
UseRetrievalClient bool

ChainStateConfig thegraph.Config
EthClientConfig geth.EthClientConfig

Expand All @@ -27,6 +29,7 @@ func ReadConfig(ctx *cli.Context) *Config {
Timeout: ctx.Duration(flags.TimeoutFlag.Name),
Workers: ctx.Int(flags.WorkersFlag.Name),
OperatorId: ctx.String(flags.OperatorIdFlag.Name),
UseRetrievalClient: ctx.Bool(flags.UseRetrievalClientFlag.Name),
ChainStateConfig: thegraph.ReadCLIConfig(ctx),
EthClientConfig: geth.ReadEthClientConfig(ctx),
BLSOperatorStateRetrieverAddr: ctx.GlobalString(flags.BlsOperatorStateRetrieverFlag.Name),
Expand Down
13 changes: 10 additions & 3 deletions tools/semverscan/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,25 +31,31 @@ var (
/* Optional Flags*/
TimeoutFlag = cli.DurationFlag{
Name: common.PrefixFlag(FlagPrefix, "timeout"),
Usage: "seconds to wait for GPRC response",
Usage: "Seconds to wait for GPRC response",
Required: false,
EnvVar: common.PrefixEnvVar(envPrefix, "TIMEOUT"),
Value: 3 * time.Second,
}
WorkersFlag = cli.UintFlag{
Name: common.PrefixFlag(FlagPrefix, "workers"),
Usage: "maximum number of concurrent node info requests",
Usage: "Maximum number of concurrent node info requests",
Required: false,
EnvVar: common.PrefixEnvVar(envPrefix, "WORKERS"),
Value: 10,
}
OperatorIdFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "operator-id"),
Usage: "operator id to scan",
Usage: "Operator ID to scan",
Required: false,
EnvVar: common.PrefixEnvVar(envPrefix, "OPERATOR_ID"),
Value: "",
}
UseRetrievalClientFlag = cli.BoolFlag{
Name: common.PrefixFlag(FlagPrefix, "use-retrieval-client"),
Usage: "Use retrieval client to get operator info (default: false)",
Required: false,
EnvVar: common.PrefixEnvVar(envPrefix, "USE_RETRIEVAL_CLIENT"),
}
)

var requiredFlags = []cli.Flag{
Expand All @@ -61,6 +67,7 @@ var optionalFlags = []cli.Flag{
TimeoutFlag,
WorkersFlag,
OperatorIdFlag,
UseRetrievalClientFlag,
}

// Flags contains the list of configuration options available to the binary.
Expand Down

0 comments on commit f4015d6

Please sign in to comment.