Skip to content

Commit

Permalink
Merge branch 'staging' into feat/batcher-queue-len-size-metric
Browse files Browse the repository at this point in the history
  • Loading branch information
Julian Ventura committed Dec 16, 2024
2 parents 1bd4ad8 + 062eeb9 commit 71af2ab
Show file tree
Hide file tree
Showing 8 changed files with 888 additions and 269 deletions.
17 changes: 17 additions & 0 deletions aggregator/pkg/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ type Aggregator struct {
// Stores the TaskResponse for each batch by batchIdentifierHash
batchDataByIdentifierHash map[[32]byte]BatchData

// Stores the start time for each batch of the aggregator by task index
batchStartTimeByIdx map[uint32]time.Time

// This task index is to communicate with the local BLS
// Service.
// Note: In case of a reboot it can start from 0 again
Expand All @@ -78,6 +81,7 @@ type Aggregator struct {
// - batchCreatedBlockByIdx
// - batchDataByIdentifierHash
// - nextBatchIndex
// - batchStartTimeByIdx
taskMutex *sync.Mutex

// Mutex to protect ethereum wallet
Expand Down Expand Up @@ -124,6 +128,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesIdxByIdentifierHash := make(map[[32]byte]uint32)
batchDataByIdentifierHash := make(map[[32]byte]BatchData)
batchCreatedBlockByIdx := make(map[uint32]uint64)
batchStartTimeByIdx := make(map[uint32]time.Time)

chainioConfig := sdkclients.BuildAllConfig{
EthHttpUrl: aggregatorConfig.BaseConfig.EthRpcUrl,
Expand Down Expand Up @@ -172,6 +177,7 @@ func NewAggregator(aggregatorConfig config.AggregatorConfig) (*Aggregator, error
batchesIdxByIdentifierHash: batchesIdxByIdentifierHash,
batchDataByIdentifierHash: batchDataByIdentifierHash,
batchCreatedBlockByIdx: batchCreatedBlockByIdx,
batchStartTimeByIdx: batchStartTimeByIdx,
nextBatchIndex: nextBatchIndex,
taskMutex: &sync.Mutex{},
walletMutex: &sync.Mutex{},
Expand Down Expand Up @@ -233,6 +239,7 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA
batchIdentifierHash := agg.batchesIdentifierHashByIdx[blsAggServiceResp.TaskIndex]
batchData := agg.batchDataByIdentifierHash[batchIdentifierHash]
taskCreatedBlock := agg.batchCreatedBlockByIdx[blsAggServiceResp.TaskIndex]
taskCreatedAt := agg.batchStartTimeByIdx[blsAggServiceResp.TaskIndex]
agg.taskMutex.Unlock()
agg.AggregatorConfig.BaseConfig.Logger.Info("- Unlocked Resources: Fetching task data")

Expand Down Expand Up @@ -266,6 +273,9 @@ func (agg *Aggregator) handleBlsAggServiceResponse(blsAggServiceResp blsagg.BlsA

agg.telemetry.LogQuorumReached(batchData.BatchMerkleRoot)

// Only observe quorum reached if successful
agg.metrics.ObserveTaskQuorumReached(time.Since(taskCreatedAt))

agg.logger.Info("Threshold reached", "taskIndex", blsAggServiceResp.TaskIndex,
"batchIdentifierHash", "0x"+hex.EncodeToString(batchIdentifierHash[:]))

Expand Down Expand Up @@ -320,6 +330,8 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
agg.metrics.IncBumpedGasPriceForAggregatedResponse()
agg.telemetry.BumpedTaskGasPrice(batchMerkleRoot, bumpedGasPrice.String())
}

startTime := time.Now()
receipt, err := agg.avsWriter.SendAggregatedResponse(
batchIdentifierHash,
batchMerkleRoot,
Expand All @@ -338,6 +350,9 @@ func (agg *Aggregator) sendAggregatedResponse(batchIdentifierHash [32]byte, batc
return nil, err
}

// We only send the latency metric if the response is successul
agg.metrics.ObserveLatencyForRespondToTask(time.Since(startTime))

agg.walletMutex.Unlock()
agg.logger.Infof("- Unlocked Wallet Resources: Sending aggregated response for batch %s", hex.EncodeToString(batchIdentifierHash[:]))

Expand Down Expand Up @@ -383,6 +398,7 @@ func (agg *Aggregator) AddNewTask(batchMerkleRoot [32]byte, senderAddress [20]by
BatchMerkleRoot: batchMerkleRoot,
SenderAddress: senderAddress,
}
agg.batchStartTimeByIdx[batchIndex] = time.Now()
agg.logger.Info(
"Task Info added in aggregator:",
"Task", batchIndex,
Expand Down Expand Up @@ -447,6 +463,7 @@ func (agg *Aggregator) ClearTasksFromMaps() {
delete(agg.batchCreatedBlockByIdx, i)
delete(agg.batchesIdentifierHashByIdx, i)
delete(agg.batchDataByIdentifierHash, batchIdentifierHash)
delete(agg.batchStartTimeByIdx, i)
} else {
agg.logger.Warn("Task not found in maps", "taskIndex", i)
}
Expand Down
121 changes: 75 additions & 46 deletions batcher/aligned-batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use retry::batcher_retryables::{
user_balance_is_unlocked_retryable,
};
use retry::{retry_function, RetryError};
use tokio::time::timeout;
use tokio::time::{timeout, Instant};
use types::batch_state::BatchState;
use types::user_state::UserState;

Expand Down Expand Up @@ -1441,28 +1441,12 @@ impl Batcher {
let batch_merkle_root_hex = hex::encode(batch_merkle_root);
info!("Batch merkle root: 0x{}", batch_merkle_root_hex);
let file_name = batch_merkle_root_hex.clone() + ".json";

info!("Uploading batch to S3...");
self.upload_batch_to_s3(batch_bytes, &file_name).await?;

if let Err(e) = self
.telemetry
.task_uploaded_to_s3(&batch_merkle_root_hex)
.await
{
warn!("Failed to send task status to telemetry: {:?}", e);
};
info!("Batch sent to S3 with name: {}", file_name);

info!("Uploading batch to contract");
let batch_data_pointer: String = "".to_owned() + &self.download_endpoint + "/" + &file_name;

let num_proofs_in_batch = leaves.len();

let gas_per_proof = (CONSTANT_GAS_COST
+ ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF * num_proofs_in_batch as u128)
/ num_proofs_in_batch as u128;

let fee_per_proof = U256::from(gas_per_proof) * gas_price;
let fee_for_aggregator = (U256::from(AGGREGATOR_GAS_COST)
* gas_price
Expand All @@ -1478,12 +1462,31 @@ impl Batcher {
respond_to_task_fee_limit,
);

let proof_submitters = finalized_batch.iter().map(|entry| entry.sender).collect();
let proof_submitters: Vec<Address> =
finalized_batch.iter().map(|entry| entry.sender).collect();

self.simulate_create_new_task(
*batch_merkle_root,
batch_data_pointer.clone(),
proof_submitters.clone(),
fee_params.clone(),
)
.await?;

self.metrics
.gas_price_used_on_latest_batch
.set(gas_price.as_u64() as i64);

info!("Uploading batch to S3...");
self.upload_batch_to_s3(batch_bytes, &file_name).await?;
if let Err(e) = self
.telemetry
.task_uploaded_to_s3(&batch_merkle_root_hex)
.await
{
warn!("Failed to send task status to telemetry: {:?}", e);
};
info!("Batch sent to S3 with name: {}", file_name);
if let Err(e) = self
.telemetry
.task_created(
Expand All @@ -1496,6 +1499,7 @@ impl Batcher {
warn!("Failed to send task status to telemetry: {:?}", e);
};

info!("Submitting batch to contract");
match self
.create_new_task(
*batch_merkle_root,
Expand Down Expand Up @@ -1533,27 +1537,7 @@ impl Batcher {
proof_submitters: Vec<Address>,
fee_params: CreateNewTaskFeeParams,
) -> Result<TransactionReceipt, BatcherError> {
// First, we simulate the tx
retry_function(
|| {
simulate_create_new_task_retryable(
batch_merkle_root,
batch_data_pointer.clone(),
proof_submitters.clone(),
fee_params.clone(),
&self.payment_service,
&self.payment_service_fallback,
)
},
ETHEREUM_CALL_MIN_RETRY_DELAY,
ETHEREUM_CALL_BACKOFF_FACTOR,
ETHEREUM_CALL_MAX_RETRIES,
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
.map_err(|e| e.inner())?;

// Then, we send the real tx
let start = Instant::now();
let result = retry_function(
|| {
create_new_task_retryable(
Expand All @@ -1572,6 +1556,11 @@ impl Batcher {
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await;
self.metrics
.create_new_task_duration
.set(start.elapsed().as_millis() as i64);
// Set to zero since it is not always executed
self.metrics.cancel_create_new_task_duration.set(0);
match result {
Ok(receipt) => {
if let Err(e) = self
Expand All @@ -1592,17 +1581,49 @@ impl Batcher {
}
}

/// Simulates the `create_new_task` transaction by sending an `eth_call` to the RPC node.
/// This function does not mutate the state but verifies if it will revert under the given conditions.
async fn simulate_create_new_task(
&self,
batch_merkle_root: [u8; 32],
batch_data_pointer: String,
proof_submitters: Vec<Address>,
fee_params: CreateNewTaskFeeParams,
) -> Result<(), BatcherError> {
retry_function(
|| {
simulate_create_new_task_retryable(
batch_merkle_root,
batch_data_pointer.clone(),
proof_submitters.clone(),
fee_params.clone(),
&self.payment_service,
&self.payment_service_fallback,
)
},
ETHEREUM_CALL_MIN_RETRY_DELAY,
ETHEREUM_CALL_BACKOFF_FACTOR,
ETHEREUM_CALL_MAX_RETRIES,
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
.map_err(|e| e.inner())?;

Ok(())
}

/// Sends a transaction to Ethereum with the same nonce as the previous one to override it.
/// Retries on recoverable errors with exponential backoff.
/// Bumps the fee if not included in 6 blocks, using `calculate_bumped_gas_price`.
/// In the first 5 attemps, bumps the fee every 3 blocks. Then exponential backoff takes over.
/// After 2 hours (attempt 13), retries occur hourly for 1 day (33 retries).
pub async fn cancel_create_new_task_tx(&self, old_tx_gas_price: U256) {
info!("Cancelling createNewTask transaction...");
let start = Instant::now();
let iteration = Arc::new(Mutex::new(0));
let previous_gas_price = Arc::new(Mutex::new(old_tx_gas_price));

if let Err(e) = retry_function(
match retry_function(
|| async {
let mut iteration = iteration.lock().await;
let mut previous_gas_price = previous_gas_price.lock().await;
Expand Down Expand Up @@ -1638,11 +1659,12 @@ impl Batcher {
)
.await
{
error!("Could not cancel createNewTask transaction: {e}");
return;
Ok(_) => info!("createNewTask transaction successfully canceled"),
Err(e) => error!("Could not cancel createNewTask transaction: {e}"),
};

info!("createNewTask transaction successfully canceled");
self.metrics
.cancel_create_new_task_duration
.set(start.elapsed().as_millis() as i64);
}

/// Only relevant for testing and for users to easily use Aligned
Expand Down Expand Up @@ -1784,7 +1806,8 @@ impl Batcher {
batch_bytes: &[u8],
file_name: &str,
) -> Result<(), BatcherError> {
retry_function(
let start = Instant::now();
let result = retry_function(
|| {
Self::upload_batch_to_s3_retryable(
batch_bytes,
Expand All @@ -1799,7 +1822,13 @@ impl Batcher {
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
.map_err(|e| BatcherError::BatchUploadError(e.to_string()))
.map_err(|e| BatcherError::BatchUploadError(e.to_string()));

self.metrics
.s3_duration
.set(start.elapsed().as_micros() as i64);

result
}

async fn upload_batch_to_s3_retryable(
Expand Down
18 changes: 18 additions & 0 deletions batcher/aligned-batcher/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ pub struct BatcherMetrics {
pub broken_ws_connections: IntCounter,
pub queue_len: IntGauge,
pub queue_size_bytes: IntGauge,
pub s3_duration: IntGauge,
pub create_new_task_duration: IntGauge,
pub cancel_create_new_task_duration: IntGauge,
}

impl BatcherMetrics {
Expand Down Expand Up @@ -53,6 +56,15 @@ impl BatcherMetrics {
"queue_size_bytes",
"Accumulated size in bytes of all proofs in the queue"
))?;
let s3_duration = register_int_gauge!(opts!("s3_duration", "S3 Duration"))?;
let create_new_task_duration = register_int_gauge!(opts!(
"create_new_task_duration",
"Create New Task Duration"
))?;
let cancel_create_new_task_duration = register_int_gauge!(opts!(
"cancel_create_new_task_duration",
"Cancel create New Task Duration"
))?;

registry.register(Box::new(open_connections.clone()))?;
registry.register(Box::new(received_proofs.clone()))?;
Expand All @@ -65,6 +77,9 @@ impl BatcherMetrics {
registry.register(Box::new(broken_ws_connections.clone()))?;
registry.register(Box::new(queue_len.clone()))?;
registry.register(Box::new(queue_size_bytes.clone()))?;
registry.register(Box::new(s3_duration.clone()))?;
registry.register(Box::new(create_new_task_duration.clone()))?;
registry.register(Box::new(cancel_create_new_task_duration.clone()))?;

let metrics_route = warp::path!("metrics")
.and(warp::any().map(move || registry.clone()))
Expand All @@ -88,6 +103,9 @@ impl BatcherMetrics {
broken_ws_connections,
queue_len,
queue_size_bytes,
s3_duration,
create_new_task_duration,
cancel_create_new_task_duration,
})
}

Expand Down
8 changes: 7 additions & 1 deletion batcher/aligned-batcher/src/retry/batcher_retryables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,13 @@ pub async fn simulate_create_new_task_retryable(
.gas_price(fee_params.gas_price);
// sends an `eth_call` request to the node
match simulation.call().await {
Ok(_) => Ok(()),
Ok(_) => {
info!(
"Simulation task for: 0x{} succeeded.",
hex::encode(batch_merkle_root)
);
Ok(())
}
Err(ContractError::Revert(err)) => {
// Since transaction was reverted, we don't want to retry with fallback.
warn!("Simulated transaction reverted {:?}", err);
Expand Down
2 changes: 1 addition & 1 deletion batcher/aligned-sdk/src/core/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub const CONSTANT_GAS_COST: u128 =
+ BATCHER_SUBMISSION_BASE_GAS_COST;
pub const DEFAULT_MAX_FEE_PER_PROOF: u128 =
ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF * 100_000_000_000; // gas_price = 100 Gwei = 0.0000001 ether (high gas price)
pub const CONNECTION_TIMEOUT: u64 = 5; // 5 secs
pub const CONNECTION_TIMEOUT: u64 = 30; // 30 secs

// % modifiers: (100% is x1, 10% is x0.1, 1000% is x10)
pub const RESPOND_TO_TASK_FEE_LIMIT_PERCENTAGE_MULTIPLIER: u128 = 250; // fee_for_aggregator -> respondToTaskFeeLimit modifier
Expand Down
Loading

0 comments on commit 71af2ab

Please sign in to comment.