Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid looking for new epoch event in all transactions #15293

Merged
merged 2 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

49 changes: 40 additions & 9 deletions aptos-move/aptos-vm/src/aptos_vm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3080,13 +3080,44 @@ pub(crate) fn fetch_module_metadata_for_struct_tag(
}
}

#[test]
fn vm_thread_safe() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}

assert_send::<AptosVM>();
assert_sync::<AptosVM>();
assert_send::<MoveVmExt>();
assert_sync::<MoveVmExt>();
#[cfg(test)]
mod tests {
use crate::{move_vm_ext::MoveVmExt, AptosVM};
use aptos_types::{
account_address::AccountAddress,
account_config::{NEW_EPOCH_EVENT_MOVE_TYPE_TAG, NEW_EPOCH_EVENT_V2_MOVE_TYPE_TAG},
contract_event::ContractEvent,
event::EventKey,
};

#[test]
fn vm_thread_safe() {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}

assert_send::<AptosVM>();
assert_sync::<AptosVM>();
assert_send::<MoveVmExt>();
assert_sync::<MoveVmExt>();
}

#[test]
fn should_restart_execution_on_new_epoch() {
let new_epoch_event = ContractEvent::new_v1(
EventKey::new(0, AccountAddress::ONE),
0,
NEW_EPOCH_EVENT_MOVE_TYPE_TAG.clone(),
vec![],
);
let new_epoch_event_v2 =
ContractEvent::new_v2(NEW_EPOCH_EVENT_V2_MOVE_TYPE_TAG.clone(), vec![]);
assert!(AptosVM::should_restart_execution(&[(
new_epoch_event,
None
)]));
assert!(AptosVM::should_restart_execution(&[(
new_epoch_event_v2,
None
)]));
}
}
1 change: 1 addition & 0 deletions execution/executor-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ anyhow = { workspace = true }
aptos-crypto = { workspace = true }
aptos-drop-helper = { workspace = true }
aptos-infallible = { workspace = true }
aptos-metrics-core = { workspace = true }
aptos-scratchpad = { workspace = true }
aptos-secure-net = { workspace = true }
aptos-storage-interface = { workspace = true }
Expand Down
30 changes: 12 additions & 18 deletions execution/executor-types/src/execution_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,7 @@ impl ExecutionOutput {
) -> Self {
if is_block {
// If it's a block, ensure it ends with state checkpoint.
assert!(
next_epoch_state.is_some()
|| to_commit.is_empty() // reconfig suffix
|| to_commit.transactions.last().unwrap().is_non_reconfig_block_ending()
);
assert!(to_commit.is_empty() || to_commit.ends_with_sole_checkpoint());
} else {
// If it's not, there shouldn't be any transaction to be discarded or retried.
assert!(to_discard.is_empty() && to_retry.is_empty());
Expand Down Expand Up @@ -168,31 +164,29 @@ impl Inner {
let aborts = self
.to_commit
.iter()
.flat_map(
|(txn, output, _is_reconfig)| match output.status().status() {
Ok(execution_status) => {
if execution_status.is_success() {
None
} else {
Some(format!("{:?}: {:?}", txn, output.status()))
}
},
Err(_) => None,
.flat_map(|(txn, output)| match output.status().status() {
Ok(execution_status) => {
if execution_status.is_success() {
None
} else {
Some(format!("{:?}: {:?}", txn, output.status()))
}
},
)
Err(_) => None,
})
.collect::<Vec<_>>();

let discards_3 = self
.to_discard
.iter()
.take(3)
.map(|(txn, output, _is_reconfig)| format!("{:?}: {:?}", txn, output.status()))
.map(|(txn, output)| format!("{:?}: {:?}", txn, output.status()))
.collect::<Vec<_>>();
let retries_3 = self
.to_retry
.iter()
.take(3)
.map(|(txn, output, _is_reconfig)| format!("{:?}: {:?}", txn, output.status()))
.map(|(txn, output)| format!("{:?}: {:?}", txn, output.status()))
.collect::<Vec<_>>();

if !aborts.is_empty() || !discards_3.is_empty() || !retries_3.is_empty() {
Expand Down
1 change: 1 addition & 0 deletions execution/executor-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use std::{
mod error;
pub mod execution_output;
mod ledger_update_output;
mod metrics;
pub mod planned;
pub mod state_checkpoint_output;
pub mod state_compute_result;
Expand Down
17 changes: 17 additions & 0 deletions execution/executor-types/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_metrics_core::{exponential_buckets, register_histogram_vec, HistogramVec};
use once_cell::sync::Lazy;

pub static TIMER: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
// metric name
"aptos_executor_types_timer",
// metric description
"The time spent in seconds.",
&["name"],
exponential_buckets(/*start=*/ 1e-3, /*factor=*/ 2.0, /*count=*/ 20).unwrap(),
)
.unwrap()
});
8 changes: 6 additions & 2 deletions execution/executor-types/src/planned.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// Copyright (c) Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::metrics::TIMER;
use aptos_infallible::Mutex;
use aptos_metrics_core::TimerHelper;
use once_cell::sync::OnceCell;
use rayon::ThreadPool;
use std::{ops::Deref, sync::mpsc::Receiver};
Expand Down Expand Up @@ -40,10 +42,12 @@ impl<T> Planned<T> {
}
}

pub fn get(&self) -> &T {
pub fn get(&self, name_for_timer: Option<&str>) -> &T {
if let Some(t) = self.value.get() {
t
} else {
let _timer = name_for_timer.map(|name| TIMER.timer_with(&[name]));

let rx = self.rx.get().expect("Not planned").lock();
if self.value.get().is_none() {
let t = rx.recv().expect("Plan failed.");
Expand All @@ -58,7 +62,7 @@ impl<T> Deref for Planned<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
self.get()
self.get(None)
}
}

Expand Down
6 changes: 5 additions & 1 deletion execution/executor-types/src/state_compute_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ impl StateComputeResult {

pub fn make_chunk_commit_notification(&self) -> ChunkCommitNotification {
ChunkCommitNotification {
subscribable_events: self.execution_output.subscribable_events.clone(),
subscribable_events: self
.execution_output
.subscribable_events
.get(Some("wait_for_subscribable_events"))
.clone(),
committed_transactions: self.execution_output.to_commit.txns().to_vec(),
reconfiguration_occurred: self.execution_output.next_epoch_state.is_some(),
}
Expand Down
57 changes: 30 additions & 27 deletions execution/executor-types/src/transactions_with_output.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::metrics::TIMER;
use aptos_metrics_core::TimerHelper;
use aptos_types::transaction::{Transaction, TransactionOutput};
use itertools::izip;

#[derive(Debug, Default)]
pub struct TransactionsWithOutput {
pub transactions: Vec<Transaction>,
pub transaction_outputs: Vec<TransactionOutput>,
pub epoch_ending_flags: Vec<bool>,
pub is_reconfig: bool,
}

impl TransactionsWithOutput {
pub fn new(
transactions: Vec<Transaction>,
transaction_outputs: Vec<TransactionOutput>,
epoch_ending_flags: Vec<bool>,
is_reconfig: bool,
) -> Self {
assert_eq!(transactions.len(), transaction_outputs.len());
assert_eq!(transactions.len(), epoch_ending_flags.len());
Self {
transactions,
transaction_outputs,
epoch_ending_flags,
is_reconfig,
}
}

Expand All @@ -32,8 +33,7 @@ impl TransactionsWithOutput {

pub fn new_dummy_success(txns: Vec<Transaction>) -> Self {
let txn_outputs = vec![TransactionOutput::new_empty_success(); txns.len()];
let epoch_ending_flags = vec![false; txns.len()];
Self::new(txns, txn_outputs, epoch_ending_flags)
Self::new(txns, txn_outputs, false)
}

pub fn push(
Expand All @@ -42,9 +42,12 @@ impl TransactionsWithOutput {
transaction_output: TransactionOutput,
is_reconfig: bool,
) {
// can't add more txns after reconfig
assert!(!self.is_reconfig);

self.transactions.push(transaction);
self.transaction_outputs.push(transaction_output);
self.epoch_ending_flags.push(is_reconfig);
self.is_reconfig = is_reconfig;
}

pub fn len(&self) -> usize {
Expand All @@ -64,31 +67,31 @@ impl TransactionsWithOutput {
}

pub fn get_last_checkpoint_index(&self) -> Option<usize> {
if self.is_reconfig {
return Some(self.len() - 1);
}

(0..self.len())
.rev()
.find(|&i| Self::need_checkpoint(&self.transactions[i], self.epoch_ending_flags[i]))
.find(|&i| self.transactions[i].is_non_reconfig_block_ending())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this is not the last one? for chunks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, for state sync

}

pub fn need_checkpoint(txn: &Transaction, is_reconfig: bool) -> bool {
if is_reconfig {
return true;
}
match txn {
Transaction::BlockMetadata(_)
| Transaction::BlockMetadataExt(_)
| Transaction::UserTransaction(_)
| Transaction::ValidatorTransaction(_) => false,
Transaction::GenesisTransaction(_)
| Transaction::StateCheckpoint(_)
| Transaction::BlockEpilogue(_) => true,
}
pub fn iter(&self) -> impl Iterator<Item = (&Transaction, &TransactionOutput)> {
izip!(self.transactions.iter(), self.transaction_outputs.iter(),)
}

pub fn iter(&self) -> impl Iterator<Item = (&Transaction, &TransactionOutput, bool)> {
izip!(
self.transactions.iter(),
self.transaction_outputs.iter(),
self.epoch_ending_flags.iter().cloned()
)
pub fn ends_with_sole_checkpoint(&self) -> bool {
let _timer = TIMER.timer_with(&["ends_with_sole_checkpoint"]);
if self.is_reconfig {
!self
.txns()
.iter()
.any(Transaction::is_non_reconfig_block_ending)
} else {
self.txns()
.iter()
.position(Transaction::is_non_reconfig_block_ending)
== Some(self.len() - 1)
}
}
}
15 changes: 15 additions & 0 deletions execution/executor/src/tests/mock_vm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ pub static KEEP_STATUS: Lazy<TransactionStatus> =
pub static DISCARD_STATUS: Lazy<TransactionStatus> =
Lazy::new(|| TransactionStatus::Discard(StatusCode::INSUFFICIENT_BALANCE_FOR_TRANSACTION_FEE));

pub static RETRY_STATUS: Lazy<TransactionStatus> = Lazy::new(|| TransactionStatus::Retry);

pub struct MockVM;

impl VMBlockExecutor for MockVM {
Expand All @@ -76,7 +78,19 @@ impl VMBlockExecutor for MockVM {
let mut output_cache = HashMap::new();
let mut outputs = vec![];

let mut skip_rest = false;
for idx in 0..txn_provider.num_txns() {
if skip_rest {
outputs.push(TransactionOutput::new(
WriteSet::default(),
vec![],
0,
RETRY_STATUS.clone(),
TransactionAuxiliaryData::default(),
));
continue;
}

let txn = txn_provider.get_txn(idx as u32).expect_valid();
if matches!(txn, Transaction::StateCheckpoint(_)) {
outputs.push(TransactionOutput::new(
Expand Down Expand Up @@ -110,6 +124,7 @@ impl VMBlockExecutor for MockVM {
KEEP_STATUS.clone(),
TransactionAuxiliaryData::default(),
));
skip_rest = true;
continue;
}

Expand Down
13 changes: 4 additions & 9 deletions execution/executor/src/types/in_memory_state_calculator_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ impl InMemoryStateCalculatorV2 {

// If there are multiple checkpoints in the chunk, we only calculate the SMT (and its root
// hash) for the last one.
let last_checkpoint_index = execution_output.to_commit.get_last_checkpoint_index();
let last_checkpoint_index = {
let _timer = OTHER_TIMERS.timer_with(&["get_last_checkpoint_index"]);
execution_output.to_commit.get_last_checkpoint_index()
};

Self::calculate_impl(
parent_state,
Expand Down Expand Up @@ -371,14 +374,6 @@ impl InMemoryStateCalculatorV2 {
"Base state is corrupted, updates_since_base is not empty at a checkpoint."
);

for (i, (txn, _txn_out, is_reconfig)) in to_commit.iter().enumerate() {
ensure!(
TransactionsWithOutput::need_checkpoint(txn, is_reconfig) ^ (i != num_txns - 1),
"Checkpoint is allowed iff it's the last txn in the block. index: {i}, num_txns: {num_txns}, is_last: {}, txn: {txn:?}, is_reconfig: {}",
i == num_txns - 1,
is_reconfig,
);
}
Ok(())
}
}
Loading
Loading