Skip to content

Commit

Permalink
Measure attestation latency (#424)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Apr 1, 2024
1 parent 09d3031 commit 8a817e5
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 11 deletions.
3 changes: 2 additions & 1 deletion core/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"errors"
"fmt"
"math/big"
"sort"

Expand Down Expand Up @@ -171,7 +172,7 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, state
aggPubKeys[ind].Add(op.PubkeyG2)
}
}
a.Logger.Info("received signature from operator", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "quorumIDs", operatorQuorums)
a.Logger.Info("received signature from operator", "operatorID", operatorIDHex, "operatorAddress", operatorAddr, "socket", socket, "quorumIDs", fmt.Sprint(operatorQuorums))
}

// Aggregate Non signer Pubkey Id
Expand Down
2 changes: 1 addition & 1 deletion disperser/batcher/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ func (b *Batcher) ProcessConfirmedBatch(ctx context.Context, receiptOrErr *Recei
b.logger.Error("failed to update confirmation info", "failed", len(blobsToRetry), "total", len(blobs))
_ = b.handleFailure(ctx, blobsToRetry, FailUpdateConfirmationInfo)
}
b.logger.Debug("Update confirmation info took", "duration", time.Since(stageTimer))
b.logger.Debug("Update confirmation info took", "duration", time.Since(stageTimer).String())
b.Metrics.ObserveLatency("UpdateConfirmationInfo", float64(time.Since(stageTimer).Milliseconds()))
batchSize := int64(0)
for _, blobMeta := range blobs {
Expand Down
14 changes: 10 additions & 4 deletions disperser/batcher/grpc/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/Layr-Labs/eigenda/api/grpc/node"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/disperser"
"github.com/Layr-Labs/eigenda/disperser/batcher"
"github.com/Layr-Labs/eigensdk-go/logging"

"google.golang.org/grpc"
Expand All @@ -22,13 +23,15 @@ type Config struct {
type dispatcher struct {
*Config

logger logging.Logger
logger logging.Logger
metrics *batcher.DispatcherMetrics
}

func NewDispatcher(cfg *Config, logger logging.Logger) *dispatcher {
func NewDispatcher(cfg *Config, logger logging.Logger, metrics *batcher.DispatcherMetrics) *dispatcher {
return &dispatcher{
Config: cfg,
logger: logger.With("component", "Dispatcher"),
Config: cfg,
logger: logger.With("component", "Dispatcher"),
metrics: metrics,
}
}

Expand Down Expand Up @@ -68,19 +71,22 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera
return
}

requestedAt := time.Now()
sig, err := c.sendChunks(ctx, blobMessages, batchHeader, &op)
if err != nil {
update <- core.SignerMessage{
Err: err,
Signature: nil,
Operator: id,
}
c.metrics.ObserveLatency(false, float64(time.Since(requestedAt).Milliseconds()))
} else {
update <- core.SignerMessage{
Signature: sig,
Operator: id,
Err: nil,
}
c.metrics.ObserveLatency(true, float64(time.Since(requestedAt).Milliseconds()))
}

}(core.IndexedOperatorInfo{
Expand Down
26 changes: 26 additions & 0 deletions disperser/batcher/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,15 @@ type FinalizerMetrics struct {
Latency *prometheus.SummaryVec
}

type DispatcherMetrics struct {
Latency *prometheus.SummaryVec
}

type Metrics struct {
*EncodingStreamerMetrics
*TxnManagerMetrics
*FinalizerMetrics
*DispatcherMetrics

registry *prometheus.Registry

Expand Down Expand Up @@ -149,10 +154,23 @@ func NewMetrics(httpPort string, logger logging.Logger) *Metrics {
),
}

dispatcherMatrics := DispatcherMetrics{
Latency: promauto.With(reg).NewSummaryVec(
prometheus.SummaryOpts{
Namespace: namespace,
Name: "attestation_latency_ms",
Help: "attestation latency summary in milliseconds",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
},
[]string{"status"},
),
}

metrics := &Metrics{
EncodingStreamerMetrics: &encodingStreamerMetrics,
TxnManagerMetrics: &txnManagerMetrics,
FinalizerMetrics: &finalizerMetrics,
DispatcherMetrics: &dispatcherMatrics,
Blob: promauto.With(reg).NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Expand Down Expand Up @@ -222,6 +240,14 @@ func (g *Metrics) UpdateAttestation(operatorCount map[core.QuorumID]int, signerC
}
}

func (t *DispatcherMetrics) ObserveLatency(success bool, latencyMS float64) {
label := "success"
if !success {
label = "failure"
}
t.Latency.WithLabelValues(label).Observe(latencyMS)
}

// UpdateCompletedBlob increments the number and updates size of processed blobs.
func (g *Metrics) UpdateCompletedBlob(size int, status disperser.BlobStatus) {
switch status {
Expand Down
6 changes: 3 additions & 3 deletions disperser/cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,11 @@ func RunBatcher(ctx *cli.Context) error {
return err
}

metrics := batcher.NewMetrics(config.MetricsConfig.HTTPPort, logger)

dispatcher := dispatcher.NewDispatcher(&dispatcher.Config{
Timeout: config.TimeoutConfig.AttestationTimeout,
}, logger)
}, logger, metrics.DispatcherMetrics)
asgn := &core.StdAssignmentCoordinator{}

client, err := geth.NewMultiHomingClient(config.EthClientConfig, gethcommon.HexToAddress(config.FireblocksConfig.WalletAddress), logger)
Expand Down Expand Up @@ -160,8 +162,6 @@ func RunBatcher(ctx *cli.Context) error {
}
}

metrics := batcher.NewMetrics(config.MetricsConfig.HTTPPort, logger)

if len(config.BatcherConfig.EncoderSocket) == 0 {
return errors.New("encoder socket must be specified")
}
Expand Down
4 changes: 2 additions & 2 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser
dispatcherConfig := &dispatcher.Config{
Timeout: time.Second,
}
dispatcher := dispatcher.NewDispatcher(dispatcherConfig, logger)
batcherMetrics := batcher.NewMetrics("9100", logger)
dispatcher := dispatcher.NewDispatcher(dispatcherConfig, logger, batcherMetrics.DispatcherMetrics)

transactor := &coremock.MockTransactor{}
transactor.On("OperatorIDToAddress").Return(gethcommon.Address{}, nil)
Expand Down Expand Up @@ -166,7 +167,6 @@ func mustMakeDisperser(t *testing.T, cst core.IndexedChainState, store disperser
finalizer := batchermock.NewFinalizer()

disperserMetrics := disperser.NewMetrics("9100", logger)
batcherMetrics := batcher.NewMetrics("9100", logger)
txnManager := batchermock.NewTxnManager()

batcher, err := batcher.NewBatcher(batcherConfig, timeoutConfig, store, dispatcher, cst, asn, encoderClient, agg, &commonmock.MockEthClient{}, finalizer, transactor, txnManager, logger, batcherMetrics, handleBatchLivenessChan)
Expand Down

0 comments on commit 8a817e5

Please sign in to comment.