Skip to content

Commit

Permalink
feat(block-producer): address mempool review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Mirko-von-Leipzig committed Dec 9, 2024
1 parent 536bcd0 commit 25160ea
Show file tree
Hide file tree
Showing 15 changed files with 1,156 additions and 1,484 deletions.
4 changes: 2 additions & 2 deletions crates/block-producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ miden-objects = { workspace = true }
miden-processor = { workspace = true }
miden-stdlib = { workspace = true }
miden-tx = { workspace = true }
rand = { version = "0.8.5" }
rand = { version = "0.8" }
serde = { version = "1.0", features = ["derive"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "net", "macros", "sync", "time"] }
Expand All @@ -39,7 +39,7 @@ miden-lib = { workspace = true, features = ["testing"] }
miden-node-test-macro = { path = "../test-macro" }
miden-objects = { workspace = true, features = ["testing"] }
miden-tx = { workspace = true, features = ["testing"] }
pretty_assertions = "1.4.1"
pretty_assertions = "1.4"
rand_chacha = { version = "0.3", default-features = false }
tokio = { workspace = true, features = ["test-util"] }
winterfell = { version = "0.10" }
156 changes: 111 additions & 45 deletions crates/block-producer/src/batch_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@ use crate::{
COMPONENT, SERVER_BUILD_BATCH_FREQUENCY,
};

// FIXME: fix the batch builder tests.
// #[cfg(test)]
// mod tests;

pub mod batch;
pub use batch::TransactionBatch;
use miden_node_utils::formatting::{format_array, format_blake3_digest};
Expand All @@ -23,6 +19,11 @@ use crate::errors::BuildBatchError;
// BATCH BUILDER
// ================================================================================================

