Skip to content

Commit

Permalink
Merge 4b6f5f4 into a547440
Browse files Browse the repository at this point in the history
  • Loading branch information
uri-99 authored Dec 4, 2024
2 parents a547440 + 4b6f5f4 commit 5bab645
Show file tree
Hide file tree
Showing 13 changed files with 1,749 additions and 683 deletions.
38 changes: 36 additions & 2 deletions batcher/aligned-batcher/src/eth/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use crate::{
retry_function,
},
};
use aligned_sdk::core::constants::{
use aligned_sdk::core::{constants::{
ETHEREUM_CALL_BACKOFF_FACTOR, ETHEREUM_CALL_MAX_RETRIES, ETHEREUM_CALL_MAX_RETRY_DELAY,
ETHEREUM_CALL_MIN_RETRY_DELAY, GAS_PRICE_INCREMENT_PERCENTAGE_PER_ITERATION,
OVERRIDE_GAS_PRICE_PERCENTAGE_MULTIPLIER, PERCENTAGE_DIVIDER,
};
}, errors::BumpError, types::BumpUnit};
use ethers::prelude::*;
use ethers::providers::{Http, Provider};
use log::error;
Expand Down Expand Up @@ -103,6 +103,40 @@ pub async fn get_gas_price(
e.inner()
})
}

pub async fn calculate_bumped_proof_cost(
current_gas_price: U256,
bump_unit: BumpUnit,
amount: U256,
) -> Result<U256, BumpError> {
let new_max_fee = match bump_unit {
BumpUnit::NewMaxFee => {
amount
}
BumpUnit::Wei => {
current_gas_price + amount
}
BumpUnit::Gwei => {
current_gas_price + (amount * U256::from(1_000_000_000))
}
BumpUnit::Eth => {
current_gas_price + (amount * U256::from(1_000_000_000_000_000_000))
}
BumpUnit::Percentage => {
current_gas_price + (current_gas_price * amount / U256::from(PERCENTAGE_DIVIDER))
}
// TODO once Pats pr is done:
// BumpUnit::BatchSize => {

// }
_ => {
warn!("Invalid bump unit: {bump_unit:?}");
return Err(BumpError::InvalidBumpUnit);
}
};
Ok(new_max_fee)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
206 changes: 176 additions & 30 deletions batcher/aligned-batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use ethers::contract::ContractError;
use ethers::signers::Signer;
use retry::batcher_retryables::{
cancel_create_new_task_retryable, create_new_task_retryable, get_user_balance_retryable,
get_user_nonce_from_ethereum_retryable, user_balance_is_unlocked_retryable,
get_user_nonce_from_ethereum_retryable, simulate_create_new_task_retryable,
user_balance_is_unlocked_retryable,
};
use retry::{retry_function, RetryError};
use tokio::time::timeout;
Expand All @@ -30,9 +31,7 @@ use aligned_sdk::core::constants::{
RESPOND_TO_TASK_FEE_LIMIT_PERCENTAGE_MULTIPLIER,
};
use aligned_sdk::core::types::{
ClientMessage, GetNonceResponseMessage, NoncedVerificationData, ProofInvalidReason,
ProvingSystemId, SubmitProofMessage, SubmitProofResponseMessage, VerificationCommitmentBatch,
VerificationData, VerificationDataCommitment,
BumpFeeResponseMessage, BumpUnit, ClientMessage, GetNonceResponseMessage, NoncedVerificationData, ProofInvalidReason, ProvingSystemId, SubmitProofMessage, SubmitProofResponseMessage, VerificationCommitmentBatch, VerificationData, VerificationDataCommitment
};

use aws_sdk_s3::client::Client as S3Client;
Expand All @@ -47,7 +46,7 @@ use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{Mutex, MutexGuard, RwLock};
use tokio_tungstenite::tungstenite::{Error, Message};
use types::batch_queue::{self, BatchQueueEntry, BatchQueueEntryPriority};
use types::errors::BatcherError;
use types::errors::{BatcherError, TransactionSendError};

use crate::config::{ConfigFromYaml, ContractDeploymentOutput};
use crate::telemetry::sender::TelemetrySender;
Expand Down Expand Up @@ -472,6 +471,11 @@ impl Batcher {
.handle_submit_proof_msg(msg, ws_conn_sink)
.await
}
ClientMessage::BumpFee(bump_unit, amout, proof_qty) => {
self.clone()
.handle_bump_fee_msg(bump_unit, amout, proof_qty, ws_conn_sink)
.await
}
}
}

