Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(operator): GetNotRespondedTasksFrom Retries #1432

Open
wants to merge 20 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aggregator/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func (agg *Aggregator) InitializeNewTask(batchIndex uint32, taskCreatedBlock uin
}
return err
}
return retry.Retry(initializeNewTask_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
return retry.Retry(initializeNewTask_func, retry.DefaultRetryConfig())
}

// Long-lived goroutine that periodically checks and removes old Tasks from stored Maps
Expand Down
18 changes: 11 additions & 7 deletions aggregator/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]))
taskIndex := uint32(0)

taskIndex, err := agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash)
taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash)

if err != nil {
agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum")
Expand All @@ -72,7 +72,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t

agg.logger.Info("Starting bls signature process")
go func() {
err := agg.ProcessNewSignature(
err := agg.ProcessNewSignatureRetryable(
context.Background(), taskIndex, signedTaskResponse.BatchIdentifierHash,
&signedTaskResponse.BlsSignature, signedTaskResponse.OperatorId,
)
Expand Down Expand Up @@ -123,7 +123,7 @@ func (agg *Aggregator) ServerRunning(_ *struct{}, reply *int64) error {
- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks)
- NOTE: TaskNotFound errors from the BLS Aggregation service are Transient errors as block reorg's may lead to these errors being thrown.
*/
func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32, taskResponse interface{}, blsSignature *bls.Signature, operatorId eigentypes.Bytes32) error {
func (agg *Aggregator) ProcessNewSignatureRetryable(ctx context.Context, taskIndex uint32, taskResponse interface{}, blsSignature *bls.Signature, operatorId eigentypes.Bytes32) error {
processNewSignature_func := func() error {
err := agg.blsAggregationService.ProcessNewSignature(
ctx, taskIndex, taskResponse,
Expand All @@ -137,10 +137,15 @@ func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32
return err
}

return retry.Retry(processNewSignature_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime)
return retry.Retry(processNewSignature_func, retry.ChainRetryConfig())
}

func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error) {
// Checks Internal mapping for Signed Task Response, returns its TaskIndex.
/*
- All errors are considered Transient Errors
- Retry times (3 retries): 1 sec, 2 sec, 4 sec
*/
func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint32, error) {
getTaskIndex_func := func() (uint32, error) {
agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response")
Expand All @@ -153,6 +158,5 @@ func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error
return taskIndex, nil
}
}

return retry.RetryWithData(getTaskIndex_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
return retry.RetryWithData(getTaskIndex_func, retry.DefaultRetryConfig())
}
4 changes: 2 additions & 2 deletions core/chainio/avs_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (r *AvsReader) DisabledVerifiers() (*big.Int, error) {

// Returns all the "NewBatchV3" logs that have not been responded starting from the given block number
func (r *AvsReader) GetNotRespondedTasksFrom(fromBlock uint64) ([]servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
logs, err := r.AvsContractBindings.ServiceManager.FilterNewBatchV3(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil)
logs, err := r.FilterNewBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil)

if err != nil {
return nil, err
Expand All @@ -94,7 +94,7 @@ func (r *AvsReader) GetNotRespondedTasksFrom(fromBlock uint64) ([]servicemanager
// now check if its finalized or not before appending
batchIdentifier := append(task.BatchMerkleRoot[:], task.SenderAddress[:]...)
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
state, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(nil, batchIdentifierHash)
state, err := r.BatchesStateRetryable(nil, batchIdentifierHash)

if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions core/chainio/avs_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema
// Subscribe to new tasks
sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil)
if err != nil {
s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err)
s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultMaxNumRetries, "err", err)
return nil, err
}

subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil)
if err != nil {
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err)
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultMaxNumRetries, "err", err)
return nil, err
}
s.logger.Info("Subscribed to new AlignedLayer V2 tasks")
Expand Down
15 changes: 13 additions & 2 deletions core/chainio/avs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
txOpts.NoSend = false
i := 0

// Set Retry config for RespondToTaskV2
respondToTaskV2Config := retry.DefaultRetryConfig()
respondToTaskV2Config.MaxNumRetries = 0
respondToTaskV2Config.MaxElapsedTime = 0

// Set Retry config for WaitForTxRetryable
waitForTxConfig := retry.DefaultRetryConfig()
waitForTxConfig.MaxInterval = 2 * time.Second
waitForTxConfig.MaxNumRetries = 0
waitForTxConfig.MaxElapsedTime = timeToWaitBeforeBump

respondToTaskV2Func := func() (*types.Receipt, error) {
gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback)
if err != nil {
Expand Down Expand Up @@ -126,7 +137,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
return nil, err
}

receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, tx.Hash(), timeToWaitBeforeBump)
receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, tx.Hash(), waitForTxConfig)
if receipt != nil {
w.checkIfAggregatorHadToPaidForBatcher(tx, batchIdentifierHash)
return receipt, nil
Expand All @@ -143,7 +154,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
return nil, fmt.Errorf("transaction failed")
}

return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, retry.MaxInterval, 0)
return retry.RetryWithData(respondToTaskV2Func, respondToTaskV2Config)
}

// Calculates the transaction cost from the receipt and compares it with the batcher respondToTaskFeeLimit
Expand Down
Loading
Loading