diff --git a/crates/blockifier/src/bouncer.rs b/crates/blockifier/src/bouncer.rs index ed05d48f5e..3d940bb4a8 100644 --- a/crates/blockifier/src/bouncer.rs +++ b/crates/blockifier/src/bouncer.rs @@ -36,7 +36,7 @@ macro_rules! impl_checked_sub { pub type HashMapWrapper = HashMap; -#[derive(Debug, Default, PartialEq, Clone)] +#[derive(Clone, Debug, Default, PartialEq)] pub struct BouncerConfig { pub block_max_capacity: BouncerWeights, pub block_max_capacity_with_keccak: BouncerWeights, diff --git a/crates/blockifier/src/concurrency.rs b/crates/blockifier/src/concurrency.rs index f898b46b09..71000f105a 100644 --- a/crates/blockifier/src/concurrency.rs +++ b/crates/blockifier/src/concurrency.rs @@ -2,6 +2,7 @@ pub mod fee_utils; pub mod scheduler; #[cfg(any(feature = "testing", test))] pub mod test_utils; +pub mod utils; pub mod versioned_state_proxy; pub mod versioned_storage; pub mod worker_logic; diff --git a/crates/blockifier/src/concurrency/scheduler.rs b/crates/blockifier/src/concurrency/scheduler.rs index d5ebabc9fa..5a44d76a62 100644 --- a/crates/blockifier/src/concurrency/scheduler.rs +++ b/crates/blockifier/src/concurrency/scheduler.rs @@ -2,6 +2,7 @@ use std::cmp::min; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::{Mutex, MutexGuard}; +use crate::concurrency::utils::lock_mutex_in_array; use crate::concurrency::TxIndex; #[cfg(test)] @@ -128,13 +129,7 @@ impl Scheduler { } fn lock_tx_status(&self, tx_index: TxIndex) -> MutexGuard<'_, TransactionStatus> { - self.tx_statuses[tx_index].lock().unwrap_or_else(|error| { - panic!( - "Status of transaction index {} is poisoned. Data: {:?}.", - tx_index, - *error.get_ref() - ) - }) + lock_mutex_in_array(&self.tx_statuses, tx_index) } fn set_executed_status(&self, tx_index: TxIndex) { diff --git a/crates/blockifier/src/concurrency/scheduler_test.rs b/crates/blockifier/src/concurrency/scheduler_test.rs index 8fa558aa90..1c96f5a671 100644 --- a/crates/blockifier/src/concurrency/scheduler_test.rs +++ b/crates/blockifier/src/concurrency/scheduler_test.rs @@ -66,7 +66,7 @@ fn test_lock_tx_status() { } #[rstest] -#[should_panic(expected = "Status of transaction index 0 is poisoned. Data: ReadyToExecute.")] +#[should_panic(expected = "Cell of transaction index 0 is poisoned. Data: ReadyToExecute.")] fn test_lock_tx_status_poisoned() { let scheduler = Arc::new(Scheduler::new(DEFAULT_CHUNK_SIZE)); let scheduler_clone = scheduler.clone(); diff --git a/crates/blockifier/src/concurrency/utils.rs b/crates/blockifier/src/concurrency/utils.rs new file mode 100644 index 0000000000..4ca2b2eb17 --- /dev/null +++ b/crates/blockifier/src/concurrency/utils.rs @@ -0,0 +1,10 @@ +use std::fmt::Debug; +use std::sync::{Mutex, MutexGuard}; + +use crate::concurrency::TxIndex; + +pub fn lock_mutex_in_array(array: &[Mutex], tx_index: TxIndex) -> MutexGuard<'_, T> { + array[tx_index].lock().unwrap_or_else(|error| { + panic!("Cell of transaction index {} is poisoned. Data: {:?}.", tx_index, *error.get_ref()) + }) +} diff --git a/crates/blockifier/src/concurrency/versioned_state_proxy.rs b/crates/blockifier/src/concurrency/versioned_state_proxy.rs index 6584291eda..692e20ce10 100644 --- a/crates/blockifier/src/concurrency/versioned_state_proxy.rs +++ b/crates/blockifier/src/concurrency/versioned_state_proxy.rs @@ -69,7 +69,7 @@ impl VersionedState { // accessing this function should be protected by a mutex to ensure thread safety. // TODO: Consider coupling the tx index with the read set to ensure any mismatch between them // will cause the validation to fail. - fn validate_read_set(&mut self, tx_index: TxIndex, reads: &StateMaps) -> bool { + fn validate_reads(&mut self, tx_index: TxIndex, reads: &StateMaps) -> bool { // If is the first transaction in the chunk, then the read set is valid. Since it has no // predecessors, there's nothing to compare it to. if tx_index == 0 { @@ -170,8 +170,8 @@ impl VersionedStateProxy { self.state.lock().expect("Failed to acquire state lock.") } - pub fn validate_read_set(&self, reads: &StateMaps) -> bool { - self.state().validate_read_set(self.tx_index, reads) + pub fn validate_reads(&self, reads: &StateMaps) -> bool { + self.state().validate_reads(self.tx_index, reads) } pub fn apply_writes(&self, writes: &StateMaps, class_hash_to_class: &ContractClassMapping) { diff --git a/crates/blockifier/src/concurrency/versioned_state_proxy_test.rs b/crates/blockifier/src/concurrency/versioned_state_proxy_test.rs index 3870d2d9f0..04fafdc996 100644 --- a/crates/blockifier/src/concurrency/versioned_state_proxy_test.rs +++ b/crates/blockifier/src/concurrency/versioned_state_proxy_test.rs @@ -280,7 +280,7 @@ fn test_validate_read_set( let transactional_state = CachedState::from(safe_versioned_state.pin_version(1)); // Validating tx index 0 always succeeds. - assert!(safe_versioned_state.pin_version(0).validate_read_set(&StateMaps::default())); + assert!(safe_versioned_state.pin_version(0).validate_reads(&StateMaps::default())); assert!(transactional_state.cache.borrow().initial_reads.storage.is_empty()); transactional_state.get_storage_at(contract_address, storage_key).unwrap(); @@ -304,7 +304,7 @@ fn test_validate_read_set( assert!( safe_versioned_state .pin_version(1) - .validate_read_set(&transactional_state.cache.borrow().initial_reads) + .validate_reads(&transactional_state.cache.borrow().initial_reads) ); } diff --git a/crates/blockifier/src/concurrency/worker_logic.rs b/crates/blockifier/src/concurrency/worker_logic.rs index 75b801ed80..d536ceffbc 100644 --- a/crates/blockifier/src/concurrency/worker_logic.rs +++ b/crates/blockifier/src/concurrency/worker_logic.rs @@ -1,25 +1,21 @@ use std::collections::{HashMap, HashSet}; -use std::sync::{Arc, Mutex}; +use std::fmt::Debug; +use std::sync::Mutex; use starknet_api::core::ClassHash; use crate::concurrency::scheduler::{Scheduler, Task}; +use crate::concurrency::utils::lock_mutex_in_array; use crate::concurrency::versioned_state_proxy::ThreadSafeVersionedState; use crate::concurrency::TxIndex; use crate::context::BlockContext; -use crate::state::cached_state::StateMaps; +use crate::state::cached_state::{CachedState, StateMaps}; use crate::state::state_api::StateReader; use crate::transaction::objects::{TransactionExecutionInfo, TransactionExecutionResult}; use crate::transaction::transaction_execution::Transaction; +use crate::transaction::transactions::ExecutableTransaction; -pub struct ConcurrentExecutionContext { - pub scheduler: Scheduler, - pub state: ThreadSafeVersionedState, - pub chunk: Box<[Transaction]>, - pub execution_outputs: Box<[Mutex]>, - pub block_context: BlockContext, -} - +#[derive(Debug)] pub struct ExecutionTaskOutput { pub reads: StateMaps, pub writes: StateMaps, @@ -27,32 +23,72 @@ pub struct ExecutionTaskOutput { pub result: TransactionExecutionResult, } -// TODO(Noa, 15/05/2024): Re-consider the necessity of the Arc (as opposed to a reference), given -// concurrent code. -pub fn run(execution_context: Arc>) { - let scheduler = &execution_context.scheduler; - let mut task = Task::NoTask; - loop { - task = match task { - Task::ExecutionTask(tx_index) => { - execute(&execution_context, tx_index); - Task::NoTask - } - Task::ValidationTask(tx_index) => validate(&execution_context, tx_index), - Task::NoTask => scheduler.next_task(), - Task::Done => break, - }; - } +pub struct WorkerExecutor { + pub scheduler: Scheduler, + pub state: ThreadSafeVersionedState, + pub chunk: Box<[Transaction]>, + pub execution_outputs: Box<[Mutex>]>, + pub block_context: BlockContext, } +impl WorkerExecutor { + pub fn run(&self) { + let mut task = Task::NoTask; + loop { + task = match task { + Task::ExecutionTask(tx_index) => { + self.execute(tx_index); + Task::NoTask + } + Task::ValidationTask(tx_index) => self.validate(tx_index), + Task::NoTask => self.scheduler.next_task(), + Task::Done => break, + }; + } + } -fn execute(_execution_context: &ConcurrentExecutionContext, _tx_index: TxIndex) { - // TODO(Noa, 15/05/2024): share code with `try_commit`. - todo!(); -} + fn execute(&self, tx_index: TxIndex) { + self.execute_tx(tx_index); + self.scheduler.finish_execution(tx_index) + } + + fn execute_tx(&self, tx_index: TxIndex) { + let tx_versioned_state = self.state.pin_version(tx_index); + let tx = &self.chunk[tx_index]; + // TODO(Noa, 15/05/2024): remove the redundant cached state. + let mut tx_state = CachedState::new(tx_versioned_state); + let mut transactional_state = CachedState::create_transactional(&mut tx_state); + let validate = true; + let charge_fee = true; + + let execution_result = + tx.execute_raw(&mut transactional_state, &self.block_context, charge_fee, validate); -fn validate( - _execution_context: &ConcurrentExecutionContext, - _tx_index: TxIndex, -) -> Task { - todo!(); + if execution_result.is_ok() { + let class_hash_to_class = transactional_state.class_hash_to_class.borrow(); + // TODO(Noa, 15/05/2024): use `tx_versioned_state` when we add support to transactional + // versioned state. + self.state + .pin_version(tx_index) + .apply_writes(&transactional_state.cache.borrow().writes, &class_hash_to_class); + } + + // Write the transaction execution outputs. + let tx_reads_writes = transactional_state.cache.take(); + // In case of a failed transaction, we don't record its writes and visited pcs. + let (writes, visited_pcs) = match execution_result { + Ok(_) => (tx_reads_writes.writes, transactional_state.visited_pcs), + Err(_) => (StateMaps::default(), HashMap::default()), + }; + let mut execution_output = lock_mutex_in_array(&self.execution_outputs, tx_index); + *execution_output = Some(ExecutionTaskOutput { + reads: tx_reads_writes.initial_reads, + writes, + visited_pcs, + result: execution_result, + }); + } + + fn validate(&self, _tx_index: TxIndex) -> Task { + todo!(); + } }