diff --git a/core/aggregation.go b/core/aggregation.go index d0c01c3abb..f0ceed2ae9 100644 --- a/core/aggregation.go +++ b/core/aggregation.go @@ -270,9 +270,35 @@ func (a *StdSignatureAggregator) ReceiveSignatures(ctx context.Context, state *I } func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics IndexedChainState, referenceBlockNumber uint, quorumAttestation *QuorumAttestation, quorumIDs []QuorumID) (*SignatureAggregation, error) { + for _, quorumID := range quorumIDs { + _, found := quorumAttestation.QuorumResults[quorumID] + if !found { + return nil, fmt.Errorf("quorum %d not found in quorum attestation", quorumID) + } + + _, found = quorumAttestation.QuorumAggPubKey[quorumID] + if !found { + return nil, fmt.Errorf("quorum %d does not have an aggregated public key", quorumID) + } + + _, found = quorumAttestation.SignersAggPubKey[quorumID] + if !found { + return nil, fmt.Errorf("quorum %d does not have a signers aggregated public key", quorumID) + } + + _, found = quorumAttestation.AggSignature[quorumID] + if !found { + return nil, fmt.Errorf("quorum %d does not have an aggregated signature", quorumID) + } + } + // Aggregate the aggregated signatures. We reuse the first aggregated signature as the accumulator var aggSig *Signature for _, quorumID := range quorumIDs { + if quorumAttestation.AggSignature[quorumID] == nil { + a.Logger.Error("cannot aggregate signature for quorum because aggregated signature is nil", "quorumID", quorumID) + continue + } sig := quorumAttestation.AggSignature[quorumID] if aggSig == nil { aggSig = &Signature{sig.G1Point.Clone()} @@ -284,6 +310,10 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics In // Aggregate the aggregated public keys. We reuse the first aggregated public key as the accumulator var aggPubKey *G2Point for _, quorumID := range quorumIDs { + if quorumAttestation.SignersAggPubKey[quorumID] == nil { + a.Logger.Error("cannot aggregate public key for quorum because signers aggregated public key is nil", "quorumID", quorumID) + continue + } apk := quorumAttestation.SignersAggPubKey[quorumID] if aggPubKey == nil { aggPubKey = apk.Clone() @@ -315,6 +345,10 @@ func (a *StdSignatureAggregator) AggregateSignatures(ctx context.Context, ics In quorumAggKeys := make(map[QuorumID]*G1Point, len(quorumIDs)) for _, quorumID := range quorumIDs { + if quorumAttestation.QuorumAggPubKey[quorumID] == nil { + a.Logger.Error("cannot aggregate public key for quorum because aggregated public key is nil", "quorumID", quorumID) + continue + } quorumAggKeys[quorumID] = quorumAttestation.QuorumAggPubKey[quorumID] } diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go index f48a9783ff..366ecad4e8 100644 --- a/disperser/controller/dispatcher.go +++ b/disperser/controller/dispatcher.go @@ -2,6 +2,7 @@ package controller import ( "context" + "encoding/hex" "errors" "fmt" "math" @@ -99,7 +100,7 @@ func (d *Dispatcher) Start(ctx context.Context) error { sigChan, batchData, err := d.HandleBatch(ctx) if err != nil { if errors.Is(err, errNoBlobsToDispatch) { - d.logger.Warn("no blobs to dispatch") + d.logger.Debug("no blobs to dispatch") } else { d.logger.Error("failed to process a batch", "err", err) } @@ -110,6 +111,7 @@ func (d *Dispatcher) Start(ctx context.Context) error { if err != nil { d.logger.Error("failed to handle signatures", "err", err) } + close(sigChan) // TODO(ian-shim): handle errors and mark failed }() } @@ -172,15 +174,24 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage, err := d.blobMetadataStore.PutDispersalRequest(ctx, req) if err != nil { d.logger.Error("failed to put dispersal request", "err", err) + sigChan <- core.SigningMessage{ + Signature: nil, + Operator: opID, + BatchHeaderHash: batchData.BatchHeaderHash, + AttestationLatencyMs: 0, + Err: err, + } return } d.metrics.reportPutDispersalRequestLatency(time.Since(putDispersalRequestStart)) var i int + var lastErr error for i = 0; i < d.NumRequestRetries+1; i++ { sendChunksStart := time.Now() sig, err := d.sendChunks(ctx, client, batch) + lastErr = err sendChunksFinished := time.Now() d.metrics.reportSendChunksLatency(sendChunksFinished.Sub(sendChunksStart)) if err == nil { @@ -200,16 +211,26 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage, Signature: sig, Operator: opID, BatchHeaderHash: batchData.BatchHeaderHash, - AttestationLatencyMs: 0, // TODO: calculate latency + AttestationLatencyMs: float64(time.Since(sendChunksStart)), Err: nil, } - break } - d.logger.Warn("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "err", err) + d.logger.Warn("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "batchHeader", hex.EncodeToString(batchData.BatchHeaderHash[:]), "err", err) time.Sleep(time.Duration(math.Pow(2, float64(i))) * time.Second) // Wait before retrying } + + if lastErr != nil { + d.logger.Error("failed to send chunks", "operator", opID.Hex(), "NumAttempts", i, "batchHeader", hex.EncodeToString(batchData.BatchHeaderHash[:]), "err", lastErr) + sigChan <- core.SigningMessage{ + Signature: nil, + Operator: opID, + BatchHeaderHash: batchData.BatchHeaderHash, + AttestationLatencyMs: 0, + Err: lastErr, + } + } d.metrics.reportSendChunksRetryCount(float64(i)) }) @@ -221,29 +242,36 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage, // HandleSignatures receives signatures from operators, validates, and aggregates them func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, sigChan chan core.SigningMessage) error { + if batchData == nil { + return errors.New("batchData is required") + } handleSignaturesStart := time.Now() defer func() { d.metrics.reportHandleSignaturesLatency(time.Since(handleSignaturesStart)) }() + batchHeaderHash := hex.EncodeToString(batchData.BatchHeaderHash[:]) quorumAttestation, err := d.aggregator.ReceiveSignatures(ctx, batchData.OperatorState, batchData.BatchHeaderHash, sigChan) if err != nil { - return fmt.Errorf("failed to receive and validate signatures: %w", err) + return fmt.Errorf("failed to receive and validate signatures for batch %s: %w", batchHeaderHash, err) } receiveSignaturesFinished := time.Now() d.metrics.reportReceiveSignaturesLatency(receiveSignaturesFinished.Sub(handleSignaturesStart)) - quorums := make([]core.QuorumID, len(quorumAttestation.QuorumResults)) - i := 0 + nonZeroQuorums := make([]core.QuorumID, 0) for quorumID := range quorumAttestation.QuorumResults { - quorums[i] = quorumID - i++ + d.logger.Debug("quorum attestation results", "quorumID", quorumID, "result", quorumAttestation.QuorumResults[quorumID]) + nonZeroQuorums = append(nonZeroQuorums, quorumID) } - aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, quorums) + if len(nonZeroQuorums) == 0 { + return fmt.Errorf("all quorums received no attestation for batch %s", batchHeaderHash) + } + + aggSig, err := d.aggregator.AggregateSignatures(ctx, d.chainState, uint(batchData.Batch.BatchHeader.ReferenceBlockNumber), quorumAttestation, nonZeroQuorums) aggregateSignaturesFinished := time.Now() d.metrics.reportAggregateSignaturesLatency(aggregateSignaturesFinished.Sub(receiveSignaturesFinished)) if err != nil { - return fmt.Errorf("failed to aggregate signatures: %w", err) + return fmt.Errorf("failed to aggregate signatures for batch %s: %w", batchHeaderHash, err) } err = d.blobMetadataStore.PutAttestation(ctx, &corev2.Attestation{ @@ -253,19 +281,19 @@ func (d *Dispatcher) HandleSignatures(ctx context.Context, batchData *batchData, APKG2: aggSig.AggPubKey, QuorumAPKs: aggSig.QuorumAggPubKeys, Sigma: aggSig.AggSignature, - QuorumNumbers: quorums, + QuorumNumbers: nonZeroQuorums, }) putAttestationFinished := time.Now() d.metrics.reportPutAttestationLatency(putAttestationFinished.Sub(aggregateSignaturesFinished)) if err != nil { - return fmt.Errorf("failed to put attestation: %w", err) + return fmt.Errorf("failed to put attestation for batch %s: %w", batchHeaderHash, err) } err = d.updateBatchStatus(ctx, batchData.BlobKeys, v2.Certified) updateBatchStatusFinished := time.Now() d.metrics.reportUpdateBatchStatusLatency(updateBatchStatusFinished.Sub(putAttestationFinished)) if err != nil { - return fmt.Errorf("failed to mark blobs as certified: %w", err) + return fmt.Errorf("failed to mark blobs as certified for batch %s: %w", batchHeaderHash, err) } return nil @@ -286,16 +314,16 @@ func (d *Dispatcher) NewBatch(ctx context.Context, referenceBlockNumber uint64) return nil, fmt.Errorf("failed to get blob metadata by status: %w", err) } - d.logger.Debug("got new metadatas to make batch", "numBlobs", len(blobMetadatas)) if len(blobMetadatas) == 0 { return nil, errNoBlobsToDispatch } + d.logger.Debug("got new metadatas to make batch", "numBlobs", len(blobMetadatas), "referenceBlockNumber", referenceBlockNumber) state, err := d.GetOperatorState(ctx, blobMetadatas, referenceBlockNumber) getOperatorStateFinished := time.Now() d.metrics.reportGetOperatorStateLatency(getOperatorStateFinished.Sub(getBlobMetadataFinished)) if err != nil { - return nil, fmt.Errorf("failed to get operator state: %w", err) + return nil, fmt.Errorf("failed to get operator state at block %d: %w", referenceBlockNumber, err) } keys := make([]corev2.BlobKey, len(blobMetadatas)) diff --git a/disperser/controller/encoding_manager.go b/disperser/controller/encoding_manager.go index bc6a0c9355..1844b48daf 100644 --- a/disperser/controller/encoding_manager.go +++ b/disperser/controller/encoding_manager.go @@ -128,7 +128,7 @@ func (e *EncodingManager) Start(ctx context.Context) error { err := e.HandleBatch(ctx) if err != nil { if errors.Is(err, errNoBlobsToEncode) { - e.logger.Warn("no blobs to encode") + e.logger.Debug("no blobs to encode") } else { e.logger.Error("failed to process a batch", "err", err) }