diff --git a/crates/blockifier/src/concurrency.rs b/crates/blockifier/src/concurrency.rs index 13c9d244f0..37ba6dc68d 100644 --- a/crates/blockifier/src/concurrency.rs +++ b/crates/blockifier/src/concurrency.rs @@ -3,5 +3,6 @@ pub mod scheduler; pub mod test_utils; pub mod versioned_state_proxy; pub mod versioned_storage; +pub mod worker_logic; type TxIndex = usize; diff --git a/crates/blockifier/src/concurrency/scheduler.rs b/crates/blockifier/src/concurrency/scheduler.rs index 3e9f88b048..d8f6a2a485 100644 --- a/crates/blockifier/src/concurrency/scheduler.rs +++ b/crates/blockifier/src/concurrency/scheduler.rs @@ -45,7 +45,7 @@ impl Scheduler { } /// Returns the done marker. - pub fn done(&self) -> bool { + fn done(&self) -> bool { self.done_marker.load(Ordering::Acquire) } @@ -97,17 +97,18 @@ impl Scheduler { /// Updates the Scheduler that a validation task has been finished and triggers the creation of /// new tasks in case of failure: schedules validation for higher transactions + re-executes the /// current transaction (if ready). - pub fn finish_validation(&self, tx_index: TxIndex, aborted: bool) -> Option { + pub fn finish_validation(&self, tx_index: TxIndex, aborted: bool) -> Task { if aborted { self.set_ready_status(tx_index); if self.execution_index.load(Ordering::Acquire) > tx_index && self.try_incarnate(tx_index) { - return Some(Task::ExecutionTask(tx_index)); + return Task::ExecutionTask(tx_index); } } self.safe_decrement_n_active_tasks(); - None + + Task::NoTask } /// Checks if all transactions have been executed and validated. diff --git a/crates/blockifier/src/concurrency/versioned_state_proxy.rs b/crates/blockifier/src/concurrency/versioned_state_proxy.rs index 5de7d36d8f..6584291eda 100644 --- a/crates/blockifier/src/concurrency/versioned_state_proxy.rs +++ b/crates/blockifier/src/concurrency/versioned_state_proxy.rs @@ -154,6 +154,12 @@ impl ThreadSafeVersionedState { } } +impl Clone for ThreadSafeVersionedState { + fn clone(&self) -> Self { + ThreadSafeVersionedState(Arc::clone(&self.0)) + } +} + pub struct VersionedStateProxy { pub tx_index: TxIndex, pub state: Arc>>, diff --git a/crates/blockifier/src/concurrency/worker_logic.rs b/crates/blockifier/src/concurrency/worker_logic.rs new file mode 100644 index 0000000000..75b801ed80 --- /dev/null +++ b/crates/blockifier/src/concurrency/worker_logic.rs @@ -0,0 +1,58 @@ +use std::collections::{HashMap, HashSet}; +use std::sync::{Arc, Mutex}; + +use starknet_api::core::ClassHash; + +use crate::concurrency::scheduler::{Scheduler, Task}; +use crate::concurrency::versioned_state_proxy::ThreadSafeVersionedState; +use crate::concurrency::TxIndex; +use crate::context::BlockContext; +use crate::state::cached_state::StateMaps; +use crate::state::state_api::StateReader; +use crate::transaction::objects::{TransactionExecutionInfo, TransactionExecutionResult}; +use crate::transaction::transaction_execution::Transaction; + +pub struct ConcurrentExecutionContext { + pub scheduler: Scheduler, + pub state: ThreadSafeVersionedState, + pub chunk: Box<[Transaction]>, + pub execution_outputs: Box<[Mutex]>, + pub block_context: BlockContext, +} + +pub struct ExecutionTaskOutput { + pub reads: StateMaps, + pub writes: StateMaps, + pub visited_pcs: HashMap>, + pub result: TransactionExecutionResult, +} + +// TODO(Noa, 15/05/2024): Re-consider the necessity of the Arc (as opposed to a reference), given +// concurrent code. +pub fn run(execution_context: Arc>) { + let scheduler = &execution_context.scheduler; + let mut task = Task::NoTask; + loop { + task = match task { + Task::ExecutionTask(tx_index) => { + execute(&execution_context, tx_index); + Task::NoTask + } + Task::ValidationTask(tx_index) => validate(&execution_context, tx_index), + Task::NoTask => scheduler.next_task(), + Task::Done => break, + }; + } +} + +fn execute(_execution_context: &ConcurrentExecutionContext, _tx_index: TxIndex) { + // TODO(Noa, 15/05/2024): share code with `try_commit`. + todo!(); +} + +fn validate( + _execution_context: &ConcurrentExecutionContext, + _tx_index: TxIndex, +) -> Task { + todo!(); +}