Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(retries): Use config params for retry parameters #1467

Merged
merged 13 commits into from
Nov 22, 2024
7 changes: 3 additions & 4 deletions aggregator/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
// If that's the case, we won't know about the task at this point
// so we make GetTaskIndex retryable, waiting for some seconds,
// before trying to fetch the task again from the map.
taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash)
taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash, retry.NetworkRetryParams())

if err != nil {
agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum")
Expand Down Expand Up @@ -114,10 +114,9 @@ func (agg *Aggregator) ServerRunning(_ *struct{}, reply *int64) error {
Checks Internal mapping for Signed Task Response, returns its TaskIndex.
- All errors are considered Transient Errors
- Retry times (3 retries): 1 sec, 2 sec, 4 sec

TODO: We should refactor the retry duration considering extending it to a larger time or number of retries, at least somewhere between 1 and 2 blocks
*/
func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint32, error) {
func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte, config *retry.RetryParams) (uint32, error) {
getTaskIndex_func := func() (uint32, error) {
agg.taskMutex.Lock()
taskIndex, ok := agg.batchesIdxByIdentifierHash[batchIdentifierHash]
Expand All @@ -129,5 +128,5 @@ func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint
}
}

return retry.RetryWithData(getTaskIndex_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
return retry.RetryWithData(getTaskIndex_func, config)
}
36 changes: 18 additions & 18 deletions core/chainio/avs_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)

// Subscribe to new tasks
sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil)
sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err)
s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err)
return nil, err
}

subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil)
subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err)
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err)
return nil, err
}
s.logger.Info("Subscribed to new AlignedLayer V2 tasks")
Expand Down Expand Up @@ -114,14 +114,14 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema
case err := <-sub.Err():
s.logger.Warn("Error in new task subscription", "err", err)
sub.Unsubscribe()
sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil)
sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
errorChannel <- err
}
case err := <-subFallback.Err():
s.logger.Warn("Error in fallback new task subscription", "err", err)
subFallback.Unsubscribe()
subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil)
subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
errorChannel <- err
}
Expand All @@ -137,13 +137,13 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3)

// Subscribe to new tasks
sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil)
sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
s.logger.Error("Primary failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err)
return nil, err
}

subFallback, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil)
subFallback, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err)
return nil, err
Expand Down Expand Up @@ -185,14 +185,14 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema
case err := <-sub.Err():
s.logger.Warn("Error in new task subscription", "err", err)
sub.Unsubscribe()
sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil)
sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
errorChannel <- err
}
case err := <-subFallback.Err():
s.logger.Warn("Error in fallback new task subscription", "err", err)
subFallback.Unsubscribe()
subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil)
subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if err != nil {
errorChannel <- err
}
Expand Down Expand Up @@ -258,7 +258,7 @@ func (s *AvsSubscriber) processNewBatchV3(batch *servicemanager.ContractAlignedL
// getLatestNotRespondedTaskFromEthereum queries the blockchain for the latest not responded task using the FilterNewBatch method.
func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, error) {

latestBlock, err := s.BlockNumberRetryable(context.Background())
latestBlock, err := s.BlockNumberRetryable(context.Background(), retry.NetworkRetryParams())
if err != nil {
return nil, err
}
Expand All @@ -271,7 +271,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanag
fromBlock = latestBlock - BlockInterval
}

logs, err := s.FilterBatchV2Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil)
logs, err := s.FilterBatchV2Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil, retry.NetworkRetryParams())
if err != nil {
return nil, err
}
Expand All @@ -293,7 +293,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanag

batchIdentifier := append(lastLog.BatchMerkleRoot[:], lastLog.SenderAddress[:]...)
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
state, err := s.BatchesStateRetryable(nil, batchIdentifierHash)
state, err := s.BatchesStateRetryable(nil, batchIdentifierHash, retry.NetworkRetryParams())
if err != nil {
return nil, err
}
Expand All @@ -307,7 +307,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanag

// getLatestNotRespondedTaskFromEthereum queries the blockchain for the latest not responded task using the FilterNewBatch method.
func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
latestBlock, err := s.BlockNumberRetryable(context.Background())
latestBlock, err := s.BlockNumberRetryable(context.Background(), retry.NetworkRetryParams())
if err != nil {
return nil, err
}
Expand All @@ -320,7 +320,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanag
fromBlock = latestBlock - BlockInterval
}

logs, err := s.FilterBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil)
logs, err := s.FilterBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil, retry.NetworkRetryParams())
if err != nil {
return nil, err
}
Expand All @@ -342,7 +342,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanag

batchIdentifier := append(lastLog.BatchMerkleRoot[:], lastLog.SenderAddress[:]...)
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
state, err := s.BatchesStateRetryable(nil, batchIdentifierHash)
state, err := s.BatchesStateRetryable(nil, batchIdentifierHash, retry.NetworkRetryParams())
if err != nil {
return nil, err
}
Expand All @@ -355,15 +355,15 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanag
}

