Skip to content

Commit

Permalink
return attestation err result to dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Dec 17, 2024
1 parent b679a14 commit ebde2d5
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 19 deletions.
12 changes: 12 additions & 0 deletions core/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics In
// Aggregate the aggregated signatures. We reuse the first aggregated signature as the accumulator
var aggSig *Signature
for _, quorumID := range quorumIDs {
if quorumAttestation.AggSignature[quorumID] == nil {
a.Logger.Error("cannot aggregate signature for quorum because aggregated signature is nil", "quorumID", quorumID)
continue
}
sig := quorumAttestation.AggSignature[quorumID]
if aggSig == nil {
aggSig = &Signature{sig.G1Point.Clone()}
Expand All @@ -284,6 +288,10 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics In
// Aggregate the aggregated public keys. We reuse the first aggregated public key as the accumulator
var aggPubKey *G2Point
for _, quorumID := range quorumIDs {
if quorumAttestation.SignersAggPubKey[quorumID] == nil {
a.Logger.Error("cannot aggregate public key for quorum because signers aggregated public key is nil", "quorumID", quorumID)
continue
}
apk := quorumAttestation.SignersAggPubKey[quorumID]
if aggPubKey == nil {
aggPubKey = apk.Clone()
Expand Down Expand Up @@ -315,6 +323,10 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics In

quorumAggKeys := make(map[QuorumID]*G1Point, len(quorumIDs))
for _, quorumID := range quorumIDs {
if quorumAttestation.QuorumAggPubKey[quorumID] == nil {
a.Logger.Error("cannot aggregate public key for quorum because aggregated public key is nil", "quorumID", quorumID)
continue
}
quorumAggKeys[quorumID] = quorumAttestation.QuorumAggPubKey[quorumID]
}

Expand Down
60 changes: 44 additions & 16 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package controller

import (
"context"
"encoding/hex"
"errors"
"fmt"
"math"
Expand Down Expand Up @@ -99,7 +100,7 @@ func (d *Dispatcher) Start(ctx context.Context) error {
sigChan, batchData, err := d.HandleBatch(ctx)
if err != nil {
if errors.Is(err, errNoBlobsToDispatch) {
d.logger.Warn("no blobs to dispatch")
d.logger.Debug("no blobs to dispatch")
} else {
d.logger.Error("failed to process a batch", "err", err)
}
Expand All @@ -110,6 +111,7 @@ func (d *Dispatcher) Start(ctx context.Context) error {
if err != nil {
d.logger.Error("failed to handle signatures", "err", err)
}
close(sigChan)
// TODO(ian-shim): handle errors and mark failed
}()
}
Expand Down Expand Up @@ -172,15 +174,24 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
err := d.blobMetadataStore.PutDispersalRequest(ctx, req)
if err != nil {
d.logger.Error("failed to put dispersal request", "err", err)
sigChan <- core.SigningMessage{
Signature: nil,
Operator: opID,
BatchHeaderHash: batchData.BatchHeaderHash,
AttestationLatencyMs: 0,
Err: err,
}
return
}

d.metrics.reportPutDispersalRequestLatency(time.Since(putDispersalRequestStart))

var i int
var lastErr error
for i = 0; i < d.NumRequestRetries+1; i++ {
sendChunksStart := time.Now()
sig, err := d.sendChunks(ctx, client, batch)
lastErr = err
sendChunksFinished := time.Now()
d.metrics.reportSendChunksLatency(sendChunksFinished.Sub(sendChunksStart))
if err == nil {
Expand All @@ -200,16 +211,26 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,
Signature: sig,
Operator: opID,
BatchHeaderHash: batchData.BatchHeaderHash,
AttestationLatencyMs: 0, // TODO: calculate latency
AttestationLatencyMs: float64(time.Since(sendChunksStart)),
Err: nil,
}

break
}

d.logger.Warn("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "err", err)
d.logger.Warn("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "batchHeader", hex.EncodeToString(batchData.BatchHeaderHash[:]), "err", err)
time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second) // Wait before retrying
}

if lastErr != nil {
d.logger.Error("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "batchHeader", hex.EncodeToString(batchData.BatchHeaderHash[:]), "err", lastErr)
sigChan <- core.SigningMessage{
Signature: nil,
Operator: opID,
BatchHeaderHash: batchData.BatchHeaderHash,
AttestationLatencyMs: 0,
Err: lastErr,
}
}
d.metrics.reportSendChunksRetryCount(float64(i))
})

Expand All @@ -221,29 +242,36 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage,

// HandleSignatures receives signatures from operators, validates, and aggregates them
func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, sigChan chan core.SigningMessage) error {
if batchData == nil {
return errors.New("batchData is required")
}
handleSignaturesStart := time.Now()
defer func() {
d.metrics.reportHandleSignaturesLatency(time.Since(handleSignaturesStart))
}()

batchHeaderHash := hex.EncodeToString(batchData.BatchHeaderHash[:])
quorumAttestation, err := d.aggregator.ReceiveSignatures(ctx, batchData.OperatorState, batchData.BatchHeaderHash, sigChan)
if err != nil {
return fmt.Errorf("failed to receive and validate signatures: %w", err)
return fmt.Errorf("failed to receive and validate signatures for batch %s: %w", batchHeaderHash, err)
}
receiveSignaturesFinished := time.Now()
d.metrics.reportReceiveSignaturesLatency(receiveSignaturesFinished.Sub(handleSignaturesStart))

quorums := make([]core.QuorumID, len(quorumAttestation.QuorumResults))
i := 0
nonZeroQuorums := make([]core.QuorumID, 0)
for quorumID := range quorumAttestation.QuorumResults {
quorums[i] = quorumID
i++
d.logger.Debug("quorum attestation results", "quorumID", quorumID, "result", quorumAttestation.QuorumResults[quorumID])
nonZeroQuorums = append(nonZeroQuorums, quorumID)
}
aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, quorums)
if len(nonZeroQuorums) == 0 {
return fmt.Errorf("all quorums received no attestation for batch %s", batchHeaderHash)
}

aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, nonZeroQuorums)
aggregateSignaturesFinished := time.Now()
d.metrics.reportAggregateSignaturesLatency(aggregateSignaturesFinished.Sub(receiveSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to aggregate signatures: %w", err)
return fmt.Errorf("failed to aggregate signatures for batch %s: %w", batchHeaderHash, err)
}

err = d.blobMetadataStore.PutAttestation(ctx, &corev2.Attestation{
Expand All @@ -253,19 +281,19 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData,
APKG2: aggSig.AggPubKey,
QuorumAPKs: aggSig.QuorumAggPubKeys,
Sigma: aggSig.AggSignature,
QuorumNumbers: quorums,
QuorumNumbers: nonZeroQuorums,
})
putAttestationFinished := time.Now()
d.metrics.reportPutAttestationLatency(putAttestationFinished.Sub(aggregateSignaturesFinished))
if err != nil {
return fmt.Errorf("failed to put attestation: %w", err)
return fmt.Errorf("failed to put attestation for batch %s: %w", batchHeaderHash, err)
}

err = d.updateBatchStatus(ctx, batchData.BlobKeys, v2.Certified)
updateBatchStatusFinished := time.Now()
d.metrics.reportUpdateBatchStatusLatency(updateBatchStatusFinished.Sub(putAttestationFinished))
if err != nil {
return fmt.Errorf("failed to mark blobs as certified: %w", err)
return fmt.Errorf("failed to mark blobs as certified for batch %s: %w", batchHeaderHash, err)
}

return nil
Expand All @@ -286,16 +314,16 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64)
return nil, fmt.Errorf("failed to get blob metadata by status: %w", err)
}

d.logger.Debug("got new metadatas to make batch", "numBlobs", len(blobMetadatas))
if len(blobMetadatas) == 0 {
return nil, errNoBlobsToDispatch
}
d.logger.Debug("got new metadatas to make batch", "numBlobs", len(blobMetadatas), "referenceBlockNumber", referenceBlockNumber)

state, err := d.GetOperatorState(ctx, blobMetadatas, referenceBlockNumber)
getOperatorStateFinished := time.Now()
d.metrics.reportGetOperatorStateLatency(getOperatorStateFinished.Sub(getBlobMetadataFinished))
if err != nil {
return nil, fmt.Errorf("failed to get operator state: %w", err)
return nil, fmt.Errorf("failed to get operator state at block %d: %w", referenceBlockNumber, err)
}

keys := make([]corev2.BlobKey, len(blobMetadatas))
Expand Down
2 changes: 1 addition & 1 deletion disperser/controller/encoding_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (e *EncodingManager) Start(ctx context.Context) error {
err := e.HandleBatch(ctx)
if err != nil {
if errors.Is(err, errNoBlobsToEncode) {
e.logger.Warn("no blobs to encode")
e.logger.Debug("no blobs to encode")
} else {
e.logger.Error("failed to process a batch", "err", err)
}
Expand Down
11 changes: 9 additions & 2 deletions node/grpc/server_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"encoding/hex"
"fmt"
"runtime"
"time"

"github.com/Layr-Labs/eigenda/api"
pb "github.com/Layr-Labs/eigenda/api/grpc/node/v2"
"github.com/Layr-Labs/eigenda/common"
Expand All @@ -14,8 +17,6 @@ import (
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/prometheus/client_golang/prometheus"
"github.com/shirou/gopsutil/mem"
"runtime"
"time"
)

// ServerV2 implements the Node v2 proto APIs.
Expand Down Expand Up @@ -76,6 +77,12 @@ func (s *ServerV2) StoreChunks(ctx context.Context, in *pb.StoreChunksRequest) (
if s.node.StoreV2 == nil {
return nil, api.NewErrorInternal("v2 store not initialized")
}

// TODO(ian-shim): support remote signer
if s.node.KeyPair == nil {
return nil, api.NewErrorInternal("missing key pair")
}

batch, err := s.validateStoreChunksRequest(in)
if err != nil {
return nil, err
Expand Down

0 comments on commit ebde2d5

Please sign in to comment.