Skip to content

Commit

Permalink
Merge branch '311-aggregator-wait-for-receipt-for-1-minute-if-not-bum…
Browse files Browse the repository at this point in the history
…p-the-fee-v2' into test-aggregator-bump-fee
  • Loading branch information
MarcosNicolau committed Oct 22, 2024
2 parents 4a8a9f5 + 3469101 commit 39bd318
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 64 deletions.
3 changes: 2 additions & 1 deletion aggregator/internal/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
"senderAddress", hex.EncodeToString(senderAddress[:]),
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))

receipt, err := agg.avsWriter.SendAggregatedResponse(batchIdentifierHash, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature)
onRetry := func() { agg.metrics.IncBumpedGasPriceForAggregatedResponse() }
receipt, err := agg.avsWriter.SendAggregatedResponse(batchIdentifierHash, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature, onRetry)
if err != nil {
agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Error sending aggregated response for batch %s. Error: %s", hex.EncodeToString(batchIdentifierHash[:]), err)
Expand Down
43 changes: 36 additions & 7 deletions core/chainio/avs_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ import (
"github.com/yetanotherco/aligned_layer/core/utils"
)

const (
gasBumpPercentage int = 10
)

type AvsWriter struct {
*avsregistry.ChainWriter
AvsContractBindings *AvsServiceBindings
Expand Down Expand Up @@ -72,7 +76,10 @@ func NewAvsWriterFromConfig(baseConfig *config.BaseConfig, ecdsaConfig *config.E
}, nil
}

func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature) (*types.Receipt, error) {
// Sends AggregatedResponse and waits for the receipt for three blocks, if not received
// it will try again bumping the last tx gas price based on `CalculateGasPriceBump`
// This process happens indefinitely until the transaction is included.
func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMerkleRoot [32]byte, senderAddress [20]byte, nonSignerStakesAndSignature servicemanager.IBLSSignatureCheckerNonSignerStakesAndSignature, onRetry func()) (*types.Receipt, error) {
txOpts := *w.Signer.GetTxOpts()
txOpts.NoSend = true // simulate the transaction
tx, err := w.AvsContractBindings.ServiceManager.RespondToTaskV2(&txOpts, batchMerkleRoot, senderAddress, nonSignerStakesAndSignature, new(big.Int).SetUint64(5))
Expand All @@ -94,10 +101,20 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
txNonce := new(big.Int).SetUint64(tx.Nonce())
txOpts.NoSend = false
txOpts.Nonce = txNonce
var i uint64 = 1

executeTransaction := func(bumpedGasPrices *big.Int) (*types.Transaction, error) {
txOpts.GasPrice = bumpedGasPrices
lastTxGasPrice := tx.GasPrice()
var i uint64

Check failure on line 106 in core/chainio/avs_writer.go

View workflow job for this annotation

GitHub Actions / lint

S1021: should merge variable declaration with assignment on next line (gosimple)
i = 0
sendTransaction := func() (*types.Receipt, error) {
if i > 0 {
onRetry()
}
i++

bumpedGasPrice := utils.CalculateGasPriceBump(lastTxGasPrice, gasBumpPercentage)
lastTxGasPrice = bumpedGasPrice
txOpts.GasPrice = bumpedGasPrice

w.logger.Infof("Sending ResponseToTask transaction with a gas price of %v", txOpts.GasPrice)
err = w.checkRespondToTaskFeeLimit(tx, txOpts, batchIdentifierHash, senderAddress)

Expand All @@ -112,12 +129,24 @@ func (w *AvsWriter) SendAggregatedResponse(batchIdentifierHash [32]byte, batchMe
if err != nil {
return nil, connection.PermanentError{Inner: err}
}
return tx, nil
}
return tx, nil

ctx, cancel := context.WithTimeout(context.Background(), time.Second*36)
defer cancel()
receipt, err := utils.WaitForTransactionReceipt(w.Client, ctx, tx.Hash())

if receipt != nil {
return receipt, nil
}
// if we are here, this means we have reached the timeout (after three blocks it hasn't been included)
// so we try again by bumping the fee to make sure its included
if err != nil {
return nil, err
}
return nil, fmt.Errorf("transaction failed")
}

return utils.SendTransactionWithInfiniteRetryAndBumpingGasPrice(executeTransaction, w.Client, tx.GasPrice())
return connection.RetryWithData(sendTransaction, 1000, 2, 0)
}

func (w *AvsWriter) checkRespondToTaskFeeLimit(tx *types.Transaction, txOpts bind.TransactOpts, batchIdentifierHash [32]byte, senderAddress [20]byte) error {
Expand Down
4 changes: 0 additions & 4 deletions core/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ func (e PermanentError) Is(err error) bool {

// Same as Retry only that the functionToRetry can return a value upon correct execution
func RetryWithData[T any](functionToRetry func() (*T, error), minDelay uint64, factor float64, maxTries uint64) (*T, error) {
i := 0
f := func() (*T, error) {
val, err := functionToRetry()
i++
if perm, ok := err.(PermanentError); err != nil && ok {
return nil, backoff.Permanent(perm.Inner)
}
Expand Down Expand Up @@ -54,10 +52,8 @@ func RetryWithData[T any](functionToRetry func() (*T, error), minDelay uint64, f
// The function to be retried should return `PermanentError` when the condition for stop retrying
// is met.
func Retry(functionToRetry func() error, minDelay uint64, factor float64, maxTries uint64) error {
i := 0
f := func() error {
err := functionToRetry()
i++
if perm, ok := err.(PermanentError); err != nil && ok {
return backoff.Permanent(perm.Inner)
}
Expand Down
48 changes: 7 additions & 41 deletions core/utils/eth_client_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
eigentypes "github.com/Layr-Labs/eigensdk-go/types"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
connection "github.com/yetanotherco/aligned_layer/core"
)

const maxRetries = 25
Expand Down Expand Up @@ -50,45 +49,12 @@ func BytesToQuorumThresholdPercentages(quorumThresholdPercentagesBytes []byte) e
return quorumThresholdPercentages
}

// Very basic algorithm to calculate the gasPrice bump based on the currentGasPrice and retry iteration.
// It adds a i/10 percentage to the current prices, where i represents the iteration.
func CalculateGasPriceBumpBasedOnRetry(currentGasPrice *big.Int, iteration int) *big.Int {
factor := (new(big.Int).Add(big.NewInt(100), new(big.Int).Mul(big.NewInt(int64(iteration)), big.NewInt(10))))
gasPrice := new(big.Int).Mul(currentGasPrice, factor)
gasPrice = gasPrice.Div(gasPrice, big.NewInt(100))
// Very basic algorithm to calculate the gasPrice bump based on the currentGasPrice and percentage.
// It adds a the percentage to the current gas price.
func CalculateGasPriceBump(currentGasPrice *big.Int, percentage int) *big.Int {
percentageBump := new(big.Int).Div(big.NewInt(int64(percentage)), big.NewInt(100))
bumpAmount := new(big.Int).Mul(currentGasPrice, percentageBump)
bumpedGasPrice := new(big.Int).Add(currentGasPrice, bumpAmount)

return gasPrice
}

// Sends a transaction and waits for the receipt for three blocks, if not received
// it will try again bumping the gas price based on `CalculateGasPriceBumpBasedOnRetry`
// and pass it to executeTransaction (make sure you update the txOpts with the new gasPrice)
// This process happens indefinitely until we get the receipt or the receipt status is an err.
func SendTransactionWithInfiniteRetryAndBumpingGasPrice(executeTransaction func(*big.Int) (*types.Transaction, error), client eth.InstrumentedClient, baseGasPrice *big.Int) (*types.Receipt, error) {
i := 0
sendTransaction := func() (*types.Receipt, error) {
i++
gasPrice := CalculateGasPriceBumpBasedOnRetry(baseGasPrice, i)

tx, err := executeTransaction(gasPrice)
if err != nil {
return nil, err
}

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*2000)
defer cancel()
receipt, err := WaitForTransactionReceipt(client, ctx, tx.Hash())

if receipt != nil {
return receipt, nil
}
// if we are here, this means we have reached the timeout (after three blocks it hasn't been included)
// so we try again by bumping the fee to make sure its included
if err != nil {
return nil, err
}
return nil, fmt.Errorf("transaction failed")

}
return connection.RetryWithData(sendTransaction, 1000, 2, 0)
return bumpedGasPrice
}
112 changes: 106 additions & 6 deletions grafana/provisioning/dashboards/aligned/aggregator_batcher.json
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@
"h": 7,
"w": 10,
"x": 0,
"y": 34
"y": 20
},
"id": 9,
"options": {
Expand Down Expand Up @@ -910,7 +910,7 @@
"h": 7,
"w": 10,
"x": 10,
"y": 34
"y": 20
},
"id": 1,
"options": {
Expand Down Expand Up @@ -974,7 +974,7 @@
"h": 7,
"w": 5,
"x": 0,
"y": 41
"y": 27
},
"id": 8,
"options": {
Expand Down Expand Up @@ -1044,7 +1044,7 @@
"h": 7,
"w": 5,
"x": 5,
"y": 41
"y": 27
},
"id": 7,
"options": {
Expand Down Expand Up @@ -1083,6 +1083,106 @@
"title": "Tasks Received",
"type": "stat"
},
{
"datasource": {
"type": "prometheus",
"uid": "prometheus"
},
"description": "Number of times gas price was bumped while sending an aggregated response.",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 7,
"w": 10,
"x": 10,
"y": 27
},
"id": 20,
"interval": "36",
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "prometheus"
},
"disableTextWrap": false,
"editorMode": "builder",
"expr": "aligned_respond_to_task_gas_price_bumped",
"fullMetaSearch": false,
"includeNullMetadata": true,
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A",
"useBackend": false
}
],
"title": "Bumped gas price for aggregated responses",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
Expand All @@ -1109,7 +1209,7 @@
"h": 7,
"w": 5,
"x": 0,
"y": 48
"y": 34
},
"id": 2,
"options": {
Expand Down Expand Up @@ -1179,7 +1279,7 @@
"h": 7,
"w": 5,
"x": 5,
"y": 48
"y": 34
},
"id": 5,
"options": {
Expand Down
20 changes: 15 additions & 5 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
)

type Metrics struct {
ipPortAddress string
logger logging.Logger
numAggregatedResponses prometheus.Counter
numAggregatorReceivedTasks prometheus.Counter
numOperatorTaskResponses prometheus.Counter
ipPortAddress string
logger logging.Logger
numAggregatedResponses prometheus.Counter
numAggregatorReceivedTasks prometheus.Counter
numOperatorTaskResponses prometheus.Counter
numBumpedGasPriceForAggregatedResponse prometheus.Counter
}

const alignedNamespace = "aligned"
Expand All @@ -40,6 +41,11 @@ func NewMetrics(ipPortAddress string, reg prometheus.Registerer, logger logging.
Name: "aggregator_received_tasks",
Help: "Number of tasks received by the Service Manager",
}),
numBumpedGasPriceForAggregatedResponse: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Namespace: alignedNamespace,
Name: "respond_to_task_gas_price_bumped",
Help: "Number of times gas price was bumped while sending aggregated response",
}),
}
}

Expand Down Expand Up @@ -74,3 +80,7 @@ func (m *Metrics) IncAggregatedResponses() {
func (m *Metrics) IncOperatorTaskResponses() {
m.numOperatorTaskResponses.Inc()
}

func (m *Metrics) IncBumpedGasPriceForAggregatedResponse() {
m.numBumpedGasPriceForAggregatedResponse.Inc()
}

0 comments on commit 39bd318

Please sign in to comment.