Skip to content

Commit

Permalink
return attestation err result to dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Dec 17, 2024
1 parent b679a14 commit edbfff7
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 17 deletions.
34 changes: 34 additions & 0 deletions core/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,35 @@ func (a *StdSignatureAggregator) ReceiveSignatures(ctx context.Context, state *I
}

func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics IndexedChainState, referenceBlockNumber uint, quorumAttestation *QuorumAttestation, quorumIDs []QuorumID) (*SignatureAggregation, error) {
for _, quorumID := range quorumIDs {
_, found := quorumAttestation.QuorumResults[quorumID]
if !found {
return nil, fmt.Errorf("quorum %d not found in quorum attestation", quorumID)
}

_, found = quorumAttestation.QuorumAggPubKey[quorumID]
if !found {
return nil, fmt.Errorf("quorum %d does not have an aggregated public key", quorumID)
}

_, found = quorumAttestation.SignersAggPubKey[quorumID]
if !found {
return nil, fmt.Errorf("quorum %d does not have a signers aggregated public key", quorumID)
}

_, found = quorumAttestation.AggSignature[quorumID]
if !found {
return nil, fmt.Errorf("quorum %d does not have an aggregated signature", quorumID)
}
}

// Aggregate the aggregated signatures. We reuse the first aggregated signature as the accumulator
var aggSig *Signature
for _, quorumID := range quorumIDs {
if quorumAttestation.AggSignature[quorumID] == nil {
a.Logger.Error("cannot aggregate signature for quorum because aggregated signature is nil", "quorumID", quorumID)
continue
}
sig := quorumAttestation.AggSignature[quorumID]
if aggSig == nil {
aggSig = &Signature{sig.G1Point.Clone()}
Expand All @@ -284,6 +310,10 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics In
// Aggregate the aggregated public keys. We reuse the first aggregated public key as the accumulator
var aggPubKey *G2Point
for _, quorumID := range quorumIDs {
if quorumAttestation.SignersAggPubKey[quorumID] == nil {
a.Logger.Error("cannot aggregate public key for quorum because signers aggregated public key is nil", "quorumID", quorumID)
continue
}
apk := quorumAttestation.SignersAggPubKey[quorumID]
if aggPubKey == nil {
aggPubKey = apk.Clone()
Expand Down Expand Up @@ -315,6 +345,10 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics In

quorumAggKeys := make(map[QuorumID]*G1Point, len(quorumIDs))
for _, quorumID := range quorumIDs {
if quorumAttestation.QuorumAggPubKey[quorumID] == nil {
a.Logger.Error("cannot aggregate public key for quorum because aggregated public key is nil", "quorumID", quorumID)
continue
}
quorumAggKeys[quorumID] = quorumAttestation.QuorumAggPubKey[quorumID]
}

Expand Down
60 changes: 44 additions & 16 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"encoding/hex"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -99,7 +100,7 @@ func (d *Dispatcher) Start(ctx context.Context) error {
sigChan, batchData, err := d.HandleBatch(ctx)
if err != nil {
if errors.Is(err, errNoBlobsToDispatch) {
d.logger.Warn("no blobs to dispatch")
d.logger.Debug("no blobs to dispatch")
} else {
d.logger.Error("failed to process a batch", "err", err)
}
Expand All @@ -110,6 +111,7 @@ func (d *Dispatcher) Start(ctx context.Context) error {
if err != nil {
d.logger.Error("failed to handle signatures", "err", err)
}
close(sigChan)
// TODO(ian-shim): handle errors and mark failed
}()
}
Expand Down Expand Up @@ -172,15 +174,24 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
err := d.blobMetadataStore.PutDispersalRequest(ctx, req)
if err != nil {
d.logger.Error("failed to put dispersal request", "err", err)
sigChan <- core.SigningMessage{
Signature: nil,
Operator: opID,
BatchHeaderHash: batchData.BatchHeaderHash,
AttestationLatencyMs: 0,
Err: err,
}
return
}

d.metrics.reportPutDispersalRequestLatency(time.Since(putDispersalRequestStart))

var i int
var lastErr error
for i = 0; i < d.NumRequestRetries+1; i++ {
sendChunksStart := time.Now()
sig, err := d.sendChunks(ctx, client, batch)
lastErr = err
sendChunksFinished := time.Now()
d.metrics.reportSendChunksLatency(sendChunksFinished.Sub(sendChunksStart))
if err == nil {
Expand All @@ -200,16 +211,26 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
Signature: sig,
Operator: opID,
BatchHeaderHash: batchData.BatchHeaderHash,
AttestationLatencyMs: 0, // TODO: calculate latency
AttestationLatencyMs: float64(time.Since(sendChunksStart)),
Err: nil,
}

break
}

d.logger.Warn("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "err", err)
d.logger.Warn("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "batchHeader", hex.EncodeToString(batchData.BatchHeaderHash[:]), "err", err)
time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second) // Wait before retrying
}

if lastErr != nil {
d.logger.Error("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "batchHeader", hex.EncodeToString(batchData.BatchHeaderHash[:]), "err", lastErr)
sigChan <- core.SigningMessage{
Signature: nil,
Operator: opID,
BatchHeaderHash: batchData.BatchHeaderHash,
AttestationLatencyMs: 0,
Err: lastErr,
}
}
d.metrics.reportSendChunksRetryCount(float64(i))
})

Expand All @@ -221,29 +242,36 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,

// HandleSignatures receives signatures from operators, validates, and aggregates them
func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, sigChan chan core.SigningMessage) error {
if batchData == nil {
return errors.New("batchData is required")
}
handleSignaturesStart := time.Now()
defer func() {
d.metrics.reportHandleSignaturesLatency(time.Since(handleSignaturesStart))
}()

batchHeaderHash := hex.EncodeToString(batchData.BatchHeaderHash[:])
quorumAttestation, err := d.aggregator.ReceiveSignatures(ctx, batchData.OperatorState, batchData.BatchHeaderHash, sigChan)
if err != nil {
return fmt.Errorf("failed to receive and validate signatures: %w", err)
return fmt.Errorf("failed to receive and validate signatures for batch %s: %w", batchHeaderHash, err)
}
receiveSignaturesFinished := time.Now()
d.metrics.reportReceiveSignaturesLatency(receiveSignaturesFinished.Sub(handleSignaturesStart))

quorums := make([]core.QuorumID, len(quorumAttestation.QuorumResults))
i := 0
nonZeroQuorums := make([]core.QuorumID, 0)
for quorumID := range quorumAttestation.QuorumResults {
quorums[i] = quorumID
i++
d.logger.Debug("quorum attestation results", "quorumID", quorumID, "result", quorumAttestation.QuorumResults[quorumID])
nonZeroQuorums = append(nonZeroQuorums, quorumID)
}
aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, quorums)
if len(nonZeroQuorums) == 0 {
return fmt.Errorf("all quorums received no attestation for batch %s", batchHeaderHash)
}

aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, nonZeroQuorums)
aggregateSignaturesFinished := time.Now()
d.metrics.reportAggregateSignaturesLatency(aggregateSignaturesFinished.Sub(receiveSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to aggregate signatures: %w", err)
return fmt.Errorf("failed to aggregate signatures for batch %s: %w", batchHeaderHash, err)
}

err = d.blobMetadataStore.PutAttestation(ctx, &corev2.Attestation{
Expand All @@ -253,19 +281,19 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
APKG2: aggSig.AggPubKey,
QuorumAPKs: aggSig.QuorumAggPubKeys,
Sigma: aggSig.AggSignature,
QuorumNumbers: quorums,
QuorumNumbers: nonZeroQuorums,
})
putAttestationFinished := time.Now()
d.metrics.reportPutAttestationLatency(putAttestationFinished.Sub(aggregateSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to put attestation: %w", err)
return fmt.Errorf("failed to put attestation for batch %s: %w", batchHeaderHash, err)
}

err = d.updateBatchStatus(ctx, batchData.BlobKeys, v2.Certified)
updateBatchStatusFinished := time.Now()
d.metrics.reportUpdateBatchStatusLatency(updateBatchStatusFinished.Sub(putAttestationFinished))
if err != nil {
return fmt.Errorf("failed to mark blobs as certified: %w", err)
return fmt.Errorf("failed to mark blobs as certified for batch %s: %w", batchHeaderHash, err)
}

return nil
Expand All @@ -286,16 +314,16 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
return nil, fmt.Errorf("failed to get blob metadata by status: %w", err)
}

d.logger.Debug("got new metadatas to make batch", "numBlobs", len(blobMetadatas))
if len(blobMetadatas) == 0 {
return nil, errNoBlobsToDispatch
}
d.logger.Debug("got new metadatas to make batch", "numBlobs", len(blobMetadatas), "referenceBlockNumber", referenceBlockNumber)

state, err := d.GetOperatorState(ctx, blobMetadatas, referenceBlockNumber)
getOperatorStateFinished := time.Now()
d.metrics.reportGetOperatorStateLatency(getOperatorStateFinished.Sub(getBlobMetadataFinished))
if err != nil {
return nil, fmt.Errorf("failed to get operator state: %w", err)
return nil, fmt.Errorf("failed to get operator state at block %d: %w", referenceBlockNumber, err)
}

keys := make([]corev2.BlobKey, len(blobMetadatas))
Expand Down
2 changes: 1 addition & 1 deletion disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (e *EncodingManager) Start(ctx context.Context) error {
err := e.HandleBatch(ctx)
if err != nil {
if errors.Is(err, errNoBlobsToEncode) {
e.logger.Warn("no blobs to encode")
e.logger.Debug("no blobs to encode")
} else {
e.logger.Error("failed to process a batch", "err", err)
}
Expand Down

0 comments on commit edbfff7

Please sign in to comment.