diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index d1ebb1f41..4ec6488d0 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -54,7 +54,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t // If that's the case, we won't know about the task at this point // so we make GetTaskIndex retryable, waiting for some seconds, // before trying to fetch the task again from the map. - taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash) + taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash, retry.NetworkRetryParams()) if err != nil { agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum") @@ -114,10 +114,9 @@ func (agg *Aggregator) ServerRunning(_ *struct{}, reply *int64) error { Checks Internal mapping for Signed Task Response, returns its TaskIndex. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec - TODO: We should refactor the retry duration considering extending it to a larger time or number of retries, at least somewhere between 1 and 2 blocks */ -func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint32, error) { +func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte, config *retry.RetryParams) (uint32, error) { getTaskIndex_func := func() (uint32, error) { agg.taskMutex.Lock() taskIndex, ok := agg.batchesIdxByIdentifierHash[batchIdentifierHash] @@ -129,5 +128,5 @@ func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint } } - return retry.RetryWithData(getTaskIndex_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(getTaskIndex_func, config) } diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index ea810da62..c40802058 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -66,15 +66,15 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) // Subscribe to new tasks - sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil) + sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) if err != nil { - s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err) + s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err) return nil, err } - subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil) + subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) if err != nil { - s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err) + s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NetworkNumRetries, "err", err) return nil, err } s.logger.Info("Subscribed to new AlignedLayer V2 tasks") @@ -114,14 +114,14 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema case err := <-sub.Err(): s.logger.Warn("Error in new task subscription", "err", err) sub.Unsubscribe() - sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil) + sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) if err != nil { errorChannel <- err } case err := <-subFallback.Err(): s.logger.Warn("Error in fallback new task subscription", "err", err) subFallback.Unsubscribe() - subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil) + subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) if err != nil { errorChannel <- err } @@ -137,13 +137,13 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) // Subscribe to new tasks - sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil) + sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) if err != nil { s.logger.Error("Primary failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err) return nil, err } - subFallback, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil) + subFallback, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) if err != nil { s.logger.Error("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err) return nil, err @@ -185,14 +185,14 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema case err := <-sub.Err(): s.logger.Warn("Error in new task subscription", "err", err) sub.Unsubscribe() - sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil) + sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams()) if err != nil { errorChannel <- err } case err := <-subFallback.Err(): s.logger.Warn("Error in fallback new task subscription", "err", err) subFallback.Unsubscribe() - subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil) + subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams()) if err != nil { errorChannel <- err } @@ -258,7 +258,7 @@ func (s *AvsSubscriber) processNewBatchV3(batch *servicemanager.ContractAlignedL // getLatestNotRespondedTaskFromEthereum queries the blockchain for the latest not responded task using the FilterNewBatch method. func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, error) { - latestBlock, err := s.BlockNumberRetryable(context.Background()) + latestBlock, err := s.BlockNumberRetryable(context.Background(), retry.NetworkRetryParams()) if err != nil { return nil, err } @@ -271,7 +271,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanag fromBlock = latestBlock - BlockInterval } - logs, err := s.FilterBatchV2Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil) + logs, err := s.FilterBatchV2Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil, retry.NetworkRetryParams()) if err != nil { return nil, err } @@ -293,7 +293,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanag batchIdentifier := append(lastLog.BatchMerkleRoot[:], lastLog.SenderAddress[:]...) batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier)) - state, err := s.BatchesStateRetryable(nil, batchIdentifierHash) + state, err := s.BatchesStateRetryable(nil, batchIdentifierHash, retry.NetworkRetryParams()) if err != nil { return nil, err } @@ -307,7 +307,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanag // getLatestNotRespondedTaskFromEthereum queries the blockchain for the latest not responded task using the FilterNewBatch method. func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) { - latestBlock, err := s.BlockNumberRetryable(context.Background()) + latestBlock, err := s.BlockNumberRetryable(context.Background(), retry.NetworkRetryParams()) if err != nil { return nil, err } @@ -320,7 +320,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanag fromBlock = latestBlock - BlockInterval } - logs, err := s.FilterBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil) + logs, err := s.FilterBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil, retry.NetworkRetryParams()) if err != nil { return nil, err } @@ -342,7 +342,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanag batchIdentifier := append(lastLog.BatchMerkleRoot[:], lastLog.SenderAddress[:]...) batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier)) - state, err := s.BatchesStateRetryable(nil, batchIdentifierHash) + state, err := s.BatchesStateRetryable(nil, batchIdentifierHash, retry.NetworkRetryParams()) if err != nil { return nil, err } @@ -355,7 +355,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanag } func (s *AvsSubscriber) WaitForOneBlock(startBlock uint64) error { - currentBlock, err := s.BlockNumberRetryable(context.Background()) + currentBlock, err := s.BlockNumberRetryable(context.Background(), retry.NetworkRetryParams()) if err != nil { return err } @@ -363,7 +363,7 @@ func (s *AvsSubscriber) WaitForOneBlock(startBlock uint64) error { if currentBlock <= startBlock { // should really be == but just in case // Subscribe to new head c := make(chan *types.Header) - sub, err := s.SubscribeNewHeadRetryable(context.Background(), c) + sub, err := s.SubscribeNewHeadRetryable(context.Background(), c, retry.NetworkRetryParams()) if err != nil { return err } diff --git a/core/chainio/avs_writer.go b/core/chainio/avs_writer.go index 63a3da914..d712f9013 100644 --- a/core/chainio/avs_writer.go +++ b/core/chainio/avs_writer.go @@ -92,7 +92,7 @@ func NewAvsWriterFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, timeToWaitBeforeBump time.Duration, onGasPriceBumped func(*big.Int)) (*types.Receipt, error) { txOpts := *w.Signer.GetTxOpts() txOpts.NoSend = true // simulate the transaction - simTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature) + simTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature, retry.SendToChainRetryParams()) if err != nil { return nil, err } @@ -109,7 +109,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe batchMerkleRootHashString := hex.EncodeToString(batchMerkleRoot[:]) respondToTaskV2Func := func() (*types.Receipt, error) { - gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback) + gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback, retry.NetworkRetryParams()) if err != nil { return nil, err } @@ -144,7 +144,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe } } w.logger.Infof("Receipts for old transactions not found, will check if the batch state has been responded", "merkle root", batchMerkleRootHashString) - batchState, _ := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash) + batchState, _ := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash, retry.NetworkRetryParams()) if batchState.Responded { w.logger.Infof("Batch state has been already responded", "merkle root", batchMerkleRootHashString) return nil, nil @@ -163,7 +163,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe } w.logger.Infof("Sending RespondToTask transaction with a gas price of %v", txOpts.GasPrice, "merkle root", batchMerkleRootHashString) - realTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature) + realTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature, retry.SendToChainRetryParams()) if err != nil { w.logger.Errorf("Respond to task transaction err, %v", err, "merkle root", batchMerkleRootHashString) return nil, err @@ -171,7 +171,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe sentTxs = append(sentTxs, realTx) w.logger.Infof("Transaction sent, waiting for receipt", "merkle root", batchMerkleRootHashString) - receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, realTx.Hash(), timeToWaitBeforeBump) + receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, realTx.Hash(), retry.WaitForTxRetryParams(timeToWaitBeforeBump)) if receipt != nil { w.checkIfAggregatorHadToPaidForBatcher(realTx, batchIdentifierHash) return receipt, nil @@ -191,15 +191,14 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe // This just retries the bump of a fee in case of a timeout // The wait is done before on WaitForTransactionReceiptRetryable, and all the functions are retriable, // so this retry doesn't need to wait more time - maxInterval := time.Millisecond * 500 - return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, maxInterval, 0) + return retry.RetryWithData(respondToTaskV2Func, retry.RespondToTaskV2()) } // Calculates the transaction cost from the receipt and compares it with the batcher respondToTaskFeeLimit // if the tx cost was higher, then it means the aggregator has paid the difference for the batcher (txCost - respondToTaskFeeLimit) and so metrics are updated accordingly. // otherwise nothing is done. func (w *AvsWriter) checkIfAggregatorHadToPaidForBatcher(tx *types.Transaction, batchIdentifierHash [32]byte) { - batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash) + batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash, retry.NetworkRetryParams()) if err != nil { return } @@ -225,7 +224,7 @@ func (w *AvsWriter) checkAggAndBatcherHaveEnoughBalance(tx *types.Transaction, t txCost := new(big.Int).Mul(txGasAsBigInt, txGasPrice) w.logger.Info("Transaction cost", "cost", txCost) - batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash) + batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash, retry.NetworkRetryParams()) if err != nil { w.logger.Error("Failed to get batch state", "error", err) w.logger.Info("Proceeding to check balances against transaction cost") @@ -253,7 +252,7 @@ func (w *AvsWriter) compareAggregatorBalance(amount *big.Int, aggregatorAddress ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - aggregatorBalance, err := w.BalanceAtRetryable(ctx, aggregatorAddress, nil) + aggregatorBalance, err := w.BalanceAtRetryable(ctx, aggregatorAddress, nil, retry.NetworkRetryParams()) if err != nil { // Ignore and continue. w.logger.Error("failed to get aggregator balance: %v", err) @@ -268,7 +267,7 @@ func (w *AvsWriter) compareAggregatorBalance(amount *big.Int, aggregatorAddress func (w *AvsWriter) compareBatcherBalance(amount *big.Int, senderAddress [20]byte) error { // Get batcher balance - batcherBalance, err := w.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress) + batcherBalance, err := w.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress, retry.NetworkRetryParams()) if err != nil { // Ignore and continue. w.logger.Error("Failed to get batcherBalance", "error", err) diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index ad44a2509..bf4724e2f 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -22,7 +22,7 @@ Send a transaction to the AVS contract to respond to a task. - Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks) - NOTE: Contract call reverts are not considered `PermanentError`'s as block reorg's may lead to contract call revert in which case the aggregator should retry. */ -func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkleRoot [32]byte, senderAddress common.Address, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*types.Transaction, error) { +func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkleRoot [32]byte, senderAddress common.Address, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, config *retry.RetryParams) (*types.Transaction, error) { respondToTaskV2_func := func() (*types.Transaction, error) { // Try with main connection tx, err := w.AvsContractBindings.ServiceManager.RespondToTaskV2(opts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature) @@ -33,7 +33,7 @@ func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkl return tx, err } - return retry.RetryWithData(respondToTaskV2_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime) + return retry.RetryWithData(respondToTaskV2_func, config) } /* @@ -42,7 +42,7 @@ Get the state of a batch from the AVS contract. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec */ -func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { +func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte, config *retry.RetryParams) (struct { TaskCreatedBlock uint32 Responded bool RespondToTaskFeeLimit *big.Int @@ -61,7 +61,7 @@ func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (s } return state, err } - return retry.RetryWithData(batchesState_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(batchesState_func, config) } /* @@ -70,7 +70,7 @@ Get the balance of a batcher from the AVS contract. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec */ -func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress common.Address) (*big.Int, error) { +func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress common.Address, config *retry.RetryParams) (*big.Int, error) { batcherBalances_func := func() (*big.Int, error) { // Try with main connection batcherBalance, err := w.AvsContractBindings.ServiceManager.BatchersBalances(opts, senderAddress) @@ -80,7 +80,7 @@ func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress } return batcherBalance, err } - return retry.RetryWithData(batcherBalances_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(batcherBalances_func, config) } /* @@ -91,7 +91,7 @@ TODO: it gets the balance from an Address, not necessarily an aggregator. The na - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ -func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) (*big.Int, error) { +func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int, config *retry.RetryParams) (*big.Int, error) { balanceAt_func := func() (*big.Int, error) { // Try with main connection aggregatorBalance, err := w.Client.BalanceAt(ctx, aggregatorAddress, blockNumber) @@ -101,7 +101,7 @@ func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress co } return aggregatorBalance, err } - return retry.RetryWithData(balanceAt_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(balanceAt_func, config) } // |---AVS_SUBSCRIBER---| @@ -112,7 +112,7 @@ Get the latest block number from Ethereum - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ -func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error) { +func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context, config *retry.RetryParams) (uint64, error) { latestBlock_func := func() (uint64, error) { // Try with main connection latestBlock, err := s.AvsContractBindings.ethClient.BlockNumber(ctx) @@ -122,7 +122,7 @@ func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error } return latestBlock, err } - return retry.RetryWithData(latestBlock_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(latestBlock_func, config) } /* @@ -131,11 +131,11 @@ Get NewBatchV2 logs from the AVS contract. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ -func (s *AvsSubscriber) FilterBatchV2Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { +func (s *AvsSubscriber) FilterBatchV2Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte, config *retry.RetryParams) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { return s.AvsContractBindings.ServiceManager.FilterNewBatchV2(opts, batchMerkleRoot) } - return retry.RetryWithData(filterNewBatchV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(filterNewBatchV2_func, config) } /* @@ -144,11 +144,11 @@ Get NewBatchV3 logs from the AVS contract. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ -func (s *AvsSubscriber) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { +func (s *AvsSubscriber) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte, config *retry.RetryParams) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { return s.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot) } - return retry.RetryWithData(filterNewBatchV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(filterNewBatchV2_func, config) } /* @@ -157,7 +157,7 @@ Get the state of a batch from the AVS contract. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec */ -func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { +func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte, config *retry.RetryParams) (struct { TaskCreatedBlock uint32 Responded bool RespondToTaskFeeLimit *big.Int @@ -170,7 +170,7 @@ func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte return s.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0) } - return retry.RetryWithData(batchState_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(batchState_func, config) } /* @@ -179,7 +179,7 @@ Subscribe to new heads from the Ethereum node. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ -func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- *types.Header) (ethereum.Subscription, error) { +func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- *types.Header, config *retry.RetryParams) (ethereum.Subscription, error) { subscribeNewHead_func := func() (ethereum.Subscription, error) { // Try with main connection sub, err := s.AvsContractBindings.ethClient.SubscribeNewHead(ctx, c) @@ -189,7 +189,7 @@ func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- } return sub, err } - return retry.RetryWithData(subscribeNewHead_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(subscribeNewHead_func, config) } /* @@ -203,11 +203,12 @@ func SubscribeToNewTasksV2Retryable( serviceManager *servicemanager.ContractAlignedLayerServiceManager, newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchMerkleRoot [][32]byte, + config *retry.RetryParams, ) (event.Subscription, error) { subscribe_func := func() (event.Subscription, error) { return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot) } - return retry.RetryWithData(subscribe_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(subscribe_func, config) } /* @@ -221,9 +222,10 @@ func SubscribeToNewTasksV3Retryable( serviceManager *servicemanager.ContractAlignedLayerServiceManager, newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, batchMerkleRoot [][32]byte, + config *retry.RetryParams, ) (event.Subscription, error) { subscribe_func := func() (event.Subscription, error) { return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot) } - return retry.RetryWithData(subscribe_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(subscribe_func, config) } diff --git a/core/retry.go b/core/retry.go index fe872f658..fb39e8d8d 100644 --- a/core/retry.go +++ b/core/retry.go @@ -25,15 +25,83 @@ func (e PermanentError) Is(err error) bool { } const ( - MinDelay = 1 * time.Second // Initial delay for retry interval. - MaxInterval = 60 * time.Second // Maximum interval an individual retry may have. - MaxElapsedTime = 0 * time.Second // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. - RetryFactor float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. - NumRetries uint64 = 3 // Total number of retries attempted. - MinDelayChain = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. - MaxIntervalChain = 2 * time.Minute // Maximum interval for an individual retry. + NetworkInitialInterval = 1 * time.Second // Initial delay for retry interval. + NetworkMaxInterval = 60 * time.Second // Maximum interval an individual retry may have. + NetworkMaxElapsedTime = 0 * time.Second // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. + NetworkRandomizationFactor float64 = 0 // Randomization (Jitter) factor used to map retry interval to a range of values around the computed interval. In precise terms (random value in range [1 - randomizationfactor, 1 + randomizationfactor]). NOTE: This is set to 0 as we do not use jitter in Aligned. + NetworkMultiplier float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. + NetworkNumRetries uint64 = 3 // Total number of retries attempted. + + // Retry Params for Sending Tx to Chain + ChainInitialInterval = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. + ChainMaxInterval = 2 * time.Minute // Maximum interval for an individual retry. + + // Retry Params for WaitForTransactionReceipt in the Fee Bump + WaitForTxMaxInterval = 2 * time.Second // Maximum interval for an individual retry. + WaitForTxNumRetries = 0 // Total number of retries attempted. If 0, retries indefinitely until maxElapsedTime is reached. + + // Retry Parameters for RespondToTaskV2 in the Fee Bump + RespondToTaskV2MaxInterval = time.Millisecond * 500 // Maximum interval for an individual retry. + RespondToTaskV2MaxElapsedTime = 0 // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. + RespondToTaskV2NumRetries uint64 = 0 // Total number of retries attempted. If 0, retries indefinitely until maxElapsedTime is reached. ) +type RetryParams struct { + InitialInterval time.Duration // Initial delay for retry interval. + MaxInterval time.Duration // Maximum interval an individual retry may have. + MaxElapsedTime time.Duration // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. + RandomizationFactor float64 + Multiplier float64 + NumRetries uint64 +} + +func NetworkRetryParams() *RetryParams { + return &RetryParams{ + InitialInterval: NetworkInitialInterval, + MaxInterval: NetworkMaxInterval, + MaxElapsedTime: NetworkMaxElapsedTime, + RandomizationFactor: NetworkRandomizationFactor, + Multiplier: NetworkMultiplier, + NumRetries: NetworkNumRetries, + } +} + +func SendToChainRetryParams() *RetryParams { + return &RetryParams{ + InitialInterval: ChainInitialInterval, + MaxInterval: ChainMaxInterval, + MaxElapsedTime: NetworkMaxElapsedTime, + RandomizationFactor: NetworkRandomizationFactor, + Multiplier: NetworkMultiplier, + NumRetries: NetworkNumRetries, + } +} + +func RespondToTaskV2() *RetryParams { + return &RetryParams{ + InitialInterval: ChainInitialInterval, + MaxInterval: RespondToTaskV2MaxInterval, + MaxElapsedTime: RespondToTaskV2MaxElapsedTime, + RandomizationFactor: NetworkRandomizationFactor, + Multiplier: NetworkMultiplier, + NumRetries: RespondToTaskV2NumRetries, + } +} + +// WaitForTxRetryParams returns the retry parameters for waiting for a transaction to be included in a block. +// maxElapsedTime is received as parameter to allow for a custom timeout +// These parameters are used for the bumping fees logic. +func WaitForTxRetryParams(maxElapsedTime time.Duration) *RetryParams { + return &RetryParams{ + InitialInterval: NetworkInitialInterval, + MaxInterval: WaitForTxMaxInterval, + MaxElapsedTime: maxElapsedTime, + RandomizationFactor: NetworkRandomizationFactor, + Multiplier: NetworkMultiplier, + NumRetries: WaitForTxNumRetries, + } +} + /* Retry and RetryWithData are custom retry functions used in Aligned's aggregator and operator to facilitate consistent retry logic across the system. They are interfaces for around Cenk Alti (https://github.com/cenkalti) backoff library (https://github.com/cenkalti/backoff). We would like to thank him for his great work. @@ -92,7 +160,7 @@ Reference: https://github.com/cenkalti/backoff/blob/v4/exponential.go#L9 */ // Same as Retry only that the functionToRetry can return a value upon correct execution -func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Duration, factor float64, maxTries uint64, maxInterval time.Duration, maxElapsedTime time.Duration) (T, error) { +func RetryWithData[T any](functionToRetry func() (T, error), config *RetryParams) (T, error) { f := func() (T, error) { var ( val T @@ -118,17 +186,16 @@ func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Durat return val, err } - randomOption := backoff.WithRandomizationFactor(0) - - initialRetryOption := backoff.WithInitialInterval(minDelay) - multiplierOption := backoff.WithMultiplier(factor) - maxIntervalOption := backoff.WithMaxInterval(maxInterval) - maxElapsedTimeOption := backoff.WithMaxElapsedTime(maxElapsedTime) + initialRetryOption := backoff.WithInitialInterval(config.InitialInterval) + multiplierOption := backoff.WithMultiplier(config.Multiplier) + maxIntervalOption := backoff.WithMaxInterval(config.MaxInterval) + maxElapsedTimeOption := backoff.WithMaxElapsedTime(config.MaxElapsedTime) + randomOption := backoff.WithRandomizationFactor(config.RandomizationFactor) expBackoff := backoff.NewExponentialBackOff(randomOption, multiplierOption, initialRetryOption, maxIntervalOption, maxElapsedTimeOption) var maxRetriesBackoff backoff.BackOff - if maxTries > 0 { - maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, maxTries) + if config.NumRetries > 0 { + maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.NumRetries) } else { maxRetriesBackoff = expBackoff } @@ -142,7 +209,7 @@ func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Durat // from the configuration are reached, or until a `PermanentError` is returned. // The function to be retried should return `PermanentError` when the condition for stop retrying // is met. -func Retry(functionToRetry func() error, minDelay time.Duration, factor float64, maxTries uint64, maxInterval time.Duration, maxElapsedTime time.Duration) error { +func Retry(functionToRetry func() error, config *RetryParams) error { f := func() error { var err error func() { @@ -165,17 +232,16 @@ func Retry(functionToRetry func() error, minDelay time.Duration, factor float64, return err } - randomOption := backoff.WithRandomizationFactor(0) - - initialRetryOption := backoff.WithInitialInterval(minDelay) - multiplierOption := backoff.WithMultiplier(factor) - maxIntervalOption := backoff.WithMaxInterval(maxInterval) - maxElapsedTimeOption := backoff.WithMaxElapsedTime(maxElapsedTime) + initialRetryOption := backoff.WithInitialInterval(config.InitialInterval) + multiplierOption := backoff.WithMultiplier(config.Multiplier) + maxIntervalOption := backoff.WithMaxInterval(config.MaxInterval) + maxElapsedTimeOption := backoff.WithMaxElapsedTime(config.MaxElapsedTime) + randomOption := backoff.WithRandomizationFactor(config.RandomizationFactor) expBackoff := backoff.NewExponentialBackOff(randomOption, multiplierOption, initialRetryOption, maxIntervalOption, maxElapsedTimeOption) var maxRetriesBackoff backoff.BackOff - if maxTries > 0 { - maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, maxTries) + if config.NumRetries > 0 { + maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.NumRetries) } else { maxRetriesBackoff = expBackoff } diff --git a/core/retry_test.go b/core/retry_test.go index 22d886f2e..9b2f416b0 100644 --- a/core/retry_test.go +++ b/core/retry_test.go @@ -42,7 +42,15 @@ func TestRetryWithData(t *testing.T) { x, err := DummyFunction(43) return &x, err } - _, err := retry.RetryWithData(function, 1000, 2, 3, retry.MaxInterval, retry.MaxElapsedTime) + config := &retry.RetryParams{ + InitialInterval: 1000, + MaxInterval: 2, + MaxElapsedTime: 3, + RandomizationFactor: 0, + Multiplier: retry.NetworkMultiplier, + NumRetries: retry.NetworkNumRetries, + } + _, err := retry.RetryWithData(function, config) if err != nil { t.Errorf("Retry error!: %s", err) } @@ -53,7 +61,15 @@ func TestRetry(t *testing.T) { _, err := DummyFunction(43) return err } - err := retry.Retry(function, 1000, 2, 3, retry.MaxInterval, retry.MaxElapsedTime) + config := &retry.RetryParams{ + InitialInterval: 1000, + MaxInterval: 2, + MaxElapsedTime: 3, + RandomizationFactor: 0, + Multiplier: retry.NetworkMultiplier, + NumRetries: retry.NetworkNumRetries, + } + err := retry.Retry(function, config) if err != nil { t.Errorf("Retry error!: %s", err) } @@ -153,7 +169,7 @@ func TestWaitForTransactionReceipt(t *testing.T) { } // Assert Call succeeds when Anvil running - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45) + _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, retry.SendToChainRetryParams()) assert.NotNil(t, err, "Error Waiting for Transaction with Anvil Running: %s\n", err) if !strings.Contains(err.Error(), "not found") { t.Errorf("WaitForTransactionReceipt Emitted incorrect error: %s\n", err) @@ -165,7 +181,7 @@ func TestWaitForTransactionReceipt(t *testing.T) { return } - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45) + _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, retry.SendToChainRetryParams()) assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("WaitForTransactionReceipt Emitted non Transient error: %s\n", err) @@ -181,7 +197,7 @@ func TestWaitForTransactionReceipt(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45) + _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, retry.SendToChainRetryParams()) assert.NotNil(t, err) if !strings.Contains(err.Error(), "not found") { t.Errorf("WaitForTransactionReceipt Emitted incorrect error: %s\n", err) @@ -294,7 +310,7 @@ func TestSubscribeToNewTasksV3(t *testing.T) { t.Errorf("Error setting up Avs Service Bindings: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -302,7 +318,7 @@ func TestSubscribeToNewTasksV3(t *testing.T) { return } - _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil, retry.NetworkRetryParams()) assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("SubscribeToNewTasksV3 Emitted non Transient error: %s\n", err) @@ -318,7 +334,7 @@ func TestSubscribeToNewTasksV3(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -344,7 +360,7 @@ func TestSubscribeToNewTasksV2(t *testing.T) { t.Errorf("Error setting up Avs Service Bindings: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -352,7 +368,7 @@ func TestSubscribeToNewTasksV2(t *testing.T) { return } - _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil, retry.NetworkRetryParams()) assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("SubscribeToNewTasksV2 Emitted non Transient error: %s\n", err) @@ -368,7 +384,7 @@ func TestSubscribeToNewTasksV2(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -389,7 +405,7 @@ func TestBlockNumber(t *testing.T) { if err != nil { return } - _, err = sub.BlockNumberRetryable(context.Background()) + _, err = sub.BlockNumberRetryable(context.Background(), retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -397,7 +413,7 @@ func TestBlockNumber(t *testing.T) { return } - _, err = sub.BlockNumberRetryable(context.Background()) + _, err = sub.BlockNumberRetryable(context.Background(), retry.NetworkRetryParams()) assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BlockNumber Emitted non Transient error: %s\n", err) @@ -413,7 +429,7 @@ func TestBlockNumber(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = sub.BlockNumberRetryable(context.Background()) + _, err = sub.BlockNumberRetryable(context.Background(), retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -433,7 +449,7 @@ func TestFilterBatchV2(t *testing.T) { if err != nil { return } - _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -441,7 +457,7 @@ func TestFilterBatchV2(t *testing.T) { return } - _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil, retry.NetworkRetryParams()) assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("FilterBatchV2 Emitted non Transient error: %s\n", err) @@ -457,7 +473,7 @@ func TestFilterBatchV2(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -477,7 +493,7 @@ func TestFilterBatchV3(t *testing.T) { if err != nil { return } - _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -485,7 +501,7 @@ func TestFilterBatchV3(t *testing.T) { return } - _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil, retry.NetworkRetryParams()) assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("FilerBatchV3 Emitted non Transient error: %s\n", err) @@ -501,7 +517,7 @@ func TestFilterBatchV3(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -523,7 +539,7 @@ func TestBatchesStateSubscriber(t *testing.T) { } zero_bytes := [32]byte{} - _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes) + _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -531,7 +547,7 @@ func TestBatchesStateSubscriber(t *testing.T) { return } - _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes) + _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes, retry.NetworkRetryParams()) assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BatchesStateSubscriber Emitted non Transient error: %s\n", err) @@ -547,7 +563,7 @@ func TestBatchesStateSubscriber(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes) + _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -569,7 +585,7 @@ func TestSubscribeNewHead(t *testing.T) { return } - _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c) + _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -577,7 +593,7 @@ func TestSubscribeNewHead(t *testing.T) { return } - _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c) + _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c, retry.NetworkRetryParams()) assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("SubscribeNewHead Emitted non Transient error: %s\n", err) @@ -593,7 +609,7 @@ func TestSubscribeNewHead(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c) + _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -646,7 +662,7 @@ func TestRespondToTaskV2(t *testing.T) { zero_bytes := [32]byte{} // NOTE: With zero bytes the tx reverts - _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature, retry.SendToChainRetryParams()) assert.NotNil(t, err) if !strings.Contains(err.Error(), "execution reverted") { t.Errorf("RespondToTaskV2 did not emit the expected message: %q doesn't contain %q", err.Error(), "execution reverted: custom error 0x2396d34e:") @@ -656,7 +672,7 @@ func TestRespondToTaskV2(t *testing.T) { t.Errorf("Error killing process: %v\n", err) } - _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature, retry.SendToChainRetryParams()) assert.NotNil(t, err) if _, ok := err.(*backoff.PermanentError); ok { t.Errorf("RespondToTaskV2 Emitted non-Transient error: %s\n", err) @@ -671,7 +687,7 @@ func TestRespondToTaskV2(t *testing.T) { } // NOTE: With zero bytes the tx reverts - _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature, retry.SendToChainRetryParams()) assert.NotNil(t, err) if !strings.Contains(err.Error(), "execution reverted") { t.Errorf("RespondToTaskV2 did not emit the expected message: %q doesn't contain %q", err.Error(), "execution reverted: custom error 0x2396d34e:") @@ -699,7 +715,7 @@ func TestBatchesStateWriter(t *testing.T) { var bytes [32]byte num.FillBytes(bytes[:]) - _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes) + _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -707,7 +723,8 @@ func TestBatchesStateWriter(t *testing.T) { return } - _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes) + _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes, retry.NetworkRetryParams()) + assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BatchesStateWriter Emitted non-Transient error: %s\n", err) @@ -723,7 +740,7 @@ func TestBatchesStateWriter(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes) + _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -746,7 +763,7 @@ func TestBalanceAt(t *testing.T) { aggregator_address := common.HexToAddress("0x0") blockHeight := big.NewInt(22) - _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight) + _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -754,7 +771,7 @@ func TestBalanceAt(t *testing.T) { return } - _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight) + _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight, retry.NetworkRetryParams()) assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BalanceAt Emitted non-Transient error: %s\n", err) @@ -770,7 +787,7 @@ func TestBalanceAt(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight) + _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -792,7 +809,7 @@ func TestBatchersBalances(t *testing.T) { } senderAddress := common.HexToAddress("0x0") - _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress) + _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -800,7 +817,7 @@ func TestBatchersBalances(t *testing.T) { return } - _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress) + _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress, retry.NetworkRetryParams()) assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BatchersBalances Emitted non-Transient error: %s\n", err) @@ -816,7 +833,7 @@ func TestBatchersBalances(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress) + _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress, retry.NetworkRetryParams()) assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { diff --git a/core/utils/eth_client_utils.go b/core/utils/eth_client_utils.go index 36b28f036..bfbcc676f 100644 --- a/core/utils/eth_client_utils.go +++ b/core/utils/eth_client_utils.go @@ -3,7 +3,6 @@ package utils import ( "context" "math/big" - "time" "github.com/Layr-Labs/eigensdk-go/chainio/clients/eth" eigentypes "github.com/Layr-Labs/eigensdk-go/types" @@ -20,7 +19,7 @@ import ( // Setting a higher value will imply doing less retries across the waitTimeout, and so we might lose the receipt // All errors are considered Transient Errors // - Retry times: 0.5s, 1s, 2s, 2s, 2s, ... until it reaches waitTimeout -func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, waitTimeout time.Duration) (*types.Receipt, error) { +func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, config *retry.RetryParams) (*types.Receipt, error) { receipt_func := func() (*types.Receipt, error) { receipt, err := client.TransactionReceipt(context.Background(), txHash) if err != nil { @@ -32,7 +31,7 @@ func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackC } return receipt, nil } - return retry.RetryWithData(receipt_func, retry.MinDelay, retry.RetryFactor, 0, time.Second*2, waitTimeout) + return retry.RetryWithData(receipt_func, config) } func BytesToQuorumNumbers(quorumNumbersBytes []byte) eigentypes.QuorumNums { @@ -87,7 +86,7 @@ Get the gas price from the client with retry logic. - All errors are considered Transient Errors - Retry times: 1 sec, 2 sec, 4 sec */ -func GetGasPriceRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient) (*big.Int, error) { +func GetGasPriceRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, config *retry.RetryParams) (*big.Int, error) { respondToTaskV2_func := func() (*big.Int, error) { gasPrice, err := client.SuggestGasPrice(context.Background()) if err != nil { @@ -99,5 +98,5 @@ func GetGasPriceRetryable(client eth.InstrumentedClient, fallbackClient eth.Inst return gasPrice, nil } - return retry.RetryWithData(respondToTaskV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(respondToTaskV2_func, config) }