From 7ea1c6b59aab9f59f099af020208897df48335c1 Mon Sep 17 00:00:00 2001 From: barak-b-starkware <110763103+barak-b-starkware@users.noreply.github.com> Date: Mon, 10 Jun 2024 09:29:18 +0300 Subject: [PATCH] feat(concurrency): implement execute_chunk (#1953) --- crates/blockifier/src/blockifier/config.rs | 10 +- .../src/blockifier/transaction_executor.rs | 145 +++++++++++++----- .../src/concurrency/worker_logic.rs | 31 +++- 3 files changed, 149 insertions(+), 37 deletions(-) diff --git a/crates/blockifier/src/blockifier/config.rs b/crates/blockifier/src/blockifier/config.rs index 36e1d74a6f..ff4fd33993 100644 --- a/crates/blockifier/src/blockifier/config.rs +++ b/crates/blockifier/src/blockifier/config.rs @@ -3,9 +3,17 @@ pub struct TransactionExecutorConfig { pub concurrency_config: ConcurrencyConfig, } -#[derive(Debug, Default, Clone)] +#[derive(Debug, Clone)] +#[cfg_attr(not(feature = "concurrency"), derive(Default))] pub struct ConcurrencyConfig { pub enabled: bool, pub n_workers: usize, pub chunk_size: usize, } + +#[cfg(feature = "concurrency")] +impl Default for ConcurrencyConfig { + fn default() -> Self { + Self { enabled: true, n_workers: 4, chunk_size: 64 } + } +} diff --git a/crates/blockifier/src/blockifier/transaction_executor.rs b/crates/blockifier/src/blockifier/transaction_executor.rs index 97eb799a95..7a8d395b18 100644 --- a/crates/blockifier/src/blockifier/transaction_executor.rs +++ b/crates/blockifier/src/blockifier/transaction_executor.rs @@ -1,3 +1,8 @@ +#[cfg(feature = "concurrency")] +use std::sync::Arc; +#[cfg(feature = "concurrency")] +use std::sync::Mutex; + use itertools::FoldWhile::{Continue, Done}; use itertools::Itertools; use starknet_api::core::ClassHash; @@ -5,6 +10,8 @@ use thiserror::Error; use crate::blockifier::config::TransactionExecutorConfig; use crate::bouncer::{Bouncer, BouncerWeights}; +#[cfg(feature = "concurrency")] +use crate::concurrency::worker_logic::WorkerExecutor; use crate::context::BlockContext; use crate::state::cached_state::{CachedState, CommitmentStateDiff, TransactionalState}; use crate::state::errors::StateError; @@ -104,41 +111,6 @@ impl TransactionExecutor { } } - /// Executes the given transactions on the state maintained by the executor. - /// Stops if and when there is no more room in the block, and returns the executed transactions' - /// results. - pub fn execute_txs( - &mut self, - txs: &[Transaction], - charge_fee: bool, - ) -> Vec> { - if !self.config.concurrency_config.enabled { - self.execute_txs_sequentially(txs, charge_fee) - } else { - txs.chunks(self.config.concurrency_config.chunk_size) - .fold_while(Vec::new(), |mut results, chunk| { - let chunk_results = self.execute_chunk(chunk, charge_fee); - if chunk_results.len() < chunk.len() { - // Block is full. - results.extend(chunk_results); - Done(results) - } else { - results.extend(chunk_results); - Continue(results) - } - }) - .into_inner() - } - } - - pub fn execute_chunk( - &mut self, - _chunk: &[Transaction], - _charge_fee: bool, - ) -> Vec> { - todo!() - } - pub fn execute_txs_sequentially( &mut self, txs: &[Transaction], @@ -155,6 +127,15 @@ impl TransactionExecutor { results } + #[cfg(not(feature = "concurrency"))] + pub fn execute_chunk( + &mut self, + _chunk: &[Transaction], + _charge_fee: bool, + ) -> Vec> { + unimplemented!() + } + /// Returns the state diff, a list of contract class hash with the corresponding list of /// visited segment values and the block weights. pub fn finalize( @@ -188,3 +169,97 @@ impl TransactionExecutor { )) } } + +impl TransactionExecutor { + /// Executes the given transactions on the state maintained by the executor. + /// Stops if and when there is no more room in the block, and returns the executed transactions' + /// results. + pub fn execute_txs( + &mut self, + txs: &[Transaction], + charge_fee: bool, + ) -> Vec> { + if !self.config.concurrency_config.enabled { + self.execute_txs_sequentially(txs, charge_fee) + } else { + txs.chunks(self.config.concurrency_config.chunk_size) + .fold_while(Vec::new(), |mut results, chunk| { + let chunk_results = self.execute_chunk(chunk, charge_fee); + if chunk_results.len() < chunk.len() { + // Block is full. + results.extend(chunk_results); + Done(results) + } else { + results.extend(chunk_results); + Continue(results) + } + }) + .into_inner() + } + } + + #[cfg(feature = "concurrency")] + pub fn execute_chunk( + &mut self, + chunk: &[Transaction], + // TODO(barak, 01/08/2024): Make `charge_fee` a parameter of `WorkerExecutor`. + _charge_fee: bool, + ) -> Vec> { + let block_state = self.block_state.take().expect("The block state should be `Some`."); + + let worker_executor = Arc::new(WorkerExecutor::initialize( + block_state, + chunk, + &self.block_context, + Mutex::new(&mut self.bouncer), + )); + + // No thread pool implementation is needed here since we already have our scheduler. The + // initialized threads below will "busy wait" for new tasks using the `run` method until the + // chunk execution is completed, and then they will be joined together in a for loop. + // TODO(barak, 01/07/2024): Consider using tokio and spawn tasks that will be served by some + // upper level tokio thread pool (Runtime in tokio terminology). + std::thread::scope(|s| { + for _ in 0..self.config.concurrency_config.n_workers { + let worker_executor = Arc::clone(&worker_executor); + s.spawn(move || { + worker_executor.run(); + }); + } + }); + + let n_committed_txs = worker_executor.scheduler.get_n_committed_txs(); + let tx_execution_results = worker_executor + .execution_outputs + .iter() + .fold_while(Vec::new(), |mut results, execution_output| { + if results.len() >= n_committed_txs { + Done(results) + } else { + let locked_execution_output = execution_output + .lock() + .expect("Failed to lock execution output.") + .take() + .expect("Output must be ready."); + results.push( + locked_execution_output.result.map_err(TransactionExecutorError::from), + ); + Continue(results) + } + }) + .into_inner(); + + let block_state_after_commit = Arc::try_unwrap(worker_executor) + .unwrap_or_else(|_| { + panic!( + "To consume the block state, you must have only one strong reference to the \ + worker executor factory. Consider dropping objects that hold a reference to \ + it." + ) + }) + .commit_chunk_and_recover_block_state(n_committed_txs); + self.block_state.replace(block_state_after_commit); + + tx_execution_results + } +} diff --git a/crates/blockifier/src/concurrency/worker_logic.rs b/crates/blockifier/src/concurrency/worker_logic.rs index 96f3fc0535..207b002466 100644 --- a/crates/blockifier/src/concurrency/worker_logic.rs +++ b/crates/blockifier/src/concurrency/worker_logic.rs @@ -8,7 +8,7 @@ use starknet_api::hash::StarkFelt; use starknet_api::stark_felt; use starknet_api::transaction::Fee; -use super::versioned_state::VersionedStateProxy; +use super::versioned_state::{VersionedState, VersionedStateProxy}; use crate::blockifier::transaction_executor::TransactionExecutorError; use crate::bouncer::Bouncer; use crate::concurrency::fee_utils::fill_sequencer_balance_reads; @@ -64,6 +64,29 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> { WorkerExecutor { scheduler, state, chunk, execution_outputs, block_context, bouncer } } + // TODO(barak, 01/08/2024): Remove the `new` method or move it to test utils. + pub fn initialize( + state: S, + chunk: &'a [Transaction], + block_context: &'a BlockContext, + bouncer: Mutex<&'a mut Bouncer>, + ) -> Self { + let versioned_state = VersionedState::new(state); + let chunk_state = ThreadSafeVersionedState::new(versioned_state); + let scheduler = Scheduler::new(chunk.len()); + let execution_outputs = + std::iter::repeat_with(|| Mutex::new(None)).take(chunk.len()).collect(); + + WorkerExecutor { + scheduler, + state: chunk_state, + chunk, + execution_outputs, + block_context, + bouncer, + } + } + pub fn run(&self) { let mut task = Task::NoTask; loop { @@ -267,6 +290,12 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> { } } +impl<'a, U: UpdatableState> WorkerExecutor<'a, U> { + pub fn commit_chunk_and_recover_block_state(self, n_committed_txs: usize) -> U { + self.state.into_inner_state().commit_chunk_and_recover_block_state(n_committed_txs) + } +} + // Utilities. fn add_fee_to_sequencer_balance(