/// Builds [TransactionBatch] from sets of transactions.
///
/// Transaction sets are pulled from the [Mempool] at a configurable interval, and passed to a pool
/// of provers for proof generation. Proving is currently unimplemented and is instead simulated via
/// the given proof time and failure rate.
pub struct BatchBuilder {
pub batch_interval: Duration,
pub workers: NonZeroUsize,
Expand Down Expand Up @@ -62,12 +63,13 @@ impl BatchBuilder {
let mut interval = tokio::time::interval(self.batch_interval);
interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);

let mut inflight = WorkerPool::new(self.simulated_proof_time, self.failure_rate);
let mut worker_pool =
WorkerPool::new(self.workers, self.simulated_proof_time, self.failure_rate);

loop {
tokio::select! {
_ = interval.tick() => {
if inflight.len() >= self.workers.get() {
if !worker_pool.has_capacity() {
tracing::info!("All batch workers occupied.");
continue;
}
Expand All @@ -80,21 +82,16 @@ impl BatchBuilder {
continue;
};

inflight.spawn(batch_id, transactions);
worker_pool.spawn(batch_id, transactions).expect("Worker capacity was checked");
},
result = inflight.join_next() => {
result = worker_pool.join_next() => {
let mut mempool = mempool.lock().await;
match result {
Err(err) => {
tracing::warn!(%err, "Batch job panic'd.")
// TODO: somehow embed the batch ID into the join error, though this doesn't seem possible?
// mempool.batch_failed(batch_id);
},
Ok(Err((batch_id, err))) => {
Err((batch_id, err)) => {
tracing::warn!(%batch_id, %err, "Batch job failed.");
mempool.batch_failed(batch_id);
},
Ok(Ok((batch_id, batch))) => {
Ok((batch_id, batch)) => {
mempool.batch_proved(batch_id, batch);
}
}
Expand All @@ -109,63 +106,132 @@ impl BatchBuilder {

type BatchResult = Result<(BatchJobId, TransactionBatch), (BatchJobId, BuildBatchError)>;

/// Wrapper around tokio's JoinSet that remains pending if the set is empty,
/// Represents a pool of batch provers.
///
/// Effectively a wrapper around tokio's JoinSet that remains pending if the set is empty,
/// instead of returning None.
struct WorkerPool {
in_progress: JoinSet<BatchResult>,
simulated_proof_time: Range<Duration>,
failure_rate: f32,
/// Maximum number of workers allowed.
capacity: NonZeroUsize,
/// Maps spawned tasks to their job ID.
///
/// This allows us to map panic'd tasks to the job ID. Uses [Vec] because the task ID does not
/// implement [Ord]. Given that the expected capacity is relatively low, this has no real
/// impact beyond ergonomics.
task_map: Vec<(tokio::task::Id, BatchJobId)>,
}

impl WorkerPool {
fn new(simulated_proof_time: Range<Duration>, failure_rate: f32) -> Self {
fn new(
capacity: NonZeroUsize,
simulated_proof_time: Range<Duration>,
failure_rate: f32,
) -> Self {
Self {
simulated_proof_time,
failure_rate,
in_progress: JoinSet::new(),
capacity,
in_progress: JoinSet::default(),
task_map: Default::default(),
}
}

async fn join_next(&mut self) -> Result<BatchResult, tokio::task::JoinError> {
/// Returns the next batch proof result.
///
/// Will return pending if there are no jobs in progress (unlike tokio's [JoinSet::join_next]
/// which returns an option).
async fn join_next(&mut self) -> BatchResult {
if self.in_progress.is_empty() {
std::future::pending().await
} else {
// Cannot be None as its not empty.
self.in_progress.join_next().await.unwrap()
return std::future::pending().await;
}

let result = self
.in_progress
.join_next()
.await
.expect("JoinSet::join_next must be Some as the set is not empty")
.map_err(|join_err| {
// Map task ID to job ID as otherwise the caller can't tell which batch failed.
//
// Note that the mapping cleanup happens lower down.
let batch_id = self
.task_map
.iter()
.find(|(task_id, _)| &join_err.id() == task_id)
.expect("Task ID should be in the task map")
.1;

(batch_id, join_err.into())
})
.and_then(|x| x);

// Cleanup task mapping by removing the result's task. This is inefficient but does not
// matter as the capacity is expected to be low.
let job_id = match &result {
Ok((id, _)) => id,
Err((id, _)) => id,
};
self.task_map.retain(|(_, elem_job_id)| elem_job_id != job_id);

result
}

fn len(&self) -> usize {
self.in_progress.len()
/// Returns `true` if there is a worker available.
fn has_capacity(&self) -> bool {
self.in_progress.len() < self.capacity.get()
}

fn spawn(&mut self, id: BatchJobId, transactions: Vec<AuthenticatedTransaction>) {
self.in_progress.spawn({
// Select a random work duration from the given proof range.
let simulated_proof_time =
rand::thread_rng().gen_range(self.simulated_proof_time.clone());
/// Spawns a new batch proving task on the worker pool.
///
/// # Errors
///
/// Returns an error if no workers are available which can be checked using
/// [has_capacity](Self::has_capacity).
fn spawn(
&mut self,
id: BatchJobId,
transactions: Vec<AuthenticatedTransaction>,
) -> Result<(), ()> {
if !self.has_capacity() {
return Err(());
}

let task_id = self
.in_progress
.spawn({
// Select a random work duration from the given proof range.
let simulated_proof_time =
rand::thread_rng().gen_range(self.simulated_proof_time.clone());

// Randomly fail batches at the configured rate.
//
// Note: Rng::gen rolls between [0, 1.0) for f32, so this works as expected.
let failed = rand::thread_rng().gen::<f32>() < self.failure_rate;

// Randomly fail batches at the configured rate.
//
// Note: Rng::gen rolls between [0, 1.0) for f32, so this works as expected.
let failed = rand::thread_rng().gen::<f32>() < self.failure_rate;
async move {
tracing::debug!("Begin proving batch.");

async move {
tracing::debug!("Begin proving batch.");
let batch = Self::build_batch(transactions).map_err(|err| (id, err))?;

let batch = Self::build_batch(transactions).map_err(|err| (id, err))?;
tokio::time::sleep(simulated_proof_time).await;
if failed {
tracing::debug!("Batch proof failure injected.");
return Err((id, BuildBatchError::InjectedFailure));
}

tracing::debug!("Batch proof completed.");

tokio::time::sleep(simulated_proof_time).await;
if failed {
tracing::debug!("Batch proof failure injected.");
return Err((id, BuildBatchError::InjectedFailure));
Ok((id, batch))
}
})
.id();

tracing::debug!("Batch proof completed.");
self.task_map.push((task_id, id));

Ok((id, batch))
}
});
Ok(())
}

#[instrument(target = "miden-block-producer", skip_all, err, fields(batch_id))]
Expand Down
Loading

0 comments on commit 25160ea

Please sign in to comment.