func (s *AvsSubscriber) WaitForOneBlock(startBlock uint64) error {
currentBlock, err := s.BlockNumberRetryable(context.Background())
currentBlock, err := s.BlockNumberRetryable(context.Background(), retry.NetworkRetryParams())
if err != nil {
return err
}

if currentBlock <= startBlock { // should really be == but just in case
// Subscribe to new head
c := make(chan *types.Header)
sub, err := s.SubscribeNewHeadRetryable(context.Background(), c)
sub, err := s.SubscribeNewHeadRetryable(context.Background(), c, retry.NetworkRetryParams())
if err != nil {
return err
}
Expand Down
21 changes: 10 additions & 11 deletions core/chainio/avs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func NewAvsWriterFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, timeToWaitBeforeBump time.Duration, onGasPriceBumped func(*big.Int)) (*types.Receipt, error) {
txOpts := *w.Signer.GetTxOpts()
txOpts.NoSend = true // simulate the transaction
simTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
simTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature, retry.SendToChainRetryParams())
uri-99 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
Expand All @@ -109,7 +109,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
batchMerkleRootHashString := hex.EncodeToString(batchMerkleRoot[:])

respondToTaskV2Func := func() (*types.Receipt, error) {
gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback)
gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback, retry.NetworkRetryParams())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -144,7 +144,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
}
}
w.logger.Infof("Receipts for old transactions not found, will check if the batch state has been responded", "merkle root", batchMerkleRootHashString)
batchState, _ := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash)
batchState, _ := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash, retry.NetworkRetryParams())
if batchState.Responded {
w.logger.Infof("Batch state has been already responded", "merkle root", batchMerkleRootHashString)
return nil, nil
Expand All @@ -163,15 +163,15 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
}

w.logger.Infof("Sending RespondToTask transaction with a gas price of %v", txOpts.GasPrice, "merkle root", batchMerkleRootHashString)
realTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
realTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature, retry.SendToChainRetryParams())
if err != nil {
w.logger.Errorf("Respond to task transaction err, %v", err, "merkle root", batchMerkleRootHashString)
return nil, err
}
sentTxs = append(sentTxs, realTx)

w.logger.Infof("Transaction sent, waiting for receipt", "merkle root", batchMerkleRootHashString)
receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, realTx.Hash(), timeToWaitBeforeBump)
receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, realTx.Hash(), retry.WaitForTxRetryParams(timeToWaitBeforeBump))
if receipt != nil {
w.checkIfAggregatorHadToPaidForBatcher(realTx, batchIdentifierHash)
return receipt, nil
Expand All @@ -191,15 +191,14 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
// This just retries the bump of a fee in case of a timeout
// The wait is done before on WaitForTransactionReceiptRetryable, and all the functions are retriable,
// so this retry doesn't need to wait more time
maxInterval := time.Millisecond * 500
return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, maxInterval, 0)
return retry.RetryWithData(respondToTaskV2Func, retry.RespondToTaskV2())
}

// Calculates the transaction cost from the receipt and compares it with the batcher respondToTaskFeeLimit
// if the tx cost was higher, then it means the aggregator has paid the difference for the batcher (txCost - respondToTaskFeeLimit) and so metrics are updated accordingly.
// otherwise nothing is done.
func (w *AvsWriter) checkIfAggregatorHadToPaidForBatcher(tx *types.Transaction, batchIdentifierHash [32]byte) {
batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash)
batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash, retry.NetworkRetryParams())
if err != nil {
return
}
Expand All @@ -225,7 +224,7 @@ func (w *AvsWriter) checkAggAndBatcherHaveEnoughBalance(tx *types.Transaction, t
txCost := new(big.Int).Mul(txGasAsBigInt, txGasPrice)
w.logger.Info("Transaction cost", "cost", txCost)

batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash)
batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash, retry.NetworkRetryParams())
if err != nil {
w.logger.Error("Failed to get batch state", "error", err)
w.logger.Info("Proceeding to check balances against transaction cost")
Expand Down Expand Up @@ -253,7 +252,7 @@ func (w *AvsWriter) compareAggregatorBalance(amount *big.Int, aggregatorAddress
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

aggregatorBalance, err := w.BalanceAtRetryable(ctx, aggregatorAddress, nil)
aggregatorBalance, err := w.BalanceAtRetryable(ctx, aggregatorAddress, nil, retry.NetworkRetryParams())
if err != nil {
// Ignore and continue.
w.logger.Error("failed to get aggregator balance: %v", err)
Expand All @@ -268,7 +267,7 @@ func (w *AvsWriter) compareAggregatorBalance(amount *big.Int, aggregatorAddress

func (w *AvsWriter) compareBatcherBalance(amount *big.Int, senderAddress [20]byte) error {
// Get batcher balance
batcherBalance, err := w.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress)
batcherBalance, err := w.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress, retry.NetworkRetryParams())
if err != nil {
// Ignore and continue.
w.logger.Error("Failed to get batcherBalance", "error", err)
Expand Down
Loading