Skip to content

Commit

Permalink
Query the operator stake at current block number (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored Nov 14, 2023
1 parent 324f90b commit 5d260be
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 9 deletions.
1 change: 1 addition & 0 deletions disperser/cmd/dataapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func RunDataApi(ctx *cli.Context) error {
sharedStorage,
promClient,
subgraphClient,
tx,
chainState,
logger,
metrics,
Expand Down
16 changes: 8 additions & 8 deletions disperser/dataapi/metrics_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package dataapi
import (
"context"
"errors"
"fmt"
"math/big"
"time"

"github.com/Layr-Labs/eigenda/core"
"github.com/gammazero/workerpool"
)

Expand All @@ -22,7 +22,11 @@ func (s *server) getMetric(ctx context.Context, startTime int64, endTime int64,
return nil, err
}

totalStake, err := s.calculateTotalStake(operators)
blockNumber, err := s.transactor.GetCurrentBlockNumber(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get current block number: %w", err)
}
totalStake, err := s.calculateTotalStake(operators, blockNumber)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -69,25 +73,21 @@ func (s *server) getThroughput(ctx context.Context, start int64, end int64) ([]*
return calculateAverageThroughput(result.Values, avgThroughputWindowSize), nil
}

func (s *server) calculateTotalStake(operators []*Operator) (int64, error) {
func (s *server) calculateTotalStake(operators []*Operator, blockNumber uint32) (int64, error) {
var (
totalStakeByOperatorChan = make(chan *big.Int, len(operators))
pool = workerpool.New(maxWorkersGetOperatorState)
)

for _, o := range operators {
var (
operatorId core.OperatorID
blockNumber = uint(o.BlockNumber)
)
operatorId, err := ConvertHexadecimalToBytes(o.OperatorId)
if err != nil {
s.logger.Error("Failed to convert operator id to hex string: ", "operatorId", operatorId, "err", err)
return 0, err
}

pool.Submit(func() {
operatorState, err := s.chainState.GetOperatorStateByOperator(context.Background(), blockNumber, operatorId)
operatorState, err := s.chainState.GetOperatorStateByOperator(context.Background(), uint(blockNumber), operatorId)
if err != nil {
s.logger.Error("Failed to get operator state: ", "operatorId", operatorId, "blockNumber", blockNumber, "err", err)
totalStakeByOperatorChan <- big.NewInt(-1)
Expand Down
4 changes: 4 additions & 0 deletions disperser/dataapi/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"

"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/dataapi/docs"
"github.com/gin-contrib/cors"
Expand Down Expand Up @@ -79,6 +80,7 @@ type (
blobstore disperser.BlobStore
promClient PrometheusClient
subgraphClient SubgraphClient
transactor core.Transactor
chainState core.ChainState

metrics *Metrics
Expand All @@ -90,6 +92,7 @@ func NewServer(
blobstore disperser.BlobStore,
promClient PrometheusClient,
subgraphClient SubgraphClient,
transactor core.Transactor,
chainState core.ChainState,
logger common.Logger,
metrics *Metrics,
Expand All @@ -102,6 +105,7 @@ func NewServer(
blobstore: blobstore,
promClient: promClient,
subgraphClient: subgraphClient,
transactor: transactor,
chainState: chainState,
metrics: metrics,
}
Expand Down
4 changes: 3 additions & 1 deletion disperser/dataapi/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ var (
subgraphClient = dataapi.NewSubgraphClient(mockSubgraphApi)
config = dataapi.Config{ServerMode: "test", SocketAddr: ":8080"}

mockTx = &coremock.MockTransactor{}
mockChainState, _ = coremock.NewChainDataMock(core.OperatorIndex(1))
testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockChainState, &commock.Logger{}, dataapi.NewMetrics("9001", &commock.Logger{}))
testDataApiServer = dataapi.NewServer(config, blobstore, prometheusClient, subgraphClient, mockTx, mockChainState, &commock.Logger{}, dataapi.NewMetrics("9001", &commock.Logger{}))
expectedBatchHeaderHash = [32]byte{1, 2, 3}
expectedBlobIndex = uint32(1)
expectedRequestedAt = uint64(5567830000000000000)
Expand Down Expand Up @@ -159,6 +160,7 @@ func TestFetchMetricsHandler(t *testing.T) {

matrix := make(model.Matrix, 0)
matrix = append(matrix, s)
mockTx.On("GetCurrentBlockNumber").Return(uint32(1), nil)
mockSubgraphApi.On("QueryBatches").Return(subgraphBatches, nil)
mockSubgraphApi.On("QueryOperators").Return(subgraphOperatorRegistereds, nil)
mockPrometheusApi.On("QueryRange").Return(matrix, nil, nil)
Expand Down

0 comments on commit 5d260be

Please sign in to comment.