Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
chore(concurrency): add execute method to the thread logic (#1882)
Browse files Browse the repository at this point in the history
  • Loading branch information
noaov1 authored May 12, 2024
1 parent 2431cd6 commit 8da582b
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 49 deletions.
2 changes: 1 addition & 1 deletion crates/blockifier/src/bouncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ macro_rules! impl_checked_sub {

pub type HashMapWrapper = HashMap<String, usize>;

#[derive(Debug, Default, PartialEq, Clone)]
#[derive(Clone, Debug, Default, PartialEq)]
pub struct BouncerConfig {
pub block_max_capacity: BouncerWeights,
pub block_max_capacity_with_keccak: BouncerWeights,
Expand Down
1 change: 1 addition & 0 deletions crates/blockifier/src/concurrency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod fee_utils;
pub mod scheduler;
#[cfg(any(feature = "testing", test))]
pub mod test_utils;
pub mod utils;
pub mod versioned_state_proxy;
pub mod versioned_storage;
pub mod worker_logic;
Expand Down
9 changes: 2 additions & 7 deletions crates/blockifier/src/concurrency/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::cmp::min;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Mutex, MutexGuard};

use crate::concurrency::utils::lock_mutex_in_array;
use crate::concurrency::TxIndex;

#[cfg(test)]
Expand Down Expand Up @@ -128,13 +129,7 @@ impl Scheduler {
}

fn lock_tx_status(&self, tx_index: TxIndex) -> MutexGuard<'_, TransactionStatus> {
self.tx_statuses[tx_index].lock().unwrap_or_else(|error| {
panic!(
"Status of transaction index {} is poisoned. Data: {:?}.",
tx_index,
*error.get_ref()
)
})
lock_mutex_in_array(&self.tx_statuses, tx_index)
}

fn set_executed_status(&self, tx_index: TxIndex) {
Expand Down
2 changes: 1 addition & 1 deletion crates/blockifier/src/concurrency/scheduler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn test_lock_tx_status() {
}

#[rstest]
#[should_panic(expected = "Status of transaction index 0 is poisoned. Data: ReadyToExecute.")]
#[should_panic(expected = "Cell of transaction index 0 is poisoned. Data: ReadyToExecute.")]
fn test_lock_tx_status_poisoned() {
let scheduler = Arc::new(Scheduler::new(DEFAULT_CHUNK_SIZE));
let scheduler_clone = scheduler.clone();
Expand Down
10 changes: 10 additions & 0 deletions crates/blockifier/src/concurrency/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use std::fmt::Debug;
use std::sync::{Mutex, MutexGuard};

use crate::concurrency::TxIndex;

pub fn lock_mutex_in_array<T: Debug>(array: &[Mutex<T>], tx_index: TxIndex) -> MutexGuard<'_, T> {
array[tx_index].lock().unwrap_or_else(|error| {
panic!("Cell of transaction index {} is poisoned. Data: {:?}.", tx_index, *error.get_ref())
})
}
6 changes: 3 additions & 3 deletions crates/blockifier/src/concurrency/versioned_state_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ impl<S: StateReader> VersionedState<S> {
// accessing this function should be protected by a mutex to ensure thread safety.
// TODO: Consider coupling the tx index with the read set to ensure any mismatch between them
// will cause the validation to fail.
fn validate_read_set(&mut self, tx_index: TxIndex, reads: &StateMaps) -> bool {
fn validate_reads(&mut self, tx_index: TxIndex, reads: &StateMaps) -> bool {
// If is the first transaction in the chunk, then the read set is valid. Since it has no
// predecessors, there's nothing to compare it to.
if tx_index == 0 {
Expand Down Expand Up @@ -170,8 +170,8 @@ impl<S: StateReader> VersionedStateProxy<S> {
self.state.lock().expect("Failed to acquire state lock.")
}

pub fn validate_read_set(&self, reads: &StateMaps) -> bool {
self.state().validate_read_set(self.tx_index, reads)
pub fn validate_reads(&self, reads: &StateMaps) -> bool {
self.state().validate_reads(self.tx_index, reads)
}

pub fn apply_writes(&self, writes: &StateMaps, class_hash_to_class: &ContractClassMapping) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ fn test_validate_read_set(
let transactional_state = CachedState::from(safe_versioned_state.pin_version(1));

// Validating tx index 0 always succeeds.
assert!(safe_versioned_state.pin_version(0).validate_read_set(&StateMaps::default()));
assert!(safe_versioned_state.pin_version(0).validate_reads(&StateMaps::default()));

assert!(transactional_state.cache.borrow().initial_reads.storage.is_empty());
transactional_state.get_storage_at(contract_address, storage_key).unwrap();
Expand All @@ -304,7 +304,7 @@ fn test_validate_read_set(
assert!(
safe_versioned_state
.pin_version(1)
.validate_read_set(&transactional_state.cache.borrow().initial_reads)
.validate_reads(&transactional_state.cache.borrow().initial_reads)
);
}

Expand Down
106 changes: 71 additions & 35 deletions crates/blockifier/src/concurrency/worker_logic.rs
Original file line number Diff line number Diff line change
@@ -1,58 +1,94 @@
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};
use std::fmt::Debug;
use std::sync::Mutex;

use starknet_api::core::ClassHash;

use crate::concurrency::scheduler::{Scheduler, Task};
use crate::concurrency::utils::lock_mutex_in_array;
use crate::concurrency::versioned_state_proxy::ThreadSafeVersionedState;
use crate::concurrency::TxIndex;
use crate::context::BlockContext;
use crate::state::cached_state::StateMaps;
use crate::state::cached_state::{CachedState, StateMaps};
use crate::state::state_api::StateReader;
use crate::transaction::objects::{TransactionExecutionInfo, TransactionExecutionResult};
use crate::transaction::transaction_execution::Transaction;
use crate::transaction::transactions::ExecutableTransaction;

pub struct ConcurrentExecutionContext<S: StateReader> {
pub scheduler: Scheduler,
pub state: ThreadSafeVersionedState<S>,
pub chunk: Box<[Transaction]>,
pub execution_outputs: Box<[Mutex<ExecutionTaskOutput>]>,
pub block_context: BlockContext,
}

#[derive(Debug)]
pub struct ExecutionTaskOutput {
pub reads: StateMaps,
pub writes: StateMaps,
pub visited_pcs: HashMap<ClassHash, HashSet<usize>>,
pub result: TransactionExecutionResult<TransactionExecutionInfo>,
}

// TODO(Noa, 15/05/2024): Re-consider the necessity of the Arc (as opposed to a reference), given
// concurrent code.
pub fn run<S: StateReader>(execution_context: Arc<ConcurrentExecutionContext<S>>) {
let scheduler = &execution_context.scheduler;
let mut task = Task::NoTask;
loop {
task = match task {
Task::ExecutionTask(tx_index) => {
execute(&execution_context, tx_index);
Task::NoTask
}
Task::ValidationTask(tx_index) => validate(&execution_context, tx_index),
Task::NoTask => scheduler.next_task(),
Task::Done => break,
};
}
pub struct WorkerExecutor<S: StateReader> {
pub scheduler: Scheduler,
pub state: ThreadSafeVersionedState<S>,
pub chunk: Box<[Transaction]>,
pub execution_outputs: Box<[Mutex<Option<ExecutionTaskOutput>>]>,
pub block_context: BlockContext,
}
impl<S: StateReader> WorkerExecutor<S> {
pub fn run(&self) {
let mut task = Task::NoTask;
loop {
task = match task {
Task::ExecutionTask(tx_index) => {
self.execute(tx_index);
Task::NoTask
}
Task::ValidationTask(tx_index) => self.validate(tx_index),
Task::NoTask => self.scheduler.next_task(),
Task::Done => break,
};
}
}

fn execute<S: StateReader>(_execution_context: &ConcurrentExecutionContext<S>, _tx_index: TxIndex) {
// TODO(Noa, 15/05/2024): share code with `try_commit`.
todo!();
}
fn execute(&self, tx_index: TxIndex) {
self.execute_tx(tx_index);
self.scheduler.finish_execution(tx_index)
}

fn execute_tx(&self, tx_index: TxIndex) {
let tx_versioned_state = self.state.pin_version(tx_index);
let tx = &self.chunk[tx_index];
// TODO(Noa, 15/05/2024): remove the redundant cached state.
let mut tx_state = CachedState::new(tx_versioned_state);
let mut transactional_state = CachedState::create_transactional(&mut tx_state);
let validate = true;
let charge_fee = true;

let execution_result =
tx.execute_raw(&mut transactional_state, &self.block_context, charge_fee, validate);

fn validate<S: StateReader>(
_execution_context: &ConcurrentExecutionContext<S>,
_tx_index: TxIndex,
) -> Task {
todo!();
if execution_result.is_ok() {
let class_hash_to_class = transactional_state.class_hash_to_class.borrow();
// TODO(Noa, 15/05/2024): use `tx_versioned_state` when we add support to transactional
// versioned state.
self.state
.pin_version(tx_index)
.apply_writes(&transactional_state.cache.borrow().writes, &class_hash_to_class);
}

// Write the transaction execution outputs.
let tx_reads_writes = transactional_state.cache.take();
// In case of a failed transaction, we don't record its writes and visited pcs.
let (writes, visited_pcs) = match execution_result {
Ok(_) => (tx_reads_writes.writes, transactional_state.visited_pcs),
Err(_) => (StateMaps::default(), HashMap::default()),
};
let mut execution_output = lock_mutex_in_array(&self.execution_outputs, tx_index);
*execution_output = Some(ExecutionTaskOutput {
reads: tx_reads_writes.initial_reads,
writes,
visited_pcs,
result: execution_result,
});
}

fn validate(&self, _tx_index: TxIndex) -> Task {
todo!();
}
}

0 comments on commit 8da582b

Please sign in to comment.