Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
test(concurrency): test finish execution and finish validation in sch…
Browse files Browse the repository at this point in the history
…eduler (#1801)
  • Loading branch information
avi-starkware authored May 9, 2024
1 parent 83cfc2c commit 2431cd6
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 29 deletions.
14 changes: 0 additions & 14 deletions crates/blockifier/src/concurrency/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use crate::concurrency::TxIndex;
#[path = "scheduler_test.rs"]
pub mod test;

// TODO(Avi, 01/04/2024): Remove dead_code attribute.
#[allow(dead_code)]
#[derive(Debug, Default)]
pub struct Scheduler {
execution_index: AtomicUsize,
Expand All @@ -27,8 +25,6 @@ pub struct Scheduler {
done_marker: AtomicBool,
}

// TODO(Avi, 01/04/2024): Remove dead_code attribute.
#[allow(dead_code)]
impl Scheduler {
pub fn new(chunk_size: usize) -> Scheduler {
Scheduler {
Expand Down Expand Up @@ -171,14 +167,6 @@ impl Scheduler {
}
}

fn decrease_execution_index(&self, target_index: TxIndex) {
let previous_execution_index =
self.execution_index.fetch_min(target_index, Ordering::SeqCst);
if target_index < previous_execution_index {
self.decrease_counter.fetch_add(1, Ordering::SeqCst);
}
}

/// Updates a transaction's status to `Executing` if it is ready to execute.
fn try_incarnate(&self, tx_index: TxIndex) -> bool {
if tx_index < self.chunk_size {
Expand Down Expand Up @@ -241,8 +229,6 @@ pub enum Task {
Done,
}

// TODO(Barak, 01/04/2024): Remove dead_code attribute.
#[allow(dead_code)]
#[derive(Clone, Copy, Debug, PartialEq)]
enum TransactionStatus {
ReadyToExecute,
Expand Down
128 changes: 113 additions & 15 deletions crates/blockifier/src/concurrency/scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,119 @@ fn test_next_task(
assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), expected_n_active_tasks);
}

#[rstest]
#[case::happy_flow(TransactionStatus::Executing)]
#[should_panic(expected = "Only executing transactions can gain status executed. Transaction 0 \
is not executing. Transaction status: ReadyToExecute.")]
#[case::wrong_status_ready(TransactionStatus::ReadyToExecute)]
#[should_panic(expected = "Only executing transactions can gain status executed. Transaction 0 \
is not executing. Transaction status: Executed.")]
#[case::wrong_status_executed(TransactionStatus::Executed)]
#[should_panic(expected = "Only executing transactions can gain status executed. Transaction 0 \
is not executing. Transaction status: Aborting.")]
#[case::wrong_status_aborting(TransactionStatus::Aborting)]
fn test_set_executed_status(#[case] tx_status: TransactionStatus) {
let tx_index = 0;
let scheduler = Scheduler::new(DEFAULT_CHUNK_SIZE);
scheduler.set_tx_status(tx_index, tx_status);
// Panic is expected here in negative flows.
scheduler.set_executed_status(tx_index);
assert_eq!(*scheduler.lock_tx_status(tx_index), TransactionStatus::Executed);
}

#[rstest]
#[case::reduces_validation_index(0, 10)]
#[case::does_not_reduce_validation_index(10, 0)]
fn test_finish_execution(#[case] tx_index: TxIndex, #[case] validation_index: TxIndex) {
let n_active_tasks = 1;
let scheduler = default_scheduler!(
chunk_size: DEFAULT_CHUNK_SIZE,
validation_index: validation_index,
n_active_tasks: n_active_tasks,
);
scheduler.set_tx_status(tx_index, TransactionStatus::Executing);
scheduler.finish_execution(tx_index);
assert_eq!(*scheduler.lock_tx_status(tx_index), TransactionStatus::Executed);
assert_eq!(scheduler.validation_index.load(Ordering::Acquire), min(tx_index, validation_index));
assert_eq!(scheduler.n_active_tasks.load(Ordering::Acquire), n_active_tasks - 1);
}

#[rstest]
#[case::happy_flow(TransactionStatus::Aborting)]
#[should_panic(expected = "Only aborting transactions can be re-executed. Transaction 0 is not \
aborting. Transaction status: ReadyToExecute.")]
#[case::wrong_status_ready(TransactionStatus::ReadyToExecute)]
#[should_panic(expected = "Only aborting transactions can be re-executed. Transaction 0 is not \
aborting. Transaction status: Executed.")]
#[case::wrong_status_executed(TransactionStatus::Executed)]
#[should_panic(expected = "Only aborting transactions can be re-executed. Transaction 0 is not \
aborting. Transaction status: Executing.")]
#[case::wrong_status_executing(TransactionStatus::Executing)]
fn test_set_ready_status(#[case] tx_status: TransactionStatus) {
let tx_index = 0;
let scheduler = Scheduler::new(DEFAULT_CHUNK_SIZE);
scheduler.set_tx_status(tx_index, tx_status);
// Panic is expected here in negative flows.
scheduler.set_ready_status(tx_index);
assert_eq!(*scheduler.lock_tx_status(tx_index), TransactionStatus::ReadyToExecute);
}

#[rstest]
#[case::abort_validation(TransactionStatus::Executed)]
#[case::wrong_status_ready(TransactionStatus::ReadyToExecute)]
#[case::wrong_status_executing(TransactionStatus::Executing)]
#[case::wrong_status_aborted(TransactionStatus::Aborting)]
fn test_try_validation_abort(#[case] tx_status: TransactionStatus) {
let tx_index = 0;
let scheduler = Scheduler::new(DEFAULT_CHUNK_SIZE);
scheduler.set_tx_status(tx_index, tx_status);
let result = scheduler.try_validation_abort(tx_index);
assert_eq!(result, tx_status == TransactionStatus::Executed);
if result {
assert_eq!(*scheduler.lock_tx_status(tx_index), TransactionStatus::Aborting);
}
}

#[rstest]
#[case::not_aborted(0, 10, false)]
#[case::returns_execution_task(0, 10, true)]
#[case::does_not_return_execution_task(10, 0, true)]
fn test_finish_validation(
#[case] tx_index: TxIndex,
#[case] execution_index: TxIndex,
#[case] aborted: bool,
) {
let n_active_tasks = 1;
let scheduler = default_scheduler!(
chunk_size: DEFAULT_CHUNK_SIZE,
execution_index: execution_index,
n_active_tasks: n_active_tasks,
);
let tx_status = if aborted { TransactionStatus::Aborting } else { TransactionStatus::Executed };
scheduler.set_tx_status(tx_index, tx_status);
let result = scheduler.finish_validation(tx_index, aborted);
let new_status = scheduler.lock_tx_status(tx_index);
let new_n_active_tasks = scheduler.n_active_tasks.load(Ordering::Acquire);
match aborted {
true => {
if execution_index > tx_index {
assert_eq!(result, Task::ExecutionTask(tx_index));
assert_eq!(*new_status, TransactionStatus::Executing);
assert_eq!(new_n_active_tasks, n_active_tasks);
} else {
assert_eq!(result, Task::NoTask);
assert_eq!(*new_status, TransactionStatus::ReadyToExecute);
assert_eq!(new_n_active_tasks, n_active_tasks - 1);
}
}
false => {
assert_eq!(result, Task::NoTask);
assert_eq!(*new_status, TransactionStatus::Executed);
assert_eq!(new_n_active_tasks, n_active_tasks - 1);
}
}
}

#[rstest]
#[case::target_index_lt_validation_index(1, 3)]
#[case::target_index_eq_validation_index(3, 3)]
Expand All @@ -136,21 +249,6 @@ fn test_decrease_validation_index(
assert_eq!(scheduler.decrease_counter.load(Ordering::Acquire), expected_decrease_counter);
}

#[rstest]
#[case::target_index_lt_execution_index(1, 3)]
#[case::target_index_eq_execution_index(3, 3)]
#[case::target_index_eq_execution_index_eq_zero(0, 0)]
#[case::target_index_gt_execution_index(1, 0)]
fn test_decrease_execution_index(#[case] target_index: TxIndex, #[case] execution_index: TxIndex) {
let scheduler =
default_scheduler!(chunk_size: DEFAULT_CHUNK_SIZE, execution_index: execution_index);
scheduler.decrease_execution_index(target_index);
let expected_execution_index = min(target_index, execution_index);
assert_eq!(scheduler.execution_index.load(Ordering::Acquire), expected_execution_index);
let expected_decrease_counter = if target_index < execution_index { 1 } else { 0 };
assert_eq!(scheduler.decrease_counter.load(Ordering::Acquire), expected_decrease_counter);
}

#[rstest]
#[case::ready_to_execute(0, TransactionStatus::ReadyToExecute, true)]
#[case::executing(0, TransactionStatus::Executing, false)]
Expand Down

0 comments on commit 2431cd6

Please sign in to comment.