Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aggregator): Aggregator Retry improvements #1425

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
4f2daf2
progress
PatStiles Nov 14, 2024
3ae2fc0
pass in config file
PatStiles Nov 14, 2024
54781ac
Add missing comments + retryable markers
PatStiles Nov 14, 2024
85921cd
separate retryable logic from functions
PatStiles Nov 14, 2024
c3124c3
refactor tests
PatStiles Nov 14, 2024
62f1fff
rm cmts
PatStiles Nov 14, 2024
f0cc51e
Merge branch 'staging' into 1401-1414-aggregator-retry-improvements
MarcosNicolau Nov 19, 2024
4a3014b
rm left over cmt
PatStiles Nov 19, 2024
4d72472
mv waitForTxConfig
PatStiles Nov 19, 2024
44cb844
fix comment
PatStiles Nov 19, 2024
459ee8c
add fallback to FilterBatch
PatStiles Nov 19, 2024
b4f982d
add fallback to BatchState
PatStiles Nov 19, 2024
8564934
fix test
PatStiles Nov 19, 2024
f6bd5f5
NumRetries -> MaxNumRetries
PatStiles Nov 19, 2024
4612e2a
nit
PatStiles Nov 19, 2024
ff78ccc
grab block height from rpc client
PatStiles Nov 19, 2024
9b51c3c
fix log
PatStiles Nov 19, 2024
6c8b3dc
Merge branch 'staging' into 1401-1414-aggregator-retry-improvements
PatStiles Nov 19, 2024
55a197e
remove setting num retries
PatStiles Nov 19, 2024
ea9affb
revert MaxNumRetries -> NumRetries, change default name to rpc
PatStiles Nov 19, 2024
bb593d1
nit
PatStiles Nov 19, 2024
635448f
make WaitForTxRetry retry values constants
PatStiles Nov 19, 2024
1a702fe
change var name
PatStiles Nov 19, 2024
0845e66
naming
PatStiles Nov 19, 2024
261aa1c
rm extra comment
PatStiles Nov 19, 2024
ad756a2
missed fucntions outside chainio
PatStiles Nov 20, 2024
65d1dbe
add missing test for GetGasPrice
PatStiles Nov 20, 2024
218a07c
function name change
PatStiles Nov 20, 2024
57369ec
Merge branch 'staging' into 1401-1414-aggregator-retry-improvements
PatStiles Nov 20, 2024
ec54b47
fix lint
PatStiles Nov 20, 2024
555233a
fix gas test
PatStiles Nov 20, 2024
5f437ae
Merge branch 'staging' into 1401-1414-aggregator-retry-improvements
PatStiles Nov 20, 2024
087b5f3
add constants + use config in SendAggregatedResponse
PatStiles Nov 20, 2024
2f2434f
Merge branch 'staging' into 1401-1414-aggregator-retry-improvements
PatStiles Nov 20, 2024
97dcd8c
fix: GetTaskIndexFunc
uri-99 Nov 20, 2024
a07e43f
chore: remove comment
uri-99 Nov 20, 2024
4ecde3c
Remova retry from initialize new task retryable
PatStiles Nov 21, 2024
c8cd0b2
remove retry tests
PatStiles Nov 21, 2024
a6e984c
Merge branch 'staging' into 1401-1414-aggregator-retry-improvements
MauroToscano Nov 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aggregator/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func (agg *Aggregator) InitializeNewTask(batchIndex uint32, taskCreatedBlock uin
}
return err
}
return retry.Retry(initializeNewTask_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
return retry.Retry(initializeNewTask_func, retry.DefaultRetryConfig())
}

// Long-lived goroutine that periodically checks and removes old Tasks from stored Maps
Expand Down
18 changes: 11 additions & 7 deletions aggregator/pkg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t
"operatorId", hex.EncodeToString(signedTaskResponse.OperatorId[:]))
taskIndex := uint32(0)

taskIndex, err := agg.GetTaskIndex(signedTaskResponse.BatchIdentifierHash)
taskIndex, err := agg.GetTaskIndexRetryable(signedTaskResponse.BatchIdentifierHash)