Expand Down Expand Up @@ -782,6 +786,9 @@ impl Batcher {

// In this case, the message might be a replacement one. If it is valid,
// we replace the old entry with the new from the replacement message.

// TODO DISABLE THIS FUNCTIONALITY
// IT IS BEING REPLACED WITH BumpFee()
if expected_nonce > msg_nonce {
info!("Possible replacement message received: Expected nonce {expected_nonce:?} - message nonce: {msg_nonce:?}");
self.handle_replacement_message(
Expand Down Expand Up @@ -832,6 +839,84 @@ impl Batcher {
Ok(())
}


// TODO add signature
async fn handle_bump_fee_msg(
bump_unit: BumpUnit,
amount: U256,
proof_qty: usize,
signature: Signature, //TODO add this
signature_content: Vec<u8>, //TODO add this
ws_conn_sink: WsMessageSink,
) -> Result<(), Error> {

// /// The signature of the message is verified, and when it correct, the
// /// recovered address from the signature is returned.
// pub fn verify_signature(&self) -> Result<Address, VerifySignatureError> {
// // Recovers the address from the signed data
// let recovered = self.signature.recover_typed_data(&self.verification_data)?;

// let hashed_data = self.verification_data.encode_eip712()?;

// self.signature.verify(hashed_data, recovered)?;
// Ok(recovered)
// }

info!("Verifying Bump Fee signature...");
let Ok(addr) = verify_bump_signature(signature, signature_content) else {
error!("Signature verification error");
send_message(
ws_conn_sink.clone(),
BumpFeeResponseMessage::InvalidSignature,
)
.await;
self.metrics.user_error(&["invalid_bump_fee_signature", ""]);
return Ok(());
};
info!("Bump Fee signature verified");


let from_proof_nonce = self.get_first_proof();
let to_proof_nonce = from_proof_nonce + proof_qty; // exclusive

let batch_state_lock = self.batch_state.lock().await;
let batch_queue_copy = batch_state_lock.batch_queue.clone();

// TODO check if user has enough funds
while let Some((entry, _)) = batch_state_lock.batch_queue.peek() {
let entry_nonce = entry.nonced_verification_data.nonce;
if entry_nonce >= from_proof_nonce && entry_nonce < to_proof_nonce {
if let Err(bumped_fee) = calculate_bumped_proof_cost(entry.nonced_verification_data.max_fee, bump_unit, amount) {
send_message(
ws_conn_sink.clone(),
BumpFeeResponseMessage::InvalidBumpUnit,
)
.await;
self.metrics.user_error(&["invalid_bump_fee_unit", ""]);
return Ok(());
}
let new_entry = entry.clone();
new_entry.nonced_verification_data.max_fee = bumped_fee;

batch_state_lock.update_user_total_fees_in_queue_of_replacement_message(sender_address, entry_nonce.clone(), bumped_fee).await?;
batch_state_lock.replace_entry(entry, new_entry);
}
};
return;

}

fn verify_bump_signature(
&self,
signature: Signature,
bump_unit: BumpUnit,
amount: U256,
proof_qty: usize,
) -> bool {
// TODO
true
}

async fn is_verifier_disabled(&self, verifier: ProvingSystemId) -> bool {
let disabled_verifiers = self.disabled_verifiers.lock().await;
zk_utils::is_verifier_disabled(*disabled_verifiers, verifier)
Expand Down Expand Up @@ -1115,12 +1200,13 @@ impl Batcher {
/// an empty batch, even if the block interval has been reached.
/// Once the batch meets the conditions for submission, the finalized batch is then passed to the
/// `finalize_batch` function.
/// This function doesn't remove the proofs from the queue.
async fn is_batch_ready(
&self,
block_number: u64,
gas_price: U256,
) -> Option<Vec<BatchQueueEntry>> {
let mut batch_state_lock = self.batch_state.lock().await;
let batch_state_lock = self.batch_state.lock().await;
let current_batch_len = batch_state_lock.batch_queue.len();
let last_uploaded_batch_block_lock = self.last_uploaded_batch_block.lock().await;

Expand Down Expand Up @@ -1152,7 +1238,7 @@ impl Batcher {
// Set the batch posting flag to true
*batch_posting = true;
let batch_queue_copy = batch_state_lock.batch_queue.clone();
let (resulting_batch_queue, finalized_batch) = batch_queue::try_build_batch(
let finalized_batch = batch_queue::try_build_batch(
batch_queue_copy,
gas_price,
self.max_batch_byte_size,
Expand All @@ -1172,7 +1258,26 @@ impl Batcher {
})
.ok()?;

batch_state_lock.batch_queue = resulting_batch_queue;
Some(finalized_batch)
}

/// Takes the submitted proofs and removes them from the queue.
/// This function should be called only AFTER the submission was confirmed onchain
async fn remove_proofs_from_queue(
&self,
finalized_batch: Vec<BatchQueueEntry>,
) -> Result<(), BatcherError> {
info!("Removing proofs from queue...");
let mut batch_state_lock = self.batch_state.lock().await;

finalized_batch.iter().for_each(|entry| {
if batch_state_lock.batch_queue.remove(entry).is_none() {
// If this happens, we have a bug in our code
error!("Some proofs were not found in the queue. This should not happen.");
}
});

// now we calculate the new user_states
let new_user_states = // proofs, max_fee_limit, total_fees_in_queue
batch_state_lock.calculate_new_user_states_data();

Expand All @@ -1188,17 +1293,33 @@ impl Batcher {
// informative error.

// Now we update the user states related to the batch (proof count in batch and min fee in batch)
batch_state_lock.update_user_proof_count(addr, *proof_count)?;
batch_state_lock.update_user_max_fee_limit(addr, *max_fee_limit)?;
batch_state_lock.update_user_total_fees_in_queue(addr, *total_fees_in_queue)?;
batch_state_lock
.update_user_proof_count(addr, *proof_count)
.ok_or(BatcherError::QueueRemoveError(
"Could not update_user_proof_count".into(),
))?;
batch_state_lock
.update_user_max_fee_limit(addr, *max_fee_limit)
.ok_or(BatcherError::QueueRemoveError(
"Could not update_user_max_fee_limit".into(),
))?;
batch_state_lock
.update_user_total_fees_in_queue(addr, *total_fees_in_queue)
.ok_or(BatcherError::QueueRemoveError(
"Could not update_user_total_fees_in_queue".into(),
))?;
}

Some(finalized_batch)
Ok(())
}

/// Takes the finalized batch as input and builds the merkle tree, posts verification data batch
/// to s3, creates new task in Aligned contract and sends responses to all clients that added proofs
/// to the batch. The last uploaded batch block is updated once the task is created in Aligned.
/// Takes the finalized batch as input and:
/// builds the merkle tree
/// posts verification data batch to s3
/// creates new task in Aligned contract
/// removes the proofs from the queue, once they are succesfully submitted on-chain
/// sends responses to all clients that added proofs to the batch.
/// The last uploaded batch block is updated once the task is created in Aligned.
async fn finalize_batch(
&self,
block_number: u64,
Expand Down Expand Up @@ -1256,6 +1377,7 @@ impl Batcher {
warn!("Failed to initialize task trace on telemetry: {:?}", e);
}

// Here we submit the batch on-chain
if let Err(e) = self
.submit_batch(
&batch_bytes,
Expand All @@ -1274,27 +1396,30 @@ impl Batcher {
{
warn!("Failed to send task status to telemetry: {:?}", e);
}
for entry in finalized_batch.into_iter() {
if let Some(ws_sink) = entry.messaging_sink {
let merkle_root = hex::encode(batch_merkle_tree.root);
send_message(
ws_sink.clone(),
SubmitProofResponseMessage::CreateNewTaskError(
merkle_root,
format!("{:?}", e),
),
)
.await
} else {
warn!("Websocket sink was found empty. This should only happen in tests");

// decide if i want to flush the queue:
match e {
BatcherError::TransactionSendError(
TransactionSendError::SubmissionInsufficientBalance,
) => {
// TODO calling remove_proofs_from_queue here is a better solution, flushing only the failed batch
// this would also need a message sent to the clients
self.flush_queue_and_clear_nonce_cache().await;
}
_ => {
// Add more cases here if we want in the future
}
}

self.flush_queue_and_clear_nonce_cache().await;

return Err(e);
};

// Once the submit is succesfull, we remove the submitted proofs from the queue
// TODO handle error case:
if let Err(e) = self.remove_proofs_from_queue(finalized_batch.clone()).await {
error!("Unexpected error while updating queue: {:?}", e);
}

connection::send_batch_inclusion_data_responses(finalized_batch, &batch_merkle_tree).await
}

Expand Down Expand Up @@ -1479,6 +1604,27 @@ impl Batcher {
proof_submitters: Vec<Address>,
fee_params: CreateNewTaskFeeParams,
) -> Result<TransactionReceipt, BatcherError> {
// First, we simulate the tx
retry_function(
|| {
simulate_create_new_task_retryable(
batch_merkle_root,
batch_data_pointer.clone(),
proof_submitters.clone(),
fee_params.clone(),
&self.payment_service,
&self.payment_service_fallback,
)
},
ETHEREUM_CALL_MIN_RETRY_DELAY,
ETHEREUM_CALL_BACKOFF_FACTOR,
ETHEREUM_CALL_MAX_RETRIES,
ETHEREUM_CALL_MAX_RETRY_DELAY,
)
.await
.map_err(|e| e.inner())?;

// Then, we send the real tx
let result = retry_function(
|| {
create_new_task_retryable(
Expand Down
Loading

0 comments on commit 5bab645

Please sign in to comment.