Skip to content

Commit

Permalink
use in operator
Browse files Browse the repository at this point in the history
  • Loading branch information
PatStiles committed Nov 21, 2024
1 parent 73b6e6e commit 1c9f3a8
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 63 deletions.
36 changes: 18 additions & 18 deletions core/chainio/avs_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)

// Subscribe to new tasks
sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil)
sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.EthCallRetryConfig())
if err != nil {
s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err)
s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.EthCallNumRetries, "err", err)
return nil, err
}

subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil)
subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.EthCallRetryConfig())
if err != nil {
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err)
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.EthCallNumRetries, "err", err)
return nil, err
}
s.logger.Info("Subscribed to new AlignedLayer V2 tasks")
Expand Down Expand Up @@ -114,14 +114,14 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema
case err := <-sub.Err():
s.logger.Warn("Error in new task subscription", "err", err)
sub.Unsubscribe()
sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil)
sub, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.EthCallRetryConfig())
if err != nil {
errorChannel <- err
}
case err := <-subFallback.Err():
s.logger.Warn("Error in fallback new task subscription", "err", err)
subFallback.Unsubscribe()
subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil)
subFallback, err = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.EthCallRetryConfig())
if err != nil {
errorChannel <- err
}
Expand All @@ -137,13 +137,13 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3)

// Subscribe to new tasks
sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil)
sub, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.EthCallRetryConfig())
if err != nil {
s.logger.Error("Primary failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err)
return nil, err
}

subFallback, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil)
subFallback, err := SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.EthCallRetryConfig())
if err != nil {
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries, "err", err)
return nil, err
Expand Down Expand Up @@ -185,14 +185,14 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema
case err := <-sub.Err():
s.logger.Warn("Error in new task subscription", "err", err)
sub.Unsubscribe()
sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil)
sub, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.EthCallRetryConfig())
if err != nil {
errorChannel <- err
}
case err := <-subFallback.Err():
s.logger.Warn("Error in fallback new task subscription", "err", err)
subFallback.Unsubscribe()
subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil)
subFallback, err = SubscribeToNewTasksV3Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.EthCallRetryConfig())
if err != nil {
errorChannel <- err
}
Expand Down Expand Up @@ -258,7 +258,7 @@ func (s *AvsSubscriber) processNewBatchV3(batch *servicemanager.ContractAlignedL
// getLatestNotRespondedTaskFromEthereum queries the blockchain for the latest not responded task using the FilterNewBatch method.
func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, error) {

latestBlock, err := s.BlockNumberRetryable(context.Background())
latestBlock, err := s.BlockNumberRetryable(context.Background(), retry.EthCallRetryConfig())
if err != nil {
return nil, err
}
Expand All @@ -271,7 +271,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanag
fromBlock = latestBlock - BlockInterval
}

logs, err := s.FilterBatchV2Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil)
logs, err := s.FilterBatchV2Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil, retry.EthCallRetryConfig())
if err != nil {
return nil, err
}
Expand All @@ -293,7 +293,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanag

batchIdentifier := append(lastLog.BatchMerkleRoot[:], lastLog.SenderAddress[:]...)
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
state, err := s.BatchesStateRetryable(nil, batchIdentifierHash)
state, err := s.BatchesStateRetryable(nil, batchIdentifierHash, retry.EthCallRetryConfig())
if err != nil {
return nil, err
}
Expand All @@ -307,7 +307,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanag

// getLatestNotRespondedTaskFromEthereum queries the blockchain for the latest not responded task using the FilterNewBatch method.
func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
latestBlock, err := s.BlockNumberRetryable(context.Background())
latestBlock, err := s.BlockNumberRetryable(context.Background(), retry.EthCallRetryConfig())
if err != nil {
return nil, err
}
Expand All @@ -320,7 +320,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanag
fromBlock = latestBlock - BlockInterval
}

logs, err := s.FilterBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil)
logs, err := s.FilterBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil, retry.EthCallRetryConfig())
if err != nil {
return nil, err
}
Expand All @@ -342,7 +342,7 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanag

batchIdentifier := append(lastLog.BatchMerkleRoot[:], lastLog.SenderAddress[:]...)
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
state, err := s.BatchesStateRetryable(nil, batchIdentifierHash)
state, err := s.BatchesStateRetryable(nil, batchIdentifierHash, retry.EthCallRetryConfig())
if err != nil {
return nil, err
}
Expand All @@ -355,15 +355,15 @@ func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanag
}

