diff --git a/crates/blockifier/src/concurrency/scheduler.rs b/crates/blockifier/src/concurrency/scheduler.rs index 4a93c5e62c..3e9f88b048 100644 --- a/crates/blockifier/src/concurrency/scheduler.rs +++ b/crates/blockifier/src/concurrency/scheduler.rs @@ -49,36 +49,6 @@ impl Scheduler { self.done_marker.load(Ordering::Acquire) } - /// Checks if all transactions have been executed and validated. - fn check_done(&self) { - let observed_decrease_counter = self.decrease_counter.load(Ordering::Acquire); - - if min( - self.validation_index.load(Ordering::Acquire), - self.execution_index.load(Ordering::Acquire), - ) >= self.chunk_size - && self.n_active_tasks.load(Ordering::Acquire) == 0 - && observed_decrease_counter == self.decrease_counter.load(Ordering::Acquire) - { - self.done_marker.store(true, Ordering::Release); - } - } - - fn safe_decrement_n_active_tasks(&self) { - let previous_n_active_tasks = self.n_active_tasks.fetch_sub(1, Ordering::SeqCst); - assert!(previous_n_active_tasks > 0, "n_active_tasks underflow"); - } - - fn lock_tx_status(&self, tx_index: TxIndex) -> MutexGuard<'_, TransactionStatus> { - self.tx_statuses[tx_index].lock().unwrap_or_else(|error| { - panic!( - "Status of transaction index {} is poisoned. Data: {:?}.", - tx_index, - *error.get_ref() - ) - }) - } - pub fn next_task(&self) -> Task { if self.done() { return Task::Done; @@ -104,20 +74,92 @@ impl Scheduler { Task::NoTask } - // TODO(barak, 01/04/2024): Ensure documentation matches logic. /// Updates the Scheduler that an execution task has been finished and triggers the creation of /// new tasks accordingly: schedules validation for the current and higher transactions, if not /// already scheduled. - pub fn finish_execution(&self) -> Task { - todo!() + pub fn finish_execution(&self, tx_index: TxIndex) { + self.set_executed_status(tx_index); + if self.validation_index.load(Ordering::Acquire) > tx_index { + self.decrease_validation_index(tx_index); + } + self.safe_decrement_n_active_tasks(); + } + + pub fn try_validation_abort(&self, tx_index: TxIndex) -> bool { + let mut status = self.lock_tx_status(tx_index); + if *status == TransactionStatus::Executed { + *status = TransactionStatus::Aborting; + return true; + } + false } - // TODO(barak, 01/04/2024): Ensure documentation matches logic. /// Updates the Scheduler that a validation task has been finished and triggers the creation of /// new tasks in case of failure: schedules validation for higher transactions + re-executes the /// current transaction (if ready). - pub fn finish_validation(&self) -> Task { - todo!() + pub fn finish_validation(&self, tx_index: TxIndex, aborted: bool) -> Option { + if aborted { + self.set_ready_status(tx_index); + if self.execution_index.load(Ordering::Acquire) > tx_index + && self.try_incarnate(tx_index) + { + return Some(Task::ExecutionTask(tx_index)); + } + } + self.safe_decrement_n_active_tasks(); + None + } + + /// Checks if all transactions have been executed and validated. + fn check_done(&self) { + let observed_decrease_counter = self.decrease_counter.load(Ordering::Acquire); + + if min( + self.validation_index.load(Ordering::Acquire), + self.execution_index.load(Ordering::Acquire), + ) >= self.chunk_size + && self.n_active_tasks.load(Ordering::Acquire) == 0 + && observed_decrease_counter == self.decrease_counter.load(Ordering::Acquire) + { + self.done_marker.store(true, Ordering::Release); + } + } + + fn safe_decrement_n_active_tasks(&self) { + let previous_n_active_tasks = self.n_active_tasks.fetch_sub(1, Ordering::SeqCst); + assert!(previous_n_active_tasks > 0, "n_active_tasks underflow"); + } + + fn lock_tx_status(&self, tx_index: TxIndex) -> MutexGuard<'_, TransactionStatus> { + self.tx_statuses[tx_index].lock().unwrap_or_else(|error| { + panic!( + "Status of transaction index {} is poisoned. Data: {:?}.", + tx_index, + *error.get_ref() + ) + }) + } + + fn set_executed_status(&self, tx_index: TxIndex) { + let mut status = self.lock_tx_status(tx_index); + assert_eq!( + *status, + TransactionStatus::Executing, + "Only executing transactions can gain status executed. Transaction {tx_index} is not \ + executing. Transaction status: {status:?}." + ); + *status = TransactionStatus::Executed; + } + + fn set_ready_status(&self, tx_index: TxIndex) { + let mut status = self.lock_tx_status(tx_index); + assert_eq!( + *status, + TransactionStatus::Aborting, + "Only aborting transactions can be re-executed. Transaction {tx_index} is not \ + aborting. Transaction status: {status:?}." + ); + *status = TransactionStatus::ReadyToExecute; } fn decrease_validation_index(&self, target_index: TxIndex) { @@ -137,16 +179,16 @@ impl Scheduler { } /// Updates a transaction's status to `Executing` if it is ready to execute. - fn try_incarnate(&self, tx_index: TxIndex) -> Option { + fn try_incarnate(&self, tx_index: TxIndex) -> bool { if tx_index < self.chunk_size { let mut status = self.lock_tx_status(tx_index); if *status == TransactionStatus::ReadyToExecute { *status = TransactionStatus::Executing; - return Some(tx_index); + return true; } } self.safe_decrement_n_active_tasks(); - None + false } fn next_version_to_validate(&self) -> Option { @@ -175,7 +217,10 @@ impl Scheduler { } self.n_active_tasks.fetch_add(1, Ordering::SeqCst); let index_to_execute = self.execution_index.fetch_add(1, Ordering::SeqCst); - self.try_incarnate(index_to_execute) + if self.try_incarnate(index_to_execute) { + return Some(index_to_execute); + } + None } #[cfg(test)] diff --git a/crates/blockifier/src/concurrency/scheduler_test.rs b/crates/blockifier/src/concurrency/scheduler_test.rs index 77510cc0ae..b774a45b35 100644 --- a/crates/blockifier/src/concurrency/scheduler_test.rs +++ b/crates/blockifier/src/concurrency/scheduler_test.rs @@ -152,20 +152,20 @@ fn test_decrease_execution_index(#[case] target_index: TxIndex, #[case] executio } #[rstest] -#[case::ready_to_execute(0, TransactionStatus::ReadyToExecute, Some(0))] -#[case::executing(0, TransactionStatus::Executing, None)] -#[case::executed(0, TransactionStatus::Executed, None)] -#[case::aborting(0, TransactionStatus::Aborting, None)] -#[case::index_out_of_bounds(DEFAULT_CHUNK_SIZE, TransactionStatus::ReadyToExecute, None)] +#[case::ready_to_execute(0, TransactionStatus::ReadyToExecute, true)] +#[case::executing(0, TransactionStatus::Executing, false)] +#[case::executed(0, TransactionStatus::Executed, false)] +#[case::aborting(0, TransactionStatus::Aborting, false)] +#[case::index_out_of_bounds(DEFAULT_CHUNK_SIZE, TransactionStatus::ReadyToExecute, false)] fn test_try_incarnate( #[case] tx_index: TxIndex, #[case] tx_status: TransactionStatus, - #[case] expected_output: Option, + #[case] expected_output: bool, ) { let scheduler = default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, n_active_tasks: 1); scheduler.set_tx_status(tx_index, tx_status); assert_eq!(scheduler.try_incarnate(tx_index), expected_output); - if expected_output.is_some() { + if expected_output { assert_eq!(*scheduler.lock_tx_status(tx_index), TransactionStatus::Executing); assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), 1); } else {