diff --git a/aggregator/pkg/aggregator.go b/aggregator/pkg/aggregator.go index 7691d32e3..fdb7c8b45 100644 --- a/aggregator/pkg/aggregator.go +++ b/aggregator/pkg/aggregator.go @@ -419,7 +419,7 @@ func (agg *Aggregator) InitializeNewTask(batchIndex uint32, taskCreatedBlock uin } return err } - return retry.Retry(initializeNewTask_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.Retry(initializeNewTask_func, retry.DefaultRetryConfig()) } // Long-lived goroutine that periodically checks and removes old Tasks from stored Maps diff --git a/aggregator/pkg/server.go b/aggregator/pkg/server.go index 0c3ee5c8b..e36d1a294 100644 --- a/aggregator/pkg/server.go +++ b/aggregator/pkg/server.go @@ -53,7 +53,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t "operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:])) taskIndex := uint32(0) - taskIndex, err := agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash) + taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash) if err != nil { agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum") @@ -72,7 +72,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t agg.logger.Info("Starting bls signature process") go func() { - err := agg.ProcessNewSignature( + err := agg.ProcessNewSignatureRetryable( context.Background(), taskIndex, signedTaskResponse.BatchIdentifierHash, &signedTaskResponse.BlsSignature, signedTaskResponse.OperatorId, ) @@ -123,7 +123,7 @@ func (agg *Aggregator) ServerRunning(_ *struct{}, reply *int64) error { - Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks) - NOTE: TaskNotFound errors from the BLS Aggregation service are Transient errors as block reorg's may lead to these errors being thrown. */ -func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32, taskResponse interface{}, blsSignature *bls.Signature, operatorId eigentypes.Bytes32) error { +func (agg *Aggregator) ProcessNewSignatureRetryable(ctx context.Context, taskIndex uint32, taskResponse interface{}, blsSignature *bls.Signature, operatorId eigentypes.Bytes32) error { processNewSignature_func := func() error { err := agg.blsAggregationService.ProcessNewSignature( ctx, taskIndex, taskResponse, @@ -137,10 +137,15 @@ func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32 return err } - return retry.Retry(processNewSignature_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime) + return retry.Retry(processNewSignature_func, retry.ChainRetryConfig()) } -func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error) { +// Checks Internal mapping for Signed Task Response, returns its TaskIndex. +/* +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec +*/ +func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint32, error) { getTaskIndex_func := func() (uint32, error) { agg.taskMutex.Lock() agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response") @@ -153,6 +158,5 @@ func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error return taskIndex, nil } } - - return retry.RetryWithData(getTaskIndex_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(getTaskIndex_func, retry.DefaultRetryConfig()) } diff --git a/core/chainio/avs_reader.go b/core/chainio/avs_reader.go index 69b8f281b..227072076 100644 --- a/core/chainio/avs_reader.go +++ b/core/chainio/avs_reader.go @@ -77,7 +77,7 @@ func (r *AvsReader) DisabledVerifiers() (*big.Int, error) { // Returns all the "NewBatchV3" logs that have not been responded starting from the given block number func (r *AvsReader) GetNotRespondedTasksFrom(fromBlock uint64) ([]servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) { - logs, err := r.AvsContractBindings.ServiceManager.FilterNewBatchV3(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil) + logs, err := r.FilterNewBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil) if err != nil { return nil, err @@ -94,7 +94,7 @@ func (r *AvsReader) GetNotRespondedTasksFrom(fromBlock uint64) ([]servicemanager // now check if its finalized or not before appending batchIdentifier := append(task.BatchMerkleRoot[:], task.SenderAddress[:]...) batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier)) - state, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(nil, batchIdentifierHash) + state, err := r.BatchesStateRetryable(nil, batchIdentifierHash) if err != nil { return nil, err diff --git a/core/chainio/avs_subscriber.go b/core/chainio/avs_subscriber.go index ea810da62..c2f5e918a 100644 --- a/core/chainio/avs_subscriber.go +++ b/core/chainio/avs_subscriber.go @@ -68,13 +68,13 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema // Subscribe to new tasks sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil) if err != nil { - s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err) + s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultMaxNumRetries, "err", err) return nil, err } subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil) if err != nil { - s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err) + s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultMaxNumRetries, "err", err) return nil, err } s.logger.Info("Subscribed to new AlignedLayer V2 tasks") diff --git a/core/chainio/avs_writer.go b/core/chainio/avs_writer.go index 2f3e25e69..386402294 100644 --- a/core/chainio/avs_writer.go +++ b/core/chainio/avs_writer.go @@ -93,6 +93,17 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe txOpts.NoSend = false i := 0 + // Set Retry config for RespondToTaskV2 + respondToTaskV2Config := retry.DefaultRetryConfig() + respondToTaskV2Config.MaxNumRetries = 0 + respondToTaskV2Config.MaxElapsedTime = 0 + + // Set Retry config for WaitForTxRetryable + waitForTxConfig := retry.DefaultRetryConfig() + waitForTxConfig.MaxInterval = 2 * time.Second + waitForTxConfig.MaxNumRetries = 0 + waitForTxConfig.MaxElapsedTime = timeToWaitBeforeBump + respondToTaskV2Func := func() (*types.Receipt, error) { gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback) if err != nil { @@ -126,7 +137,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe return nil, err } - receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, tx.Hash(), timeToWaitBeforeBump) + receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, tx.Hash(), waitForTxConfig) if receipt != nil { w.checkIfAggregatorHadToPaidForBatcher(tx, batchIdentifierHash) return receipt, nil @@ -143,7 +154,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe return nil, fmt.Errorf("transaction failed") } - return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, retry.MaxInterval, 0) + return retry.RetryWithData(respondToTaskV2Func, respondToTaskV2Config) } // Calculates the transaction cost from the receipt and compares it with the batcher respondToTaskFeeLimit diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index 0ac44a59a..0384bbdf2 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -15,14 +15,7 @@ import ( // |---AVS_WRITER---| -/* -RespondToTaskV2Retryable -Send a transaction to the AVS contract to respond to a task. -- All errors are considered Transient Errors -- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks) -- NOTE: Contract call reverts are not considered `PermanentError`'s as block reorg's may lead to contract call revert in which case the aggregator should retry. -*/ -func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkleRoot [32]byte, senderAddress common.Address, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*types.Transaction, error) { +func RespondToTaskV2(w *AvsWriter, opts *bind.TransactOpts, batchMerkleRoot [32]byte, senderAddress common.Address, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) func() (*types.Transaction, error) { respondToTaskV2_func := func() (*types.Transaction, error) { // Try with main connection tx, err := w.AvsContractBindings.ServiceManager.RespondToTaskV2(opts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature) @@ -32,21 +25,25 @@ func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkl } return tx, err } - return retry.RetryWithData(respondToTaskV2_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime) + return respondToTaskV2_func } /* -BatchesStateRetryable -Get the state of a batch from the AVS contract. +RespondToTaskV2Retryable +Send a transaction to the AVS contract to respond to a task. - All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec +- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks) +- NOTE: Contract call reverts are not considered `PermanentError`'s as block reorg's may lead to contract call revert in which case the aggregator should retry. */ -func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { +func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkleRoot [32]byte, senderAddress common.Address, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*types.Transaction, error) { + return retry.RetryWithData(RespondToTaskV2(w, opts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature), retry.ChainRetryConfig()) +} + +func BatchesState(w *AvsWriter, opts *bind.CallOpts, arg0 [32]byte) func() (struct { TaskCreatedBlock uint32 Responded bool RespondToTaskFeeLimit *big.Int }, error) { - batchesState_func := func() (struct { TaskCreatedBlock uint32 Responded bool @@ -60,16 +57,24 @@ func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (s } return state, err } - return retry.RetryWithData(batchesState_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return batchesState_func } /* -BatcherBalancesRetryable -Get the balance of a batcher from the AVS contract. +BatchesStateRetryable +Get the state of a batch from the AVS contract. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec */ -func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress common.Address) (*big.Int, error) { +func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { + TaskCreatedBlock uint32 + Responded bool + RespondToTaskFeeLimit *big.Int +}, error) { + return retry.RetryWithData(BatchesState(w, opts, arg0), retry.DefaultRetryConfig()) +} + +func BatcherBalances(w *AvsWriter, opts *bind.CallOpts, senderAddress common.Address) func() (*big.Int, error) { batcherBalances_func := func() (*big.Int, error) { // Try with main connection batcherBalance, err := w.AvsContractBindings.ServiceManager.BatchersBalances(opts, senderAddress) @@ -79,18 +84,20 @@ func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress } return batcherBalance, err } - return retry.RetryWithData(batcherBalances_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return batcherBalances_func } /* -BalanceAtRetryable -Get the balance of aggregatorAddress at blockNumber. -If blockNumber is nil, it gets the latest balance. -TODO: it gets the balance from an Address, not necessarily an aggregator. The name of the parameter should be changed. +BatcherBalancesRetryable +Get the balance of a batcher from the AVS contract. - All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +- Retry times (3 retries): 1 sec, 2 sec, 4 sec */ -func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) (*big.Int, error) { +func (w *AvsWriter) BatcherBalancesRetryable(opts *bind.CallOpts, senderAddress common.Address) (*big.Int, error) { + return retry.RetryWithData(BatcherBalances(w, opts, senderAddress), retry.DefaultRetryConfig()) +} + +func BalanceAt(w *AvsWriter, ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) func() (*big.Int, error) { balanceAt_func := func() (*big.Int, error) { // Try with main connection aggregatorBalance, err := w.Client.BalanceAt(ctx, aggregatorAddress, blockNumber) @@ -100,18 +107,24 @@ func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress co } return aggregatorBalance, err } - return retry.RetryWithData(balanceAt_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return balanceAt_func } -// |---AVS_SUBSCRIBER---| - /* -BlockNumberRetryable -Get the latest block number from Ethereum +BalanceAtRetryable +Get the balance of aggregatorAddress at blockNumber. +If blockNumber is nil, it gets the latest balance. +TODO: it gets the balance from an Address, not necessarily an aggregator. The name of the parameter should be changed. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ -func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error) { +func (w *AvsWriter) BalanceAtRetryable(ctx context.Context, aggregatorAddress common.Address, blockNumber *big.Int) (*big.Int, error) { + return retry.RetryWithData(BalanceAt(w, ctx, aggregatorAddress, blockNumber), retry.DefaultRetryConfig()) +} + +// |---AVS_SUBSCRIBER---| + +func BlockNumber(s *AvsSubscriber, ctx context.Context) func() (uint64, error) { latestBlock_func := func() (uint64, error) { // Try with main connection latestBlock, err := s.AvsContractBindings.ethClient.BlockNumber(ctx) @@ -121,7 +134,28 @@ func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error } return latestBlock, err } - return retry.RetryWithData(latestBlock_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return latestBlock_func +} + +/* +BlockNumberRetryable +Get the latest block number from Ethereum +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +*/ +func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context) (uint64, error) { + return retry.RetryWithData(BlockNumber(s, ctx), retry.DefaultRetryConfig()) +} + +func FilterBatchV2(s *AvsSubscriber, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { + filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { + logs, err := s.AvsContractBindings.ServiceManager.FilterNewBatchV2(opts, batchMerkleRoot) + if err != nil { + logs, err = s.AvsContractBindings.ServiceManagerFallback.FilterNewBatchV2(opts, batchMerkleRoot) + } + return logs, err + } + return filterNewBatchV2_func } /* @@ -131,10 +165,18 @@ Get NewBatchV2 logs from the AVS contract. - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ func (s *AvsSubscriber) FilterBatchV2Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { - filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) { - return s.AvsContractBindings.ServiceManager.FilterNewBatchV2(opts, batchMerkleRoot) + return retry.RetryWithData(FilterBatchV2(s, opts, batchMerkleRoot), retry.DefaultRetryConfig()) +} + +func FilterBatchV3(s *AvsSubscriber, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + filterNewBatchV3_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + logs, err := s.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot) + if err != nil { + logs, err = s.AvsContractBindings.ServiceManagerFallback.FilterNewBatchV3(opts, batchMerkleRoot) + } + return logs, err } - return retry.RetryWithData(filterNewBatchV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return filterNewBatchV3_func } /* @@ -144,19 +186,10 @@ Get NewBatchV3 logs from the AVS contract. - Retry times (3 retries): 1 sec, 2 sec, 4 sec. */ func (s *AvsSubscriber) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { - filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { - return s.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot) - } - return retry.RetryWithData(filterNewBatchV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(FilterBatchV3(s, opts, batchMerkleRoot), retry.DefaultRetryConfig()) } -/* -BatchesStateRetryable -Get the state of a batch from the AVS contract. -- All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec -*/ -func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { +func BatchState(s *AvsSubscriber, opts *bind.CallOpts, arg0 [32]byte) func() (struct { TaskCreatedBlock uint32 Responded bool RespondToTaskFeeLimit *big.Int @@ -166,19 +199,31 @@ func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte Responded bool RespondToTaskFeeLimit *big.Int }, error) { - return s.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0) + state, err := s.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0) + if err != nil { + state, err = s.AvsContractBindings.ServiceManagerFallback.BatchesState(opts, arg0) + } + return state, err } - - return retry.RetryWithData(batchState_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return batchState_func } /* -SubscribeNewHeadRetryable -Subscribe to new heads from the Ethereum node. +BatchesStateRetryable +Get the state of a batch from the AVS contract. - All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +- Retry times (3 retries): 1 sec, 2 sec, 4 sec */ -func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- *types.Header) (ethereum.Subscription, error) { +func (s *AvsSubscriber) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { + TaskCreatedBlock uint32 + Responded bool + RespondToTaskFeeLimit *big.Int +}, error) { + + return retry.RetryWithData(BatchState(s, opts, arg0), retry.DefaultRetryConfig()) +} + +func SubscribeNewHead(s *AvsSubscriber, ctx context.Context, c chan<- *types.Header) func() (ethereum.Subscription, error) { subscribeNewHead_func := func() (ethereum.Subscription, error) { // Try with main connection sub, err := s.AvsContractBindings.ethClient.SubscribeNewHead(ctx, c) @@ -188,7 +233,29 @@ func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- } return sub, err } - return retry.RetryWithData(subscribeNewHead_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return subscribeNewHead_func +} + +/* +SubscribeNewHeadRetryable +Subscribe to new heads from the Ethereum node. +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +*/ +func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<- *types.Header) (ethereum.Subscription, error) { + return retry.RetryWithData(SubscribeNewHead(s, ctx, c), retry.DefaultRetryConfig()) +} + +func SubscribeToNewTasksV2( + opts *bind.WatchOpts, + serviceManager *servicemanager.ContractAlignedLayerServiceManager, + newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, + batchMerkleRoot [][32]byte, +) func() (event.Subscription, error) { + subscribe_func := func() (event.Subscription, error) { + return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot) + } + return subscribe_func } /* @@ -203,10 +270,19 @@ func SubscribeToNewTasksV2Retryable( newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchMerkleRoot [][32]byte, ) (event.Subscription, error) { + return retry.RetryWithData(SubscribeToNewTasksV2(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.DefaultRetryConfig()) +} + +func SubscribeToNewTasksV3( + opts *bind.WatchOpts, + serviceManager *servicemanager.ContractAlignedLayerServiceManager, + newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, + batchMerkleRoot [][32]byte, +) func() (event.Subscription, error) { subscribe_func := func() (event.Subscription, error) { - return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot) + return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot) } - return retry.RetryWithData(subscribe_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return subscribe_func } /* @@ -221,8 +297,53 @@ func SubscribeToNewTasksV3Retryable( newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, batchMerkleRoot [][32]byte, ) (event.Subscription, error) { - subscribe_func := func() (event.Subscription, error) { - return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot) + return retry.RetryWithData(SubscribeToNewTasksV3(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.DefaultRetryConfig()) +} + +func FilterNewBatchV3(r *AvsReader, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + filter_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + return r.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot) + } + return filter_func +} + +/* +FilterBatchV3Retryable +Get NewBatchV3 logs from the AVS contract. +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec. +*/ +func (r *AvsReader) FilterNewBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) { + + return retry.RetryWithData(FilterNewBatchV3(r, opts, batchMerkleRoot), retry.DefaultRetryConfig()) +} + +func ReaderBatchesState(r *AvsReader, opts *bind.CallOpts, arg0 [32]byte) func() (struct { + TaskCreatedBlock uint32 + Responded bool + RespondToTaskFeeLimit *big.Int +}, error) { + batchState_func := func() (struct { + TaskCreatedBlock uint32 + Responded bool + RespondToTaskFeeLimit *big.Int + }, error) { + return r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0) } - return retry.RetryWithData(subscribe_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return batchState_func +} + +/* +BatchesStateRetryable +Get the state of V3 batches from the AVS contract. +- All errors are considered Transient Errors +- Retry times (3 retries): 1 sec, 2 sec, 4 sec +*/ +func (r *AvsReader) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct { + TaskCreatedBlock uint32 + Responded bool + RespondToTaskFeeLimit *big.Int +}, error) { + + return retry.RetryWithData(ReaderBatchesState(r, opts, arg0), retry.DefaultRetryConfig()) } diff --git a/core/retry.go b/core/retry.go index 551be59c7..5d70882a8 100644 --- a/core/retry.go +++ b/core/retry.go @@ -25,15 +25,47 @@ func (e PermanentError) Is(err error) bool { } const ( - MinDelay = 1 * time.Second // Initial delay for retry interval. - MaxInterval = 60 * time.Second // Maximum interval an individual retry may have. - MaxElapsedTime = 0 * time.Second // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. - RetryFactor float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. - NumRetries uint64 = 3 // Total number of retries attempted. - MinDelayChain = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. - MaxIntervalChain = 2 * time.Minute // Maximum interval for an individual retry. + DefaultInitialInterval = 1 * time.Second // Initial delay for retry interval. + DefaultMaxInterval = 60 * time.Second // Maximum interval an individual retry may have. + DefaultMaxElapsedTime = 0 * time.Second // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. + DefaultRandomizationFactor float64 = 0 // Randomization (Jitter) factor used to map retry interval to a range of values around the computed interval. In precise terms (random value in range [1 - randomizationfactor, 1 + randomizationfactor]). NOTE: This is set to 0 as we do not use jitter in Aligned. + DefaultMultiplier float64 = 2 // Multiplier factor computed exponential retry interval is scaled by. + DefaultMaxNumRetries uint64 = 3 // Total number of retries attempted. + ChainInitialInterval = 12 * time.Second // Initial delay for retry interval for contract calls. Corresponds to 1 ethereum block. + ChainMaxInterval = 2 * time.Minute // Maximum interval for an individual retry. ) +type RetryConfig struct { + InitialInterval time.Duration // Initial delay for retry interval. + MaxInterval time.Duration // Maximum interval an individual retry may have. + MaxElapsedTime time.Duration // Maximum time all retries may take. `0` corresponds to no limit on the time of the retries. + RandomizationFactor float64 + Multiplier float64 + MaxNumRetries uint64 +} + +func DefaultRetryConfig() *RetryConfig { + return &RetryConfig{ + InitialInterval: DefaultInitialInterval, + MaxInterval: DefaultMaxInterval, + MaxElapsedTime: DefaultMaxElapsedTime, + RandomizationFactor: DefaultRandomizationFactor, + Multiplier: DefaultMultiplier, + MaxNumRetries: DefaultMaxNumRetries, + } +} + +func ChainRetryConfig() *RetryConfig { + return &RetryConfig{ + InitialInterval: ChainInitialInterval, + MaxInterval: ChainMaxInterval, + MaxElapsedTime: DefaultMaxElapsedTime, + RandomizationFactor: DefaultRandomizationFactor, + Multiplier: DefaultMultiplier, + MaxNumRetries: DefaultMaxNumRetries, + } +} + /* Retry and RetryWithData are custom retry functions used in Aligned's aggregator and operator to facilitate consistent retry logic across the system. They are interfaces for around Cenk Alti (https://github.com/cenkalti) backoff library (https://github.com/cenkalti/backoff). We would like to thank him for his great work. @@ -92,7 +124,8 @@ Reference: https://github.com/cenkalti/backoff/blob/v4/exponential.go#L9 */ // Same as Retry only that the functionToRetry can return a value upon correct execution -func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Duration, factor float64, maxTries uint64, maxInterval time.Duration, maxElapsedTime time.Duration) (T, error) { +func RetryWithData[T any](functionToRetry func() (T, error), config *RetryConfig) (T, error) { + //func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Duration, factor float64, maxTries uint64, maxInterval time.Duration, maxElapsedTime time.Duration) (T, error) { f := func() (T, error) { var ( val T @@ -120,15 +153,15 @@ func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Durat randomOption := backoff.WithRandomizationFactor(0) - initialRetryOption := backoff.WithInitialInterval(minDelay) - multiplierOption := backoff.WithMultiplier(factor) - maxIntervalOption := backoff.WithMaxInterval(maxInterval) - maxElapsedTimeOption := backoff.WithMaxElapsedTime(maxElapsedTime) + initialRetryOption := backoff.WithInitialInterval(config.InitialInterval) + multiplierOption := backoff.WithMultiplier(config.Multiplier) + maxIntervalOption := backoff.WithMaxInterval(config.MaxInterval) + maxElapsedTimeOption := backoff.WithMaxElapsedTime(config.MaxElapsedTime) expBackoff := backoff.NewExponentialBackOff(randomOption, multiplierOption, initialRetryOption, maxIntervalOption, maxElapsedTimeOption) var maxRetriesBackoff backoff.BackOff - if maxTries > 0 { - maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, maxTries) + if config.MaxNumRetries > 0 { + maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.MaxNumRetries) } else { maxRetriesBackoff = expBackoff } @@ -142,7 +175,7 @@ func RetryWithData[T any](functionToRetry func() (T, error), minDelay time.Durat // from the configuration are reached, or until a `PermanentError` is returned. // The function to be retried should return `PermanentError` when the condition for stop retrying // is met. -func Retry(functionToRetry func() error, minDelay time.Duration, factor float64, maxTries uint64, maxInterval time.Duration, maxElapsedTime time.Duration) error { +func Retry(functionToRetry func() error, config *RetryConfig) error { f := func() error { var err error func() { @@ -167,15 +200,15 @@ func Retry(functionToRetry func() error, minDelay time.Duration, factor float64, randomOption := backoff.WithRandomizationFactor(0) - initialRetryOption := backoff.WithInitialInterval(minDelay) - multiplierOption := backoff.WithMultiplier(factor) - maxIntervalOption := backoff.WithMaxInterval(maxInterval) - maxElapsedTimeOption := backoff.WithMaxElapsedTime(maxElapsedTime) + initialRetryOption := backoff.WithInitialInterval(config.InitialInterval) + multiplierOption := backoff.WithMultiplier(config.Multiplier) + maxIntervalOption := backoff.WithMaxInterval(config.MaxInterval) + maxElapsedTimeOption := backoff.WithMaxElapsedTime(config.MaxElapsedTime) expBackoff := backoff.NewExponentialBackOff(randomOption, multiplierOption, initialRetryOption, maxIntervalOption, maxElapsedTimeOption) var maxRetriesBackoff backoff.BackOff - if maxTries > 0 { - maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, maxTries) + if config.MaxNumRetries > 0 { + maxRetriesBackoff = backoff.WithMaxRetries(expBackoff, config.MaxNumRetries) } else { maxRetriesBackoff = expBackoff } diff --git a/core/retry_test.go b/core/retry_test.go index 4a725fb6a..8873da85d 100644 --- a/core/retry_test.go +++ b/core/retry_test.go @@ -42,7 +42,16 @@ func TestRetryWithData(t *testing.T) { x, err := DummyFunction(43) return &x, err } - _, err := retry.RetryWithData(function, 1000, 2, 3, retry.MaxInterval, retry.MaxElapsedTime) + + config := &retry.RetryConfig{ + InitialInterval: 1000, + MaxInterval: 2, + MaxElapsedTime: 3, + RandomizationFactor: 0, + Multiplier: retry.DefaultMultiplier, + MaxNumRetries: retry.DefaultMaxNumRetries, + } + _, err := retry.RetryWithData(function, config) if err != nil { t.Errorf("Retry error!: %s", err) } @@ -53,7 +62,15 @@ func TestRetry(t *testing.T) { _, err := DummyFunction(43) return err } - err := retry.Retry(function, 1000, 2, 3, retry.MaxInterval, retry.MaxElapsedTime) + config := &retry.RetryConfig{ + InitialInterval: 1000, + MaxInterval: 2, + MaxElapsedTime: 3, + RandomizationFactor: 0, + Multiplier: retry.DefaultMultiplier, + MaxNumRetries: retry.DefaultMaxNumRetries, + } + err := retry.Retry(function, config) if err != nil { t.Errorf("Retry error!: %s", err) } @@ -153,7 +170,8 @@ func TestWaitForTransactionReceipt(t *testing.T) { } // Assert Call succeeds when Anvil running - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45) + receipt_function := utils.WaitForTransactionReceipt(*client, *client, hash, retry.DefaultRetryConfig()) + _, err = receipt_function() assert.NotNil(t, err, "Error Waiting for Transaction with Anvil Running: %s\n", err) if !strings.Contains(err.Error(), "not found") { t.Errorf("WaitForTransactionReceipt Emitted incorrect error: %s\n", err) @@ -165,7 +183,8 @@ func TestWaitForTransactionReceipt(t *testing.T) { return } - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45) + receipt_function = utils.WaitForTransactionReceipt(*client, *client, hash, retry.DefaultRetryConfig()) + _, err = receipt_function() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("WaitForTransactionReceipt Emitted non Transient error: %s\n", err) @@ -181,7 +200,8 @@ func TestWaitForTransactionReceipt(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = utils.WaitForTransactionReceiptRetryable(*client, *client, hash, time.Second*45) + receipt_function = utils.WaitForTransactionReceipt(*client, *client, hash, retry.DefaultRetryConfig()) + _, err = receipt_function() assert.NotNil(t, err) if !strings.Contains(err.Error(), "not found") { t.Errorf("WaitForTransactionReceipt Emitted incorrect error: %s\n", err) @@ -339,7 +359,8 @@ func TestSubscribeToNewTasksV3(t *testing.T) { t.Errorf("Error setting up Avs Service Bindings: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func := chainio.SubscribeToNewTasksV3(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -347,13 +368,14 @@ func TestSubscribeToNewTasksV3(t *testing.T) { return } - _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func = chainio.SubscribeToNewTasksV3(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("SubscribeToNewTasksV3 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection reset") { t.Errorf("SubscribeToNewTasksV3 Emitted non Transient error: %s\n", err) return } @@ -363,7 +385,8 @@ func TestSubscribeToNewTasksV3(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func = chainio.SubscribeToNewTasksV3(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -389,7 +412,8 @@ func TestSubscribeToNewTasksV2(t *testing.T) { t.Errorf("Error setting up Avs Service Bindings: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func := chainio.SubscribeToNewTasksV2(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -397,13 +421,15 @@ func TestSubscribeToNewTasksV2(t *testing.T) { return } - _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func = chainio.SubscribeToNewTasksV2(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() + assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("SubscribeToNewTasksV2 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection reset") { t.Errorf("SubscribeToNewTasksV2 Emitted non Transient error: %s\n", err) return } @@ -413,7 +439,8 @@ func TestSubscribeToNewTasksV2(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = chainio.SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + sub_func = chainio.SubscribeToNewTasksV2(&bind.WatchOpts{}, s.ServiceManager, channel, nil) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -434,7 +461,8 @@ func TestBlockNumber(t *testing.T) { if err != nil { return } - _, err = sub.BlockNumberRetryable(context.Background()) + block_func := chainio.BlockNumber(sub, context.Background()) + _, err = block_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -442,7 +470,8 @@ func TestBlockNumber(t *testing.T) { return } - _, err = sub.BlockNumberRetryable(context.Background()) + block_func = chainio.BlockNumber(sub, context.Background()) + _, err = block_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BlockNumber Emitted non Transient error: %s\n", err) @@ -458,7 +487,8 @@ func TestBlockNumber(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = sub.BlockNumberRetryable(context.Background()) + block_func = chainio.BlockNumber(sub, context.Background()) + _, err = block_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -478,7 +508,8 @@ func TestFilterBatchV2(t *testing.T) { if err != nil { return } - _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func := chainio.FilterBatchV2(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -486,13 +517,14 @@ func TestFilterBatchV2(t *testing.T) { return } - _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func = chainio.FilterBatchV2(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("FilterBatchV2 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection refused") { t.Errorf("FilterBatchV2 Emitted non Transient error: %s\n", err) return } @@ -502,7 +534,8 @@ func TestFilterBatchV2(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.FilterBatchV2Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func = chainio.FilterBatchV2(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -522,7 +555,8 @@ func TestFilterBatchV3(t *testing.T) { if err != nil { return } - _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func := chainio.FilterBatchV3(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -530,13 +564,14 @@ func TestFilterBatchV3(t *testing.T) { return } - _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func = chainio.FilterBatchV3(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("FilerBatchV3 Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection refused") { t.Errorf("FilterBatchV3 Emitted non Transient error: %s\n", err) return } @@ -546,7 +581,8 @@ func TestFilterBatchV3(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.FilterBatchV3Retryable(&bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + batch_func = chainio.FilterBatchV3(avsSubscriber, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -568,7 +604,8 @@ func TestBatchesStateSubscriber(t *testing.T) { } zero_bytes := [32]byte{} - _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes) + batch_state_func := chainio.BatchState(avsSubscriber, nil, zero_bytes) + _, err = batch_state_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -576,13 +613,14 @@ func TestBatchesStateSubscriber(t *testing.T) { return } - _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes) + batch_state_func = chainio.BatchState(avsSubscriber, nil, zero_bytes) + _, err = batch_state_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BatchesStateSubscriber Emitted non Transient error: %s\n", err) return } - if !strings.Contains(err.Error(), "connect: connection refused") { + if !strings.Contains(err.Error(), "connection refused") { t.Errorf("BatchesStateSubscriber Emitted non Transient error: %s\n", err) return } @@ -592,7 +630,8 @@ func TestBatchesStateSubscriber(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.BatchesStateRetryable(nil, zero_bytes) + batch_state_func = chainio.BatchState(avsSubscriber, nil, zero_bytes) + _, err = batch_state_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -614,7 +653,8 @@ func TestSubscribeNewHead(t *testing.T) { return } - _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c) + sub_func := chainio.SubscribeNewHead(avsSubscriber, context.Background(), c) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -622,7 +662,8 @@ func TestSubscribeNewHead(t *testing.T) { return } - _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c) + sub_func = chainio.SubscribeNewHead(avsSubscriber, context.Background(), c) + _, err = sub_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("SubscribeNewHead Emitted non Transient error: %s\n", err) @@ -638,7 +679,8 @@ func TestSubscribeNewHead(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsSubscriber.SubscribeNewHeadRetryable(context.Background(), c) + sub_func = chainio.SubscribeNewHead(avsSubscriber, context.Background(), c) + _, err = sub_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -691,7 +733,8 @@ func TestRespondToTaskV2(t *testing.T) { zero_bytes := [32]byte{} // NOTE: With zero bytes the tx reverts - _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + resp_func := chainio.RespondToTaskV2(w, &txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + _, err = resp_func() assert.NotNil(t, err) if !strings.Contains(err.Error(), "execution reverted") { t.Errorf("RespondToTaskV2 did not emit the expected message: %q doesn't contain %q", err.Error(), "execution reverted: custom error 0x2396d34e:") @@ -701,7 +744,8 @@ func TestRespondToTaskV2(t *testing.T) { t.Errorf("Error killing process: %v\n", err) } - _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + resp_func = chainio.RespondToTaskV2(w, &txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + _, err = resp_func() assert.NotNil(t, err) if _, ok := err.(*backoff.PermanentError); ok { t.Errorf("RespondToTaskV2 Emitted non-Transient error: %s\n", err) @@ -716,7 +760,8 @@ func TestRespondToTaskV2(t *testing.T) { } // NOTE: With zero bytes the tx reverts - _, err = w.RespondToTaskV2Retryable(&txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + resp_func = chainio.RespondToTaskV2(w, &txOpts, zero_bytes, aggregator_address, nonSignerStakesAndSignature) + _, err = resp_func() assert.NotNil(t, err) if !strings.Contains(err.Error(), "execution reverted") { t.Errorf("RespondToTaskV2 did not emit the expected message: %q doesn't contain %q", err.Error(), "execution reverted: custom error 0x2396d34e:") @@ -744,7 +789,8 @@ func TestBatchesStateWriter(t *testing.T) { var bytes [32]byte num.FillBytes(bytes[:]) - _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes) + state_func := chainio.BatchesState(avsWriter, &bind.CallOpts{}, bytes) + _, err = state_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -752,7 +798,8 @@ func TestBatchesStateWriter(t *testing.T) { return } - _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes) + state_func = chainio.BatchesState(avsWriter, &bind.CallOpts{}, bytes) + _, err = state_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BatchesStateWriter Emitted non-Transient error: %s\n", err) @@ -768,7 +815,8 @@ func TestBatchesStateWriter(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsWriter.BatchesStateRetryable(&bind.CallOpts{}, bytes) + state_func = chainio.BatchesState(avsWriter, &bind.CallOpts{}, bytes) + _, err = state_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -778,7 +826,7 @@ func TestBatchesStateWriter(t *testing.T) { } func TestBalanceAt(t *testing.T) { - cmd, _, err := SetupAnvil(8545) + cmd, client, err := SetupAnvil(8545) if err != nil { t.Errorf("Error setting up Anvil: %s\n", err) } @@ -789,9 +837,15 @@ func TestBalanceAt(t *testing.T) { return } aggregator_address := common.HexToAddress("0x0") - blockHeight := big.NewInt(22) + // Fetch the latest block number + blockNumberUint64, err := client.BlockNumber(context.Background()) + blockNumber := new(big.Int).SetUint64(blockNumberUint64) + if err != nil { + t.Errorf("Error retrieving Anvil Block Number: %v\n", err) + } - _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight) + balance_func := chainio.BalanceAt(avsWriter, context.Background(), aggregator_address, blockNumber) + _, err = balance_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -799,7 +853,8 @@ func TestBalanceAt(t *testing.T) { return } - _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight) + balance_func = chainio.BalanceAt(avsWriter, context.Background(), aggregator_address, blockNumber) + _, err = balance_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BalanceAt Emitted non-Transient error: %s\n", err) @@ -815,7 +870,8 @@ func TestBalanceAt(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsWriter.BalanceAtRetryable(context.Background(), aggregator_address, blockHeight) + balance_func = chainio.BalanceAt(avsWriter, context.Background(), aggregator_address, blockNumber) + _, err = balance_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -837,7 +893,8 @@ func TestBatchersBalances(t *testing.T) { } senderAddress := common.HexToAddress("0x0") - _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress) + batcher_func := chainio.BatcherBalances(avsWriter, &bind.CallOpts{}, senderAddress) + _, err = batcher_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { @@ -845,7 +902,8 @@ func TestBatchersBalances(t *testing.T) { return } - _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress) + batcher_func = chainio.BatcherBalances(avsWriter, &bind.CallOpts{}, senderAddress) + _, err = batcher_func() assert.NotNil(t, err) if _, ok := err.(retry.PermanentError); ok { t.Errorf("BatchersBalances Emitted non-Transient error: %s\n", err) @@ -861,7 +919,147 @@ func TestBatchersBalances(t *testing.T) { t.Errorf("Error setting up Anvil: %s\n", err) } - _, err = avsWriter.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress) + batcher_func = chainio.BatcherBalances(avsWriter, &bind.CallOpts{}, senderAddress) + _, err = batcher_func() + assert.Nil(t, err) + + if err := cmd.Process.Kill(); err != nil { + t.Errorf("Error killing process: %v\n", err) + return + } +} + +func TestFilterNewBatchV3(t *testing.T) { + cmd, _, err := SetupAnvil(8545) + if err != nil { + t.Errorf("Error setting up Anvil: %s\n", err) + } + aggregatorConfig := config.NewAggregatorConfig("../config-files/config-aggregator-test.yaml") + avsReader, err := chainio.NewAvsReaderFromConfig(aggregatorConfig.BaseConfig, aggregatorConfig.EcdsaConfig) + if err != nil { + return + } + filter_func := chainio.FilterNewBatchV3(avsReader, &bind.FilterOpts{Start: 1, End: nil, Context: context.Background()}, nil) + _, err = filter_func() + assert.Nil(t, err) + if err := cmd.Process.Kill(); err != nil { + t.Errorf("Error killing process: %v\n", err) + return + } + filter_func = chainio.FilterNewBatchV3(avsReader, &bind.FilterOpts{Start: 1, End: nil, Context: context.Background()}, nil) + _, err = filter_func() + assert.NotNil(t, err) + if _, ok := err.(retry.PermanentError); ok { + t.Errorf("BatchersBalances Emitted non-Transient error: %s\n", err) + return + } + if !strings.Contains(err.Error(), "connection reset") { + t.Errorf("BatchersBalances did not return expected error: %s\n", err) + return + } + cmd, _, err = SetupAnvil(8545) + if err != nil { + t.Errorf("Error setting up Anvil: %s\n", err) + } + filter_func = chainio.FilterNewBatchV3(avsReader, &bind.FilterOpts{Start: 1, End: nil, Context: context.Background()}, nil) + _, err = filter_func() + assert.Nil(t, err) + if err := cmd.Process.Kill(); err != nil { + t.Errorf("Error killing process: %v\n", err) + return + } +} +func TestBatchesStateReader(t *testing.T) { + cmd, _, err := SetupAnvil(8545) + if err != nil { + t.Errorf("Error setting up Anvil: %s\n", err) + } + + aggregatorConfig := config.NewAggregatorConfig("../config-files/config-aggregator-test.yaml") + avsReader, err := chainio.NewAvsReaderFromConfig(aggregatorConfig.BaseConfig, aggregatorConfig.EcdsaConfig) + if err != nil { + return + } + num := big.NewInt(6) + + var bytes [32]byte + num.FillBytes(bytes[:]) + + state_func := chainio.ReaderBatchesState(avsReader, &bind.CallOpts{}, bytes) + _, err = state_func() + assert.Nil(t, err) + + if err := cmd.Process.Kill(); err != nil { + t.Errorf("error killing process: %v\n", err) + return + } + + state_func = chainio.ReaderBatchesState(avsReader, &bind.CallOpts{}, bytes) + _, err = state_func() + assert.NotNil(t, err) + if _, ok := err.(retry.PermanentError); ok { + t.Errorf("BatchesStateReader Emitted non-Transient error: %s\n", err) + return + } + if !strings.Contains(err.Error(), "connection reset") { + t.Errorf("BatchesStateReader did not contain expected error: %s\n", err) + return + } + + cmd, _, err = SetupAnvil(8545) + if err != nil { + t.Errorf("Error setting up Anvil: %s\n", err) + } + + state_func = chainio.ReaderBatchesState(avsReader, &bind.CallOpts{}, bytes) + _, err = state_func() + assert.Nil(t, err) + + if err := cmd.Process.Kill(); err != nil { + t.Errorf("Error killing process: %v\n", err) + return + } +} + +func TestReaderFilterBatchV3(t *testing.T) { + cmd, _, err := SetupAnvil(8545) + if err != nil { + t.Errorf("Error setting up Anvil: %s\n", err) + } + + aggregatorConfig := config.NewAggregatorConfig("../config-files/config-aggregator-test.yaml") + avsReader, err := chainio.NewAvsReaderFromConfig(aggregatorConfig.BaseConfig, aggregatorConfig.EcdsaConfig) + if err != nil { + return + } + batch_func := chainio.FilterNewBatchV3(avsReader, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() + assert.Nil(t, err) + + if err := cmd.Process.Kill(); err != nil { + t.Errorf("Error killing process: %v\n", err) + return + } + + batch_func = chainio.FilterNewBatchV3(avsReader, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() + assert.NotNil(t, err) + if _, ok := err.(retry.PermanentError); ok { + t.Errorf("FilerBatchV3 Emitted non Transient error: %s\n", err) + return + } + if !strings.Contains(err.Error(), "connection reset") { + t.Errorf("FilterBatchV3 Emitted non Transient error: %s\n", err) + return + } + + cmd, _, err = SetupAnvil(8545) + if err != nil { + t.Errorf("Error setting up Anvil: %s\n", err) + } + + batch_func = chainio.FilterNewBatchV3(avsReader, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil) + _, err = batch_func() assert.Nil(t, err) if err := cmd.Process.Kill(); err != nil { diff --git a/core/utils/eth_client_utils.go b/core/utils/eth_client_utils.go index 36b28f036..f48094e62 100644 --- a/core/utils/eth_client_utils.go +++ b/core/utils/eth_client_utils.go @@ -3,7 +3,6 @@ package utils import ( "context" "math/big" - "time" "github.com/Layr-Labs/eigensdk-go/chainio/clients/eth" eigentypes "github.com/Layr-Labs/eigensdk-go/types" @@ -12,15 +11,7 @@ import ( retry "github.com/yetanotherco/aligned_layer/core" ) -// WaitForTransactionReceiptRetryable repeatedly attempts to fetch the transaction receipt for a given transaction hash. -// If the receipt is not found, the function will retry with exponential backoff until the specified `waitTimeout` duration is reached. -// If the receipt is still unavailable after `waitTimeout`, it will return an error. -// -// Note: The `time.Second * 2` is set as the max interval in the retry mechanism because we can't reliably measure the specific time the tx will be included in a block. -// Setting a higher value will imply doing less retries across the waitTimeout, and so we might lose the receipt -// All errors are considered Transient Errors -// - Retry times: 0.5s, 1s, 2s, 2s, 2s, ... until it reaches waitTimeout -func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, waitTimeout time.Duration) (*types.Receipt, error) { +func WaitForTransactionReceipt(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, config *retry.RetryConfig) func() (*types.Receipt, error) { receipt_func := func() (*types.Receipt, error) { receipt, err := client.TransactionReceipt(context.Background(), txHash) if err != nil { @@ -32,7 +23,19 @@ func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackC } return receipt, nil } - return retry.RetryWithData(receipt_func, retry.MinDelay, retry.RetryFactor, 0, time.Second*2, waitTimeout) + return receipt_func +} + +// WaitForTransactionReceiptRetryable repeatedly attempts to fetch the transaction receipt for a given transaction hash. +// If the receipt is not found, the function will retry with exponential backoff until the specified `waitTimeout` duration is reached. +// If the receipt is still unavailable after `waitTimeout`, it will return an error. +// +// Note: The `time.Second * 2` is set as the max interval in the retry mechanism because we can't reliably measure the specific time the tx will be included in a block. +// Setting a higher value will imply doing less retries across the waitTimeout, and so we might lose the receipt +// All errors are considered Transient Errors +// - Retry times: 0.5s, 1s, 2s, 2s, 2s, ... until it reaches waitTimeout +func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, config *retry.RetryConfig) (*types.Receipt, error) { + return retry.RetryWithData(WaitForTransactionReceipt(client, fallbackClient, txHash, config), config) } func BytesToQuorumNumbers(quorumNumbersBytes []byte) eigentypes.QuorumNums { @@ -81,6 +84,7 @@ func CalculateGasPriceBumpBasedOnRetry(currentGasPrice *big.Int, baseBumpPercent return bumpedGasPrice } +//TODO: move to retryable function file /* GetGasPriceRetryable Get the gas price from the client with retry logic. @@ -99,5 +103,5 @@ func GetGasPriceRetryable(client eth.InstrumentedClient, fallbackClient eth.Inst return gasPrice, nil } - return retry.RetryWithData(respondToTaskV2_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime) + return retry.RetryWithData(respondToTaskV2_func, retry.DefaultRetryConfig()) }