diff --git a/crates/block-producer/Cargo.toml b/crates/block-producer/Cargo.toml index 4a93628b..52c898f8 100644 --- a/crates/block-producer/Cargo.toml +++ b/crates/block-producer/Cargo.toml @@ -24,7 +24,7 @@ miden-objects = { workspace = true } miden-processor = { workspace = true } miden-stdlib = { workspace = true } miden-tx = { workspace = true } -rand = { version = "0.8.5" } +rand = { version = "0.8" } serde = { version = "1.0", features = ["derive"] } thiserror = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "net", "macros", "sync", "time"] } @@ -39,7 +39,7 @@ miden-lib = { workspace = true, features = ["testing"] } miden-node-test-macro = { path = "../test-macro" } miden-objects = { workspace = true, features = ["testing"] } miden-tx = { workspace = true, features = ["testing"] } -pretty_assertions = "1.4.1" +pretty_assertions = "1.4" rand_chacha = { version = "0.3", default-features = false } tokio = { workspace = true, features = ["test-util"] } winterfell = { version = "0.10" } diff --git a/crates/block-producer/src/batch_builder/mod.rs b/crates/block-producer/src/batch_builder/mod.rs index 30070b58..e6399ae9 100644 --- a/crates/block-producer/src/batch_builder/mod.rs +++ b/crates/block-producer/src/batch_builder/mod.rs @@ -10,10 +10,6 @@ use crate::{ COMPONENT, SERVER_BUILD_BATCH_FREQUENCY, }; -// FIXME: fix the batch builder tests. -// #[cfg(test)] -// mod tests; - pub mod batch; pub use batch::TransactionBatch; use miden_node_utils::formatting::{format_array, format_blake3_digest}; @@ -23,6 +19,11 @@ use crate::errors::BuildBatchError; // BATCH BUILDER // ================================================================================================ +/// Builds [TransactionBatch] from sets of transactions. +/// +/// Transaction sets are pulled from the [Mempool] at a configurable interval, and passed to a pool +/// of provers for proof generation. Proving is currently unimplemented and is instead simulated via +/// the given proof time and failure rate. pub struct BatchBuilder { pub batch_interval: Duration, pub workers: NonZeroUsize, @@ -62,12 +63,13 @@ impl BatchBuilder { let mut interval = tokio::time::interval(self.batch_interval); interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay); - let mut inflight = WorkerPool::new(self.simulated_proof_time, self.failure_rate); + let mut worker_pool = + WorkerPool::new(self.workers, self.simulated_proof_time, self.failure_rate); loop { tokio::select! { _ = interval.tick() => { - if inflight.len() >= self.workers.get() { + if !worker_pool.has_capacity() { tracing::info!("All batch workers occupied."); continue; } @@ -80,21 +82,16 @@ impl BatchBuilder { continue; }; - inflight.spawn(batch_id, transactions); + worker_pool.spawn(batch_id, transactions).expect("Worker capacity was checked"); }, - result = inflight.join_next() => { + result = worker_pool.join_next() => { let mut mempool = mempool.lock().await; match result { - Err(err) => { - tracing::warn!(%err, "Batch job panic'd.") - // TODO: somehow embed the batch ID into the join error, though this doesn't seem possible? - // mempool.batch_failed(batch_id); - }, - Ok(Err((batch_id, err))) => { + Err((batch_id, err)) => { tracing::warn!(%batch_id, %err, "Batch job failed."); mempool.batch_failed(batch_id); }, - Ok(Ok((batch_id, batch))) => { + Ok((batch_id, batch)) => { mempool.batch_proved(batch_id, batch); } } @@ -109,63 +106,132 @@ impl BatchBuilder { type BatchResult = Result<(BatchJobId, TransactionBatch), (BatchJobId, BuildBatchError)>; -/// Wrapper around tokio's JoinSet that remains pending if the set is empty, +/// Represents a pool of batch provers. +/// +/// Effectively a wrapper around tokio's JoinSet that remains pending if the set is empty, /// instead of returning None. struct WorkerPool { in_progress: JoinSet, simulated_proof_time: Range, failure_rate: f32, + /// Maximum number of workers allowed. + capacity: NonZeroUsize, + /// Maps spawned tasks to their job ID. + /// + /// This allows us to map panic'd tasks to the job ID. Uses [Vec] because the task ID does not + /// implement [Ord]. Given that the expected capacity is relatively low, this has no real + /// impact beyond ergonomics. + task_map: Vec<(tokio::task::Id, BatchJobId)>, } impl WorkerPool { - fn new(simulated_proof_time: Range, failure_rate: f32) -> Self { + fn new( + capacity: NonZeroUsize, + simulated_proof_time: Range, + failure_rate: f32, + ) -> Self { Self { simulated_proof_time, failure_rate, - in_progress: JoinSet::new(), + capacity, + in_progress: JoinSet::default(), + task_map: Default::default(), } } - async fn join_next(&mut self) -> Result { + /// Returns the next batch proof result. + /// + /// Will return pending if there are no jobs in progress (unlike tokio's [JoinSet::join_next] + /// which returns an option). + async fn join_next(&mut self) -> BatchResult { if self.in_progress.is_empty() { - std::future::pending().await - } else { - // Cannot be None as its not empty. - self.in_progress.join_next().await.unwrap() + return std::future::pending().await; } + + let result = self + .in_progress + .join_next() + .await + .expect("JoinSet::join_next must be Some as the set is not empty") + .map_err(|join_err| { + // Map task ID to job ID as otherwise the caller can't tell which batch failed. + // + // Note that the mapping cleanup happens lower down. + let batch_id = self + .task_map + .iter() + .find(|(task_id, _)| &join_err.id() == task_id) + .expect("Task ID should be in the task map") + .1; + + (batch_id, join_err.into()) + }) + .and_then(|x| x); + + // Cleanup task mapping by removing the result's task. This is inefficient but does not + // matter as the capacity is expected to be low. + let job_id = match &result { + Ok((id, _)) => id, + Err((id, _)) => id, + }; + self.task_map.retain(|(_, elem_job_id)| elem_job_id != job_id); + + result } - fn len(&self) -> usize { - self.in_progress.len() + /// Returns `true` if there is a worker available. + fn has_capacity(&self) -> bool { + self.in_progress.len() < self.capacity.get() } - fn spawn(&mut self, id: BatchJobId, transactions: Vec) { - self.in_progress.spawn({ - // Select a random work duration from the given proof range. - let simulated_proof_time = - rand::thread_rng().gen_range(self.simulated_proof_time.clone()); + /// Spawns a new batch proving task on the worker pool. + /// + /// # Errors + /// + /// Returns an error if no workers are available which can be checked using + /// [has_capacity](Self::has_capacity). + fn spawn( + &mut self, + id: BatchJobId, + transactions: Vec, + ) -> Result<(), ()> { + if !self.has_capacity() { + return Err(()); + } + + let task_id = self + .in_progress + .spawn({ + // Select a random work duration from the given proof range. + let simulated_proof_time = + rand::thread_rng().gen_range(self.simulated_proof_time.clone()); + + // Randomly fail batches at the configured rate. + // + // Note: Rng::gen rolls between [0, 1.0) for f32, so this works as expected. + let failed = rand::thread_rng().gen::() < self.failure_rate; - // Randomly fail batches at the configured rate. - // - // Note: Rng::gen rolls between [0, 1.0) for f32, so this works as expected. - let failed = rand::thread_rng().gen::() < self.failure_rate; + async move { + tracing::debug!("Begin proving batch."); - async move { - tracing::debug!("Begin proving batch."); + let batch = Self::build_batch(transactions).map_err(|err| (id, err))?; - let batch = Self::build_batch(transactions).map_err(|err| (id, err))?; + tokio::time::sleep(simulated_proof_time).await; + if failed { + tracing::debug!("Batch proof failure injected."); + return Err((id, BuildBatchError::InjectedFailure)); + } + + tracing::debug!("Batch proof completed."); - tokio::time::sleep(simulated_proof_time).await; - if failed { - tracing::debug!("Batch proof failure injected."); - return Err((id, BuildBatchError::InjectedFailure)); + Ok((id, batch)) } + }) + .id(); - tracing::debug!("Batch proof completed."); + self.task_map.push((task_id, id)); - Ok((id, batch)) - } - }); + Ok(()) } #[instrument(target = "miden-block-producer", skip_all, err, fields(batch_id))] diff --git a/crates/block-producer/src/batch_builder/tests/mod.rs b/crates/block-producer/src/batch_builder/tests/mod.rs deleted file mode 100644 index 46b63f70..00000000 --- a/crates/block-producer/src/batch_builder/tests/mod.rs +++ /dev/null @@ -1,329 +0,0 @@ -use std::iter; - -use assert_matches::assert_matches; -use miden_objects::{crypto::merkle::Mmr, Digest}; -use tokio::sync::RwLock; - -use super::*; -use crate::{ - block_builder::DefaultBlockBuilder, - errors::BuildBlockError, - test_utils::{ - note::mock_note, MockPrivateAccount, MockProvenTxBuilder, MockStoreSuccessBuilder, - }, -}; -// STRUCTS -// ================================================================================================ - -#[derive(Default)] -struct BlockBuilderSuccess { - batch_groups: SharedRwVec>, - num_empty_batches_received: Arc>, -} - -#[async_trait] -impl BlockBuilder for BlockBuilderSuccess { - async fn build_block(&self, batches: &[TransactionBatch]) -> Result<(), BuildBlockError> { - if batches.is_empty() { - *self.num_empty_batches_received.write().await += 1; - } else { - self.batch_groups.write().await.push(batches.to_vec()); - } - - Ok(()) - } -} - -#[derive(Default)] -struct BlockBuilderFailure; - -#[async_trait] -impl BlockBuilder for BlockBuilderFailure { - async fn build_block(&self, _batches: &[TransactionBatch]) -> Result<(), BuildBlockError> { - Err(BuildBlockError::TooManyBatchesInBlock(0)) - } -} - -// TESTS -// ================================================================================================ - -/// Tests that the number of batches in a block doesn't exceed `max_batches_per_block` -#[tokio::test] -#[miden_node_test_macro::enable_logging] -async fn test_block_size_doesnt_exceed_limit() { - let block_frequency = Duration::from_millis(20); - let max_batches_per_block = 2; - - let store = Arc::new(MockStoreSuccessBuilder::from_accounts(iter::empty()).build()); - let block_builder = Arc::new(BlockBuilderSuccess::default()); - - let batch_builder = Arc::new(DefaultBatchBuilder::new( - store, - block_builder.clone(), - DefaultBatchBuilderOptions { block_frequency, max_batches_per_block }, - )); - - // Add 3 batches in internal queue (remember: 2 batches/block) - { - let mut batch_group = - vec![dummy_tx_batch(0, 2), dummy_tx_batch(10, 2), dummy_tx_batch(20, 2)]; - - batch_builder.ready_batches.write().await.append(&mut batch_group); - } - - // start batch builder - tokio::spawn(batch_builder.run()); - - // Wait for 2 blocks to be produced - time::sleep(block_frequency * 3).await; - - // Ensure the block builder received 2 batches of the expected size - { - let batch_groups = block_builder.batch_groups.read().await; - - assert_eq!(batch_groups.len(), 2); - assert_eq!(batch_groups[0].len(), max_batches_per_block); - assert_eq!(batch_groups[1].len(), 1); - } -} - -/// Tests that `BlockBuilder::build_block()` is still called when there are no transactions -#[tokio::test] -#[miden_node_test_macro::enable_logging] -async fn test_build_block_called_when_no_batches() { - let block_frequency = Duration::from_millis(20); - let max_batches_per_block = 2; - - let store = Arc::new(MockStoreSuccessBuilder::from_accounts(iter::empty()).build()); - let block_builder = Arc::new(BlockBuilderSuccess::default()); - - let batch_builder = Arc::new(DefaultBatchBuilder::new( - store, - block_builder.clone(), - DefaultBatchBuilderOptions { block_frequency, max_batches_per_block }, - )); - - // start batch builder - tokio::spawn(batch_builder.run()); - - // Wait for at least 1 block to be produced - time::sleep(block_frequency * 2).await; - - // Ensure the block builder received at least 1 empty batch Note: we check `> 0` instead of an - // exact number to make the test flaky in case timings change in the implementation - assert!(*block_builder.num_empty_batches_received.read().await > 0); -} - -/// Tests that if `BlockBuilder::build_block()` fails, then batches are added back on the queue -#[tokio::test] -#[miden_node_test_macro::enable_logging] -async fn test_batches_added_back_to_queue_on_block_build_failure() { - let block_frequency = Duration::from_millis(20); - let max_batches_per_block = 2; - - let store = Arc::new(MockStoreSuccessBuilder::from_accounts(iter::empty()).build()); - let block_builder = Arc::new(BlockBuilderFailure); - - let batch_builder = Arc::new(DefaultBatchBuilder::new( - store, - block_builder.clone(), - DefaultBatchBuilderOptions { block_frequency, max_batches_per_block }, - )); - - let internal_ready_batches = batch_builder.ready_batches.clone(); - - // Add 3 batches in internal queue - { - let mut batch_group = - vec![dummy_tx_batch(0, 2), dummy_tx_batch(10, 2), dummy_tx_batch(20, 2)]; - - batch_builder.ready_batches.write().await.append(&mut batch_group); - } - - // start batch builder - tokio::spawn(batch_builder.run()); - - // Wait for 2 blocks to failed to be produced - time::sleep(block_frequency * 2 + (block_frequency / 2)).await; - - // Ensure the transaction batches are all still on the queue - assert_eq!(internal_ready_batches.read().await.len(), 3); -} - -#[tokio::test] -async fn test_batch_builder_find_dangling_notes() { - let store = Arc::new(MockStoreSuccessBuilder::from_accounts(iter::empty()).build()); - let block_builder = Arc::new(BlockBuilderSuccess::default()); - - let batch_builder = Arc::new(DefaultBatchBuilder::new( - store, - block_builder, - DefaultBatchBuilderOptions { - block_frequency: Duration::from_millis(20), - max_batches_per_block: 2, - }, - )); - - // An account with 5 states so that we can simulate running 2 transactions against it. - let account = MockPrivateAccount::<3>::from(1); - - let note_1 = mock_note(1); - let note_2 = mock_note(2); - let tx1 = MockProvenTxBuilder::with_account(account.id, account.states[0], account.states[1]) - .output_notes(vec![OutputNote::Full(note_1.clone())]) - .build(); - let tx2 = MockProvenTxBuilder::with_account(account.id, account.states[1], account.states[2]) - .unauthenticated_notes(vec![note_1.clone()]) - .output_notes(vec![OutputNote::Full(note_2.clone())]) - .build(); - - let txs = vec![tx1, tx2]; - - let dangling_notes = batch_builder.find_dangling_notes(&txs).await; - assert_eq!(dangling_notes, vec![], "Note must be presented in the same batch"); - - batch_builder.build_batch(txs.clone()).await.unwrap(); - - let dangling_notes = batch_builder.find_dangling_notes(&txs).await; - assert_eq!(dangling_notes, vec![], "Note must be presented in the same batch"); - - let note_3 = mock_note(3); - - let tx1 = MockProvenTxBuilder::with_account(account.id, account.states[0], account.states[1]) - .unauthenticated_notes(vec![note_2.clone()]) - .build(); - let tx2 = MockProvenTxBuilder::with_account(account.id, account.states[1], account.states[2]) - .unauthenticated_notes(vec![note_3.clone()]) - .build(); - - let txs = vec![tx1, tx2]; - - let dangling_notes = batch_builder.find_dangling_notes(&txs).await; - assert_eq!( - dangling_notes, - vec![note_3.id()], - "Only one dangling node must be found before block is built" - ); - - batch_builder.try_build_block().await; - - let dangling_notes = batch_builder.find_dangling_notes(&txs).await; - assert_eq!( - dangling_notes, - vec![note_2.id(), note_3.id()], - "Two dangling notes must be found after block is built" - ); -} - -#[tokio::test] -async fn test_block_builder_no_missing_notes() { - let account_1: MockPrivateAccount<3> = MockPrivateAccount::from(1); - let account_2: MockPrivateAccount<3> = MockPrivateAccount::from(2); - let store = Arc::new( - MockStoreSuccessBuilder::from_accounts( - [account_1, account_2].iter().map(|account| (account.id, account.states[0])), - ) - .build(), - ); - let block_builder = Arc::new(DefaultBlockBuilder::new(Arc::clone(&store), Arc::clone(&store))); - let batch_builder = Arc::new(DefaultBatchBuilder::new( - store, - Arc::clone(&block_builder), - DefaultBatchBuilderOptions { - block_frequency: Duration::from_millis(20), - max_batches_per_block: 2, - }, - )); - - let note_1 = mock_note(1); - let note_2 = mock_note(2); - - let tx1 = MockProvenTxBuilder::with_account_index(1) - .output_notes(vec![OutputNote::Full(note_1.clone())]) - .build(); - - let tx2 = MockProvenTxBuilder::with_account_index(2) - .unauthenticated_notes(vec![note_1.clone()]) - .output_notes(vec![OutputNote::Full(note_2.clone())]) - .build(); - - let txs = vec![tx1, tx2]; - - batch_builder.build_batch(txs.clone()).await.unwrap(); - - let build_block_result = batch_builder - .block_builder - .build_block(&batch_builder.ready_batches.read().await) - .await; - assert_matches!(build_block_result, Ok(())); -} - -#[tokio::test] -async fn test_block_builder_fails_if_notes_are_missing() { - let accounts: Vec<_> = (1..=4).map(MockPrivateAccount::<3>::from).collect(); - let notes: Vec<_> = (1..=6).map(mock_note).collect(); - // We require mmr for the note authentication to succeed. - // - // We also need two blocks worth of mmr because the mock store skips genesis. - let mut mmr = Mmr::new(); - mmr.add(Digest::new([1u32.into(), 2u32.into(), 3u32.into(), 4u32.into()])); - mmr.add(Digest::new([1u32.into(), 2u32.into(), 3u32.into(), 4u32.into()])); - - let store = Arc::new( - MockStoreSuccessBuilder::from_accounts( - accounts.iter().map(|account| (account.id, account.states[0])), - ) - .initial_notes([vec![OutputNote::Full(notes[0].clone())]].iter()) - .initial_chain_mmr(mmr) - .build(), - ); - let block_builder = Arc::new(DefaultBlockBuilder::new(Arc::clone(&store), Arc::clone(&store))); - let batch_builder = Arc::new(DefaultBatchBuilder::new( - store, - Arc::clone(&block_builder), - DefaultBatchBuilderOptions { - block_frequency: Duration::from_millis(20), - max_batches_per_block: 2, - }, - )); - - let tx1 = MockProvenTxBuilder::with_account_index(1) - .output_notes(vec![OutputNote::Full(notes[1].clone())]) - .build(); - - let tx2 = MockProvenTxBuilder::with_account_index(2) - .unauthenticated_notes(vec![notes[0].clone()]) - .output_notes(vec![OutputNote::Full(notes[2].clone()), OutputNote::Full(notes[3].clone())]) - .build(); - - let tx3 = MockProvenTxBuilder::with_account_index(3) - .unauthenticated_notes(notes.iter().skip(1).cloned().collect()) - .build(); - - let txs = vec![tx1, tx2, tx3]; - - let batch = TransactionBatch::new(txs.clone(), Default::default()).unwrap(); - let build_block_result = batch_builder.block_builder.build_block(&[batch]).await; - - let mut expected_missing_notes = vec![notes[4].id(), notes[5].id()]; - expected_missing_notes.sort(); - - assert_matches!( - build_block_result, - Err(BuildBlockError::UnauthenticatedNotesNotFound(actual_missing_notes)) => { - assert_eq!(actual_missing_notes, expected_missing_notes); - } - ); -} - -// HELPERS -// ================================================================================================ - -fn dummy_tx_batch(starting_account_index: u32, num_txs_in_batch: usize) -> TransactionBatch { - let txs = (0..num_txs_in_batch) - .map(|index| { - MockProvenTxBuilder::with_account_index(starting_account_index + index as u32).build() - }) - .collect(); - TransactionBatch::new(txs, Default::default()).unwrap() -} diff --git a/crates/block-producer/src/block_builder/mod.rs b/crates/block-producer/src/block_builder/mod.rs index a8c017c7..bb739131 100644 --- a/crates/block-producer/src/block_builder/mod.rs +++ b/crates/block-producer/src/block_builder/mod.rs @@ -23,10 +23,6 @@ pub(crate) mod prover; use self::prover::{block_witness::BlockWitness, BlockProver}; -// FIXME: reimplement the tests. -// #[cfg(test)] -// mod tests; - // BLOCK BUILDER // ================================================================================================= diff --git a/crates/block-producer/src/block_builder/prover/block_witness.rs b/crates/block-producer/src/block_builder/prover/block_witness.rs index af837339..10e37c00 100644 --- a/crates/block-producer/src/block_builder/prover/block_witness.rs +++ b/crates/block-producer/src/block_builder/prover/block_witness.rs @@ -7,7 +7,7 @@ use miden_objects::{ notes::Nullifier, transaction::TransactionId, vm::{AdviceInputs, StackInputs}, - BlockHeader, Digest, Felt, BLOCK_NOTE_TREE_DEPTH, ZERO, + BlockHeader, Digest, Felt, BLOCK_NOTE_TREE_DEPTH, MAX_BATCHES_PER_BLOCK, ZERO, }; use crate::{ @@ -35,6 +35,9 @@ impl BlockWitness { mut block_inputs: BlockInputs, batches: &[TransactionBatch], ) -> Result<(Self, Vec), BuildBlockError> { + // This limit should be enforced by the mempool. + assert!(batches.len() <= MAX_BATCHES_PER_BLOCK); + Self::validate_nullifiers(&block_inputs, batches)?; let batch_created_notes_roots = batches diff --git a/crates/block-producer/src/block_builder/tests.rs b/crates/block-producer/src/block_builder/tests.rs deleted file mode 100644 index d8fbc356..00000000 --- a/crates/block-producer/src/block_builder/tests.rs +++ /dev/null @@ -1,83 +0,0 @@ -// block builder tests (higher level) -// `apply_block()` is called - -use std::sync::Arc; - -use assert_matches::assert_matches; -use miden_objects::{ - accounts::{account_id::testing::ACCOUNT_ID_OFF_CHAIN_SENDER, AccountId}, - Digest, Felt, -}; - -use crate::{ - batch_builder::TransactionBatch, - block_builder::{BlockBuilder, BuildBlockError, DefaultBlockBuilder}, - test_utils::{MockProvenTxBuilder, MockStoreFailure, MockStoreSuccessBuilder}, -}; - -/// Tests that `build_block()` succeeds when the transaction batches are not empty -#[tokio::test] -#[miden_node_test_macro::enable_logging] -async fn test_apply_block_called_nonempty_batches() { - let account_id = AccountId::new_unchecked(Felt::new(ACCOUNT_ID_OFF_CHAIN_SENDER)); - let account_initial_hash: Digest = - [Felt::new(1u64), Felt::new(1u64), Felt::new(1u64), Felt::new(1u64)].into(); - let store = Arc::new( - MockStoreSuccessBuilder::from_accounts(std::iter::once((account_id, account_initial_hash))) - .build(), - ); - - let block_builder = DefaultBlockBuilder::new(store.clone(), store.clone()); - - let batches: Vec = { - let batch_1 = { - let tx = MockProvenTxBuilder::with_account( - account_id, - account_initial_hash, - [Felt::new(2u64), Felt::new(2u64), Felt::new(2u64), Felt::new(2u64)].into(), - ) - .build(); - - TransactionBatch::new(vec![tx], Default::default()).unwrap() - }; - - vec![batch_1] - }; - block_builder.build_block(&batches).await.unwrap(); - - // Ensure that the store's `apply_block()` was called - assert_eq!(*store.num_apply_block_called.read().await, 1); -} - -/// Tests that `build_block()` succeeds when the transaction batches are empty -#[tokio::test] -#[miden_node_test_macro::enable_logging] -async fn test_apply_block_called_empty_batches() { - let account_id = AccountId::new_unchecked(Felt::new(ACCOUNT_ID_OFF_CHAIN_SENDER)); - let account_hash: Digest = - [Felt::new(1u64), Felt::new(1u64), Felt::new(1u64), Felt::new(1u64)].into(); - let store = Arc::new( - MockStoreSuccessBuilder::from_accounts(std::iter::once((account_id, account_hash))).build(), - ); - - let block_builder = DefaultBlockBuilder::new(store.clone(), store.clone()); - - block_builder.build_block(&Vec::new()).await.unwrap(); - - // Ensure that the store's `apply_block()` was called - assert_eq!(*store.num_apply_block_called.read().await, 1); -} - -/// Tests that `build_block()` fails when `get_block_inputs()` fails -#[tokio::test] -#[miden_node_test_macro::enable_logging] -async fn test_build_block_failure() { - let store = Arc::new(MockStoreFailure); - - let block_builder = DefaultBlockBuilder::new(store.clone(), store.clone()); - - let result = block_builder.build_block(&Vec::new()).await; - - // Ensure that the store's `apply_block()` was called - assert_matches!(result, Err(BuildBlockError::GetBlockInputsFailed(_))); -} diff --git a/crates/block-producer/src/errors.rs b/crates/block-producer/src/errors.rs index 937bbefe..b2e9b13c 100644 --- a/crates/block-producer/src/errors.rs +++ b/crates/block-producer/src/errors.rs @@ -25,6 +25,13 @@ pub enum BlockProducerError { /// A block-producer task panic'd. #[error("error joining {task} task")] JoinError { task: &'static str, source: JoinError }, + + /// A block-producer task reported a transport error. + #[error("task {task} had a transport error")] + TonicTransportError { + task: &'static str, + source: tonic::transport::Error, + }, } // Transaction verification errors @@ -132,6 +139,9 @@ pub enum BuildBatchError { #[error("Nothing actually went wrong, failure was injected on purpose")] InjectedFailure, + + #[error("Batch proving task panic'd")] + JoinError(#[from] tokio::task::JoinError), } // Block prover errors diff --git a/crates/block-producer/src/mempool/batch_graph.rs b/crates/block-producer/src/mempool/batch_graph.rs index c489cddd..62b4c568 100644 --- a/crates/block-producer/src/mempool/batch_graph.rs +++ b/crates/block-producer/src/mempool/batch_graph.rs @@ -3,7 +3,7 @@ use std::collections::{BTreeMap, BTreeSet}; use miden_objects::transaction::TransactionId; use super::{ - dependency_graph::{DependencyGraph, GraphError}, + graph::{DependencyGraph, GraphError}, BatchJobId, BlockBudget, BudgetStatus, }; use crate::batch_builder::batch::TransactionBatch; @@ -135,7 +135,7 @@ impl BatchGraph { /// /// # Returns /// - /// Returns all removes batches and their transactions. + /// Returns all removed batches and their transactions. /// /// # Errors /// @@ -185,6 +185,8 @@ impl BatchGraph { &mut self, batch_ids: BTreeSet, ) -> Result, GraphError> { + // This clone could be elided by moving this call to the end. This would lose the atomic + // property of this method though its unclear what value (if any) that has. self.inner.prune_processed(batch_ids.clone())?; let mut transactions = Vec::new(); @@ -224,13 +226,17 @@ impl BatchGraph { while let Some(batch_id) = self.inner.roots().first().copied() { // SAFETY: Since it was a root batch, it must definitely have a processed batch // associated with it. - let batch = self.inner.get(&batch_id).expect("root should be in graph").clone(); + let batch = self.inner.get(&batch_id).expect("root should be in graph"); // Adhere to block's budget. - if budget.check_then_subtract(&batch) == BudgetStatus::Exceeded { + if budget.check_then_subtract(batch) == BudgetStatus::Exceeded { break; } + // Clone is required to avoid multiple borrows of self. We delay this clone until after + // the budget check, which is why this looks so out of place. + let batch = batch.clone(); + // SAFETY: This is definitely a root since we just selected it from the set of roots. self.inner.process_root(batch_id).expect("root should be processed"); diff --git a/crates/block-producer/src/mempool/dependency_graph.rs b/crates/block-producer/src/mempool/dependency_graph.rs deleted file mode 100644 index 5fb2766a..00000000 --- a/crates/block-producer/src/mempool/dependency_graph.rs +++ /dev/null @@ -1,1000 +0,0 @@ -use std::{ - collections::{BTreeMap, BTreeSet}, - fmt::{Debug, Display}, -}; - -// DEPENDENCY GRAPH -// ================================================================================================ - -/// A dependency graph structure where nodes are inserted, and then made available for processing -/// once all parent nodes have been processed. -/// -/// Forms the basis of our transaction and batch dependency graphs. -/// -/// # Node lifecycle -/// ```text -/// │ -/// │ -/// insert_pending│ -/// ┌─────▼─────┐ -/// │ pending │────┐ -/// └─────┬─────┘ │ -/// │ │ -/// promote_pending│ │ -/// ┌─────▼─────┐ │ -/// ┌──────────► in queue │────│ -/// │ └─────┬─────┘ │ -/// revert_processed│ │ │ -/// │ process_root│ │ -/// │ ┌─────▼─────┐ │ -/// └──────────┼ processed │────│ -/// └─────┬─────┘ │ -/// │ │ -/// prune_processed│ │purge_subgraphs -/// ┌─────▼─────┐ │ -/// │ ◄────┘ -/// └───────────┘ -/// ``` -#[derive(Clone, PartialEq, Eq)] -pub struct DependencyGraph { - /// Node's who's data is still pending. - pending: BTreeSet, - - /// Each node's data. - vertices: BTreeMap, - - /// Each node's parents. This is redundant with `children`, - /// but we require both for efficient lookups. - parents: BTreeMap>, - - /// Each node's children. This is redundant with `parents`, - /// but we require both for efficient lookups. - children: BTreeMap>, - - /// Nodes that are available to process next. - /// - /// Effectively this is the set of nodes which are - /// unprocessed and whose parent's _are_ all processed. - roots: BTreeSet, - - /// Set of nodes that are already processed. - processed: BTreeSet, -} - -impl Debug for DependencyGraph -where - K: Debug, -{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("DependencyGraph") - .field("pending", &self.pending) - .field("vertices", &self.vertices.keys()) - .field("processed", &self.processed) - .field("roots", &self.roots) - .field("parents", &self.parents) - .field("children", &self.children) - .finish() - } -} - -#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] -pub enum GraphError { - #[error("Node {0} already exists")] - DuplicateKey(K), - - #[error("Parents not found: {0:?}")] - MissingParents(BTreeSet), - - #[error("Nodes not found: {0:?}")] - UnknownNodes(BTreeSet), - - #[error("Nodes were not yet processed: {0:?}")] - UnprocessedNodes(BTreeSet), - - #[error("Nodes would be left dangling: {0:?}")] - DanglingNodes(BTreeSet), - - #[error("Node {0} is not a root node")] - InvalidRootNode(K), - - #[error("Node {0} is not a pending node")] - InvalidPendingNode(K), -} - -/// This cannot be derived without enforcing `Default` bounds on K and V. -impl Default for DependencyGraph { - fn default() -> Self { - Self { - vertices: Default::default(), - pending: Default::default(), - parents: Default::default(), - children: Default::default(), - roots: Default::default(), - processed: Default::default(), - } - } -} - -impl DependencyGraph { - /// Inserts a new pending node into the graph. - /// - /// # Errors - /// - /// Errors if the node already exists, or if any of the parents are not part of the graph. - /// - /// This method is atomic. - pub fn insert_pending(&mut self, key: K, parents: BTreeSet) -> Result<(), GraphError> { - if self.contains(&key) { - return Err(GraphError::DuplicateKey(key)); - } - - let missing_parents = parents - .iter() - .filter(|parent| !self.contains(parent)) - .copied() - .collect::>(); - if !missing_parents.is_empty() { - return Err(GraphError::MissingParents(missing_parents)); - } - - // Inform parents of their new child. - for parent in &parents { - self.children.entry(*parent).or_default().insert(key); - } - self.pending.insert(key); - self.parents.insert(key, parents); - self.children.insert(key, Default::default()); - - Ok(()) - } - - /// Promotes a pending node, associating it with the provided value and allowing it to be - /// considered for processing. - /// - /// # Errors - /// - /// Errors if the given node is not pending. - /// - /// This method is atomic. - pub fn promote_pending(&mut self, key: K, value: V) -> Result<(), GraphError> { - if !self.pending.remove(&key) { - return Err(GraphError::InvalidPendingNode(key)); - } - - self.vertices.insert(key, value); - self.try_make_root(key); - - Ok(()) - } - - /// Reverts the nodes __and their descendents__, requeueing them for processing. - /// - /// Descendents which are pending remain unchanged. - /// - /// # Errors - /// - /// Returns an error if any of the given nodes: - /// - /// - are not part of the graph, or - /// - were not previously processed - /// - /// This method is atomic. - pub fn revert_subgraphs(&mut self, keys: BTreeSet) -> Result<(), GraphError> { - let missing_nodes = keys - .iter() - .filter(|key| !self.vertices.contains_key(key)) - .copied() - .collect::>(); - if !missing_nodes.is_empty() { - return Err(GraphError::UnknownNodes(missing_nodes)); - } - let unprocessed = keys.difference(&self.processed).copied().collect::>(); - if !unprocessed.is_empty() { - return Err(GraphError::UnprocessedNodes(unprocessed)); - } - - let mut reverted = BTreeSet::new(); - let mut to_revert = keys.clone(); - - while let Some(key) = to_revert.pop_first() { - self.processed.remove(&key); - - let unprocessed_children = self - .children - .get(&key) - .map(|children| children.difference(&reverted)) - .into_iter() - .flatten() - // We should not revert children which are pending. - .filter(|child| self.vertices.contains_key(child)) - .copied(); - - to_revert.extend(unprocessed_children); - - reverted.insert(key); - } - - // Only the original keys and the current roots need to be considered as roots. - // - // The children of the input keys are disqualified by definition (they're descendents), - // and current roots must be re-evaluated since their parents may have been requeued. - std::mem::take(&mut self.roots) - .into_iter() - .chain(keys) - .for_each(|key| self.try_make_root(key)); - - Ok(()) - } - - /// Removes a set of previously processed nodes from the graph. - /// - /// This is used to bound the size of the graph by removing nodes once they are no longer - /// required. - /// - /// # Errors - /// - /// Errors if - /// - any node is unknown - /// - any node is __not__ processed - /// - any parent node would be left unpruned - /// - /// The last point implies that all parents of the given nodes must either be part of the set, - /// or already been pruned. - /// - /// This method is atomic. - pub fn prune_processed(&mut self, keys: BTreeSet) -> Result, GraphError> { - let missing_nodes = - keys.iter().filter(|key| !self.contains(key)).copied().collect::>(); - if !missing_nodes.is_empty() { - return Err(GraphError::UnknownNodes(missing_nodes)); - } - - let unprocessed = keys.difference(&self.processed).copied().collect::>(); - if !unprocessed.is_empty() { - return Err(GraphError::UnprocessedNodes(unprocessed)); - } - - // No parent may be left dangling i.e. all parents must be part of this prune set. - let dangling = keys - .iter() - .flat_map(|key| self.parents.get(key)) - .flatten() - .filter(|parent| !keys.contains(parent)) - .copied() - .collect::>(); - if !dangling.is_empty() { - return Err(GraphError::DanglingNodes(dangling)); - } - - let mut pruned = Vec::with_capacity(keys.len()); - - for key in keys { - let value = self.vertices.remove(&key).expect("Checked in precondition"); - pruned.push(value); - self.processed.remove(&key); - self.parents.remove(&key); - - let children = self.children.remove(&key).unwrap_or_default(); - - // Remove edges from children to this node. - for child in children { - if let Some(child) = self.parents.get_mut(&child) { - child.remove(&key); - } - } - } - - Ok(pruned) - } - - /// Removes the set of nodes __and all descendents__ from the graph, returning all removed - /// nodes. This __includes__ pending nodes. - /// - /// # Returns - /// - /// All nodes removed. - /// - /// # Errors - /// - /// Returns an error if any of the given nodes does not exist. - /// - /// This method is atomic. - pub fn purge_subgraphs(&mut self, keys: BTreeSet) -> Result, GraphError> { - let missing_nodes = - keys.iter().filter(|key| !self.contains(key)).copied().collect::>(); - if !missing_nodes.is_empty() { - return Err(GraphError::UnknownNodes(missing_nodes)); - } - - let visited = keys.clone(); - let mut to_remove = keys; - let mut removed = BTreeSet::new(); - - while let Some(key) = to_remove.pop_first() { - self.vertices.remove(&key); - self.pending.remove(&key); - removed.insert(key); - - self.processed.remove(&key); - self.roots.remove(&key); - - // Children must also be purged. Take care not to visit them twice which is - // possible since children can have multiple purged parents. - let unvisited_children = self.children.remove(&key).unwrap_or_default(); - let unvisited_children = unvisited_children.difference(&visited); - to_remove.extend(unvisited_children); - - // Inform parents that this child no longer exists. - let parents = self.parents.remove(&key).unwrap_or_default(); - for parent in parents { - if let Some(parent) = self.children.get_mut(&parent) { - parent.remove(&key); - } - } - } - - Ok(removed) - } - - /// Adds the node to the `roots` list _IFF_ all of its parents are processed. - /// - /// # SAFETY - /// - /// This method assumes the node exists. Caller is responsible for ensuring this is true. - fn try_make_root(&mut self, key: K) { - if self.pending.contains(&key) { - return; - } - debug_assert!( - self.vertices.contains_key(&key), - "Potential root {key} must exist in the graph" - ); - debug_assert!( - !self.processed.contains(&key), - "Potential root {key} cannot already be processed" - ); - - let all_parents_processed = self - .parents - .get(&key) - .into_iter() - .flatten() - .all(|parent| self.processed.contains(parent)); - - if all_parents_processed { - self.roots.insert(key); - } - } - - /// Returns the set of nodes that are ready for processing. - /// - /// Nodes can be selected from here and marked as processed using [`Self::process_root`]. - pub fn roots(&self) -> &BTreeSet { - &self.roots - } - - /// Marks a root node as processed, removing it from the roots list. - /// - /// The node's children are [evaluated](Self::try_make_root) as possible roots. - /// - /// # Error - /// - /// Errors if the node is not in the roots list. - /// - /// This method is atomic. - pub fn process_root(&mut self, key: K) -> Result<(), GraphError> { - if !self.roots.remove(&key) { - return Err(GraphError::InvalidRootNode(key)); - } - - self.processed.insert(key); - - self.children - .get(&key) - .cloned() - .unwrap_or_default() - .into_iter() - .for_each(|child| self.try_make_root(child)); - - Ok(()) - } - - /// Returns the value of a node. - pub fn get(&self, key: &K) -> Option<&V> { - self.vertices.get(key) - } - - /// Returns the parents of the node, or [None] if the node does not exist. - pub fn parents(&self, key: &K) -> Option<&BTreeSet> { - self.parents.get(key) - } - - /// Returns true if the node exists, in either the pending or non-pending sets. - fn contains(&self, key: &K) -> bool { - self.pending.contains(key) || self.vertices.contains_key(key) - } -} - -// TESTS -// ================================================================================================ - -#[cfg(test)] -mod tests { - use super::*; - - // TEST UTILITIES - // ================================================================================================ - - /// Simplified graph variant where a node's key always equals its value. This is done to make - /// generating test values simpler. - type TestGraph = DependencyGraph; - - impl TestGraph { - /// Alias for inserting a node with no parents. - fn insert_with_no_parents(&mut self, node: u32) -> Result<(), GraphError> { - self.insert_with_parents(node, Default::default()) - } - - /// Alias for inserting a node with a single parent. - fn insert_with_parent(&mut self, node: u32, parent: u32) -> Result<(), GraphError> { - self.insert_with_parents(node, [parent].into()) - } - - /// Alias for inserting a node with multiple parents. - fn insert_with_parents( - &mut self, - node: u32, - parents: BTreeSet, - ) -> Result<(), GraphError> { - self.insert_pending(node, parents) - } - - /// Alias for promoting nodes with the same value as the key. - fn promote(&mut self, nodes: impl IntoIterator) -> Result<(), GraphError> { - for node in nodes { - self.promote_pending(node, node)?; - } - Ok(()) - } - - /// Promotes all nodes in the pending list with value=key. - fn promote_all(&mut self) { - // SAFETY: these are definitely pending nodes. - self.promote(self.pending.clone()).unwrap(); - } - - /// Calls process_root until all nodes have been processed. - fn process_all(&mut self) { - while let Some(root) = self.roots().first().copied() { - // SAFETY: this is definitely a root since we just took it from there :) - self.process_root(root).unwrap(); - } - } - } - - // PROMOTE TESTS - // ================================================================================================ - - #[test] - fn promoted_nodes_are_considered_for_root() { - //! Ensure that a promoted node is added to the root list if all parents are already - //! processed. - let parent_a = 1; - let parent_b = 2; - let child_a = 3; - let child_b = 4; - let child_c = 5; - - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(parent_a).unwrap(); - uut.insert_with_no_parents(parent_b).unwrap(); - uut.promote_all(); - - // Only process one parent so that some children remain unrootable. - uut.process_root(parent_a).unwrap(); - - uut.insert_with_parent(child_a, parent_a).unwrap(); - uut.insert_with_parent(child_b, parent_b).unwrap(); - uut.insert_with_parents(child_c, [parent_a, parent_b].into()).unwrap(); - - uut.promote_all(); - - // Only child_a should be added (in addition to the parents), since the other children - // are dependent on parent_b which is incomplete. - let expected_roots = [parent_b, child_a].into(); - - assert_eq!(uut.roots, expected_roots); - } - - #[test] - fn pending_nodes_are_not_considered_for_root() { - //! Ensure that an unpromoted node is _not_ added to the root list even if all parents are - //! already processed. - let parent_a = 1; - let parent_b = 2; - let child_a = 3; - let child_b = 4; - let child_c = 5; - - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(parent_a).unwrap(); - uut.insert_with_no_parents(parent_b).unwrap(); - uut.promote_all(); - uut.process_all(); - - uut.insert_with_parent(child_a, parent_a).unwrap(); - uut.insert_with_parent(child_b, parent_b).unwrap(); - uut.insert_with_parents(child_c, [parent_a, parent_b].into()).unwrap(); - - uut.promote([child_b]).unwrap(); - - // Only child b is valid as it was promoted. - let expected = [child_b].into(); - - assert_eq!(uut.roots, expected); - } - - #[test] - fn promoted_nodes_are_moved() { - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(123).unwrap(); - - assert!(uut.pending.contains(&123)); - assert!(!uut.vertices.contains_key(&123)); - - uut.promote_pending(123, 123).unwrap(); - - assert!(!uut.pending.contains(&123)); - assert!(uut.vertices.contains_key(&123)); - } - - #[test] - fn promote_rejects_already_promoted_nodes() { - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(123).unwrap(); - uut.promote_all(); - - let err = uut.promote_pending(123, 123).unwrap_err(); - let expected = GraphError::InvalidPendingNode(123); - assert_eq!(err, expected); - } - - #[test] - fn promote_rejects_unknown_nodes() { - let err = TestGraph::default().promote_pending(123, 123).unwrap_err(); - let expected = GraphError::InvalidPendingNode(123); - assert_eq!(err, expected); - } - - // INSERT TESTS - // ================================================================================================ - - #[test] - fn insert_with_known_parents_succeeds() { - let parent_a = 10; - let parent_b = 20; - let grandfather = 123; - let uncle = 222; - - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(grandfather).unwrap(); - uut.insert_with_no_parents(parent_a).unwrap(); - uut.insert_with_parent(parent_b, grandfather).unwrap(); - uut.insert_with_parent(uncle, grandfather).unwrap(); - uut.insert_with_parents(1, [parent_a, parent_b].into()).unwrap(); - } - - #[test] - fn insert_duplicate_is_rejected() { - //! Ensure that inserting a duplicate node - //! - results in an error, and - //! - does not mutate the state (atomicity) - const KEY: u32 = 123; - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(KEY).unwrap(); - - let err = uut.insert_with_no_parents(KEY).unwrap_err(); - let expected = GraphError::DuplicateKey(KEY); - assert_eq!(err, expected); - - let mut atomic_reference = TestGraph::default(); - atomic_reference.insert_with_no_parents(KEY).unwrap(); - assert_eq!(uut, atomic_reference); - } - - #[test] - fn insert_with_all_parents_missing_is_rejected() { - //! Ensure that inserting a node with unknown parents - //! - results in an error, and - //! - does not mutate the state (atomicity) - const MISSING: [u32; 4] = [1, 2, 3, 4]; - let mut uut = TestGraph::default(); - - let err = uut.insert_with_parents(0xABC, MISSING.into()).unwrap_err(); - let expected = GraphError::MissingParents(MISSING.into()); - assert_eq!(err, expected); - - let atomic_reference = TestGraph::default(); - assert_eq!(uut, atomic_reference); - } - - #[test] - fn insert_with_some_parents_missing_is_rejected() { - //! Ensure that inserting a node with unknown parents - //! - results in an error, and - //! - does not mutate the state (atomicity) - const MISSING: u32 = 123; - let mut uut = TestGraph::default(); - - uut.insert_with_no_parents(1).unwrap(); - uut.insert_with_no_parents(2).unwrap(); - uut.insert_with_no_parents(3).unwrap(); - - let atomic_reference = uut.clone(); - - let err = uut.insert_with_parents(0xABC, [1, 2, 3, MISSING].into()).unwrap_err(); - let expected = GraphError::MissingParents([MISSING].into()); - assert_eq!(err, expected); - assert_eq!(uut, atomic_reference); - } - - // REVERT TESTS - // ================================================================================================ - - #[test] - fn reverting_unprocessed_nodes_is_rejected() { - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(1).unwrap(); - uut.insert_with_no_parents(2).unwrap(); - uut.insert_with_no_parents(3).unwrap(); - uut.promote_all(); - uut.process_root(1).unwrap(); - - let err = uut.revert_subgraphs([1, 2, 3].into()).unwrap_err(); - let expected = GraphError::UnprocessedNodes([2, 3].into()); - - assert_eq!(err, expected); - } - - #[test] - fn reverting_unknown_nodes_is_rejected() { - let err = TestGraph::default().revert_subgraphs([1].into()).unwrap_err(); - let expected = GraphError::UnknownNodes([1].into()); - assert_eq!(err, expected); - } - - #[test] - fn reverting_resets_the_entire_subgraph() { - //! Reverting should reset the state to before any of the nodes where processed. - let grandparent = 1; - let parent_a = 2; - let parent_b = 3; - let child_a = 4; - let child_b = 5; - let child_c = 6; - - let disjoint = 7; - - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(grandparent).unwrap(); - uut.insert_with_no_parents(disjoint).unwrap(); - uut.insert_with_parent(parent_a, grandparent).unwrap(); - uut.insert_with_parent(parent_b, grandparent).unwrap(); - uut.insert_with_parent(child_a, parent_a).unwrap(); - uut.insert_with_parent(child_b, parent_b).unwrap(); - uut.insert_with_parents(child_c, [parent_a, parent_b].into()).unwrap(); - - uut.promote([disjoint, grandparent, parent_a, parent_b, child_a, child_c]) - .unwrap(); - uut.process_root(disjoint).unwrap(); - - let reference = uut.clone(); - - uut.process_all(); - uut.revert_subgraphs([grandparent].into()).unwrap(); - - assert_eq!(uut, reference); - } - - #[test] - fn reverting_reevaluates_roots() { - //! Node reverting from processed to unprocessed should cause the root nodes to be - //! re-evaluated. Only nodes with all parents processed should remain in the set. - let disjoint_parent = 1; - let disjoint_child = 2; - - let parent_a = 3; - let parent_b = 4; - let child_a = 5; - let child_b = 6; - - let partially_disjoin_child = 7; - - let mut uut = TestGraph::default(); - // This pair of nodes should not be impacted by the reverted subgraph. - uut.insert_with_no_parents(disjoint_parent).unwrap(); - uut.insert_with_parent(disjoint_child, disjoint_parent).unwrap(); - - uut.insert_with_no_parents(parent_a).unwrap(); - uut.insert_with_no_parents(parent_b).unwrap(); - uut.insert_with_parent(child_a, parent_a).unwrap(); - uut.insert_with_parent(child_b, parent_b).unwrap(); - uut.insert_with_parents(partially_disjoin_child, [disjoint_parent, parent_a].into()) - .unwrap(); - - // Since we are reverting the other parents, we expect the roots to match the current state. - uut.promote_all(); - uut.process_root(disjoint_parent).unwrap(); - let reference = uut.roots().clone(); - - uut.process_root(parent_a).unwrap(); - uut.process_root(parent_b).unwrap(); - uut.revert_subgraphs([parent_a, parent_b].into()).unwrap(); - - assert_eq!(uut.roots(), &reference); - } - - // PRUNING TESTS - // ================================================================================================ - - #[test] - fn pruned_nodes_are_nonextant() { - //! Checks that processed and then pruned nodes behave as if they never existed in the - //! graph. We test this by comparing it to a reference graph created without these ancestor - //! nodes. - let ancestor_a = 1; - let ancestor_b = 2; - - let child_a = 3; - let child_b = 4; - let child_both = 5; - - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(ancestor_a).unwrap(); - uut.insert_with_no_parents(ancestor_b).unwrap(); - uut.insert_with_parent(child_a, ancestor_a).unwrap(); - uut.insert_with_parent(child_b, ancestor_b).unwrap(); - uut.insert_with_parents(child_both, [ancestor_a, ancestor_b].into()).unwrap(); - uut.promote_all(); - - uut.process_root(ancestor_a).unwrap(); - uut.process_root(ancestor_b).unwrap(); - uut.prune_processed([ancestor_a, ancestor_b].into()).unwrap(); - - let mut reference = TestGraph::default(); - reference.insert_with_no_parents(child_a).unwrap(); - reference.insert_with_no_parents(child_b).unwrap(); - reference.insert_with_no_parents(child_both).unwrap(); - reference.promote_all(); - - assert_eq!(uut, reference); - } - - #[test] - fn pruning_unknown_nodes_is_rejected() { - let err = TestGraph::default().prune_processed([1].into()).unwrap_err(); - let expected = GraphError::UnknownNodes([1].into()); - assert_eq!(err, expected); - } - - #[test] - fn pruning_unprocessed_nodes_is_rejected() { - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(1).unwrap(); - uut.promote_all(); - - let err = uut.prune_processed([1].into()).unwrap_err(); - let expected = GraphError::UnprocessedNodes([1].into()); - assert_eq!(err, expected); - } - - #[test] - fn pruning_cannot_leave_parents_dangling() { - //! Pruning processed nodes must always prune all parent nodes as well. No parent node may - //! be left behind. - let dangling = 1; - let pruned = 2; - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(dangling).unwrap(); - uut.insert_with_parent(pruned, dangling).unwrap(); - uut.promote_all(); - uut.process_all(); - - let err = uut.prune_processed([pruned].into()).unwrap_err(); - let expected = GraphError::DanglingNodes([dangling].into()); - assert_eq!(err, expected); - } - - // PURGING TESTS - // ================================================================================================ - - #[test] - fn purging_subgraph_handles_internal_nodes() { - //! Purging a subgraph should correctly handle nodes already deleted within that subgraph. - //! - //! This is a concern for errors as we are deleting parts of the subgraph while we are - //! iterating through the nodes to purge. This means its likely a node will already have - //! been deleted before processing it as an input. - //! - //! We can force this to occur by re-ordering the inputs relative to the actual dependency - //! order. This means this test is a bit weaker because it relies on implementation details. - - let ancestor_a = 1; - let ancestor_b = 2; - let parent_a = 3; - let parent_b = 4; - let child_a = 5; - // This should be purged prior to parent_a. Relies on the fact that we are iterating over a - // btree which is ordered by value. - let child_b = 0; - let child_c = 6; - - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(ancestor_a).unwrap(); - uut.insert_with_no_parents(ancestor_b).unwrap(); - uut.insert_with_parent(parent_a, ancestor_a).unwrap(); - uut.insert_with_parent(parent_b, ancestor_b).unwrap(); - uut.insert_with_parents(child_a, [ancestor_a, parent_a].into()).unwrap(); - uut.insert_with_parents(child_b, [parent_a, parent_b].into()).unwrap(); - uut.insert_with_parent(child_c, parent_b).unwrap(); - - uut.purge_subgraphs([child_b, parent_a].into()).unwrap(); - - let mut reference = TestGraph::default(); - reference.insert_with_no_parents(ancestor_a).unwrap(); - reference.insert_with_no_parents(ancestor_b).unwrap(); - reference.insert_with_parent(parent_b, ancestor_b).unwrap(); - reference.insert_with_parent(child_c, parent_b).unwrap(); - - assert_eq!(uut, reference); - } - - #[test] - fn purging_removes_all_descendents() { - let ancestor_a = 1; - let ancestor_b = 2; - let parent_a = 3; - let parent_b = 4; - let child_a = 5; - let child_b = 6; - let child_c = 7; - - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(ancestor_a).unwrap(); - uut.insert_with_no_parents(ancestor_b).unwrap(); - uut.insert_with_parent(parent_a, ancestor_a).unwrap(); - uut.insert_with_parent(parent_b, ancestor_b).unwrap(); - uut.insert_with_parents(child_a, [ancestor_a, parent_a].into()).unwrap(); - uut.insert_with_parents(child_b, [parent_a, parent_b].into()).unwrap(); - uut.insert_with_parent(child_c, parent_b).unwrap(); - - uut.purge_subgraphs([parent_a].into()).unwrap(); - - let mut reference = TestGraph::default(); - reference.insert_with_no_parents(ancestor_a).unwrap(); - reference.insert_with_no_parents(ancestor_b).unwrap(); - reference.insert_with_parent(parent_b, ancestor_b).unwrap(); - reference.insert_with_parent(child_c, parent_b).unwrap(); - - assert_eq!(uut, reference); - } - - // PROCESSING TESTS - // ================================================================================================ - - #[test] - fn process_root_evaluates_children_as_roots() { - let parent_a = 1; - let parent_b = 2; - let child_a = 3; - let child_b = 4; - let child_c = 5; - - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(parent_a).unwrap(); - uut.insert_with_no_parents(parent_b).unwrap(); - uut.insert_with_parent(child_a, parent_a).unwrap(); - uut.insert_with_parent(child_b, parent_b).unwrap(); - uut.insert_with_parents(child_c, [parent_a, parent_b].into()).unwrap(); - uut.promote_all(); - - uut.process_root(parent_a).unwrap(); - assert_eq!(uut.roots(), &[parent_b, child_a].into()); - } - - #[test] - fn process_root_rejects_non_root_node() { - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(1).unwrap(); - uut.insert_with_parent(2, 1).unwrap(); - uut.promote_all(); - - let err = uut.process_root(2).unwrap_err(); - let expected = GraphError::InvalidRootNode(2); - assert_eq!(err, expected); - } - - #[test] - fn process_root_cannot_reprocess_same_node() { - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(1).unwrap(); - uut.promote_all(); - uut.process_root(1).unwrap(); - - let err = uut.process_root(1).unwrap_err(); - let expected = GraphError::InvalidRootNode(1); - assert_eq!(err, expected); - } - - #[test] - fn processing_a_queue_graph() { - //! Creates a queue graph and ensures that nodes processed in FIFO order. - let nodes = (0..10).collect::>(); - - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(nodes[0]).unwrap(); - for pairs in nodes.windows(2) { - let (parent, id) = (pairs[0], pairs[1]); - uut.insert_with_parent(id, parent).unwrap(); - } - uut.promote_all(); - - let mut ordered_roots = Vec::::new(); - for _ in &nodes { - let current_roots = uut.roots().clone(); - ordered_roots.extend(¤t_roots); - - for root in current_roots { - uut.process_root(root).unwrap(); - } - } - - assert_eq!(ordered_roots, nodes); - } - - #[test] - fn processing_and_root_tracking() { - //! Creates a somewhat arbitrarily connected graph and ensures that roots are tracked as - //! expected as the they are processed. - let ancestor_a = 1; - let ancestor_b = 2; - let parent_a = 3; - let parent_b = 4; - let child_a = 5; - let child_b = 6; - let child_c = 7; - - let mut uut = TestGraph::default(); - uut.insert_with_no_parents(ancestor_a).unwrap(); - uut.insert_with_no_parents(ancestor_b).unwrap(); - uut.insert_with_parent(parent_a, ancestor_a).unwrap(); - uut.insert_with_parent(parent_b, ancestor_b).unwrap(); - uut.insert_with_parents(child_a, [ancestor_a, parent_a].into()).unwrap(); - uut.insert_with_parents(child_b, [parent_a, parent_b].into()).unwrap(); - uut.insert_with_parent(child_c, parent_b).unwrap(); - uut.promote_all(); - - assert_eq!(uut.roots(), &[ancestor_a, ancestor_b].into()); - - uut.process_root(ancestor_a).unwrap(); - assert_eq!(uut.roots(), &[ancestor_b, parent_a].into()); - - uut.process_root(ancestor_b).unwrap(); - assert_eq!(uut.roots(), &[parent_a, parent_b].into()); - - uut.process_root(parent_a).unwrap(); - assert_eq!(uut.roots(), &[parent_b, child_a].into()); - - uut.process_root(parent_b).unwrap(); - assert_eq!(uut.roots(), &[child_a, child_b, child_c].into()); - - uut.process_root(child_a).unwrap(); - assert_eq!(uut.roots(), &[child_b, child_c].into()); - - uut.process_root(child_b).unwrap(); - assert_eq!(uut.roots(), &[child_c].into()); - - uut.process_root(child_c).unwrap(); - assert!(uut.roots().is_empty()); - } -} diff --git a/crates/block-producer/src/mempool/graph/mod.rs b/crates/block-producer/src/mempool/graph/mod.rs new file mode 100644 index 00000000..d9199909 --- /dev/null +++ b/crates/block-producer/src/mempool/graph/mod.rs @@ -0,0 +1,419 @@ +use std::{ + collections::{BTreeMap, BTreeSet}, + fmt::{Debug, Display}, +}; + +#[cfg(test)] +mod tests; + +// DEPENDENCY GRAPH +// ================================================================================================ + +/// A dependency graph structure where nodes are inserted, and then made available for processing +/// once all parent nodes have been processed. +/// +/// Forms the basis of our transaction and batch dependency graphs. +/// +/// # Node lifecycle +/// ```text +/// │ +/// │ +/// insert_pending│ +/// ┌─────▼─────┐ +/// │ pending │────┐ +/// └─────┬─────┘ │ +/// │ │ +/// promote_pending│ │ +/// ┌─────▼─────┐ │ +/// ┌──────────► in queue │────│ +/// │ └─────┬─────┘ │ +/// revert_processed│ │ │ +/// │ process_root│ │ +/// │ ┌─────▼─────┐ │ +/// └──────────┼ processed │────│ +/// └─────┬─────┘ │ +/// │ │ +/// prune_processed│ │purge_subgraphs +/// ┌─────▼─────┐ │ +/// │ ◄────┘ +/// └───────────┘ +/// ``` +#[derive(Clone, PartialEq, Eq)] +pub struct DependencyGraph { + /// Node's who's data is still pending. + pending: BTreeSet, + + /// Each node's data. + vertices: BTreeMap, + + /// Each node's parents. This is redundant with `children`, + /// but we require both for efficient lookups. + parents: BTreeMap>, + + /// Each node's children. This is redundant with `parents`, + /// but we require both for efficient lookups. + children: BTreeMap>, + + /// Nodes that are available to process next. + /// + /// Effectively this is the set of nodes which are + /// unprocessed and whose parent's _are_ all processed. + roots: BTreeSet, + + /// Set of nodes that are already processed. + processed: BTreeSet, +} + +impl Debug for DependencyGraph +where + K: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DependencyGraph") + .field("pending", &self.pending) + .field("vertices", &self.vertices.keys()) + .field("processed", &self.processed) + .field("roots", &self.roots) + .field("parents", &self.parents) + .field("children", &self.children) + .finish() + } +} + +#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] +pub enum GraphError { + #[error("Node {0} already exists")] + DuplicateKey(K), + + #[error("Parents not found: {0:?}")] + MissingParents(BTreeSet), + + #[error("Nodes not found: {0:?}")] + UnknownNodes(BTreeSet), + + #[error("Nodes were not yet processed: {0:?}")] + UnprocessedNodes(BTreeSet), + + #[error("Nodes would be left dangling: {0:?}")] + DanglingNodes(BTreeSet), + + #[error("Node {0} is not a root node")] + InvalidRootNode(K), + + #[error("Node {0} is not a pending node")] + InvalidPendingNode(K), +} + +/// This cannot be derived without enforcing `Default` bounds on K and V. +impl Default for DependencyGraph { + fn default() -> Self { + Self { + vertices: Default::default(), + pending: Default::default(), + parents: Default::default(), + children: Default::default(), + roots: Default::default(), + processed: Default::default(), + } + } +} + +impl DependencyGraph { + /// Inserts a new pending node into the graph. + /// + /// # Errors + /// + /// Errors if the node already exists, or if any of the parents are not part of the graph. + /// + /// This method is atomic. + pub fn insert_pending(&mut self, key: K, parents: BTreeSet) -> Result<(), GraphError> { + if self.contains(&key) { + return Err(GraphError::DuplicateKey(key)); + } + + let missing_parents = parents + .iter() + .filter(|parent| !self.contains(parent)) + .copied() + .collect::>(); + if !missing_parents.is_empty() { + return Err(GraphError::MissingParents(missing_parents)); + } + + // Inform parents of their new child. + for parent in &parents { + self.children.entry(*parent).or_default().insert(key); + } + self.pending.insert(key); + self.parents.insert(key, parents); + self.children.insert(key, Default::default()); + + Ok(()) + } + + /// Promotes a pending node, associating it with the provided value and allowing it to be + /// considered for processing. + /// + /// # Errors + /// + /// Errors if the given node is not pending. + /// + /// This method is atomic. + pub fn promote_pending(&mut self, key: K, value: V) -> Result<(), GraphError> { + if !self.pending.remove(&key) { + return Err(GraphError::InvalidPendingNode(key)); + } + + self.vertices.insert(key, value); + self.try_make_root(key); + + Ok(()) + } + + /// Reverts the nodes __and their descendents__, requeueing them for processing. + /// + /// Descendents which are pending remain unchanged. + /// + /// # Errors + /// + /// Returns an error if any of the given nodes: + /// + /// - are not part of the graph, or + /// - were not previously processed + /// + /// This method is atomic. + pub fn revert_subgraphs(&mut self, keys: BTreeSet) -> Result<(), GraphError> { + let missing_nodes = keys + .iter() + .filter(|key| !self.vertices.contains_key(key)) + .copied() + .collect::>(); + if !missing_nodes.is_empty() { + return Err(GraphError::UnknownNodes(missing_nodes)); + } + let unprocessed = keys.difference(&self.processed).copied().collect::>(); + if !unprocessed.is_empty() { + return Err(GraphError::UnprocessedNodes(unprocessed)); + } + + let mut reverted = BTreeSet::new(); + let mut to_revert = keys.clone(); + + while let Some(key) = to_revert.pop_first() { + self.processed.remove(&key); + + let unprocessed_children = self + .children + .get(&key) + .map(|children| children.difference(&reverted)) + .into_iter() + .flatten() + // We should not revert children which are pending. + .filter(|child| self.vertices.contains_key(child)) + .copied(); + + to_revert.extend(unprocessed_children); + + reverted.insert(key); + } + + // Only the original keys and the current roots need to be considered as roots. + // + // The children of the input keys are disqualified by definition (they're descendents), + // and current roots must be re-evaluated since their parents may have been requeued. + std::mem::take(&mut self.roots) + .into_iter() + .chain(keys) + .for_each(|key| self.try_make_root(key)); + + Ok(()) + } + + /// Removes a set of previously processed nodes from the graph. + /// + /// This is used to bound the size of the graph by removing nodes once they are no longer + /// required. + /// + /// # Errors + /// + /// Errors if + /// - any node is unknown + /// - any node is __not__ processed + /// - any parent node would be left unpruned + /// + /// The last point implies that all parents of the given nodes must either be part of the set, + /// or already been pruned. + /// + /// This method is atomic. + pub fn prune_processed(&mut self, keys: BTreeSet) -> Result, GraphError> { + let missing_nodes = + keys.iter().filter(|key| !self.contains(key)).copied().collect::>(); + if !missing_nodes.is_empty() { + return Err(GraphError::UnknownNodes(missing_nodes)); + } + + let unprocessed = keys.difference(&self.processed).copied().collect::>(); + if !unprocessed.is_empty() { + return Err(GraphError::UnprocessedNodes(unprocessed)); + } + + // No parent may be left dangling i.e. all parents must be part of this prune set. + let dangling = keys + .iter() + .flat_map(|key| self.parents.get(key)) + .flatten() + .filter(|parent| !keys.contains(parent)) + .copied() + .collect::>(); + if !dangling.is_empty() { + return Err(GraphError::DanglingNodes(dangling)); + } + + let mut pruned = Vec::with_capacity(keys.len()); + + for key in keys { + let value = self.vertices.remove(&key).expect("Checked in precondition"); + pruned.push(value); + self.processed.remove(&key); + self.parents.remove(&key); + + let children = self.children.remove(&key).unwrap_or_default(); + + // Remove edges from children to this node. + for child in children { + if let Some(child) = self.parents.get_mut(&child) { + child.remove(&key); + } + } + } + + Ok(pruned) + } + + /// Removes the set of nodes __and all descendents__ from the graph, returning all removed + /// nodes. This __includes__ pending nodes. + /// + /// # Returns + /// + /// All nodes removed. + /// + /// # Errors + /// + /// Returns an error if any of the given nodes does not exist. + /// + /// This method is atomic. + pub fn purge_subgraphs(&mut self, keys: BTreeSet) -> Result, GraphError> { + let missing_nodes = + keys.iter().filter(|key| !self.contains(key)).copied().collect::>(); + if !missing_nodes.is_empty() { + return Err(GraphError::UnknownNodes(missing_nodes)); + } + + let visited = keys.clone(); + let mut to_remove = keys; + let mut removed = BTreeSet::new(); + + while let Some(key) = to_remove.pop_first() { + self.vertices.remove(&key); + self.pending.remove(&key); + removed.insert(key); + + self.processed.remove(&key); + self.roots.remove(&key); + + // Children must also be purged. Take care not to visit them twice which is + // possible since children can have multiple purged parents. + let unvisited_children = self.children.remove(&key).unwrap_or_default(); + let unvisited_children = unvisited_children.difference(&visited); + to_remove.extend(unvisited_children); + + // Inform parents that this child no longer exists. + let parents = self.parents.remove(&key).unwrap_or_default(); + for parent in parents { + if let Some(parent) = self.children.get_mut(&parent) { + parent.remove(&key); + } + } + } + + Ok(removed) + } + + /// Adds the node to the `roots` list _IFF_ all of its parents are processed. + /// + /// # SAFETY + /// + /// This method assumes the node exists. Caller is responsible for ensuring this is true. + fn try_make_root(&mut self, key: K) { + if self.pending.contains(&key) { + return; + } + debug_assert!( + self.vertices.contains_key(&key), + "Potential root {key} must exist in the graph" + ); + debug_assert!( + !self.processed.contains(&key), + "Potential root {key} cannot already be processed" + ); + + let all_parents_processed = self + .parents + .get(&key) + .into_iter() + .flatten() + .all(|parent| self.processed.contains(parent)); + + if all_parents_processed { + self.roots.insert(key); + } + } + + /// Returns the set of nodes that are ready for processing. + /// + /// Nodes can be selected from here and marked as processed using [`Self::process_root`]. + pub fn roots(&self) -> &BTreeSet { + &self.roots + } + + /// Marks a root node as processed, removing it from the roots list. + /// + /// The node's children are [evaluated](Self::try_make_root) as possible roots. + /// + /// # Error + /// + /// Errors if the node is not in the roots list. + /// + /// This method is atomic. + pub fn process_root(&mut self, key: K) -> Result<(), GraphError> { + if !self.roots.remove(&key) { + return Err(GraphError::InvalidRootNode(key)); + } + + self.processed.insert(key); + + self.children + .get(&key) + .cloned() + .unwrap_or_default() + .into_iter() + .for_each(|child| self.try_make_root(child)); + + Ok(()) + } + + /// Returns the value of a node. + pub fn get(&self, key: &K) -> Option<&V> { + self.vertices.get(key) + } + + /// Returns the parents of the node, or [None] if the node does not exist. + pub fn parents(&self, key: &K) -> Option<&BTreeSet> { + self.parents.get(key) + } + + /// Returns true if the node exists, in either the pending or non-pending sets. + fn contains(&self, key: &K) -> bool { + self.pending.contains(key) || self.vertices.contains_key(key) + } +} diff --git a/crates/block-producer/src/mempool/graph/tests.rs b/crates/block-producer/src/mempool/graph/tests.rs new file mode 100644 index 00000000..acee736f --- /dev/null +++ b/crates/block-producer/src/mempool/graph/tests.rs @@ -0,0 +1,577 @@ +use super::*; + +// TEST UTILITIES +// ================================================================================================ + +/// Simplified graph variant where a node's key always equals its value. This is done to make +/// generating test values simpler. +type TestGraph = DependencyGraph; + +impl TestGraph { + /// Alias for inserting a node with no parents. + fn insert_with_no_parents(&mut self, node: u32) -> Result<(), GraphError> { + self.insert_with_parents(node, Default::default()) + } + + /// Alias for inserting a node with a single parent. + fn insert_with_parent(&mut self, node: u32, parent: u32) -> Result<(), GraphError> { + self.insert_with_parents(node, [parent].into()) + } + + /// Alias for inserting a node with multiple parents. + fn insert_with_parents( + &mut self, + node: u32, + parents: BTreeSet, + ) -> Result<(), GraphError> { + self.insert_pending(node, parents) + } + + /// Alias for promoting nodes with the same value as the key. + fn promote(&mut self, nodes: impl IntoIterator) -> Result<(), GraphError> { + for node in nodes { + self.promote_pending(node, node)?; + } + Ok(()) + } + + /// Promotes all nodes in the pending list with value=key. + fn promote_all(&mut self) { + // SAFETY: these are definitely pending nodes. + self.promote(self.pending.clone()).unwrap(); + } + + /// Calls process_root until all nodes have been processed. + fn process_all(&mut self) { + while let Some(root) = self.roots().first().copied() { + // SAFETY: this is definitely a root since we just took it from there :) + self.process_root(root).unwrap(); + } + } +} + +// PROMOTE TESTS +// ================================================================================================ + +#[test] +fn promoted_nodes_are_considered_for_root() { + //! Ensure that a promoted node is added to the root list if all parents are already + //! processed. + let parent_a = 1; + let parent_b = 2; + let child_a = 3; + let child_b = 4; + let child_c = 5; + + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(parent_a).unwrap(); + uut.insert_with_no_parents(parent_b).unwrap(); + uut.promote_all(); + + // Only process one parent so that some children remain unrootable. + uut.process_root(parent_a).unwrap(); + + uut.insert_with_parent(child_a, parent_a).unwrap(); + uut.insert_with_parent(child_b, parent_b).unwrap(); + uut.insert_with_parents(child_c, [parent_a, parent_b].into()).unwrap(); + + uut.promote_all(); + + // Only child_a should be added (in addition to the parents), since the other children + // are dependent on parent_b which is incomplete. + let expected_roots = [parent_b, child_a].into(); + + assert_eq!(uut.roots, expected_roots); +} + +#[test] +fn pending_nodes_are_not_considered_for_root() { + //! Ensure that an unpromoted node is _not_ added to the root list even if all parents are + //! already processed. + let parent_a = 1; + let parent_b = 2; + let child_a = 3; + let child_b = 4; + let child_c = 5; + + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(parent_a).unwrap(); + uut.insert_with_no_parents(parent_b).unwrap(); + uut.promote_all(); + uut.process_all(); + + uut.insert_with_parent(child_a, parent_a).unwrap(); + uut.insert_with_parent(child_b, parent_b).unwrap(); + uut.insert_with_parents(child_c, [parent_a, parent_b].into()).unwrap(); + + uut.promote([child_b]).unwrap(); + + // Only child b is valid as it was promoted. + let expected = [child_b].into(); + + assert_eq!(uut.roots, expected); +} + +#[test] +fn promoted_nodes_are_moved() { + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(123).unwrap(); + + assert!(uut.pending.contains(&123)); + assert!(!uut.vertices.contains_key(&123)); + + uut.promote_pending(123, 123).unwrap(); + + assert!(!uut.pending.contains(&123)); + assert!(uut.vertices.contains_key(&123)); +} + +#[test] +fn promote_rejects_already_promoted_nodes() { + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(123).unwrap(); + uut.promote_all(); + + let err = uut.promote_pending(123, 123).unwrap_err(); + let expected = GraphError::InvalidPendingNode(123); + assert_eq!(err, expected); +} + +#[test] +fn promote_rejects_unknown_nodes() { + let err = TestGraph::default().promote_pending(123, 123).unwrap_err(); + let expected = GraphError::InvalidPendingNode(123); + assert_eq!(err, expected); +} + +// INSERT TESTS +// ================================================================================================ + +#[test] +fn insert_with_known_parents_succeeds() { + let parent_a = 10; + let parent_b = 20; + let grandfather = 123; + let uncle = 222; + + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(grandfather).unwrap(); + uut.insert_with_no_parents(parent_a).unwrap(); + uut.insert_with_parent(parent_b, grandfather).unwrap(); + uut.insert_with_parent(uncle, grandfather).unwrap(); + uut.insert_with_parents(1, [parent_a, parent_b].into()).unwrap(); +} + +#[test] +fn insert_duplicate_is_rejected() { + //! Ensure that inserting a duplicate node + //! - results in an error, and + //! - does not mutate the state (atomicity) + const KEY: u32 = 123; + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(KEY).unwrap(); + + let err = uut.insert_with_no_parents(KEY).unwrap_err(); + let expected = GraphError::DuplicateKey(KEY); + assert_eq!(err, expected); + + let mut atomic_reference = TestGraph::default(); + atomic_reference.insert_with_no_parents(KEY).unwrap(); + assert_eq!(uut, atomic_reference); +} + +#[test] +fn insert_with_all_parents_missing_is_rejected() { + //! Ensure that inserting a node with unknown parents + //! - results in an error, and + //! - does not mutate the state (atomicity) + const MISSING: [u32; 4] = [1, 2, 3, 4]; + let mut uut = TestGraph::default(); + + let err = uut.insert_with_parents(0xABC, MISSING.into()).unwrap_err(); + let expected = GraphError::MissingParents(MISSING.into()); + assert_eq!(err, expected); + + let atomic_reference = TestGraph::default(); + assert_eq!(uut, atomic_reference); +} + +#[test] +fn insert_with_some_parents_missing_is_rejected() { + //! Ensure that inserting a node with unknown parents + //! - results in an error, and + //! - does not mutate the state (atomicity) + const MISSING: u32 = 123; + let mut uut = TestGraph::default(); + + uut.insert_with_no_parents(1).unwrap(); + uut.insert_with_no_parents(2).unwrap(); + uut.insert_with_no_parents(3).unwrap(); + + let atomic_reference = uut.clone(); + + let err = uut.insert_with_parents(0xABC, [1, 2, 3, MISSING].into()).unwrap_err(); + let expected = GraphError::MissingParents([MISSING].into()); + assert_eq!(err, expected); + assert_eq!(uut, atomic_reference); +} + +// REVERT TESTS +// ================================================================================================ + +#[test] +fn reverting_unprocessed_nodes_is_rejected() { + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(1).unwrap(); + uut.insert_with_no_parents(2).unwrap(); + uut.insert_with_no_parents(3).unwrap(); + uut.promote_all(); + uut.process_root(1).unwrap(); + + let err = uut.revert_subgraphs([1, 2, 3].into()).unwrap_err(); + let expected = GraphError::UnprocessedNodes([2, 3].into()); + + assert_eq!(err, expected); +} + +#[test] +fn reverting_unknown_nodes_is_rejected() { + let err = TestGraph::default().revert_subgraphs([1].into()).unwrap_err(); + let expected = GraphError::UnknownNodes([1].into()); + assert_eq!(err, expected); +} + +#[test] +fn reverting_resets_the_entire_subgraph() { + //! Reverting should reset the state to before any of the nodes where processed. + let grandparent = 1; + let parent_a = 2; + let parent_b = 3; + let child_a = 4; + let child_b = 5; + let child_c = 6; + + let disjoint = 7; + + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(grandparent).unwrap(); + uut.insert_with_no_parents(disjoint).unwrap(); + uut.insert_with_parent(parent_a, grandparent).unwrap(); + uut.insert_with_parent(parent_b, grandparent).unwrap(); + uut.insert_with_parent(child_a, parent_a).unwrap(); + uut.insert_with_parent(child_b, parent_b).unwrap(); + uut.insert_with_parents(child_c, [parent_a, parent_b].into()).unwrap(); + + uut.promote([disjoint, grandparent, parent_a, parent_b, child_a, child_c]) + .unwrap(); + uut.process_root(disjoint).unwrap(); + + let reference = uut.clone(); + + uut.process_all(); + uut.revert_subgraphs([grandparent].into()).unwrap(); + + assert_eq!(uut, reference); +} + +#[test] +fn reverting_reevaluates_roots() { + //! Node reverting from processed to unprocessed should cause the root nodes to be + //! re-evaluated. Only nodes with all parents processed should remain in the set. + let disjoint_parent = 1; + let disjoint_child = 2; + + let parent_a = 3; + let parent_b = 4; + let child_a = 5; + let child_b = 6; + + let partially_disjoin_child = 7; + + let mut uut = TestGraph::default(); + // This pair of nodes should not be impacted by the reverted subgraph. + uut.insert_with_no_parents(disjoint_parent).unwrap(); + uut.insert_with_parent(disjoint_child, disjoint_parent).unwrap(); + + uut.insert_with_no_parents(parent_a).unwrap(); + uut.insert_with_no_parents(parent_b).unwrap(); + uut.insert_with_parent(child_a, parent_a).unwrap(); + uut.insert_with_parent(child_b, parent_b).unwrap(); + uut.insert_with_parents(partially_disjoin_child, [disjoint_parent, parent_a].into()) + .unwrap(); + + // Since we are reverting the other parents, we expect the roots to match the current state. + uut.promote_all(); + uut.process_root(disjoint_parent).unwrap(); + let reference = uut.roots().clone(); + + uut.process_root(parent_a).unwrap(); + uut.process_root(parent_b).unwrap(); + uut.revert_subgraphs([parent_a, parent_b].into()).unwrap(); + + assert_eq!(uut.roots(), &reference); +} + +// PRUNING TESTS +// ================================================================================================ + +#[test] +fn pruned_nodes_are_nonextant() { + //! Checks that processed and then pruned nodes behave as if they never existed in the + //! graph. We test this by comparing it to a reference graph created without these ancestor + //! nodes. + let ancestor_a = 1; + let ancestor_b = 2; + + let child_a = 3; + let child_b = 4; + let child_both = 5; + + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(ancestor_a).unwrap(); + uut.insert_with_no_parents(ancestor_b).unwrap(); + uut.insert_with_parent(child_a, ancestor_a).unwrap(); + uut.insert_with_parent(child_b, ancestor_b).unwrap(); + uut.insert_with_parents(child_both, [ancestor_a, ancestor_b].into()).unwrap(); + uut.promote_all(); + + uut.process_root(ancestor_a).unwrap(); + uut.process_root(ancestor_b).unwrap(); + uut.prune_processed([ancestor_a, ancestor_b].into()).unwrap(); + + let mut reference = TestGraph::default(); + reference.insert_with_no_parents(child_a).unwrap(); + reference.insert_with_no_parents(child_b).unwrap(); + reference.insert_with_no_parents(child_both).unwrap(); + reference.promote_all(); + + assert_eq!(uut, reference); +} + +#[test] +fn pruning_unknown_nodes_is_rejected() { + let err = TestGraph::default().prune_processed([1].into()).unwrap_err(); + let expected = GraphError::UnknownNodes([1].into()); + assert_eq!(err, expected); +} + +#[test] +fn pruning_unprocessed_nodes_is_rejected() { + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(1).unwrap(); + uut.promote_all(); + + let err = uut.prune_processed([1].into()).unwrap_err(); + let expected = GraphError::UnprocessedNodes([1].into()); + assert_eq!(err, expected); +} + +#[test] +fn pruning_cannot_leave_parents_dangling() { + //! Pruning processed nodes must always prune all parent nodes as well. No parent node may + //! be left behind. + let dangling = 1; + let pruned = 2; + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(dangling).unwrap(); + uut.insert_with_parent(pruned, dangling).unwrap(); + uut.promote_all(); + uut.process_all(); + + let err = uut.prune_processed([pruned].into()).unwrap_err(); + let expected = GraphError::DanglingNodes([dangling].into()); + assert_eq!(err, expected); +} + +// PURGING TESTS +// ================================================================================================ + +#[test] +fn purging_subgraph_handles_internal_nodes() { + //! Purging a subgraph should correctly handle nodes already deleted within that subgraph. + //! + //! This is a concern for errors as we are deleting parts of the subgraph while we are + //! iterating through the nodes to purge. This means its likely a node will already have + //! been deleted before processing it as an input. + //! + //! We can force this to occur by re-ordering the inputs relative to the actual dependency + //! order. This means this test is a bit weaker because it relies on implementation details. + + let ancestor_a = 1; + let ancestor_b = 2; + let parent_a = 3; + let parent_b = 4; + let child_a = 5; + // This should be purged prior to parent_a. Relies on the fact that we are iterating over a + // btree which is ordered by value. + let child_b = 0; + let child_c = 6; + + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(ancestor_a).unwrap(); + uut.insert_with_no_parents(ancestor_b).unwrap(); + uut.insert_with_parent(parent_a, ancestor_a).unwrap(); + uut.insert_with_parent(parent_b, ancestor_b).unwrap(); + uut.insert_with_parents(child_a, [ancestor_a, parent_a].into()).unwrap(); + uut.insert_with_parents(child_b, [parent_a, parent_b].into()).unwrap(); + uut.insert_with_parent(child_c, parent_b).unwrap(); + + uut.purge_subgraphs([child_b, parent_a].into()).unwrap(); + + let mut reference = TestGraph::default(); + reference.insert_with_no_parents(ancestor_a).unwrap(); + reference.insert_with_no_parents(ancestor_b).unwrap(); + reference.insert_with_parent(parent_b, ancestor_b).unwrap(); + reference.insert_with_parent(child_c, parent_b).unwrap(); + + assert_eq!(uut, reference); +} + +#[test] +fn purging_removes_all_descendents() { + let ancestor_a = 1; + let ancestor_b = 2; + let parent_a = 3; + let parent_b = 4; + let child_a = 5; + let child_b = 6; + let child_c = 7; + + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(ancestor_a).unwrap(); + uut.insert_with_no_parents(ancestor_b).unwrap(); + uut.insert_with_parent(parent_a, ancestor_a).unwrap(); + uut.insert_with_parent(parent_b, ancestor_b).unwrap(); + uut.insert_with_parents(child_a, [ancestor_a, parent_a].into()).unwrap(); + uut.insert_with_parents(child_b, [parent_a, parent_b].into()).unwrap(); + uut.insert_with_parent(child_c, parent_b).unwrap(); + + uut.purge_subgraphs([parent_a].into()).unwrap(); + + let mut reference = TestGraph::default(); + reference.insert_with_no_parents(ancestor_a).unwrap(); + reference.insert_with_no_parents(ancestor_b).unwrap(); + reference.insert_with_parent(parent_b, ancestor_b).unwrap(); + reference.insert_with_parent(child_c, parent_b).unwrap(); + + assert_eq!(uut, reference); +} + +// PROCESSING TESTS +// ================================================================================================ + +#[test] +fn process_root_evaluates_children_as_roots() { + let parent_a = 1; + let parent_b = 2; + let child_a = 3; + let child_b = 4; + let child_c = 5; + + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(parent_a).unwrap(); + uut.insert_with_no_parents(parent_b).unwrap(); + uut.insert_with_parent(child_a, parent_a).unwrap(); + uut.insert_with_parent(child_b, parent_b).unwrap(); + uut.insert_with_parents(child_c, [parent_a, parent_b].into()).unwrap(); + uut.promote_all(); + + uut.process_root(parent_a).unwrap(); + assert_eq!(uut.roots(), &[parent_b, child_a].into()); +} + +#[test] +fn process_root_rejects_non_root_node() { + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(1).unwrap(); + uut.insert_with_parent(2, 1).unwrap(); + uut.promote_all(); + + let err = uut.process_root(2).unwrap_err(); + let expected = GraphError::InvalidRootNode(2); + assert_eq!(err, expected); +} + +#[test] +fn process_root_cannot_reprocess_same_node() { + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(1).unwrap(); + uut.promote_all(); + uut.process_root(1).unwrap(); + + let err = uut.process_root(1).unwrap_err(); + let expected = GraphError::InvalidRootNode(1); + assert_eq!(err, expected); +} + +#[test] +fn processing_a_queue_graph() { + //! Creates a queue graph and ensures that nodes processed in FIFO order. + let nodes = (0..10).collect::>(); + + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(nodes[0]).unwrap(); + for pairs in nodes.windows(2) { + let (parent, id) = (pairs[0], pairs[1]); + uut.insert_with_parent(id, parent).unwrap(); + } + uut.promote_all(); + + let mut ordered_roots = Vec::::new(); + for _ in &nodes { + let current_roots = uut.roots().clone(); + ordered_roots.extend(¤t_roots); + + for root in current_roots { + uut.process_root(root).unwrap(); + } + } + + assert_eq!(ordered_roots, nodes); +} + +#[test] +fn processing_and_root_tracking() { + //! Creates a somewhat arbitrarily connected graph and ensures that roots are tracked as + //! expected as the they are processed. + let ancestor_a = 1; + let ancestor_b = 2; + let parent_a = 3; + let parent_b = 4; + let child_a = 5; + let child_b = 6; + let child_c = 7; + + let mut uut = TestGraph::default(); + uut.insert_with_no_parents(ancestor_a).unwrap(); + uut.insert_with_no_parents(ancestor_b).unwrap(); + uut.insert_with_parent(parent_a, ancestor_a).unwrap(); + uut.insert_with_parent(parent_b, ancestor_b).unwrap(); + uut.insert_with_parents(child_a, [ancestor_a, parent_a].into()).unwrap(); + uut.insert_with_parents(child_b, [parent_a, parent_b].into()).unwrap(); + uut.insert_with_parent(child_c, parent_b).unwrap(); + uut.promote_all(); + + assert_eq!(uut.roots(), &[ancestor_a, ancestor_b].into()); + + uut.process_root(ancestor_a).unwrap(); + assert_eq!(uut.roots(), &[ancestor_b, parent_a].into()); + + uut.process_root(ancestor_b).unwrap(); + assert_eq!(uut.roots(), &[parent_a, parent_b].into()); + + uut.process_root(parent_a).unwrap(); + assert_eq!(uut.roots(), &[parent_b, child_a].into()); + + uut.process_root(parent_b).unwrap(); + assert_eq!(uut.roots(), &[child_a, child_b, child_c].into()); + + uut.process_root(child_a).unwrap(); + assert_eq!(uut.roots(), &[child_b, child_c].into()); + + uut.process_root(child_b).unwrap(); + assert_eq!(uut.roots(), &[child_c].into()); + + uut.process_root(child_c).unwrap(); + assert!(uut.roots().is_empty()); +} diff --git a/crates/block-producer/src/mempool/mod.rs b/crates/block-producer/src/mempool/mod.rs index 3ebc9862..00e96e2a 100644 --- a/crates/block-producer/src/mempool/mod.rs +++ b/crates/block-producer/src/mempool/mod.rs @@ -18,7 +18,7 @@ use crate::{ }; mod batch_graph; -mod dependency_graph; +mod graph; mod inflight_state; mod transaction_graph; diff --git a/crates/block-producer/src/mempool/transaction_graph.rs b/crates/block-producer/src/mempool/transaction_graph.rs index ced50cdd..5709bfde 100644 --- a/crates/block-producer/src/mempool/transaction_graph.rs +++ b/crates/block-producer/src/mempool/transaction_graph.rs @@ -3,7 +3,7 @@ use std::collections::BTreeSet; use miden_objects::transaction::TransactionId; use super::{ - dependency_graph::{DependencyGraph, GraphError}, + graph::{DependencyGraph, GraphError}, BatchBudget, BudgetStatus, }; use crate::domain::transaction::AuthenticatedTransaction; diff --git a/crates/block-producer/src/server/mod.rs b/crates/block-producer/src/server/mod.rs index 7f79d7fd..08e72137 100644 --- a/crates/block-producer/src/server/mod.rs +++ b/crates/block-producer/src/server/mod.rs @@ -106,27 +106,30 @@ impl BlockProducer { // any complete or fail, we can shutdown the rest (somewhat) gracefully. let mut tasks = tokio::task::JoinSet::new(); - // TODO: improve the error situationship. let batch_builder_id = tasks .spawn({ let mempool = mempool.clone(); - async { batch_builder.run(mempool).await } + async { + batch_builder.run(mempool).await; + Ok(()) + } }) .id(); let block_builder_id = tasks .spawn({ let mempool = mempool.clone(); - async { block_builder.run(mempool).await } - }) - .id(); - let rpc_id = tasks - .spawn(async move { - BlockProducerRpcServer::new(mempool, store) - .serve(rpc_listener) - .await - .expect("block-producer failed") + async { + block_builder.run(mempool).await; + Ok(()) + } }) .id(); + let rpc_id = + tasks + .spawn(async move { + BlockProducerRpcServer::new(mempool, store).serve(rpc_listener).await + }) + .id(); let task_ids = HashMap::from([ (batch_builder_id, "batch-builder"), @@ -140,7 +143,7 @@ impl BlockProducer { let task_result = tasks.join_next_with_id().await.unwrap(); let task_id = match &task_result { - Ok((id, ())) => *id, + Ok((id, _)) => *id, Err(err) => err.id(), }; let task = task_ids.get(&task_id).unwrap_or(&"unknown"); @@ -150,7 +153,11 @@ impl BlockProducer { task_result .map_err(|source| BlockProducerError::JoinError { task, source }) - .map(|(_, ())| Err(BlockProducerError::TaskFailedSuccesfully { task }))? + .map(|(_, result)| match result { + Ok(_) => Err(BlockProducerError::TaskFailedSuccesfully { task }), + Err(source) => Err(BlockProducerError::TonicTransportError { task, source }), + }) + .and_then(|x| x) } } diff --git a/crates/block-producer/src/store/mod.rs b/crates/block-producer/src/store/mod.rs index 2b34d75d..a6af90e0 100644 --- a/crates/block-producer/src/store/mod.rs +++ b/crates/block-producer/src/store/mod.rs @@ -155,7 +155,7 @@ impl DefaultStore { /// Returns the latest block's header from the store. pub async fn latest_header(&self) -> Result { - // TODO: fixup the errors types. + // TODO: Consolidate the error types returned by the store (and its trait). let response = self .store .clone()