Skip to content
This repository has been archived by the owner on Aug 21, 2024. It is now read-only.

Commit

Permalink
feat(concurrency): add thread logic method (#1863)
Browse files Browse the repository at this point in the history
  • Loading branch information
noaov1 authored May 8, 2024
1 parent 3a6c5f2 commit 7be4d53
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 4 deletions.
1 change: 1 addition & 0 deletions crates/blockifier/src/concurrency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ pub mod scheduler;
pub mod test_utils;
pub mod versioned_state_proxy;
pub mod versioned_storage;
pub mod worker_logic;

type TxIndex = usize;
9 changes: 5 additions & 4 deletions crates/blockifier/src/concurrency/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ impl Scheduler {
}

/// Returns the done marker.
pub fn done(&self) -> bool {
fn done(&self) -> bool {
self.done_marker.load(Ordering::Acquire)
}

Expand Down Expand Up @@ -97,17 +97,18 @@ impl Scheduler {
/// Updates the Scheduler that a validation task has been finished and triggers the creation of
/// new tasks in case of failure: schedules validation for higher transactions + re-executes the
/// current transaction (if ready).
pub fn finish_validation(&self, tx_index: TxIndex, aborted: bool) -> Option<Task> {
pub fn finish_validation(&self, tx_index: TxIndex, aborted: bool) -> Task {
if aborted {
self.set_ready_status(tx_index);
if self.execution_index.load(Ordering::Acquire) > tx_index
&& self.try_incarnate(tx_index)
{
return Some(Task::ExecutionTask(tx_index));
return Task::ExecutionTask(tx_index);
}
}
self.safe_decrement_n_active_tasks();
None

Task::NoTask
}

/// Checks if all transactions have been executed and validated.
Expand Down
6 changes: 6 additions & 0 deletions crates/blockifier/src/concurrency/versioned_state_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ impl<S: StateReader> ThreadSafeVersionedState<S> {
}
}

impl<S: StateReader> Clone for ThreadSafeVersionedState<S> {
fn clone(&self) -> Self {
ThreadSafeVersionedState(Arc::clone(&self.0))
}
}

pub struct VersionedStateProxy<S: StateReader> {
pub tx_index: TxIndex,
pub state: Arc<Mutex<VersionedState<S>>>,
Expand Down
58 changes: 58 additions & 0 deletions crates/blockifier/src/concurrency/worker_logic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::collections::{HashMap, HashSet};
use std::sync::{Arc, Mutex};

use starknet_api::core::ClassHash;

use crate::concurrency::scheduler::{Scheduler, Task};
use crate::concurrency::versioned_state_proxy::ThreadSafeVersionedState;
use crate::concurrency::TxIndex;
use crate::context::BlockContext;
use crate::state::cached_state::StateMaps;
use crate::state::state_api::StateReader;
use crate::transaction::objects::{TransactionExecutionInfo, TransactionExecutionResult};
use crate::transaction::transaction_execution::Transaction;

pub struct ConcurrentExecutionContext<S: StateReader> {
pub scheduler: Scheduler,
pub state: ThreadSafeVersionedState<S>,
pub chunk: Box<[Transaction]>,
pub execution_outputs: Box<[Mutex<ExecutionTaskOutput>]>,
pub block_context: BlockContext,
}

pub struct ExecutionTaskOutput {
pub reads: StateMaps,
pub writes: StateMaps,
pub visited_pcs: HashMap<ClassHash, HashSet<usize>>,
pub result: TransactionExecutionResult<TransactionExecutionInfo>,
}

// TODO(Noa, 15/05/2024): Re-consider the necessity of the Arc (as opposed to a reference), given
// concurrent code.
pub fn run<S: StateReader>(execution_context: Arc<ConcurrentExecutionContext<S>>) {
let scheduler = &execution_context.scheduler;
let mut task = Task::NoTask;
loop {
task = match task {
Task::ExecutionTask(tx_index) => {
execute(&execution_context, tx_index);
Task::NoTask
}
Task::ValidationTask(tx_index) => validate(&execution_context, tx_index),
Task::NoTask => scheduler.next_task(),
Task::Done => break,
};
}
}

fn execute<S: StateReader>(_execution_context: &ConcurrentExecutionContext<S>, _tx_index: TxIndex) {
// TODO(Noa, 15/05/2024): share code with `try_commit`.
todo!();
}

fn validate<S: StateReader>(
_execution_context: &ConcurrentExecutionContext<S>,
_tx_index: TxIndex,
) -> Task {
todo!();
}

0 comments on commit 7be4d53

Please sign in to comment.