Skip to content

Commit

Permalink
Avoid looking for new epoch event in all transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Nov 19, 2024
1 parent 8f02541 commit be1426c
Show file tree
Hide file tree
Showing 13 changed files with 164 additions and 139 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 40 additions & 9 deletions aptos-move/aptos-vm/src/aptos_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3080,13 +3080,44 @@ pub(crate) fn fetch_module_metadata_for_struct_tag(
}
}

#[test]
fn vm_thread_safe() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}

assert_send::<AptosVM>();
assert_sync::<AptosVM>();
assert_send::<MoveVmExt>();
assert_sync::<MoveVmExt>();
#[cfg(test)]
mod tests {
use crate::{move_vm_ext::MoveVmExt, AptosVM};
use aptos_types::{
account_address::AccountAddress,
account_config::{NEW_EPOCH_EVENT_MOVE_TYPE_TAG, NEW_EPOCH_EVENT_V2_MOVE_TYPE_TAG},
contract_event::ContractEvent,
event::EventKey,
};

#[test]
fn vm_thread_safe() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}

assert_send::<AptosVM>();
assert_sync::<AptosVM>();
assert_send::<MoveVmExt>();
assert_sync::<MoveVmExt>();
}

#[test]
fn should_restart_execution_on_new_epoch() {
let new_epoch_event = ContractEvent::new_v1(
EventKey::new(0, AccountAddress::ONE),
0,
NEW_EPOCH_EVENT_MOVE_TYPE_TAG.clone(),
vec![],
);
let new_epoch_event_v2 =
ContractEvent::new_v2(NEW_EPOCH_EVENT_V2_MOVE_TYPE_TAG.clone(), vec![]);
assert!(AptosVM::should_restart_execution(&[(
new_epoch_event,
None
)]));
assert!(AptosVM::should_restart_execution(&[(
new_epoch_event_v2,
None
)]));
}
}
1 change: 1 addition & 0 deletions execution/executor-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ anyhow = { workspace = true }
aptos-crypto = { workspace = true }
aptos-drop-helper = { workspace = true }
aptos-infallible = { workspace = true }
aptos-metrics-core = { workspace = true }
aptos-scratchpad = { workspace = true }
aptos-secure-net = { workspace = true }
aptos-storage-interface = { workspace = true }
Expand Down
30 changes: 12 additions & 18 deletions execution/executor-types/src/execution_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ impl ExecutionOutput {
) -> Self {
if is_block {
// If it's a block, ensure it ends with state checkpoint.
assert!(
next_epoch_state.is_some()
|| to_commit.is_empty() // reconfig suffix
|| to_commit.transactions.last().unwrap().is_non_reconfig_block_ending()
);
assert!(to_commit.is_empty() || to_commit.ends_with_sole_checkpoint());
} else {
// If it's not, there shouldn't be any transaction to be discarded or retried.
assert!(to_discard.is_empty() && to_retry.is_empty());
Expand Down Expand Up @@ -168,31 +164,29 @@ impl Inner {
let aborts = self
.to_commit
.iter()
.flat_map(
|(txn, output, _is_reconfig)| match output.status().status() {
Ok(execution_status) => {
if execution_status.is_success() {
None
} else {
Some(format!("{:?}: {:?}", txn, output.status()))
}
},
Err(_) => None,
.flat_map(|(txn, output)| match output.status().status() {
Ok(execution_status) => {
if execution_status.is_success() {
None
} else {
Some(format!("{:?}: {:?}", txn, output.status()))
}
},
)
Err(_) => None,
})
.collect::<Vec<_>>();

let discards_3 = self
.to_discard
.iter()
.take(3)
.map(|(txn, output, _is_reconfig)| format!("{:?}: {:?}", txn, output.status()))
.map(|(txn, output)| format!("{:?}: {:?}", txn, output.status()))
.collect::<Vec<_>>();
let retries_3 = self
.to_retry
.iter()
.take(3)
.map(|(txn, output, _is_reconfig)| format!("{:?}: {:?}", txn, output.status()))
.map(|(txn, output)| format!("{:?}: {:?}", txn, output.status()))
.collect::<Vec<_>>();

if !aborts.is_empty() || !discards_3.is_empty() || !retries_3.is_empty() {
Expand Down
1 change: 1 addition & 0 deletions execution/executor-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use std::{
mod error;
pub mod execution_output;
mod ledger_update_output;
mod metrics;
pub mod planned;
pub mod state_checkpoint_output;
pub mod state_compute_result;
Expand Down
17 changes: 17 additions & 0 deletions execution/executor-types/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_metrics_core::{exponential_buckets, register_histogram_vec, HistogramVec};
use once_cell::sync::Lazy;

pub static TIMER: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
// metric name
"aptos_executor_types_timer",
// metric description
"The time spent in seconds.",
&["name"],
exponential_buckets(/*start=*/ 1e-3, /*factor=*/ 2.0, /*count=*/ 20).unwrap(),
)
.unwrap()
});
8 changes: 6 additions & 2 deletions execution/executor-types/src/planned.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::metrics::TIMER;
use aptos_infallible::Mutex;
use aptos_metrics_core::TimerHelper;
use once_cell::sync::OnceCell;
use rayon::ThreadPool;
use std::{ops::Deref, sync::mpsc::Receiver};
Expand Down Expand Up @@ -40,10 +42,12 @@ impl<T> Planned<T> {
}
}

pub fn get(&self) -> &T {
pub fn get(&self, name_for_timer: Option<&str>) -> &T {
if let Some(t) = self.value.get() {
t
} else {
let _timer = name_for_timer.map(|name| TIMER.timer_with(&[name]));

let rx = self.rx.get().expect("Not planned").lock();
if self.value.get().is_none() {
let t = rx.recv().expect("Plan failed.");
Expand All @@ -58,7 +62,7 @@ impl<T> Deref for Planned<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
self.get()
self.get(None)
}
}

Expand Down
6 changes: 5 additions & 1 deletion execution/executor-types/src/state_compute_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ impl StateComputeResult {

pub fn make_chunk_commit_notification(&self) -> ChunkCommitNotification {
ChunkCommitNotification {
subscribable_events: self.execution_output.subscribable_events.clone(),
subscribable_events: self
.execution_output
.subscribable_events
.get(Some("wait_for_subscribable_events"))
.clone(),
committed_transactions: self.execution_output.to_commit.txns().to_vec(),
reconfiguration_occurred: self.execution_output.next_epoch_state.is_some(),
}
Expand Down
57 changes: 30 additions & 27 deletions execution/executor-types/src/transactions_with_output.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::metrics::TIMER;
use aptos_metrics_core::TimerHelper;
use aptos_types::transaction::{Transaction, TransactionOutput};
use itertools::izip;

#[derive(Debug, Default)]
pub struct TransactionsWithOutput {
pub transactions: Vec<Transaction>,
pub transaction_outputs: Vec<TransactionOutput>,
pub epoch_ending_flags: Vec<bool>,
pub is_reconfig: bool,
}

impl TransactionsWithOutput {
pub fn new(
transactions: Vec<Transaction>,
transaction_outputs: Vec<TransactionOutput>,
epoch_ending_flags: Vec<bool>,
is_reconfig: bool,
) -> Self {
assert_eq!(transactions.len(), transaction_outputs.len());
assert_eq!(transactions.len(), epoch_ending_flags.len());
Self {
transactions,
transaction_outputs,
epoch_ending_flags,
is_reconfig,
}
}

Expand All @@ -32,8 +33,7 @@ impl TransactionsWithOutput {

pub fn new_dummy_success(txns: Vec<Transaction>) -> Self {
let txn_outputs = vec![TransactionOutput::new_empty_success(); txns.len()];
let epoch_ending_flags = vec![false; txns.len()];
Self::new(txns, txn_outputs, epoch_ending_flags)
Self::new(txns, txn_outputs, false)
}

pub fn push(
Expand All @@ -42,9 +42,12 @@ impl TransactionsWithOutput {
transaction_output: TransactionOutput,
is_reconfig: bool,
) {
// can't add more txns after reconfig
assert!(!self.is_reconfig);

self.transactions.push(transaction);
self.transaction_outputs.push(transaction_output);
self.epoch_ending_flags.push(is_reconfig);
self.is_reconfig = is_reconfig;
}

pub fn len(&self) -> usize {
Expand All @@ -64,31 +67,31 @@ impl TransactionsWithOutput {
}

pub fn get_last_checkpoint_index(&self) -> Option<usize> {
if self.is_reconfig {
return Some(self.len() - 1);
}

(0..self.len())
.rev()
.find(|&i| Self::need_checkpoint(&self.transactions[i], self.epoch_ending_flags[i]))
.find(|&i| self.transactions[i].is_non_reconfig_block_ending())
}

pub fn need_checkpoint(txn: &Transaction, is_reconfig: bool) -> bool {
if is_reconfig {
return true;
}
match txn {
Transaction::BlockMetadata(_)
| Transaction::BlockMetadataExt(_)
| Transaction::UserTransaction(_)
| Transaction::ValidatorTransaction(_) => false,
Transaction::GenesisTransaction(_)
| Transaction::StateCheckpoint(_)
| Transaction::BlockEpilogue(_) => true,
}
pub fn iter(&self) -> impl Iterator<Item = (&Transaction, &TransactionOutput)> {
izip!(self.transactions.iter(), self.transaction_outputs.iter(),)
}

pub fn iter(&self) -> impl Iterator<Item = (&Transaction, &TransactionOutput, bool)> {
izip!(
self.transactions.iter(),
self.transaction_outputs.iter(),
self.epoch_ending_flags.iter().cloned()
)
pub fn ends_with_sole_checkpoint(&self) -> bool {
let _timer = TIMER.timer_with(&["ends_with_sole_checkpoint"]);
if self.is_reconfig {
!self
.txns()
.iter()
.any(Transaction::is_non_reconfig_block_ending)
} else {
self.txns()
.iter()
.position(Transaction::is_non_reconfig_block_ending)
== Some(self.len() - 1)
}
}
}
15 changes: 15 additions & 0 deletions execution/executor/src/tests/mock_vm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub static KEEP_STATUS: Lazy<TransactionStatus> =
pub static DISCARD_STATUS: Lazy<TransactionStatus> =
Lazy::new(|| TransactionStatus::Discard(StatusCode::INSUFFICIENT_BALANCE_FOR_TRANSACTION_FEE));

pub static RETRY_STATUS: Lazy<TransactionStatus> = Lazy::new(|| TransactionStatus::Retry);

pub struct MockVM;

impl VMBlockExecutor for MockVM {
Expand All @@ -76,7 +78,19 @@ impl VMBlockExecutor for MockVM {
let mut output_cache = HashMap::new();
let mut outputs = vec![];

let mut skip_rest = false;
for idx in 0..txn_provider.num_txns() {
if skip_rest {
outputs.push(TransactionOutput::new(
WriteSet::default(),
vec![],
0,
RETRY_STATUS.clone(),
TransactionAuxiliaryData::default(),
));
continue;
}

let txn = txn_provider.get_txn(idx as u32).expect_valid();
if matches!(txn, Transaction::StateCheckpoint(_)) {
outputs.push(TransactionOutput::new(
Expand Down Expand Up @@ -110,6 +124,7 @@ impl VMBlockExecutor for MockVM {
KEEP_STATUS.clone(),
TransactionAuxiliaryData::default(),
));
skip_rest = true;
continue;
}

Expand Down
13 changes: 4 additions & 9 deletions execution/executor/src/types/in_memory_state_calculator_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ impl InMemoryStateCalculatorV2 {

// If there are multiple checkpoints in the chunk, we only calculate the SMT (and its root
// hash) for the last one.
let last_checkpoint_index = execution_output.to_commit.get_last_checkpoint_index();
let last_checkpoint_index = {
let _timer = OTHER_TIMERS.timer_with(&["get_last_checkpoint_index"]);
execution_output.to_commit.get_last_checkpoint_index()
};

Self::calculate_impl(
parent_state,
Expand Down Expand Up @@ -371,14 +374,6 @@ impl InMemoryStateCalculatorV2 {
"Base state is corrupted, updates_since_base is not empty at a checkpoint."
);

for (i, (txn, _txn_out, is_reconfig)) in to_commit.iter().enumerate() {
ensure!(
TransactionsWithOutput::need_checkpoint(txn, is_reconfig) ^ (i != num_txns - 1),
"Checkpoint is allowed iff it's the last txn in the block. index: {i}, num_txns: {num_txns}, is_last: {}, txn: {txn:?}, is_reconfig: {}",
i == num_txns - 1,
is_reconfig,
);
}
Ok(())
}
}
Loading

0 comments on commit be1426c

Please sign in to comment.