Skip to content

Commit

Permalink
Review: handle batch proving worker panic
Browse files Browse the repository at this point in the history
  • Loading branch information
Mirko-von-Leipzig committed Dec 6, 2024
1 parent 860f791 commit 2d3d60d
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 35 deletions.
102 changes: 67 additions & 35 deletions crates/block-producer/src/batch_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,11 @@ impl BatchBuilder {
result = worker_pool.join_next() => {
let mut mempool = mempool.lock().await;
match result {
Err(err) => {
tracing::warn!(%err, "Batch job panic'd.")
// TODO: somehow embed the batch ID into the join error, though this doesn't seem possible?
// mempool.batch_failed(batch_id);
},
Ok(Err((batch_id, err))) => {
Err((batch_id, err)) => {
tracing::warn!(%batch_id, %err, "Batch job failed.");
mempool.batch_failed(batch_id);
},
Ok(Ok((batch_id, batch))) => {
Ok((batch_id, batch)) => {
mempool.batch_proved(batch_id, batch);
}
}
Expand All @@ -121,6 +116,12 @@ struct WorkerPool {
failure_rate: f32,
/// Maximum number of workers allowed.
capacity: NonZeroUsize,
/// Maps spawned tasks to their job ID.
///
/// This allows us to map panic'd tasks to the job ID. Uses [Vec] because the task ID does not
/// implement [Ord]. Given that the expected capacity is relatively low, this has no real
/// impact beyond ergonomics.
task_map: Vec<(tokio::task::Id, BatchJobId)>,
}

impl WorkerPool {
Expand All @@ -132,24 +133,50 @@ impl WorkerPool {
Self {
simulated_proof_time,
failure_rate,
in_progress: JoinSet::new(),
capacity,
in_progress: JoinSet::default(),
task_map: Default::default(),
}
}

/// Returns the next batch proof result.
///
/// Will return pending if there are no jobs in progress (unlike tokio's [JoinSet::join_next]
/// which returns an option).
async fn join_next(&mut self) -> Result<BatchResult, tokio::task::JoinError> {
async fn join_next(&mut self) -> BatchResult {
if self.in_progress.is_empty() {
std::future::pending().await
} else {
self.in_progress
.join_next()
.await
.expect("JoinSet::join_next must be Some as the set is not empty")
return std::future::pending().await;
}

let result = self
.in_progress
.join_next()
.await
.expect("JoinSet::join_next must be Some as the set is not empty")
.map_err(|join_err| {
// Map task ID to job ID as otherwise the caller can't tell which batch failed.
//
// Note that the mapping cleanup happens lower down.
let batch_id = self
.task_map
.iter()
.find(|(task_id, _)| &join_err.id() == task_id)
.expect("Task ID should be in the task map")
.1;

(batch_id, join_err.into())
})
.and_then(|x| x);

// Cleanup task mapping by removing the result's task. This is inefficient but does not
// matter as the capacity is expected to be low.
let job_id = match &result {
Ok((id, _)) => id,
Err((id, _)) => id,
};
self.task_map.retain(|(_, elem_job_id)| elem_job_id != job_id);

result
}

/// Returns `true` if there is a worker available.
Expand All @@ -172,32 +199,37 @@ impl WorkerPool {
return Err(());
}

self.in_progress.spawn({
// Select a random work duration from the given proof range.
let simulated_proof_time =
rand::thread_rng().gen_range(self.simulated_proof_time.clone());
let task_id = self
.in_progress
.spawn({
// Select a random work duration from the given proof range.
let simulated_proof_time =
rand::thread_rng().gen_range(self.simulated_proof_time.clone());

// Randomly fail batches at the configured rate.
//
// Note: Rng::gen rolls between [0, 1.0) for f32, so this works as expected.
let failed = rand::thread_rng().gen::<f32>() < self.failure_rate;
// Randomly fail batches at the configured rate.
//
// Note: Rng::gen rolls between [0, 1.0) for f32, so this works as expected.
let failed = rand::thread_rng().gen::<f32>() < self.failure_rate;

async move {
tracing::debug!("Begin proving batch.");
async move {
tracing::debug!("Begin proving batch.");

let batch = Self::build_batch(transactions).map_err(|err| (id, err))?;
let batch = Self::build_batch(transactions).map_err(|err| (id, err))?;

tokio::time::sleep(simulated_proof_time).await;
if failed {
tracing::debug!("Batch proof failure injected.");
return Err((id, BuildBatchError::InjectedFailure));
}
tokio::time::sleep(simulated_proof_time).await;
if failed {
tracing::debug!("Batch proof failure injected.");
return Err((id, BuildBatchError::InjectedFailure));
}

tracing::debug!("Batch proof completed.");
tracing::debug!("Batch proof completed.");

Ok((id, batch))
}
});
Ok((id, batch))
}
})
.id();

self.task_map.push((task_id, id));

Ok(())
}
Expand Down
3 changes: 3 additions & 0 deletions crates/block-producer/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ pub enum BuildBatchError {

#[error("Nothing actually went wrong, failure was injected on purpose")]
InjectedFailure,

#[error("Batch proving task panic'd")]
JoinError(#[from] tokio::task::JoinError),
}

// Block prover errors
Expand Down

0 comments on commit 2d3d60d

Please sign in to comment.