Skip to content

Commit

Permalink
return attestation err result to dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Dec 17, 2024
1 parent b679a14 commit 46955b1
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 11 deletions.
45 changes: 35 additions & 10 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"encoding/hex"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -99,7 +100,7 @@ func (d *Dispatcher) Start(ctx context.Context) error {
sigChan, batchData, err := d.HandleBatch(ctx)
if err != nil {
if errors.Is(err, errNoBlobsToDispatch) {
d.logger.Warn("no blobs to dispatch")
d.logger.Debug("no blobs to dispatch")
} else {
d.logger.Error("failed to process a batch", "err", err)
}
Expand All @@ -110,6 +111,7 @@ func (d *Dispatcher) Start(ctx context.Context) error {
if err != nil {
d.logger.Error("failed to handle signatures", "err", err)
}
close(sigChan)
// TODO(ian-shim): handle errors and mark failed
}()
}
Expand Down Expand Up @@ -172,15 +174,24 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
err := d.blobMetadataStore.PutDispersalRequest(ctx, req)
if err != nil {
d.logger.Error("failed to put dispersal request", "err", err)
sigChan <- core.SigningMessage{
Signature: nil,
Operator: opID,
BatchHeaderHash: batchData.BatchHeaderHash,
AttestationLatencyMs: 0,
Err: err,
}
return
}

d.metrics.reportPutDispersalRequestLatency(time.Since(putDispersalRequestStart))

var i int
var lastErr error
for i = 0; i < d.NumRequestRetries+1; i++ {
sendChunksStart := time.Now()
sig, err := d.sendChunks(ctx, client, batch)
lastErr = err
sendChunksFinished := time.Now()
d.metrics.reportSendChunksLatency(sendChunksFinished.Sub(sendChunksStart))
if err == nil {
Expand All @@ -200,16 +211,26 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
Signature: sig,
Operator: opID,
BatchHeaderHash: batchData.BatchHeaderHash,
AttestationLatencyMs: 0, // TODO: calculate latency
AttestationLatencyMs: float64(time.Since(sendChunksStart)),
Err: nil,
}

break
}

d.logger.Warn("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "err", err)
d.logger.Warn("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "batcHeader", hex.EncodeToString(batchData.BatchHeaderHash[:]), "err", err)
time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second) // Wait before retrying
}

if lastErr != nil {
d.logger.Error("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "batcHeader", hex.EncodeToString(batchData.BatchHeaderHash[:]), "err", lastErr)
sigChan <- core.SigningMessage{
Signature: nil,
Operator: opID,
BatchHeaderHash: batchData.BatchHeaderHash,
AttestationLatencyMs: 0,
Err: lastErr,
}
}
d.metrics.reportSendChunksRetryCount(float64(i))
})

Expand All @@ -221,14 +242,18 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,

// HandleSignatures receives signatures from operators, validates, and aggregates them
func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, sigChan chan core.SigningMessage) error {
if batchData == nil {
return errors.New("batchData is required")
}
handleSignaturesStart := time.Now()
defer func() {
d.metrics.reportHandleSignaturesLatency(time.Since(handleSignaturesStart))
}()

batchHeaderHash := hex.EncodeToString(batchData.BatchHeaderHash[:])
quorumAttestation, err := d.aggregator.ReceiveSignatures(ctx, batchData.OperatorState, batchData.BatchHeaderHash, sigChan)
if err != nil {
return fmt.Errorf("failed to receive and validate signatures: %w", err)
return fmt.Errorf("failed to receive and validate signatures for batch %s: %w", batchHeaderHash, err)
}
receiveSignaturesFinished := time.Now()
d.metrics.reportReceiveSignaturesLatency(receiveSignaturesFinished.Sub(handleSignaturesStart))
Expand All @@ -243,7 +268,7 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
aggregateSignaturesFinished := time.Now()
d.metrics.reportAggregateSignaturesLatency(aggregateSignaturesFinished.Sub(receiveSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to aggregate signatures: %w", err)
return fmt.Errorf("failed to aggregate signatures for batch %s: %w", batchHeaderHash, err)
}

err = d.blobMetadataStore.PutAttestation(ctx, &corev2.Attestation{
Expand All @@ -258,14 +283,14 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
putAttestationFinished := time.Now()
d.metrics.reportPutAttestationLatency(putAttestationFinished.Sub(aggregateSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to put attestation: %w", err)
return fmt.Errorf("failed to put attestation for batch %s: %w", batchHeaderHash, err)
}

err = d.updateBatchStatus(ctx, batchData.BlobKeys, v2.Certified)
updateBatchStatusFinished := time.Now()
d.metrics.reportUpdateBatchStatusLatency(updateBatchStatusFinished.Sub(putAttestationFinished))
if err != nil {
return fmt.Errorf("failed to mark blobs as certified: %w", err)
return fmt.Errorf("failed to mark blobs as certified for batch %s: %w", batchHeaderHash, err)
}

return nil
Expand All @@ -286,16 +311,16 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
return nil, fmt.Errorf("failed to get blob metadata by status: %w", err)
}

d.logger.Debug("got new metadatas to make batch", "numBlobs", len(blobMetadatas))
if len(blobMetadatas) == 0 {
return nil, errNoBlobsToDispatch
}
d.logger.Debug("got new metadatas to make batch", "numBlobs", len(blobMetadatas), "referenceBlockNumber", referenceBlockNumber)

state, err := d.GetOperatorState(ctx, blobMetadatas, referenceBlockNumber)
getOperatorStateFinished := time.Now()
d.metrics.reportGetOperatorStateLatency(getOperatorStateFinished.Sub(getBlobMetadataFinished))
if err != nil {
return nil, fmt.Errorf("failed to get operator state: %w", err)
return nil, fmt.Errorf("failed to get operator state at block %d: %w", referenceBlockNumber, err)
}

keys := make([]corev2.BlobKey, len(blobMetadatas))
Expand Down
2 changes: 1 addition & 1 deletion disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (e *EncodingManager) Start(ctx context.Context) error {
err := e.HandleBatch(ctx)
if err != nil {
if errors.Is(err, errNoBlobsToEncode) {
e.logger.Warn("no blobs to encode")
e.logger.Debug("no blobs to encode")
} else {
e.logger.Error("failed to process a batch", "err", err)
}
Expand Down

0 comments on commit 46955b1

Please sign in to comment.