Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
feat(concurrency): implement execute_chunk (#1953)
Browse files Browse the repository at this point in the history
  • Loading branch information
barak-b-starkware authored Jun 10, 2024
1 parent 441c27f commit 7ea1c6b
Show file tree
Hide file tree
Showing 3 changed files with 149 additions and 37 deletions.
10 changes: 9 additions & 1 deletion crates/blockifier/src/blockifier/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,17 @@ pub struct TransactionExecutorConfig {
pub concurrency_config: ConcurrencyConfig,
}

#[derive(Debug, Default, Clone)]
#[derive(Debug, Clone)]
#[cfg_attr(not(feature = "concurrency"), derive(Default))]
pub struct ConcurrencyConfig {
pub enabled: bool,
pub n_workers: usize,
pub chunk_size: usize,
}

#[cfg(feature = "concurrency")]
impl Default for ConcurrencyConfig {
fn default() -> Self {
Self { enabled: true, n_workers: 4, chunk_size: 64 }
}
}
145 changes: 110 additions & 35 deletions crates/blockifier/src/blockifier/transaction_executor.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
#[cfg(feature = "concurrency")]
use std::sync::Arc;
#[cfg(feature = "concurrency")]
use std::sync::Mutex;

use itertools::FoldWhile::{Continue, Done};
use itertools::Itertools;
use starknet_api::core::ClassHash;
use thiserror::Error;

use crate::blockifier::config::TransactionExecutorConfig;
use crate::bouncer::{Bouncer, BouncerWeights};
#[cfg(feature = "concurrency")]
use crate::concurrency::worker_logic::WorkerExecutor;
use crate::context::BlockContext;
use crate::state::cached_state::{CachedState, CommitmentStateDiff, TransactionalState};
use crate::state::errors::StateError;
Expand Down Expand Up @@ -104,41 +111,6 @@ impl<S: StateReader> TransactionExecutor<S> {
}
}

/// Executes the given transactions on the state maintained by the executor.
/// Stops if and when there is no more room in the block, and returns the executed transactions'
/// results.
pub fn execute_txs(
&mut self,
txs: &[Transaction],
charge_fee: bool,
) -> Vec<TransactionExecutorResult<TransactionExecutionInfo>> {
if !self.config.concurrency_config.enabled {
self.execute_txs_sequentially(txs, charge_fee)
} else {
txs.chunks(self.config.concurrency_config.chunk_size)
.fold_while(Vec::new(), |mut results, chunk| {
let chunk_results = self.execute_chunk(chunk, charge_fee);
if chunk_results.len() < chunk.len() {
// Block is full.
results.extend(chunk_results);
Done(results)
} else {
results.extend(chunk_results);
Continue(results)
}
})
.into_inner()
}
}

pub fn execute_chunk(
&mut self,
_chunk: &[Transaction],
_charge_fee: bool,
) -> Vec<TransactionExecutorResult<TransactionExecutionInfo>> {
todo!()
}

pub fn execute_txs_sequentially(
&mut self,
txs: &[Transaction],
Expand All @@ -155,6 +127,15 @@ impl<S: StateReader> TransactionExecutor<S> {
results
}

#[cfg(not(feature = "concurrency"))]
pub fn execute_chunk(
&mut self,
_chunk: &[Transaction],
_charge_fee: bool,
) -> Vec<TransactionExecutorResult<TransactionExecutionInfo>> {
unimplemented!()
}

/// Returns the state diff, a list of contract class hash with the corresponding list of
/// visited segment values and the block weights.
pub fn finalize(
Expand Down Expand Up @@ -188,3 +169,97 @@ impl<S: StateReader> TransactionExecutor<S> {
))
}
}

