Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat (batcher): dont retry + gas escalator + sequential txs #680

Merged
merged 19 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 24 additions & 74 deletions batcher/aligned-batcher/src/eth/mod.rs
Original file line number Diff line number Diff line change
@@ -1,93 +1,52 @@
use std::iter::repeat;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;

use aligned_sdk::eth::batcher_payment_service::{BatcherPaymentServiceContract, SignatureData};
use aligned_sdk::eth::batcher_payment_service::BatcherPaymentServiceContract;
use ethers::prelude::k256::ecdsa::SigningKey;
use ethers::prelude::*;
use log::{error, info, warn};
use tokio::time::sleep;
use gas_escalator::{Frequency, GeometricGasPrice};

const CREATE_NEW_TASK_MAX_RETRIES: usize = 15;
const CREATE_NEW_TASK_MILLISECS_BETWEEN_RETRIES: u64 = 2000;

use crate::{config::ECDSAConfig, types::errors::BatcherError};
use crate::config::ECDSAConfig;

#[derive(Debug, Clone, EthEvent)]
pub struct BatchVerified {
pub batch_merkle_root: [u8; 32],
}

pub type BatcherPaymentService =
BatcherPaymentServiceContract<SignerMiddleware<Provider<Http>, Wallet<SigningKey>>>;
pub type BatcherPaymentService = BatcherPaymentServiceContract<
SignerMiddleware<GasEscalatorMiddleware<Provider<RetryClient<Http>>>, Wallet<SigningKey>>,
>;

pub fn get_provider(eth_rpc_url: String) -> Result<Provider<Http>, anyhow::Error> {
Provider::<Http>::try_from(eth_rpc_url).map_err(|err| anyhow::anyhow!(err))
}
const MAX_RETRIES: u32 = 15; // Max retries for the retry client. Will only retry on network errors
const INITIAL_BACKOFF: u64 = 1000; // Initial backoff for the retry client in milliseconds, will increase every retry
const GAS_MULTIPLIER: f64 = 1.125; // Multiplier for the gas price for gas escalator
const GAS_ESCALATOR_INTERVAL: u64 = 12; // Time in seconds between gas escalations