func (s *AvsSubscriber) WaitForOneBlock(startBlock uint64) error {
currentBlock, err := s.BlockNumberRetryable(context.Background())
currentBlock, err := s.BlockNumberRetryable(context.Background(), retry.EthCallRetryConfig())
if err != nil {
return err
}

if currentBlock <= startBlock { // should really be == but just in case
// Subscribe to new head
c := make(chan *types.Header)
sub, err := s.SubscribeNewHeadRetryable(context.Background(), c)
sub, err := s.SubscribeNewHeadRetryable(context.Background(), c, retry.EthCallRetryConfig())
if err != nil {
return err
}
Expand Down
14 changes: 7 additions & 7 deletions core/chainio/avs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewAvsWriterFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, gasBumpPercentage uint, gasBumpIncrementalPercentage uint, timeToWaitBeforeBump time.Duration, onGasPriceBumped func(*big.Int)) (*types.Receipt, error) {
txOpts := *w.Signer.GetTxOpts()
txOpts.NoSend = true // simulate the transaction
simTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
simTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature, retry.ChainRetryConfig())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
}
}
w.logger.Infof("Receipts for old transactions not found, will check if the batch state has been responded", "merkle root", batchMerkleRootHashString)
batchState, _ := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash)
batchState, _ := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash, retry.ChainRetryConfig())
if batchState.Responded {
w.logger.Infof("Batch state has been already responded", "merkle root", batchMerkleRootHashString)
return nil, nil
Expand All @@ -183,7 +183,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
}

w.logger.Infof("Sending RespondToTask transaction with a gas price of %v", txOpts.GasPrice, "merkle root", batchMerkleRootHashString)
realTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
realTx, err := w.RespondToTaskV2Retryable(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature, retry.ChainRetryConfig())
if err != nil {
w.logger.Errorf("Respond to task transaction err, %v", err, "merkle root", batchMerkleRootHashString)
return nil, err
Expand Down Expand Up @@ -218,7 +218,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
// if the tx cost was higher, then it means the aggregator has paid the difference for the batcher (txCost - respondToTaskFeeLimit) and so metrics are updated accordingly.
// otherwise nothing is done.
func (w *AvsWriter) checkIfAggregatorHadToPaidForBatcher(tx *types.Transaction, batchIdentifierHash [32]byte) {
batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash)
batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash, retry.EthCallRetryConfig())
if err != nil {
return
}
Expand All @@ -244,7 +244,7 @@ func (w *AvsWriter) checkAggAndBatcherHaveEnoughBalance(tx *types.Transaction, t
txCost := new(big.Int).Mul(txGasAsBigInt, txGasPrice)
w.logger.Info("Transaction cost", "cost", txCost)

batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash)
batchState, err := w.BatchesStateRetryable(&bind.CallOpts{}, batchIdentifierHash, retry.EthCallRetryConfig())
if err != nil {
w.logger.Error("Failed to get batch state", "error", err)
w.logger.Info("Proceeding to check balances against transaction cost")
Expand Down Expand Up @@ -272,7 +272,7 @@ func (w *AvsWriter) compareAggregatorBalance(amount *big.Int, aggregatorAddress
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

aggregatorBalance, err := w.BalanceAtRetryable(ctx, aggregatorAddress, nil)
aggregatorBalance, err := w.BalanceAtRetryable(ctx, aggregatorAddress, nil, retry.EthCallRetryConfig())
if err != nil {
// Ignore and continue.
w.logger.Error("failed to get aggregator balance: %v", err)
Expand All @@ -287,7 +287,7 @@ func (w *AvsWriter) compareAggregatorBalance(amount *big.Int, aggregatorAddress

func (w *AvsWriter) compareBatcherBalance(amount *big.Int, senderAddress [20]byte) error {
// Get batcher balance
batcherBalance, err := w.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress)
batcherBalance, err := w.BatcherBalancesRetryable(&bind.CallOpts{}, senderAddress, retry.EthCallRetryConfig())
if err != nil {
// Ignore and continue.
w.logger.Error("Failed to get batcherBalance", "error", err)
Expand Down
Loading

0 comments on commit 1c9f3a8

Please sign in to comment.