impl<S: StateReader + Send + Sync> TransactionExecutor<S> {
/// Executes the given transactions on the state maintained by the executor.
/// Stops if and when there is no more room in the block, and returns the executed transactions'
/// results.
pub fn execute_txs(
&mut self,
txs: &[Transaction],
charge_fee: bool,
) -> Vec<TransactionExecutorResult<TransactionExecutionInfo>> {
if !self.config.concurrency_config.enabled {
self.execute_txs_sequentially(txs, charge_fee)
} else {
txs.chunks(self.config.concurrency_config.chunk_size)
.fold_while(Vec::new(), |mut results, chunk| {
let chunk_results = self.execute_chunk(chunk, charge_fee);
if chunk_results.len() < chunk.len() {
// Block is full.
results.extend(chunk_results);
Done(results)
} else {
results.extend(chunk_results);
Continue(results)
}
})
.into_inner()
}
}

#[cfg(feature = "concurrency")]
pub fn execute_chunk(
&mut self,
chunk: &[Transaction],
// TODO(barak, 01/08/2024): Make `charge_fee` a parameter of `WorkerExecutor`.
_charge_fee: bool,
) -> Vec<TransactionExecutorResult<TransactionExecutionInfo>> {
let block_state = self.block_state.take().expect("The block state should be `Some`.");

let worker_executor = Arc::new(WorkerExecutor::initialize(
block_state,
chunk,
&self.block_context,
Mutex::new(&mut self.bouncer),
));

// No thread pool implementation is needed here since we already have our scheduler. The
// initialized threads below will "busy wait" for new tasks using the `run` method until the
// chunk execution is completed, and then they will be joined together in a for loop.
// TODO(barak, 01/07/2024): Consider using tokio and spawn tasks that will be served by some
// upper level tokio thread pool (Runtime in tokio terminology).
std::thread::scope(|s| {
for _ in 0..self.config.concurrency_config.n_workers {
let worker_executor = Arc::clone(&worker_executor);
s.spawn(move || {
worker_executor.run();
});
}
});

let n_committed_txs = worker_executor.scheduler.get_n_committed_txs();
let tx_execution_results = worker_executor
.execution_outputs
.iter()
.fold_while(Vec::new(), |mut results, execution_output| {
if results.len() >= n_committed_txs {
Done(results)
} else {
let locked_execution_output = execution_output
.lock()
.expect("Failed to lock execution output.")
.take()
.expect("Output must be ready.");
results.push(
locked_execution_output.result.map_err(TransactionExecutorError::from),
);
Continue(results)
}
})
.into_inner();

let block_state_after_commit = Arc::try_unwrap(worker_executor)
.unwrap_or_else(|_| {
panic!(
"To consume the block state, you must have only one strong reference to the \
worker executor factory. Consider dropping objects that hold a reference to \
it."
)
})
.commit_chunk_and_recover_block_state(n_committed_txs);
self.block_state.replace(block_state_after_commit);

tx_execution_results
}
}
31 changes: 30 additions & 1 deletion crates/blockifier/src/concurrency/worker_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use starknet_api::hash::StarkFelt;
use starknet_api::stark_felt;
use starknet_api::transaction::Fee;

use super::versioned_state::VersionedStateProxy;
use super::versioned_state::{VersionedState, VersionedStateProxy};
use crate::blockifier::transaction_executor::TransactionExecutorError;
use crate::bouncer::Bouncer;
use crate::concurrency::fee_utils::fill_sequencer_balance_reads;
Expand Down Expand Up @@ -64,6 +64,29 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {
WorkerExecutor { scheduler, state, chunk, execution_outputs, block_context, bouncer }
}

// TODO(barak, 01/08/2024): Remove the `new` method or move it to test utils.
pub fn initialize(
state: S,
chunk: &'a [Transaction],
block_context: &'a BlockContext,
bouncer: Mutex<&'a mut Bouncer>,
) -> Self {
let versioned_state = VersionedState::new(state);
let chunk_state = ThreadSafeVersionedState::new(versioned_state);
let scheduler = Scheduler::new(chunk.len());
let execution_outputs =
std::iter::repeat_with(|| Mutex::new(None)).take(chunk.len()).collect();

WorkerExecutor {
scheduler,
state: chunk_state,
chunk,
execution_outputs,
block_context,
bouncer,
}
}

pub fn run(&self) {
let mut task = Task::NoTask;
loop {
Expand Down Expand Up @@ -267,6 +290,12 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {
}
}

impl<'a, U: UpdatableState> WorkerExecutor<'a, U> {
pub fn commit_chunk_and_recover_block_state(self, n_committed_txs: usize) -> U {
self.state.into_inner_state().commit_chunk_and_recover_block_state(n_committed_txs)
}
}

// Utilities.

fn add_fee_to_sequencer_balance(
Expand Down

0 comments on commit 7ea1c6b

Please sign in to comment.