pub async fn create_new_task(
payment_service: &BatcherPaymentService,
batch_merkle_root: [u8; 32],
batch_data_pointer: String,
leaves: Vec<[u8; 32]>,
signatures: Vec<SignatureData>,
gas_for_aggregator: U256,
gas_per_proof: U256,
) -> Result<TransactionReceipt, BatcherError> {
// pad leaves to next power of 2
let padded_leaves = pad_leaves(leaves);
pub fn get_provider(eth_rpc_url: String) -> Result<Provider<RetryClient<Http>>, anyhow::Error> {
let provider = Http::from_str(eth_rpc_url.as_str())
.map_err(|e| anyhow::Error::msg(format!("Failed to create provider: {}", e)))?;

let call = payment_service.create_new_task(
batch_merkle_root,
batch_data_pointer,
padded_leaves,
signatures,
gas_for_aggregator,
gas_per_proof,
let client = RetryClient::new(
provider,
Box::<ethers::providers::HttpRateLimitRetryPolicy>::default(),
MAX_RETRIES,
INITIAL_BACKOFF,
);

// If there was a pending transaction from a previously sent batch, the `call.send()` will
// fail because of the nonce not being updated. We should retry sending and not returning an error
// immediatly.
info!("Creating task for: {:x?}", batch_merkle_root);

for i in 0..CREATE_NEW_TASK_MAX_RETRIES {
match call.send().await {
Ok(pending_tx) => match pending_tx.await {
Ok(Some(receipt)) => return Ok(receipt),
Ok(None) => return Err(BatcherError::ReceiptNotFoundError),
Err(_) => return Err(BatcherError::TransactionSendError),
},
Err(error) => {
if i != CREATE_NEW_TASK_MAX_RETRIES - 1 {
warn!(
"Error when trying to create a task: {}\n Retrying ...",
error
);
} else {
error!("Error when trying to create a task on last retry. Batch task {:x?} will be lost", batch_merkle_root);
return Err(BatcherError::TaskCreationError(error.to_string()));
}
}
};

sleep(Duration::from_millis(
CREATE_NEW_TASK_MILLISECS_BETWEEN_RETRIES,
))
.await;
}

Err(BatcherError::MaxRetriesReachedError)
Ok(Provider::<RetryClient<Http>>::new(client))
}

pub async fn get_batcher_payment_service(
provider: Provider<Http>,
provider: Provider<RetryClient<Http>>,
ecdsa_config: ECDSAConfig,
contract_address: String,
) -> Result<BatcherPaymentService, anyhow::Error> {
let chain_id = provider.get_chainid().await?;

let escalator = GeometricGasPrice::new(GAS_MULTIPLIER, GAS_ESCALATOR_INTERVAL, None::<u64>);

let provider = GasEscalatorMiddleware::new(provider, escalator, Frequency::PerBlock);

// get private key from keystore
let wallet = Wallet::decrypt_keystore(
&ecdsa_config.private_key_store_path,
Expand All @@ -102,12 +61,3 @@ pub async fn get_batcher_payment_service(

Ok(service_manager)
}

fn pad_leaves(leaves: Vec<[u8; 32]>) -> Vec<[u8; 32]> {
let leaves_len = leaves.len();
let last_leaf = leaves[leaves_len - 1];
leaves
.into_iter()
.chain(repeat(last_leaf).take(leaves_len.next_power_of_two() - leaves_len))
.collect()
}
132 changes: 93 additions & 39 deletions batcher/aligned-batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use serde::Serialize;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::env;
use std::iter::repeat;
use std::net::SocketAddr;
use std::sync::Arc;

Expand All @@ -20,7 +21,7 @@ use aws_sdk_s3::client::Client as S3Client;
use eth::BatcherPaymentService;
use ethers::prelude::{Middleware, Provider};
use ethers::providers::Ws;
use ethers::types::{Address, Signature, U256};
use ethers::types::{Address, Signature, TransactionReceipt, U256};
use futures_util::stream::{self, SplitSink};
use futures_util::{future, SinkExt, StreamExt, TryStreamExt};
use lambdaworks_crypto::merkle_tree::merkle::MerkleTree;
Expand Down Expand Up @@ -232,36 +233,34 @@ impl Batcher {

let nonce = U256::from_big_endian(client_msg.verification_data.nonce.as_slice());
let nonced_verification_data = client_msg.verification_data;
if nonced_verification_data.verification_data.proof.len() <= self.max_proof_size {
// When pre-verification is enabled, batcher will verify proofs for faster feedback with clients
if self.pre_verification_is_enabled
&& !zk_utils::verify(&nonced_verification_data.verification_data)
{
error!("Invalid proof detected. Verification failed.");
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidProof)
.await;
return Ok(()); // Send error message to the client and return
}

// Doing nonce verification after proof verification to avoid unnecessary nonce increment
if !self.check_nonce_and_increment(addr, nonce).await {
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidNonce)
.await;
return Ok(()); // Send error message to the client and return
}

self.add_to_batch(
nonced_verification_data,
ws_conn_sink.clone(),
client_msg.signature,
)
.await;
} else {
error!("Proof is too large");
if nonced_verification_data.verification_data.proof.len() > self.max_proof_size {
error!("Proof size exceeds the maximum allowed size.");
send_message(ws_conn_sink.clone(), ValidityResponseMessage::ProofTooLarge)
.await;
return Ok(());
}

// When pre-verification is enabled, batcher will verify proofs for faster feedback with clients
if self.pre_verification_is_enabled
&& !zk_utils::verify(&nonced_verification_data.verification_data)
{
error!("Invalid proof detected. Verification failed.");
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidProof).await;
return Ok(()); // Send error message to the client and return
};
}

// Doing nonce verification after proof verification to avoid unnecessary nonce increment
if !self.check_nonce_and_increment(addr, nonce).await {
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidNonce).await;
return Ok(()); // Send error message to the client and return
}

self.add_to_batch(
nonced_verification_data,
ws_conn_sink.clone(),
client_msg.signature,
)
.await;

info!("Verification data message handled");

Expand Down Expand Up @@ -551,7 +550,6 @@ impl Batcher {
info!("Batch sent to S3 with name: {}", file_name);

info!("Uploading batch to contract");
let payment_service = &self.payment_service;
let batch_data_pointer = "https://".to_owned() + &self.s3_bucket_name + "/" + &file_name;

let num_proofs_in_batch = leaves.len();
Expand All @@ -566,19 +564,75 @@ impl Batcher {
.map(|(i, signature)| SignatureData::new(signature, nonces[i]))
.collect();

eth::create_new_task(
payment_service,
*batch_merkle_root,
match self
.create_new_task(
*batch_merkle_root,
batch_data_pointer,
leaves,
signatures,
AGGREGATOR_COST.into(),
gas_per_proof.into(),
)
.await
{
Ok(_) => {
info!("Batch verification task created on Aligned contract");
Ok(())
}
Err(e) => {
error!(
"Failed to send batch to contract, batch will be lost: {:?}",
e
);

Err(e)
}
}
}

async fn create_new_task(
&self,
batch_merkle_root: [u8; 32],
batch_data_pointer: String,
leaves: Vec<[u8; 32]>,
signatures: Vec<SignatureData>,
gas_for_aggregator: U256,
gas_per_proof: U256,
) -> Result<TransactionReceipt, BatcherError> {
// pad leaves to next power of 2
let padded_leaves = Self::pad_leaves(leaves);

let call = self.payment_service.create_new_task(
batch_merkle_root,
batch_data_pointer,
leaves,
padded_leaves,
signatures,
AGGREGATOR_COST.into(), // FIXME(uri): This value should be read from aligned_layer/contracts/script/deploy/config/devnet/batcher-payment-service.devnet.config.json
gas_per_proof.into(), //FIXME(uri): This value should be read from aligned_layer/contracts/script/deploy/config/devnet/batcher-payment-service.devnet.config.json
)
.await?;
gas_for_aggregator,
gas_per_proof,
);

info!("Batch verification task created on Aligned contract");
Ok(())
info!("Creating task for: {}", hex::encode(batch_merkle_root));

let pending_tx = call
.send()
.await
.map_err(|e| BatcherError::TaskCreationError(e.to_string()))?;

let receipt = pending_tx
.await
.map_err(|_| BatcherError::TransactionSendError)?
.ok_or(BatcherError::ReceiptNotFoundError)?;

Ok(receipt)
}

fn pad_leaves(leaves: Vec<[u8; 32]>) -> Vec<[u8; 32]> {
let leaves_len = leaves.len();
let last_leaf = leaves[leaves_len - 1];
leaves
.into_iter()
.chain(repeat(last_leaf).take(leaves_len.next_power_of_two() - leaves_len))
.collect()
}

/// Only relevant for testing and for users to easily use Aligned
Expand Down

Large diffs are not rendered by default.

Loading