Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
feat(concurrency): use the bouncer in concurrent execution (#1930)
Browse files Browse the repository at this point in the history
  • Loading branch information
avi-starkware authored Jun 4, 2024
1 parent 79b6ae8 commit 28745a3
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 10 deletions.
67 changes: 60 additions & 7 deletions crates/blockifier/src/concurrency/worker_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use starknet_api::stark_felt;
use starknet_api::transaction::Fee;

use super::versioned_state::VersionedStateProxy;
use crate::blockifier::transaction_executor::TransactionExecutorError;
use crate::bouncer::Bouncer;
use crate::concurrency::fee_utils::fill_sequencer_balance_reads;
use crate::concurrency::scheduler::{Scheduler, Task};
use crate::concurrency::utils::lock_mutex_in_array;
Expand All @@ -17,8 +19,11 @@ use crate::concurrency::TxIndex;
use crate::context::BlockContext;
use crate::execution::execution_utils::stark_felt_to_felt;
use crate::fee::fee_utils::get_sequencer_balance_keys;
use crate::state::cached_state::{ContractClassMapping, StateMaps, TransactionalState};
use crate::state::cached_state::{
ContractClassMapping, StateChanges, StateMaps, TransactionalState,
};
use crate::state::state_api::{StateReader, UpdatableState};
use crate::transaction::errors::TransactionExecutionError;
use crate::transaction::objects::{TransactionExecutionInfo, TransactionExecutionResult};
use crate::transaction::transaction_execution::Transaction;
use crate::transaction::transactions::ExecutableTransaction;
Expand All @@ -44,18 +49,20 @@ pub struct WorkerExecutor<'a, S: StateReader> {
pub chunk: &'a [Transaction],
pub execution_outputs: Box<[Mutex<Option<ExecutionTaskOutput>>]>,
pub block_context: BlockContext,
pub bouncer: Mutex<&'a mut Bouncer>,
}
impl<'a, S: StateReader> WorkerExecutor<'a, S> {
pub fn new(
state: ThreadSafeVersionedState<S>,
chunk: &'a [Transaction],
block_context: BlockContext,
bouncer: Mutex<&'a mut Bouncer>,
) -> Self {
let scheduler = Scheduler::new(chunk.len());
let execution_outputs =
std::iter::repeat_with(|| Mutex::new(None)).take(chunk.len()).collect();

WorkerExecutor { scheduler, state, chunk, execution_outputs, block_context }
WorkerExecutor { scheduler, state, chunk, execution_outputs, block_context, bouncer }
}

pub fn run(&self) {
Expand Down Expand Up @@ -192,12 +199,57 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {

// Execution is final.
let mut execution_output = lock_mutex_in_array(&self.execution_outputs, tx_index);
let result_tx_info =
let writes = &execution_output.as_ref().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR).writes;
let reads = &execution_output.as_ref().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR).reads;
let tx_state_changes_keys = StateChanges::from(writes.diff(reads)).into_keys();
let tx_result =
&mut execution_output.as_mut().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR).result;

let tx_context = Arc::new(self.block_context.to_tx_context(tx));
if let Ok(tx_info) = result_tx_info.as_mut() {
// TODO(Meshi, 01/06/2024): ask the bouncer if there is enough room for the transaction.
if let Ok(tx_execution_info) = tx_result.as_mut() {
// Ask the bouncer if there is room for the transaction in the block.
let bouncer_result = self.bouncer.lock().expect("Bouncer lock failed.").try_update(
&tx_versioned_state,
&tx_state_changes_keys,
&tx_execution_info.summarize(),
&tx_execution_info.actual_resources,
);
if let Err(error) = bouncer_result {
match error {
TransactionExecutorError::BlockFull => return false,
TransactionExecutorError::TransactionExecutionError(
TransactionExecutionError::TransactionTooLarge,
) => {
// TransactionTooLarge error - revise the execution result, delete writes
// and commit.
// TODO(Avi, 20/6/2024): Move TransactionTooLarge inside execute_raw.
let old_execution_output =
execution_output.take().expect(EXECUTION_OUTPUTS_UNWRAP_ERROR);
tx_versioned_state.delete_writes(
&old_execution_output.writes,
&old_execution_output.contract_classes,
);

*execution_output = Some(ExecutionTaskOutput {
reads: old_execution_output.reads,
writes: StateMaps::default(),
contract_classes: HashMap::default(),
visited_pcs: HashMap::default(),
result: Err(TransactionExecutionError::TransactionTooLarge),
});

// Signal to the scheduler that the execution output has been revised, so
// higher transactions should be re-validated.
self.scheduler.finish_execution_during_commit(tx_index);

return true;
}
_ => {
// TODO(Avi, 01/07/2024): Consider propagating the error.
panic!("Bouncer update failed. {error:?}: {error}");
}
}
}
// Update the sequencer balance (in state + call info).
if tx_context.tx_info.sender_address()
== self.block_context.block_info.sequencer_address
Expand All @@ -220,7 +272,8 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {
)
});

