diff --git a/.github/workflows/test-go-retries.yml b/.github/workflows/test-go-retries.yml deleted file mode 100644 index 175ea1067..000000000 --- a/.github/workflows/test-go-retries.yml +++ /dev/null @@ -1,36 +0,0 @@ -name: test-go-retries - -on: - push: - branches: [main] - pull_request: - branches: ["*"] - paths: - - 'core/**' - - '.github/workflows/test-go-retries.yml' - -jobs: - test: - runs-on: ubuntu-latest - steps: - - name: Clear device space - run: | - sudo rm -rf "$AGENT_TOOLSDIRECTORY" - sudo rm -rf /usr/local/lib/android - sudo rm -rf /opt/ghc - sudo rm -rf /usr/local/.ghcup - sudo rm -rf /usr/share/dotnet - sudo rm -rf /opt/ghc - sudo rm -rf "/usr/local/share/boost" - - uses: actions/checkout@v4 - - uses: actions/setup-go@v5 - with: - go-version: '1.22' - cache: false - - uses: actions-rs/toolchain@v1 - with: - toolchain: stable - - name: foundry-toolchain - uses: foundry-rs/foundry-toolchain@v1.2.0 - - name: Test go Retry Functions - run: make test_go_retries \ No newline at end of file diff --git a/aggregator/pkg/aggregator.go b/aggregator/pkg/aggregator.go index cc3bb3644..66b687722 100644 --- a/aggregator/pkg/aggregator.go +++ b/aggregator/pkg/aggregator.go @@ -5,14 +5,12 @@ import ( "encoding/hex" "fmt" "math/big" - "strings" "sync" "time" gethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/prometheus/client_golang/prometheus" - retry "github.com/yetanotherco/aligned_layer/core" "github.com/yetanotherco/aligned_layer/metrics" sdkclients "github.com/Layr-Labs/eigensdk-go/chainio/clients" @@ -396,7 +394,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by quorumNums := eigentypes.QuorumNums{eigentypes.QuorumNum(QUORUM_NUMBER)} quorumThresholdPercentages := eigentypes.QuorumThresholdPercentages{eigentypes.QuorumThresholdPercentage(QUORUM_THRESHOLD)} - err := agg.InitializeNewTaskRetryable(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, agg.AggregatorConfig.Aggregator.BlsServiceTaskTimeout) + err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, agg.AggregatorConfig.Aggregator.BlsServiceTaskTimeout) if err != nil { agg.logger.Fatalf("BLS aggregation service error when initializing new task: %s", err) } @@ -409,30 +407,6 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by // |---RETRYABLE---| -/* -InitializeNewTaskRetryable -Initialize a new task in the BLS Aggregation service - - Errors: - Permanent: - - TaskAlreadyInitializedError (Permanent): Task is already intialized in the BLS Aggregation service (https://github.com/Layr-Labs/eigensdk-go/blob/dev/services/bls_aggregation/blsagg.go#L27). - Transient: - - All others. - - Retry times (3 retries): 1 sec, 2 sec, 4 sec -*/ -func (agg *Aggregator) InitializeNewTaskRetryable(batchIndex uint32, taskCreatedBlock uint32, quorumNums eigentypes.QuorumNums, quorumThresholdPercentages eigentypes.QuorumThresholdPercentages, timeToExpiry time.Duration) error { - initializeNewTask_func := func() error { - err := agg.blsAggregationService.InitializeNewTask(batchIndex, taskCreatedBlock, quorumNums, quorumThresholdPercentages, timeToExpiry) - if err != nil { - // Task is already initialized - if strings.Contains(err.Error(), "already initialized") { - err = retry.PermanentError{Inner: err} - } - } - return err - } - return retry.Retry(initializeNewTask_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) -} - // Long-lived goroutine that periodically checks and removes old Tasks from stored Maps // It runs every GarbageCollectorPeriod and removes all tasks older than GarbageCollectorTasksAge // This was added because each task occupies memory in the maps, and we need to free it to avoid a memory leak diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index 1eccfeb78..02bbccd29 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -50,7 +50,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t "operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:])) taskIndex := uint32(0) - taskIndex, err := agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash) + taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash) if err != nil { agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum") @@ -106,17 +106,25 @@ func (agg *Aggregator) ServerRunning(_ *struct{}, reply *int64) error { return nil } -func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error) { +func GetTaskIndexFunc(agg *Aggregator, batchIdentifierHash [32]byte) func() (uint32, error) { getTaskIndex_func := func() (uint32, error) { agg.taskMutex.Lock() taskIndex, ok := agg.batchesIdxByIdentifierHash[batchIdentifierHash] agg.taskMutex.Unlock() if !ok { - return taskIndex, fmt.Errorf("Task not found in the internal map") + return taskIndex, fmt.Errorf("task not found in the internal map") } else { return taskIndex, nil } } + return getTaskIndex_func +} - return retry.RetryWithData(getTaskIndex_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) +// Checks Internal mapping for Signed Task Response, returns its TaskIndex. +/* +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec +*/ +func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint32, error) { + return retry.RetryWithData(GetTaskIndexFunc(agg, batchIdentifierHash), retry.EthCallRetryConfig()) } diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index ea810da62..26eefe046 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -68,13 +68,13 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema // Subscribe to new tasks sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil) if err != nil { - s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err) + s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.EthCallNumRetries, "err", err) return nil, err } subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil) if err != nil { - s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err) + s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.EthCallNumRetries, "err", err) return nil, err } s.logger.Info("Subscribed to new AlignedLayer V2 tasks") diff --git a/core/chainio/avs_writer.go b/core/chainio/avs_writer.go index 63a3da914..fbdc0b66d 100644 --- a/core/chainio/avs_writer.go +++ b/core/chainio/avs_writer.go @@ -22,6 +22,14 @@ import ( "github.com/yetanotherco/aligned_layer/metrics" ) +const ( + waitForTxMaxInterval = 2 * time.Second + waitForTxNumRetries = 0 + respondToTaskV2NumRetries uint64 = 0 + respondToTaskV2MaxInterval = time.Millisecond * 500 + respondToTaskV2MaxElapsedTime = 0 +) + type AvsWriter struct { *avsregistry.ChainWriter AvsContractBindings *AvsServiceBindings @@ -104,6 +112,18 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe txOpts.NoSend = false i := 0 + // Set Retry config for RespondToTaskV2 + respondToTaskV2Config := retry.EthCallRetryConfig() + respondToTaskV2Config.NumRetries = respondToTaskV2NumRetries + respondToTaskV2Config.MaxInterval = respondToTaskV2MaxInterval + respondToTaskV2Config.MaxElapsedTime = respondToTaskV2MaxElapsedTime + + // Set Retry config for WaitForTxRetryable + waitForTxConfig := retry.EthCallRetryConfig() + waitForTxConfig.MaxInterval = waitForTxMaxInterval + waitForTxConfig.NumRetries = waitForTxNumRetries + waitForTxConfig.MaxElapsedTime = timeToWaitBeforeBump + var sentTxs []*types.Transaction batchMerkleRootHashString := hex.EncodeToString(batchMerkleRoot[:]) @@ -171,7 +191,8 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe sentTxs = append(sentTxs, realTx) w.logger.Infof("Transaction sent, waiting for receipt", "merkle root", batchMerkleRootHashString) - receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, realTx.Hash(), timeToWaitBeforeBump) + receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, realTx.Hash(), waitForTxConfig) + if receipt != nil { w.checkIfAggregatorHadToPaidForBatcher(realTx, batchIdentifierHash) return receipt, nil @@ -191,8 +212,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe // This just retries the bump of a fee in case of a timeout // The wait is done before on WaitForTransactionReceiptRetryable, and all the functions are retriable, // so this retry doesn't need to wait more time - maxInterval := time.Millisecond * 500 - return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, maxInterval, 0) + return retry.RetryWithData(respondToTaskV2Func, respondToTaskV2Config) } // Calculates the transaction cost from the receipt and compares it with the batcher respondToTaskFeeLimit diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index ad44a2509..8f93100f4 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -15,14 +15,7 @@ import ( // |---AVS_WRITER---| -/* -RespondToTaskV2Retryable -Send a transaction to the AVS contract to respond to a task. -- All errors are considered Transient Errors -- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks) -- NOTE: Contract call reverts are not considered `PermanentError`'s as block reorg's may lead to contract call revert in which case the aggregator should retry. -*/ -func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkleRoot [32]byte, senderAddress common.Address, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*types.Transaction, error) { +func RespondToTaskV2Func(w *AvsWriter, opts *bind.TransactOpts, batchMerkleRoot [32]byte, senderAddress common.Address, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) func() (*types.Transaction, error) { respondToTaskV2_func := func() (*types.Transaction, error) { // Try with main connection tx, err := w.AvsContractBindings.ServiceManager.RespondToTaskV2(opts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature) @@ -33,21 +26,25 @@ func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkl return tx, err } - return retry.RetryWithData(respondToTaskV2_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime) + return respondToTaskV2_func } /* -BatchesStateRetryable -Get the state of a batch from the AVS contract. +RespondToTaskV2Retryable +Send a transaction to the AVS contract to respond to a task. - All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec +- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks) +- NOTE: Contract call reverts are not considered `PermanentError`'s as block reorg's may lead to contract call revert in which case the aggregator should retry. */ -func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { +func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkleRoot [32]byte, senderAddress common.Address, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*types.Transaction, error) { + return retry.RetryWithData(RespondToTaskV2Func(w, opts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature), retry.ChainRetryConfig()) +} + +func BatchesStateFunc(w *AvsWriter, opts *bind.CallOpts, arg0 [32]byte) func() (struct { TaskCreatedBlock uint32 Responded bool RespondToTaskFeeLimit *big.Int }, error) { - batchesState_func := func() (struct { TaskCreatedBlock uint32 Responded bool @@ -61,16 +58,24 @@ func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (s } return state, err } - return retry.RetryWithData(batchesState_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return batchesState_func } /* -BatcherBalancesRetryable -Get the balance of a batcher from the AVS contract. +BatchesStateRetryable +Get the state of a batch from the AVS contract. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec */ -func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress common.Address) (*big.Int, error) { +func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { + TaskCreatedBlock uint32 + Responded bool + RespondToTaskFeeLimit *big.Int +}, error) { + return retry.RetryWithData(BatchesStateFunc(w, opts, arg0), retry.EthCallRetryConfig()) +} + +func BatcherBalancesFunc(w *AvsWriter, opts *bind.CallOpts, senderAddress common.Address) func() (*big.Int, error) { batcherBalances_func := func() (*big.Int, error) { // Try with main connection batcherBalance, err := w.AvsContractBindings.ServiceManager.BatchersBalances(opts, senderAddress) @@ -80,18 +85,20 @@ func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress } return batcherBalance, err } - return retry.RetryWithData(batcherBalances_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return batcherBalances_func } /* -BalanceAtRetryable -Get the balance of aggregatorAddress at blockNumber. -If blockNumber is nil, it gets the latest balance. -TODO: it gets the balance from an Address, not necessarily an aggregator. The name of the parameter should be changed. +BatcherBalancesRetryable +Get the balance of a batcher from the AVS contract. - All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +- Retry times (3 retries): 1 sec, 2 sec, 4 sec */ -func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) (*big.Int, error) { +func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress common.Address) (*big.Int, error) { + return retry.RetryWithData(BatcherBalancesFunc(w, opts, senderAddress), retry.EthCallRetryConfig()) +} + +func BalanceAtFunc(w *AvsWriter, ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) func() (*big.Int, error) { balanceAt_func := func() (*big.Int, error) { // Try with main connection aggregatorBalance, err := w.Client.BalanceAt(ctx, aggregatorAddress, blockNumber) @@ -101,18 +108,24 @@ func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress co } return aggregatorBalance, err } - return retry.RetryWithData(balanceAt_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return balanceAt_func } -// |---AVS_SUBSCRIBER---| - /* -BlockNumberRetryable -Get the latest block number from Ethereum +BalanceAtRetryable +Get the balance of aggregatorAddress at blockNumber. +If blockNumber is nil, it gets the latest balance. +TODO: it gets the balance from an Address, not necessarily an aggregator. The name of the parameter should be changed. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ -func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error) { +func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) (*big.Int, error) { + return retry.RetryWithData(BalanceAtFunc(w, ctx, aggregatorAddress, blockNumber), retry.EthCallRetryConfig()) +} + +// |---AVS_SUBSCRIBER---| + +func BlockNumberFunc(s *AvsSubscriber, ctx context.Context) func() (uint64, error) { latestBlock_func := func() (uint64, error) { // Try with main connection latestBlock, err := s.AvsContractBindings.ethClient.BlockNumber(ctx) @@ -122,7 +135,28 @@ func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error } return latestBlock, err } - return retry.RetryWithData(latestBlock_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return latestBlock_func +} + +/* +BlockNumberRetryable +Get the latest block number from Ethereum +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +*/ +func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error) { + return retry.RetryWithData(BlockNumberFunc(s, ctx), retry.EthCallRetryConfig()) +} + +func FilterBatchV2Func(s *AvsSubscriber, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { + filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { + logs, err := s.AvsContractBindings.ServiceManager.FilterNewBatchV2(opts, batchMerkleRoot) + if err != nil { + logs, err = s.AvsContractBindings.ServiceManagerFallback.FilterNewBatchV2(opts, batchMerkleRoot) + } + return logs, err + } + return filterNewBatchV2_func } /* @@ -132,10 +166,18 @@ Get NewBatchV2 logs from the AVS contract. - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ func (s *AvsSubscriber) FilterBatchV2Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { - filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { - return s.AvsContractBindings.ServiceManager.FilterNewBatchV2(opts, batchMerkleRoot) + return retry.RetryWithData(FilterBatchV2Func(s, opts, batchMerkleRoot), retry.EthCallRetryConfig()) +} + +func FilterBatchV3Func(s *AvsSubscriber, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + filterNewBatchV3_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + logs, err := s.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot) + if err != nil { + logs, err = s.AvsContractBindings.ServiceManagerFallback.FilterNewBatchV3(opts, batchMerkleRoot) + } + return logs, err } - return retry.RetryWithData(filterNewBatchV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return filterNewBatchV3_func } /* @@ -145,19 +187,10 @@ Get NewBatchV3 logs from the AVS contract. - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ func (s *AvsSubscriber) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { - filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { - return s.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot) - } - return retry.RetryWithData(filterNewBatchV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(FilterBatchV3Func(s, opts, batchMerkleRoot), retry.EthCallRetryConfig()) } -/* -BatchesStateRetryable -Get the state of a batch from the AVS contract. -- All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec -*/ -func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { +func BatchStateFunc(s *AvsSubscriber, opts *bind.CallOpts, arg0 [32]byte) func() (struct { TaskCreatedBlock uint32 Responded bool RespondToTaskFeeLimit *big.Int @@ -167,19 +200,31 @@ func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte Responded bool RespondToTaskFeeLimit *big.Int }, error) { - return s.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0) + state, err := s.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0) + if err != nil { + state, err = s.AvsContractBindings.ServiceManagerFallback.BatchesState(opts, arg0) + } + return state, err } - - return retry.RetryWithData(batchState_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return batchState_func } /* -SubscribeNewHeadRetryable -Subscribe to new heads from the Ethereum node. +BatchesStateRetryable +Get the state of a batch from the AVS contract. - All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +- Retry times (3 retries): 1 sec, 2 sec, 4 sec */ -func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- *types.Header) (ethereum.Subscription, error) { +func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { + TaskCreatedBlock uint32 + Responded bool + RespondToTaskFeeLimit *big.Int +}, error) { + + return retry.RetryWithData(BatchStateFunc(s, opts, arg0), retry.EthCallRetryConfig()) +} + +func SubscribeNewHeadFunc(s *AvsSubscriber, ctx context.Context, c chan<- *types.Header) func() (ethereum.Subscription, error) { subscribeNewHead_func := func() (ethereum.Subscription, error) { // Try with main connection sub, err := s.AvsContractBindings.ethClient.SubscribeNewHead(ctx, c) @@ -189,7 +234,29 @@ func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- } return sub, err } - return retry.RetryWithData(subscribeNewHead_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return subscribeNewHead_func +} + +/* +SubscribeNewHeadRetryable +Subscribe to new heads from the Ethereum node. +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +*/ +func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- *types.Header) (ethereum.Subscription, error) { + return retry.RetryWithData(SubscribeNewHeadFunc(s, ctx, c), retry.EthCallRetryConfig()) +} + +func SubscribeToNewTasksV2Func( + opts *bind.WatchOpts, + serviceManager *servicemanager.ContractAlignedLayerServiceManager, + newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, + batchMerkleRoot [][32]byte, +) func() (event.Subscription, error) { + subscribe_func := func() (event.Subscription, error) { + return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot) + } + return subscribe_func } /* @@ -204,10 +271,19 @@ func SubscribeToNewTasksV2Retryable( newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchMerkleRoot [][32]byte, ) (event.Subscription, error) { + return retry.RetryWithData(SubscribeToNewTasksV2Func(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.EthCallRetryConfig()) +} + +func SubscribeToNewTasksV3Func( + opts *bind.WatchOpts, + serviceManager *servicemanager.ContractAlignedLayerServiceManager, + newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, + batchMerkleRoot [][32]byte, +) func() (event.Subscription, error) { subscribe_func := func() (event.Subscription, error) { - return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot) + return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot) } - return retry.RetryWithData(subscribe_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return subscribe_func } /* @@ -222,8 +298,5 @@ func SubscribeToNewTasksV3Retryable( newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, batchMerkleRoot [][32]byte, ) (event.Subscription, error) { - subscribe_func := func() (event.Subscription, error) { - return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot) - } - return retry.RetryWithData(subscribe_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(SubscribeToNewTasksV3Func(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.EthCallRetryConfig()) } diff --git a/core/retry.go b/core/retry.go index fe872f658..1d462c3d0 100644 --- a/core/retry.go +++ b/core/retry.go @@ -25,15 +25,47 @@ func (e PermanentError) Is(err error) bool { } const ( - MinDelay = 1 * time.Second // Initial delay for retry interval. - MaxInterval = 60 * time.Second // Maximum interval an individual retry may have. - MaxElapsedTime = 0 * time.Second // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. - RetryFactor float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. - NumRetries uint64 = 3 // Total number of retries attempted. - MinDelayChain = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. - MaxIntervalChain = 2 * time.Minute // Maximum interval for an individual retry. + EthCallInitialInterval = 1 * time.Second // Initial delay for retry interval. + EthCallMaxInterval = 60 * time.Second // Maximum interval an individual retry may have. + EthCallMaxElapsedTime = 0 * time.Second // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. + EthCallRandomizationFactor float64 = 0 // Randomization (Jitter) factor used to map retry interval to a range of values around the computed interval. In precise terms (random value in range [1 - randomizationfactor, 1 + randomizationfactor]). NOTE: This is set to 0 as we do not use jitter in Aligned. + EthCallMultiplier float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. + EthCallNumRetries uint64 = 3 // Total number of retries attempted. + ChainInitialInterval = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. + ChainMaxInterval = 2 * time.Minute // Maximum interval for an individual retry. ) +type RetryConfig struct { + InitialInterval time.Duration // Initial delay for retry interval. + MaxInterval time.Duration // Maximum interval an individual retry may have. + MaxElapsedTime time.Duration // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. + RandomizationFactor float64 + Multiplier float64 + NumRetries uint64 +} + +func EthCallRetryConfig() *RetryConfig { + return &RetryConfig{ + InitialInterval: EthCallInitialInterval, + MaxInterval: EthCallMaxInterval, + MaxElapsedTime: EthCallMaxElapsedTime, + RandomizationFactor: EthCallRandomizationFactor, + Multiplier: EthCallMultiplier, + NumRetries: EthCallNumRetries, + } +} + +func ChainRetryConfig() *RetryConfig { + return &RetryConfig{ + InitialInterval: ChainInitialInterval, + MaxInterval: ChainMaxInterval, + MaxElapsedTime: EthCallMaxElapsedTime, + RandomizationFactor: EthCallRandomizationFactor, + Multiplier: EthCallMultiplier, + NumRetries: EthCallNumRetries, + } +} + /* Retry and RetryWithData are custom retry functions used in Aligned's aggregator and operator to facilitate consistent retry logic across the system. They are interfaces for around Cenk Alti (https://github.com/cenkalti) backoff library (https://github.com/cenkalti/backoff). We would like to thank him for his great work. @@ -92,7 +124,7 @@ Reference: https://github.com/cenkalti/backoff/blob/v4/exponential.go#L9 */ // Same as Retry only that the functionToRetry can return a value upon correct execution -func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Duration, factor float64, maxTries uint64, maxInterval time.Duration, maxElapsedTime time.Duration) (T, error) { +func RetryWithData[T any](functionToRetry func() (T, error), config *RetryConfig) (T, error) { f := func() (T, error) { var ( val T @@ -120,15 +152,15 @@ func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Durat randomOption := backoff.WithRandomizationFactor(0) - initialRetryOption := backoff.WithInitialInterval(minDelay) - multiplierOption := backoff.WithMultiplier(factor) - maxIntervalOption := backoff.WithMaxInterval(maxInterval) - maxElapsedTimeOption := backoff.WithMaxElapsedTime(maxElapsedTime) + initialRetryOption := backoff.WithInitialInterval(config.InitialInterval) + multiplierOption := backoff.WithMultiplier(config.Multiplier) + maxIntervalOption := backoff.WithMaxInterval(config.MaxInterval) + maxElapsedTimeOption := backoff.WithMaxElapsedTime(config.MaxElapsedTime) expBackoff := backoff.NewExponentialBackOff(randomOption, multiplierOption, initialRetryOption, maxIntervalOption, maxElapsedTimeOption) var maxRetriesBackoff backoff.BackOff - if maxTries > 0 { - maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, maxTries) + if config.NumRetries > 0 { + maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.NumRetries) } else { maxRetriesBackoff = expBackoff } @@ -142,7 +174,7 @@ func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Durat // from the configuration are reached, or until a `PermanentError` is returned. // The function to be retried should return `PermanentError` when the condition for stop retrying // is met. -func Retry(functionToRetry func() error, minDelay time.Duration, factor float64, maxTries uint64, maxInterval time.Duration, maxElapsedTime time.Duration) error { +func Retry(functionToRetry func() error, config *RetryConfig) error { f := func() error { var err error func() { @@ -167,15 +199,15 @@ func Retry(functionToRetry func() error, minDelay time.Duration, factor float64, randomOption := backoff.WithRandomizationFactor(0) - initialRetryOption := backoff.WithInitialInterval(minDelay) - multiplierOption := backoff.WithMultiplier(factor) - maxIntervalOption := backoff.WithMaxInterval(maxInterval) - maxElapsedTimeOption := backoff.WithMaxElapsedTime(maxElapsedTime) + initialRetryOption := backoff.WithInitialInterval(config.InitialInterval) + multiplierOption := backoff.WithMultiplier(config.Multiplier) + maxIntervalOption := backoff.WithMaxInterval(config.MaxInterval) + maxElapsedTimeOption := backoff.WithMaxElapsedTime(config.MaxElapsedTime) expBackoff := backoff.NewExponentialBackOff(randomOption, multiplierOption, initialRetryOption, maxIntervalOption, maxElapsedTimeOption) var maxRetriesBackoff backoff.BackOff - if maxTries > 0 { - maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, maxTries) + if config.NumRetries > 0 { + maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.NumRetries) } else { maxRetriesBackoff = expBackoff } diff --git a/core/retry_test.go b/core/retry_test.go index 4a725fb6a..b7cca8acd 100644 --- a/core/retry_test.go +++ b/core/retry_test.go @@ -42,7 +42,16 @@ func TestRetryWithData(t *testing.T) { x, err := DummyFunction(43) return &x, err } - _, err := retry.RetryWithData(function, 1000, 2, 3, retry.MaxInterval, retry.MaxElapsedTime) + + config := &retry.RetryConfig{ + InitialInterval: 1000, + MaxInterval: 2, + MaxElapsedTime: 3, + RandomizationFactor: 0, + Multiplier: retry.EthCallMultiplier, + NumRetries: retry.EthCallNumRetries, + } + _, err := retry.RetryWithData(function, config) if err != nil { t.Errorf("Retry error!: %s", err) } @@ -53,7 +62,15 @@ func TestRetry(t *testing.T) { _, err := DummyFunction(43) return err } - err := retry.Retry(function, 1000, 2, 3, retry.MaxInterval, retry.MaxElapsedTime) + config := &retry.RetryConfig{ + InitialInterval: 1000, + MaxInterval: 2, + MaxElapsedTime: 3, + RandomizationFactor: 0, + Multiplier: retry.EthCallMultiplier, + NumRetries: retry.EthCallNumRetries, + } + err := retry.Retry(function, config) if err != nil { t.Errorf("Retry error!: %s", err) } @@ -153,7 +170,8 @@ func TestWaitForTransactionReceipt(t *testing.T) { } // Assert Call succeeds when Anvil running - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45) + receipt_function := utils.WaitForTransactionReceiptFunc(*client, *client, hash, retry.EthCallRetryConfig()) + _, err = receipt_function() assert.NotNil(t, err, "Error Waiting for Transaction with Anvil Running: %s\n", err) if !strings.Contains(err.Error(), "not found") { t.Errorf("WaitForTransactionReceipt Emitted incorrect error: %s\n", err) @@ -165,7 +183,8 @@ func TestWaitForTransactionReceipt(t *testing.T) { return } - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45) + receipt_function = utils.WaitForTransactionReceiptFunc(*client, *client, hash, retry.EthCallRetryConfig()) + _, err = receipt_function() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("WaitForTransactionReceipt Emitted non Transient error: %s\n", err) @@ -181,7 +200,8 @@ func TestWaitForTransactionReceipt(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45) + receipt_function = utils.WaitForTransactionReceiptFunc(*client, *client, hash, retry.EthCallRetryConfig()) + _, err = receipt_function() assert.NotNil(t, err) if !strings.Contains(err.Error(), "not found") { t.Errorf("WaitForTransactionReceipt Emitted incorrect error: %s\n", err) @@ -194,6 +214,50 @@ func TestWaitForTransactionReceipt(t *testing.T) { } } +func TestGetGasPrice(t *testing.T) { + + cmd, client, err := SetupAnvil(8545) + if err != nil { + t.Errorf("Error setting up Anvil: %s\n", err) + } + + // Assert Call succeeds when Anvil running + gas_function := utils.GetGasPriceFunc(*client, *client) + _, err = gas_function() + assert.Nil(t, err, "Error Waiting for Transaction with Anvil Running: %s\n", err) + + if err = cmd.Process.Kill(); err != nil { + t.Errorf("Error killing process: %v\n", err) + return + } + + gas_function = utils.GetGasPriceFunc(*client, *client) + _, err = gas_function() + assert.NotNil(t, err) + if _, ok := err.(retry.PermanentError); ok { + t.Errorf("WaitForTransactionReceipt Emitted non Transient error: %s\n", err) + return + } + if !strings.Contains(err.Error(), "connect: connection refused") { + t.Errorf("WaitForTransactionReceipt Emitted non Transient error: %s\n", err) + return + } + + cmd, client, err = SetupAnvil(8545) + if err != nil { + t.Errorf("Error setting up Anvil: %s\n", err) + } + + gas_function = utils.GetGasPriceFunc(*client, *client) + _, err = gas_function() + assert.Nil(t, err) + + if err := cmd.Process.Kill(); err != nil { + t.Errorf("Error killing process: %v\n", err) + return + } +} + // NOTE: The following tests involving starting the aggregator panic after the connection to anvil is cut crashing the test runner. // The originates within the eigen-sdk and as of 8/11/24 is currently working to be fixed. @@ -339,7 +403,8 @@ func TestSubscribeToNewTasksV3(t *testing.T) { t.Errorf("Error setting up Avs Service Bindings: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func := chainio.SubscribeToNewTasksV3Func(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -347,13 +412,14 @@ func TestSubscribeToNewTasksV3(t *testing.T) { return } - _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func = chainio.SubscribeToNewTasksV3Func(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("SubscribeToNewTasksV3 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection reset") { t.Errorf("SubscribeToNewTasksV3 Emitted non Transient error: %s\n", err) return } @@ -363,7 +429,8 @@ func TestSubscribeToNewTasksV3(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func = chainio.SubscribeToNewTasksV3Func(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -389,7 +456,8 @@ func TestSubscribeToNewTasksV2(t *testing.T) { t.Errorf("Error setting up Avs Service Bindings: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func := chainio.SubscribeToNewTasksV2Func(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -397,13 +465,15 @@ func TestSubscribeToNewTasksV2(t *testing.T) { return } - _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func = chainio.SubscribeToNewTasksV2Func(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() + assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("SubscribeToNewTasksV2 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection reset") { t.Errorf("SubscribeToNewTasksV2 Emitted non Transient error: %s\n", err) return } @@ -413,7 +483,8 @@ func TestSubscribeToNewTasksV2(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func = chainio.SubscribeToNewTasksV2Func(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -434,7 +505,8 @@ func TestBlockNumber(t *testing.T) { if err != nil { return } - _, err = sub.BlockNumberRetryable(context.Background()) + block_func := chainio.BlockNumberFunc(sub, context.Background()) + _, err = block_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -442,7 +514,8 @@ func TestBlockNumber(t *testing.T) { return } - _, err = sub.BlockNumberRetryable(context.Background()) + block_func = chainio.BlockNumberFunc(sub, context.Background()) + _, err = block_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BlockNumber Emitted non Transient error: %s\n", err) @@ -458,7 +531,8 @@ func TestBlockNumber(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = sub.BlockNumberRetryable(context.Background()) + block_func = chainio.BlockNumberFunc(sub, context.Background()) + _, err = block_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -478,7 +552,8 @@ func TestFilterBatchV2(t *testing.T) { if err != nil { return } - _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func := chainio.FilterBatchV2Func(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -486,13 +561,14 @@ func TestFilterBatchV2(t *testing.T) { return } - _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func = chainio.FilterBatchV2Func(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("FilterBatchV2 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection refused") { t.Errorf("FilterBatchV2 Emitted non Transient error: %s\n", err) return } @@ -502,7 +578,8 @@ func TestFilterBatchV2(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func = chainio.FilterBatchV2Func(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -522,7 +599,8 @@ func TestFilterBatchV3(t *testing.T) { if err != nil { return } - _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func := chainio.FilterBatchV3Func(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -530,13 +608,14 @@ func TestFilterBatchV3(t *testing.T) { return } - _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func = chainio.FilterBatchV3Func(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("FilerBatchV3 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection refused") { t.Errorf("FilterBatchV3 Emitted non Transient error: %s\n", err) return } @@ -546,7 +625,8 @@ func TestFilterBatchV3(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func = chainio.FilterBatchV3Func(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -568,7 +648,8 @@ func TestBatchesStateSubscriber(t *testing.T) { } zero_bytes := [32]byte{} - _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes) + batch_state_func := chainio.BatchStateFunc(avsSubscriber, nil, zero_bytes) + _, err = batch_state_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -576,13 +657,14 @@ func TestBatchesStateSubscriber(t *testing.T) { return } - _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes) + batch_state_func = chainio.BatchStateFunc(avsSubscriber, nil, zero_bytes) + _, err = batch_state_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BatchesStateSubscriber Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection refused") { t.Errorf("BatchesStateSubscriber Emitted non Transient error: %s\n", err) return } @@ -592,7 +674,8 @@ func TestBatchesStateSubscriber(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes) + batch_state_func = chainio.BatchStateFunc(avsSubscriber, nil, zero_bytes) + _, err = batch_state_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -614,7 +697,8 @@ func TestSubscribeNewHead(t *testing.T) { return } - _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c) + sub_func := chainio.SubscribeNewHeadFunc(avsSubscriber, context.Background(), c) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -622,7 +706,8 @@ func TestSubscribeNewHead(t *testing.T) { return } - _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c) + sub_func = chainio.SubscribeNewHeadFunc(avsSubscriber, context.Background(), c) + _, err = sub_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("SubscribeNewHead Emitted non Transient error: %s\n", err) @@ -638,7 +723,8 @@ func TestSubscribeNewHead(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c) + sub_func = chainio.SubscribeNewHeadFunc(avsSubscriber, context.Background(), c) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -691,7 +777,8 @@ func TestRespondToTaskV2(t *testing.T) { zero_bytes := [32]byte{} // NOTE: With zero bytes the tx reverts - _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + resp_func := chainio.RespondToTaskV2Func(w, &txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + _, err = resp_func() assert.NotNil(t, err) if !strings.Contains(err.Error(), "execution reverted") { t.Errorf("RespondToTaskV2 did not emit the expected message: %q doesn't contain %q", err.Error(), "execution reverted: custom error 0x2396d34e:") @@ -701,7 +788,8 @@ func TestRespondToTaskV2(t *testing.T) { t.Errorf("Error killing process: %v\n", err) } - _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + resp_func = chainio.RespondToTaskV2Func(w, &txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + _, err = resp_func() assert.NotNil(t, err) if _, ok := err.(*backoff.PermanentError); ok { t.Errorf("RespondToTaskV2 Emitted non-Transient error: %s\n", err) @@ -716,7 +804,8 @@ func TestRespondToTaskV2(t *testing.T) { } // NOTE: With zero bytes the tx reverts - _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + resp_func = chainio.RespondToTaskV2Func(w, &txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + _, err = resp_func() assert.NotNil(t, err) if !strings.Contains(err.Error(), "execution reverted") { t.Errorf("RespondToTaskV2 did not emit the expected message: %q doesn't contain %q", err.Error(), "execution reverted: custom error 0x2396d34e:") @@ -744,7 +833,8 @@ func TestBatchesStateWriter(t *testing.T) { var bytes [32]byte num.FillBytes(bytes[:]) - _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes) + state_func := chainio.BatchesStateFunc(avsWriter, &bind.CallOpts{}, bytes) + _, err = state_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -752,7 +842,8 @@ func TestBatchesStateWriter(t *testing.T) { return } - _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes) + state_func = chainio.BatchesStateFunc(avsWriter, &bind.CallOpts{}, bytes) + _, err = state_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BatchesStateWriter Emitted non-Transient error: %s\n", err) @@ -768,7 +859,8 @@ func TestBatchesStateWriter(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes) + state_func = chainio.BatchesStateFunc(avsWriter, &bind.CallOpts{}, bytes) + _, err = state_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -778,7 +870,7 @@ func TestBatchesStateWriter(t *testing.T) { } func TestBalanceAt(t *testing.T) { - cmd, _, err := SetupAnvil(8545) + cmd, client, err := SetupAnvil(8545) if err != nil { t.Errorf("Error setting up Anvil: %s\n", err) } @@ -789,9 +881,15 @@ func TestBalanceAt(t *testing.T) { return } aggregator_address := common.HexToAddress("0x0") - blockHeight := big.NewInt(22) + // Fetch the latest block number + blockNumberUint64, err := client.BlockNumber(context.Background()) + blockNumber := new(big.Int).SetUint64(blockNumberUint64) + if err != nil { + t.Errorf("Error retrieving Anvil Block Number: %v\n", err) + } - _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight) + balance_func := chainio.BalanceAtFunc(avsWriter, context.Background(), aggregator_address, blockNumber) + _, err = balance_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -799,7 +897,8 @@ func TestBalanceAt(t *testing.T) { return } - _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight) + balance_func = chainio.BalanceAtFunc(avsWriter, context.Background(), aggregator_address, blockNumber) + _, err = balance_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BalanceAt Emitted non-Transient error: %s\n", err) @@ -815,7 +914,8 @@ func TestBalanceAt(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight) + balance_func = chainio.BalanceAtFunc(avsWriter, context.Background(), aggregator_address, blockNumber) + _, err = balance_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -837,7 +937,8 @@ func TestBatchersBalances(t *testing.T) { } senderAddress := common.HexToAddress("0x0") - _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress) + batcher_func := chainio.BatcherBalancesFunc(avsWriter, &bind.CallOpts{}, senderAddress) + _, err = batcher_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -845,7 +946,8 @@ func TestBatchersBalances(t *testing.T) { return } - _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress) + batcher_func = chainio.BatcherBalancesFunc(avsWriter, &bind.CallOpts{}, senderAddress) + _, err = batcher_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BatchersBalances Emitted non-Transient error: %s\n", err) @@ -861,7 +963,8 @@ func TestBatchersBalances(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress) + batcher_func = chainio.BatcherBalancesFunc(avsWriter, &bind.CallOpts{}, senderAddress) + _, err = batcher_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { diff --git a/core/utils/eth_client_utils.go b/core/utils/eth_client_utils.go index 36b28f036..4da9bedce 100644 --- a/core/utils/eth_client_utils.go +++ b/core/utils/eth_client_utils.go @@ -3,7 +3,6 @@ package utils import ( "context" "math/big" - "time" "github.com/Layr-Labs/eigensdk-go/chainio/clients/eth" eigentypes "github.com/Layr-Labs/eigensdk-go/types" @@ -12,15 +11,7 @@ import ( retry "github.com/yetanotherco/aligned_layer/core" ) -// WaitForTransactionReceiptRetryable repeatedly attempts to fetch the transaction receipt for a given transaction hash. -// If the receipt is not found, the function will retry with exponential backoff until the specified `waitTimeout` duration is reached. -// If the receipt is still unavailable after `waitTimeout`, it will return an error. -// -// Note: The `time.Second * 2` is set as the max interval in the retry mechanism because we can't reliably measure the specific time the tx will be included in a block. -// Setting a higher value will imply doing less retries across the waitTimeout, and so we might lose the receipt -// All errors are considered Transient Errors -// - Retry times: 0.5s, 1s, 2s, 2s, 2s, ... until it reaches waitTimeout -func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, waitTimeout time.Duration) (*types.Receipt, error) { +func WaitForTransactionReceiptFunc(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, config *retry.RetryConfig) func() (*types.Receipt, error) { receipt_func := func() (*types.Receipt, error) { receipt, err := client.TransactionReceipt(context.Background(), txHash) if err != nil { @@ -32,7 +23,19 @@ func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackC } return receipt, nil } - return retry.RetryWithData(receipt_func, retry.MinDelay, retry.RetryFactor, 0, time.Second*2, waitTimeout) + return receipt_func +} + +// WaitForTransactionReceiptRetryable repeatedly attempts to fetch the transaction receipt for a given transaction hash. +// If the receipt is not found, the function will retry with exponential backoff until the specified `waitTimeout` duration is reached. +// If the receipt is still unavailable after `waitTimeout`, it will return an error. +// +// Note: The `time.Second * 2` is set as the max interval in the retry mechanism because we can't reliably measure the specific time the tx will be included in a block. +// Setting a higher value will imply doing less retries across the waitTimeout, and so we might lose the receipt +// All errors are considered Transient Errors +// - Retry times: 0.5s, 1s, 2s, 2s, 2s, ... until it reaches waitTimeout +func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, config *retry.RetryConfig) (*types.Receipt, error) { + return retry.RetryWithData(WaitForTransactionReceiptFunc(client, fallbackClient, txHash, config), config) } func BytesToQuorumNumbers(quorumNumbersBytes []byte) eigentypes.QuorumNums { @@ -81,14 +84,9 @@ func CalculateGasPriceBumpBasedOnRetry(currentGasPrice *big.Int, baseBumpPercent return bumpedGasPrice } -/* -GetGasPriceRetryable -Get the gas price from the client with retry logic. -- All errors are considered Transient Errors -- Retry times: 1 sec, 2 sec, 4 sec -*/ -func GetGasPriceRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient) (*big.Int, error) { - respondToTaskV2_func := func() (*big.Int, error) { +func GetGasPriceFunc(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient) func() (*big.Int, error) { + + getGasPrice_func := func() (*big.Int, error) { gasPrice, err := client.SuggestGasPrice(context.Background()) if err != nil { gasPrice, err = fallbackClient.SuggestGasPrice(context.Background()) @@ -99,5 +97,15 @@ func GetGasPriceRetryable(client eth.InstrumentedClient, fallbackClient eth.Inst return gasPrice, nil } - return retry.RetryWithData(respondToTaskV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return getGasPrice_func +} + +/* +GetGasPriceRetryable +Get the gas price from the client with retry logic. +- All errors are considered Transient Errors +- Retry times: 1 sec, 2 sec, 4 sec +*/ +func GetGasPriceRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient) (*big.Int, error) { + return retry.RetryWithData(GetGasPriceFunc(client, fallbackClient), retry.EthCallRetryConfig()) }