if err != nil {
agg.logger.Warn("Task not found in the internal map, operator signature will be lost. Batch may not reach quorum")
Expand All @@ -72,7 +72,7 @@ func (agg *Aggregator) ProcessOperatorSignedTaskResponseV2(signedTaskResponse *t

agg.logger.Info("Starting bls signature process")
go func() {
err := agg.ProcessNewSignature(
err := agg.ProcessNewSignatureRetryable(
context.Background(), taskIndex, signedTaskResponse.BatchIdentifierHash,
&signedTaskResponse.BlsSignature, signedTaskResponse.OperatorId,
)
Expand Down Expand Up @@ -123,7 +123,7 @@ func (agg *Aggregator) ServerRunning(_ *struct{}, reply *int64) error {
- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks)
- NOTE: TaskNotFound errors from the BLS Aggregation service are Transient errors as block reorg's may lead to these errors being thrown.
*/
func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32, taskResponse interface{}, blsSignature *bls.Signature, operatorId eigentypes.Bytes32) error {
func (agg *Aggregator) ProcessNewSignatureRetryable(ctx context.Context, taskIndex uint32, taskResponse interface{}, blsSignature *bls.Signature, operatorId eigentypes.Bytes32) error {
processNewSignature_func := func() error {
err := agg.blsAggregationService.ProcessNewSignature(
ctx, taskIndex, taskResponse,
Expand All @@ -137,10 +137,15 @@ func (agg *Aggregator) ProcessNewSignature(ctx context.Context, taskIndex uint32
return err
}

return retry.Retry(processNewSignature_func, retry.MinDelayChain, retry.RetryFactor, retry.NumRetries, retry.MaxIntervalChain, retry.MaxElapsedTime)
return retry.Retry(processNewSignature_func, retry.ChainRetryConfig())
}

func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error) {
// Checks Internal mapping for Signed Task Response, returns its TaskIndex.
/*
- All errors are considered Transient Errors
- Retry times (3 retries): 1 sec, 2 sec, 4 sec
*/
func (agg *Aggregator) GetTaskIndexRetryable(batchIdentifierHash [32]byte) (uint32, error) {
getTaskIndex_func := func() (uint32, error) {
agg.taskMutex.Lock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Locked Resources: Starting processing of Response")
Expand All @@ -153,6 +158,5 @@ func (agg *Aggregator) GetTaskIndex(batchIdentifierHash [32]byte) (uint32, error
return taskIndex, nil
}
}

return retry.RetryWithData(getTaskIndex_func, retry.MinDelay, retry.RetryFactor, retry.NumRetries, retry.MaxInterval, retry.MaxElapsedTime)
return retry.RetryWithData(getTaskIndex_func, retry.DefaultRetryConfig())
}
4 changes: 2 additions & 2 deletions core/chainio/avs_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,13 @@ func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicema
// Subscribe to new tasks
sub, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil)
if err != nil {
s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err)
s.logger.Error("Primary failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultMaxNumRetries, "err", err)
return nil, err
}

subFallback, err := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil)
if err != nil {
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.NumRetries, "err", err)
s.logger.Error("Fallback failed to subscribe to new AlignedLayer V2 tasks after %d retries", retry.DefaultMaxNumRetries, "err", err)
return nil, err
}
s.logger.Info("Subscribed to new AlignedLayer V2 tasks")
Expand Down
15 changes: 13 additions & 2 deletions core/chainio/avs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,17 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
txOpts.NoSend = false
i := 0

// Set Retry config for RespondToTaskV2
respondToTaskV2Config := retry.DefaultRetryConfig()
respondToTaskV2Config.MaxNumRetries = 0
respondToTaskV2Config.MaxElapsedTime = 0

// Set Retry config for WaitForTxRetryable
waitForTxConfig := retry.DefaultRetryConfig()
waitForTxConfig.MaxInterval = 2 * time.Second
PatStiles marked this conversation as resolved.
Show resolved Hide resolved
waitForTxConfig.MaxNumRetries = 0
waitForTxConfig.MaxElapsedTime = timeToWaitBeforeBump

respondToTaskV2Func := func() (*types.Receipt, error) {
gasPrice, err := utils.GetGasPriceRetryable(w.Client, w.ClientFallback)
if err != nil {
Expand Down Expand Up @@ -126,7 +137,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
return nil, err
}

receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, tx.Hash(), timeToWaitBeforeBump)
receipt, err := utils.WaitForTransactionReceiptRetryable(w.Client, w.ClientFallback, tx.Hash(), waitForTxConfig)
if receipt != nil {
w.checkIfAggregatorHadToPaidForBatcher(tx, batchIdentifierHash)
return receipt, nil
Expand All @@ -143,7 +154,7 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
return nil, fmt.Errorf("transaction failed")
}

return retry.RetryWithData(respondToTaskV2Func, retry.MinDelay, retry.RetryFactor, 0, retry.MaxInterval, 0)
return retry.RetryWithData(respondToTaskV2Func, respondToTaskV2Config)
}

// Calculates the transaction cost from the receipt and compares it with the batcher respondToTaskFeeLimit
Expand Down
Loading
Loading