if let Some(fee_transfer_call_info) = tx_info.fee_transfer_call_info.as_mut() {
if let Some(fee_transfer_call_info) = tx_execution_info.fee_transfer_call_info.as_mut()
{
// Fix the transfer call info.
fill_sequencer_balance_reads(
fee_transfer_call_info,
Expand All @@ -231,7 +284,7 @@ impl<'a, S: StateReader> WorkerExecutor<'a, S> {
add_fee_to_sequencer_balance(
tx_context.fee_token_address(),
&mut tx_versioned_state,
tx_info.actual_fee,
tx_execution_info.actual_fee,
&self.block_context,
sequencer_balance_value_low,
sequencer_balance_value_high,
Expand Down
22 changes: 19 additions & 3 deletions crates/blockifier/src/concurrency/worker_logic_test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::sync::Mutex;

use num_bigint::BigUint;
use starknet_api::core::{ContractAddress, PatriciaKey};
Expand All @@ -9,6 +10,7 @@ use starknet_api::{contract_address, patricia_key, stark_felt};
use super::WorkerExecutor;
use crate::abi::abi_utils::get_fee_token_var_address;
use crate::abi::sierra_types::next_storage_key;
use crate::bouncer::{Bouncer, BouncerConfig};
use crate::concurrency::scheduler::{Task, TransactionStatus};
use crate::concurrency::test_utils::safe_versioned_state_for_testing;
use crate::concurrency::worker_logic::add_fee_to_sequencer_balance;
Expand Down Expand Up @@ -96,7 +98,13 @@ fn test_worker_execute() {
.map(Transaction::AccountTransaction)
.collect::<Vec<Transaction>>();

let worker_executor = WorkerExecutor::new(safe_versioned_state.clone(), &txs, block_context);
let mut bouncer = Bouncer::new(BouncerConfig::default());
let worker_executor = WorkerExecutor::new(
safe_versioned_state.clone(),
&txs,
block_context,
Mutex::new(&mut bouncer),
);

// Creates 3 execution active tasks.
worker_executor.scheduler.next_task();
Expand Down Expand Up @@ -256,7 +264,13 @@ fn test_worker_validate() {
.map(Transaction::AccountTransaction)
.collect::<Vec<Transaction>>();

let worker_executor = WorkerExecutor::new(safe_versioned_state.clone(), &txs, block_context);
let mut bouncer = Bouncer::new(BouncerConfig::default());
let worker_executor = WorkerExecutor::new(
safe_versioned_state.clone(),
&txs,
block_context,
Mutex::new(&mut bouncer),
);

// Creates 2 active tasks.
worker_executor.scheduler.next_task();
Expand Down Expand Up @@ -413,7 +427,9 @@ fn test_deploy_before_declare() {
.map(Transaction::AccountTransaction)
.collect::<Vec<Transaction>>();

let worker_executor = WorkerExecutor::new(safe_versioned_state, &txs, block_context);
let mut bouncer = Bouncer::new(BouncerConfig::default());
let worker_executor =
WorkerExecutor::new(safe_versioned_state, &txs, block_context, Mutex::new(&mut bouncer));

// Creates 2 active tasks.
worker_executor.scheduler.next_task();
Expand Down

0 comments on commit 28745a3

Please sign in to comment.