diff --git a/crates/blockifier/src/concurrency/scheduler.rs b/crates/blockifier/src/concurrency/scheduler.rs index 6ed5e65817..4a93c5e62c 100644 --- a/crates/blockifier/src/concurrency/scheduler.rs +++ b/crates/blockifier/src/concurrency/scheduler.rs @@ -1,6 +1,6 @@ use std::cmp::min; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::Mutex; +use std::sync::{Mutex, MutexGuard}; use crate::concurrency::TxIndex; @@ -20,6 +20,7 @@ pub struct Scheduler { decrease_counter: AtomicUsize, n_active_tasks: AtomicUsize, chunk_size: usize, + // TODO(Avi, 15/05/2024): Consider using RwLock instead of Mutex. tx_statuses: Box<[Mutex]>, /// Updated by the `check_done` procedure, providing a cheap way for all threads to exit their /// main loops. @@ -68,6 +69,16 @@ impl Scheduler { assert!(previous_n_active_tasks > 0, "n_active_tasks underflow"); } + fn lock_tx_status(&self, tx_index: TxIndex) -> MutexGuard<'_, TransactionStatus> { + self.tx_statuses[tx_index].lock().unwrap_or_else(|error| { + panic!( + "Status of transaction index {} is poisoned. Data: {:?}.", + tx_index, + *error.get_ref() + ) + }) + } + pub fn next_task(&self) -> Task { if self.done() { return Task::Done; @@ -128,8 +139,11 @@ impl Scheduler { /// Updates a transaction's status to `Executing` if it is ready to execute. fn try_incarnate(&self, tx_index: TxIndex) -> Option { if tx_index < self.chunk_size { - // TODO(barak, 01/04/2024): complete try_incarnate logic. - return Some(tx_index); + let mut status = self.lock_tx_status(tx_index); + if *status == TransactionStatus::ReadyToExecute { + *status = TransactionStatus::Executing; + return Some(tx_index); + } } self.safe_decrement_n_active_tasks(); None @@ -144,8 +158,10 @@ impl Scheduler { self.n_active_tasks.fetch_add(1, Ordering::SeqCst); let index_to_validate = self.validation_index.fetch_add(1, Ordering::SeqCst); if index_to_validate < self.chunk_size { - // TODO(barak, 01/04/2024): complete next_version_to_validate logic. - return Some(index_to_validate); + let status = self.lock_tx_status(index_to_validate); + if *status == TransactionStatus::Executed { + return Some(index_to_validate); + } } self.safe_decrement_n_active_tasks(); None @@ -161,6 +177,14 @@ impl Scheduler { let index_to_execute = self.execution_index.fetch_add(1, Ordering::SeqCst); self.try_incarnate(index_to_execute) } + + #[cfg(test)] + fn set_tx_status(&self, tx_index: TxIndex, status: TransactionStatus) { + if tx_index < self.chunk_size { + let mut tx_status = self.lock_tx_status(tx_index); + *tx_status = status; + } + } } #[derive(Debug, PartialEq)] diff --git a/crates/blockifier/src/concurrency/scheduler_test.rs b/crates/blockifier/src/concurrency/scheduler_test.rs index d821c361b0..77510cc0ae 100644 --- a/crates/blockifier/src/concurrency/scheduler_test.rs +++ b/crates/blockifier/src/concurrency/scheduler_test.rs @@ -1,5 +1,6 @@ use std::cmp::min; use std::sync::atomic::Ordering; +use std::sync::Arc; use pretty_assertions::assert_eq; use rstest::rstest; @@ -31,8 +32,8 @@ fn test_new(#[values(0, 1, 32)] chunk_size: usize) { #[case::execution_incomplete(DEFAULT_CHUNK_SIZE-1, DEFAULT_CHUNK_SIZE+1, 0, false)] #[case::validation_incomplete(DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE-1, 0, false)] fn test_check_done( - #[case] execution_index: usize, - #[case] validation_index: usize, + #[case] execution_index: TxIndex, + #[case] validation_index: TxIndex, #[case] n_active_tasks: usize, #[case] expected: bool, ) { @@ -47,119 +48,174 @@ fn test_check_done( } #[rstest] -#[case::happy_flow(1, 0)] +#[case::no_panic(1)] #[should_panic(expected = "n_active_tasks underflow")] -#[case::underflow(0, 0)] -fn test_safe_decrement_n_active_tasks( - #[case] n_active_tasks: usize, - #[case] expected_n_active_tasks: usize, -) { +#[case::underflow_panic(0)] +fn test_safe_decrement_n_active_tasks(#[case] n_active_tasks: usize) { let scheduler = default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, n_active_tasks: n_active_tasks); scheduler.safe_decrement_n_active_tasks(); - assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), expected_n_active_tasks); + assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), n_active_tasks - 1); } #[rstest] -#[case::done(DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE, true, Task::Done, 0)] -#[case::no_task(DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE, false, Task::NoTask, 0)] -#[case::execution_task(0, 0, false, Task::ExecutionTask(0), 1)] -#[case::validation_task(1, 0, false, Task::ValidationTask(0), 1)] +fn test_lock_tx_status() { + let scheduler = Scheduler::new(DEFAULT_CHUNK_SIZE); + let status = scheduler.lock_tx_status(0); + assert_eq!(*status, TransactionStatus::ReadyToExecute); +} + +#[rstest] +#[should_panic(expected = "Status of transaction index 0 is poisoned. Data: ReadyToExecute.")] +fn test_lock_tx_status_poisoned() { + let scheduler = Arc::new(Scheduler::new(DEFAULT_CHUNK_SIZE)); + let scheduler_clone = scheduler.clone(); + let handle = std::thread::spawn(move || { + let _guard = scheduler_clone.lock_tx_status(0); + panic!("Intentional panic to poison the mutex") + }); + handle.join().expect_err("Thread did not panic as expected"); + // The panic is expected here. + let _guard = scheduler.lock_tx_status(0); +} + +#[rstest] +#[case::done(DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE, TransactionStatus::Executed, Task::Done)] +#[case::no_task(DEFAULT_CHUNK_SIZE, DEFAULT_CHUNK_SIZE, TransactionStatus::Executed, Task::NoTask)] +#[case::no_task_as_validation_index_not_executed( + DEFAULT_CHUNK_SIZE, + 0, + TransactionStatus::ReadyToExecute, + Task::NoTask +)] +#[case::execution_task(0, 0, TransactionStatus::ReadyToExecute, Task::ExecutionTask(0))] +#[case::execution_task_as_validation_index_not_executed( + 1, + 0, + TransactionStatus::ReadyToExecute, + Task::ExecutionTask(1) +)] +#[case::validation_task(1, 0, TransactionStatus::Executed, Task::ValidationTask(0))] fn test_next_task( - #[case] execution_index: usize, - #[case] validation_index: usize, - #[case] done_marker: bool, + #[case] execution_index: TxIndex, + #[case] validation_index: TxIndex, + #[case] validation_index_status: TransactionStatus, #[case] expected_next_task: Task, - #[case] expected_n_active_tasks: usize, ) { let scheduler = default_scheduler!( chunk_size: DEFAULT_CHUNK_SIZE, execution_index: execution_index, validation_index: validation_index, - done_marker: done_marker, + done_marker: expected_next_task == Task::Done, ); + scheduler.set_tx_status(validation_index, validation_index_status); let next_task = scheduler.next_task(); assert_eq!(next_task, expected_next_task); + let expected_n_active_tasks = match expected_next_task { + Task::Done | Task::NoTask => 0, + _ => 1, + }; assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), expected_n_active_tasks); } #[rstest] -#[case::target_index_lt_validation_index(1, 3, 1)] -#[case::target_index_eq_validation_index(3, 3, 0)] -#[case::target_index_eq_validation_index_eq_zero(0, 0, 0)] -#[case::target_index_gt_validation_index(1, 0, 0)] +#[case::target_index_lt_validation_index(1, 3)] +#[case::target_index_eq_validation_index(3, 3)] +#[case::target_index_eq_validation_index_eq_zero(0, 0)] +#[case::target_index_gt_validation_index(1, 0)] fn test_decrease_validation_index( #[case] target_index: TxIndex, - #[case] validation_index: usize, - #[case] expected_decrease_counter: usize, + #[case] validation_index: TxIndex, ) { let scheduler = default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, validation_index: validation_index); scheduler.decrease_validation_index(target_index); let expected_validation_index = min(target_index, validation_index); assert_eq!(scheduler.validation_index.load(Ordering::Acquire), expected_validation_index); + let expected_decrease_counter = if target_index < validation_index { 1 } else { 0 }; assert_eq!(scheduler.decrease_counter.load(Ordering::Acquire), expected_decrease_counter); } #[rstest] -#[case::target_index_lt_execution_index(1, 3, 1)] -#[case::target_index_eq_execution_index(3, 3, 0)] -#[case::target_index_eq_execution_index_eq_zero(0, 0, 0)] -#[case::target_index_gt_execution_index(1, 0, 0)] -fn test_decrease_execution_index( - #[case] target_index: TxIndex, - #[case] execution_index: usize, - #[case] expected_decrease_counter: usize, -) { +#[case::target_index_lt_execution_index(1, 3)] +#[case::target_index_eq_execution_index(3, 3)] +#[case::target_index_eq_execution_index_eq_zero(0, 0)] +#[case::target_index_gt_execution_index(1, 0)] +fn test_decrease_execution_index(#[case] target_index: TxIndex, #[case] execution_index: TxIndex) { let scheduler = default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, execution_index: execution_index); scheduler.decrease_execution_index(target_index); let expected_execution_index = min(target_index, execution_index); assert_eq!(scheduler.execution_index.load(Ordering::Acquire), expected_execution_index); + let expected_decrease_counter = if target_index < execution_index { 1 } else { 0 }; assert_eq!(scheduler.decrease_counter.load(Ordering::Acquire), expected_decrease_counter); } #[rstest] -#[case::from_ready_to_execute_to_executing(0, Some(0), 1)] -#[case::index_out_of_bounds(DEFAULT_CHUNK_SIZE, None, 0)] +#[case::ready_to_execute(0, TransactionStatus::ReadyToExecute, Some(0))] +#[case::executing(0, TransactionStatus::Executing, None)] +#[case::executed(0, TransactionStatus::Executed, None)] +#[case::aborting(0, TransactionStatus::Aborting, None)] +#[case::index_out_of_bounds(DEFAULT_CHUNK_SIZE, TransactionStatus::ReadyToExecute, None)] fn test_try_incarnate( - #[case] tx_index: usize, - #[case] expected_output: Option, - #[case] expected_n_active_tasks: usize, + #[case] tx_index: TxIndex, + #[case] tx_status: TransactionStatus, + #[case] expected_output: Option, ) { let scheduler = default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, n_active_tasks: 1); + scheduler.set_tx_status(tx_index, tx_status); assert_eq!(scheduler.try_incarnate(tx_index), expected_output); - assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), expected_n_active_tasks); + if expected_output.is_some() { + assert_eq!(*scheduler.lock_tx_status(tx_index), TransactionStatus::Executing); + assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), 1); + } else { + assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), 0); + if tx_index < DEFAULT_CHUNK_SIZE { + assert_eq!(*scheduler.lock_tx_status(tx_index), tx_status); + } + } } #[rstest] -#[case::some(1, Some(1), 2, 1)] -#[case::none(DEFAULT_CHUNK_SIZE, None, DEFAULT_CHUNK_SIZE, 0)] +#[case::ready_to_execute(1, TransactionStatus::ReadyToExecute, None)] +#[case::executing(1, TransactionStatus::Executing, None)] +#[case::executed(1, TransactionStatus::Executed, Some(1))] +#[case::aborting(1, TransactionStatus::Aborting, None)] +#[case::index_out_of_bounds(DEFAULT_CHUNK_SIZE, TransactionStatus::ReadyToExecute, None)] fn test_next_version_to_validate( - #[case] validation_index: usize, - #[case] expected_output: Option, - #[case] expected_validation_index: usize, - #[case] expected_n_active_tasks: usize, + #[case] validation_index: TxIndex, + #[case] tx_status: TransactionStatus, + #[case] expected_output: Option, ) { let scheduler = default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, validation_index: validation_index); + scheduler.set_tx_status(validation_index, tx_status); assert_eq!(scheduler.next_version_to_validate(), expected_output); + let expected_validation_index = + if validation_index < DEFAULT_CHUNK_SIZE { validation_index + 1 } else { validation_index }; assert_eq!(scheduler.validation_index.load(Ordering::Acquire), expected_validation_index); + let expected_n_active_tasks = if expected_output.is_some() { 1 } else { 0 }; assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), expected_n_active_tasks); } #[rstest] -#[case::some(1, Some(1), 2, 1)] -#[case::none(DEFAULT_CHUNK_SIZE, None, DEFAULT_CHUNK_SIZE, 0)] +#[case::ready_to_execute(1, TransactionStatus::ReadyToExecute, Some(1))] +#[case::executing(1, TransactionStatus::Executing, None)] +#[case::executed(1, TransactionStatus::Executed, None)] +#[case::aborting(1, TransactionStatus::Aborting, None)] +#[case::index_out_of_bounds(DEFAULT_CHUNK_SIZE, TransactionStatus::ReadyToExecute, None)] fn test_next_version_to_execute( - #[case] execution_index: usize, - #[case] expected_output: Option, - #[case] expected_execution_index: usize, - #[case] expected_n_active_tasks: usize, + #[case] execution_index: TxIndex, + #[case] tx_status: TransactionStatus, + #[case] expected_output: Option, ) { let scheduler = default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, execution_index: execution_index); + scheduler.set_tx_status(execution_index, tx_status); assert_eq!(scheduler.next_version_to_execute(), expected_output); + let expected_execution_index = + if execution_index < DEFAULT_CHUNK_SIZE { execution_index + 1 } else { execution_index }; assert_eq!(scheduler.execution_index.load(Ordering::Acquire), expected_execution_index); + let expected_n_active_tasks = if expected_output.is_some() { 1 } else { 0 }; assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), expected_n_active_tasks); }