From be1426c74290da38c0b72d54e841d50490d234c9 Mon Sep 17 00:00:00 2001 From: aldenhu Date: Thu, 14 Nov 2024 22:43:43 +0000 Subject: [PATCH] Avoid looking for new epoch event in all transactions --- Cargo.lock | 1 + aptos-move/aptos-vm/src/aptos_vm.rs | 49 ++++++++-- execution/executor-types/Cargo.toml | 1 + .../executor-types/src/execution_output.rs | 30 +++--- execution/executor-types/src/lib.rs | 1 + execution/executor-types/src/metrics.rs | 17 ++++ execution/executor-types/src/planned.rs | 8 +- .../src/state_compute_result.rs | 6 +- .../src/transactions_with_output.rs | 57 ++++++----- execution/executor/src/tests/mock_vm/mod.rs | 15 +++ .../types/in_memory_state_calculator_v2.rs | 13 +-- .../src/workflow/do_get_execution_output.rs | 98 ++++++------------- .../executor/src/workflow/do_ledger_update.rs | 7 +- 13 files changed, 164 insertions(+), 139 deletions(-) create mode 100644 execution/executor-types/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index 0154b689416e5..2cf11dc51314e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1600,6 +1600,7 @@ dependencies = [ "aptos-crypto", "aptos-drop-helper", "aptos-infallible", + "aptos-metrics-core", "aptos-scratchpad", "aptos-secure-net", "aptos-storage-interface", diff --git a/aptos-move/aptos-vm/src/aptos_vm.rs b/aptos-move/aptos-vm/src/aptos_vm.rs index d79ae3db81895..1291ca2223fc2 100644 --- a/aptos-move/aptos-vm/src/aptos_vm.rs +++ b/aptos-move/aptos-vm/src/aptos_vm.rs @@ -3080,13 +3080,44 @@ pub(crate) fn fetch_module_metadata_for_struct_tag( } } -#[test] -fn vm_thread_safe() { - fn assert_send() {} - fn assert_sync() {} - - assert_send::(); - assert_sync::(); - assert_send::(); - assert_sync::(); +#[cfg(test)] +mod tests { + use crate::{move_vm_ext::MoveVmExt, AptosVM}; + use aptos_types::{ + account_address::AccountAddress, + account_config::{NEW_EPOCH_EVENT_MOVE_TYPE_TAG, NEW_EPOCH_EVENT_V2_MOVE_TYPE_TAG}, + contract_event::ContractEvent, + event::EventKey, + }; + + #[test] + fn vm_thread_safe() { + fn assert_send() {} + fn assert_sync() {} + + assert_send::(); + assert_sync::(); + assert_send::(); + assert_sync::(); + } + + #[test] + fn should_restart_execution_on_new_epoch() { + let new_epoch_event = ContractEvent::new_v1( + EventKey::new(0, AccountAddress::ONE), + 0, + NEW_EPOCH_EVENT_MOVE_TYPE_TAG.clone(), + vec![], + ); + let new_epoch_event_v2 = + ContractEvent::new_v2(NEW_EPOCH_EVENT_V2_MOVE_TYPE_TAG.clone(), vec![]); + assert!(AptosVM::should_restart_execution(&[( + new_epoch_event, + None + )])); + assert!(AptosVM::should_restart_execution(&[( + new_epoch_event_v2, + None + )])); + } } diff --git a/execution/executor-types/Cargo.toml b/execution/executor-types/Cargo.toml index 4a4dbaf987799..f10419375c0fb 100644 --- a/execution/executor-types/Cargo.toml +++ b/execution/executor-types/Cargo.toml @@ -17,6 +17,7 @@ anyhow = { workspace = true } aptos-crypto = { workspace = true } aptos-drop-helper = { workspace = true } aptos-infallible = { workspace = true } +aptos-metrics-core = { workspace = true } aptos-scratchpad = { workspace = true } aptos-secure-net = { workspace = true } aptos-storage-interface = { workspace = true } diff --git a/execution/executor-types/src/execution_output.rs b/execution/executor-types/src/execution_output.rs index b9e2ac4806144..fb2d63f7b0897 100644 --- a/execution/executor-types/src/execution_output.rs +++ b/execution/executor-types/src/execution_output.rs @@ -38,11 +38,7 @@ impl ExecutionOutput { ) -> Self { if is_block { // If it's a block, ensure it ends with state checkpoint. - assert!( - next_epoch_state.is_some() - || to_commit.is_empty() // reconfig suffix - || to_commit.transactions.last().unwrap().is_non_reconfig_block_ending() - ); + assert!(to_commit.is_empty() || to_commit.ends_with_sole_checkpoint()); } else { // If it's not, there shouldn't be any transaction to be discarded or retried. assert!(to_discard.is_empty() && to_retry.is_empty()); @@ -168,31 +164,29 @@ impl Inner { let aborts = self .to_commit .iter() - .flat_map( - |(txn, output, _is_reconfig)| match output.status().status() { - Ok(execution_status) => { - if execution_status.is_success() { - None - } else { - Some(format!("{:?}: {:?}", txn, output.status())) - } - }, - Err(_) => None, + .flat_map(|(txn, output)| match output.status().status() { + Ok(execution_status) => { + if execution_status.is_success() { + None + } else { + Some(format!("{:?}: {:?}", txn, output.status())) + } }, - ) + Err(_) => None, + }) .collect::>(); let discards_3 = self .to_discard .iter() .take(3) - .map(|(txn, output, _is_reconfig)| format!("{:?}: {:?}", txn, output.status())) + .map(|(txn, output)| format!("{:?}: {:?}", txn, output.status())) .collect::>(); let retries_3 = self .to_retry .iter() .take(3) - .map(|(txn, output, _is_reconfig)| format!("{:?}: {:?}", txn, output.status())) + .map(|(txn, output)| format!("{:?}: {:?}", txn, output.status())) .collect::>(); if !aborts.is_empty() || !discards_3.is_empty() || !retries_3.is_empty() { diff --git a/execution/executor-types/src/lib.rs b/execution/executor-types/src/lib.rs index 48de1b896fa30..fab455b1c7c77 100644 --- a/execution/executor-types/src/lib.rs +++ b/execution/executor-types/src/lib.rs @@ -36,6 +36,7 @@ use std::{ mod error; pub mod execution_output; mod ledger_update_output; +mod metrics; pub mod planned; pub mod state_checkpoint_output; pub mod state_compute_result; diff --git a/execution/executor-types/src/metrics.rs b/execution/executor-types/src/metrics.rs new file mode 100644 index 0000000000000..d86cfcba3ad31 --- /dev/null +++ b/execution/executor-types/src/metrics.rs @@ -0,0 +1,17 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_metrics_core::{exponential_buckets, register_histogram_vec, HistogramVec}; +use once_cell::sync::Lazy; + +pub static TIMER: Lazy = Lazy::new(|| { + register_histogram_vec!( + // metric name + "aptos_executor_types_timer", + // metric description + "The time spent in seconds.", + &["name"], + exponential_buckets(/*start=*/ 1e-3, /*factor=*/ 2.0, /*count=*/ 20).unwrap(), + ) + .unwrap() +}); diff --git a/execution/executor-types/src/planned.rs b/execution/executor-types/src/planned.rs index e16206ce9b634..54b4145991ae3 100644 --- a/execution/executor-types/src/planned.rs +++ b/execution/executor-types/src/planned.rs @@ -1,7 +1,9 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +use crate::metrics::TIMER; use aptos_infallible::Mutex; +use aptos_metrics_core::TimerHelper; use once_cell::sync::OnceCell; use rayon::ThreadPool; use std::{ops::Deref, sync::mpsc::Receiver}; @@ -40,10 +42,12 @@ impl Planned { } } - pub fn get(&self) -> &T { + pub fn get(&self, name_for_timer: Option<&str>) -> &T { if let Some(t) = self.value.get() { t } else { + let _timer = name_for_timer.map(|name| TIMER.timer_with(&[name])); + let rx = self.rx.get().expect("Not planned").lock(); if self.value.get().is_none() { let t = rx.recv().expect("Plan failed."); @@ -58,7 +62,7 @@ impl Deref for Planned { type Target = T; fn deref(&self) -> &Self::Target { - self.get() + self.get(None) } } diff --git a/execution/executor-types/src/state_compute_result.rs b/execution/executor-types/src/state_compute_result.rs index 390818228b8da..1d5748570d4cc 100644 --- a/execution/executor-types/src/state_compute_result.rs +++ b/execution/executor-types/src/state_compute_result.rs @@ -145,7 +145,11 @@ impl StateComputeResult { pub fn make_chunk_commit_notification(&self) -> ChunkCommitNotification { ChunkCommitNotification { - subscribable_events: self.execution_output.subscribable_events.clone(), + subscribable_events: self + .execution_output + .subscribable_events + .get(Some("wait_for_subscribable_events")) + .clone(), committed_transactions: self.execution_output.to_commit.txns().to_vec(), reconfiguration_occurred: self.execution_output.next_epoch_state.is_some(), } diff --git a/execution/executor-types/src/transactions_with_output.rs b/execution/executor-types/src/transactions_with_output.rs index 54204f6608d96..bd157c35a34e5 100644 --- a/execution/executor-types/src/transactions_with_output.rs +++ b/execution/executor-types/src/transactions_with_output.rs @@ -1,6 +1,8 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +use crate::metrics::TIMER; +use aptos_metrics_core::TimerHelper; use aptos_types::transaction::{Transaction, TransactionOutput}; use itertools::izip; @@ -8,21 +10,20 @@ use itertools::izip; pub struct TransactionsWithOutput { pub transactions: Vec, pub transaction_outputs: Vec, - pub epoch_ending_flags: Vec, + pub is_reconfig: bool, } impl TransactionsWithOutput { pub fn new( transactions: Vec, transaction_outputs: Vec, - epoch_ending_flags: Vec, + is_reconfig: bool, ) -> Self { assert_eq!(transactions.len(), transaction_outputs.len()); - assert_eq!(transactions.len(), epoch_ending_flags.len()); Self { transactions, transaction_outputs, - epoch_ending_flags, + is_reconfig, } } @@ -32,8 +33,7 @@ impl TransactionsWithOutput { pub fn new_dummy_success(txns: Vec) -> Self { let txn_outputs = vec![TransactionOutput::new_empty_success(); txns.len()]; - let epoch_ending_flags = vec![false; txns.len()]; - Self::new(txns, txn_outputs, epoch_ending_flags) + Self::new(txns, txn_outputs, false) } pub fn push( @@ -42,9 +42,12 @@ impl TransactionsWithOutput { transaction_output: TransactionOutput, is_reconfig: bool, ) { + // can't add more txns after reconfig + assert!(!self.is_reconfig); + self.transactions.push(transaction); self.transaction_outputs.push(transaction_output); - self.epoch_ending_flags.push(is_reconfig); + self.is_reconfig = is_reconfig; } pub fn len(&self) -> usize { @@ -64,31 +67,31 @@ impl TransactionsWithOutput { } pub fn get_last_checkpoint_index(&self) -> Option { + if self.is_reconfig { + return Some(self.len() - 1); + } + (0..self.len()) .rev() - .find(|&i| Self::need_checkpoint(&self.transactions[i], self.epoch_ending_flags[i])) + .find(|&i| self.transactions[i].is_non_reconfig_block_ending()) } - pub fn need_checkpoint(txn: &Transaction, is_reconfig: bool) -> bool { - if is_reconfig { - return true; - } - match txn { - Transaction::BlockMetadata(_) - | Transaction::BlockMetadataExt(_) - | Transaction::UserTransaction(_) - | Transaction::ValidatorTransaction(_) => false, - Transaction::GenesisTransaction(_) - | Transaction::StateCheckpoint(_) - | Transaction::BlockEpilogue(_) => true, - } + pub fn iter(&self) -> impl Iterator { + izip!(self.transactions.iter(), self.transaction_outputs.iter(),) } - pub fn iter(&self) -> impl Iterator { - izip!( - self.transactions.iter(), - self.transaction_outputs.iter(), - self.epoch_ending_flags.iter().cloned() - ) + pub fn ends_with_sole_checkpoint(&self) -> bool { + let _timer = TIMER.timer_with(&["ends_with_sole_checkpoint"]); + if self.is_reconfig { + !self + .txns() + .iter() + .any(Transaction::is_non_reconfig_block_ending) + } else { + self.txns() + .iter() + .position(Transaction::is_non_reconfig_block_ending) + == Some(self.len() - 1) + } } } diff --git a/execution/executor/src/tests/mock_vm/mod.rs b/execution/executor/src/tests/mock_vm/mod.rs index 23816899fb286..b1ab87984c7a4 100644 --- a/execution/executor/src/tests/mock_vm/mod.rs +++ b/execution/executor/src/tests/mock_vm/mod.rs @@ -57,6 +57,8 @@ pub static KEEP_STATUS: Lazy = pub static DISCARD_STATUS: Lazy = Lazy::new(|| TransactionStatus::Discard(StatusCode::INSUFFICIENT_BALANCE_FOR_TRANSACTION_FEE)); +pub static RETRY_STATUS: Lazy = Lazy::new(|| TransactionStatus::Retry); + pub struct MockVM; impl VMBlockExecutor for MockVM { @@ -76,7 +78,19 @@ impl VMBlockExecutor for MockVM { let mut output_cache = HashMap::new(); let mut outputs = vec![]; + let mut skip_rest = false; for idx in 0..txn_provider.num_txns() { + if skip_rest { + outputs.push(TransactionOutput::new( + WriteSet::default(), + vec![], + 0, + RETRY_STATUS.clone(), + TransactionAuxiliaryData::default(), + )); + continue; + } + let txn = txn_provider.get_txn(idx as u32).expect_valid(); if matches!(txn, Transaction::StateCheckpoint(_)) { outputs.push(TransactionOutput::new( @@ -110,6 +124,7 @@ impl VMBlockExecutor for MockVM { KEEP_STATUS.clone(), TransactionAuxiliaryData::default(), )); + skip_rest = true; continue; } diff --git a/execution/executor/src/types/in_memory_state_calculator_v2.rs b/execution/executor/src/types/in_memory_state_calculator_v2.rs index 31c19716fd7a2..1462bbb705aa6 100644 --- a/execution/executor/src/types/in_memory_state_calculator_v2.rs +++ b/execution/executor/src/types/in_memory_state_calculator_v2.rs @@ -49,7 +49,10 @@ impl InMemoryStateCalculatorV2 { // If there are multiple checkpoints in the chunk, we only calculate the SMT (and its root // hash) for the last one. - let last_checkpoint_index = execution_output.to_commit.get_last_checkpoint_index(); + let last_checkpoint_index = { + let _timer = OTHER_TIMERS.timer_with(&["get_last_checkpoint_index"]); + execution_output.to_commit.get_last_checkpoint_index() + }; Self::calculate_impl( parent_state, @@ -371,14 +374,6 @@ impl InMemoryStateCalculatorV2 { "Base state is corrupted, updates_since_base is not empty at a checkpoint." ); - for (i, (txn, _txn_out, is_reconfig)) in to_commit.iter().enumerate() { - ensure!( - TransactionsWithOutput::need_checkpoint(txn, is_reconfig) ^ (i != num_txns - 1), - "Checkpoint is allowed iff it's the last txn in the block. index: {i}, num_txns: {num_txns}, is_last: {}, txn: {txn:?}, is_reconfig: {}", - i == num_txns - 1, - is_reconfig, - ); - } Ok(()) } } diff --git a/execution/executor/src/workflow/do_get_execution_output.rs b/execution/executor/src/workflow/do_get_execution_output.rs index 1c4d0ee8663d0..2a7d45795a6ca 100644 --- a/execution/executor/src/workflow/do_get_execution_output.rs +++ b/execution/executor/src/workflow/do_get_execution_output.rs @@ -45,7 +45,7 @@ use aptos_types::{ }; use aptos_vm::VMBlockExecutor; use itertools::Itertools; -use std::{iter, sync::Arc}; +use std::sync::Arc; pub struct DoGetExecutionOutput; @@ -288,51 +288,31 @@ impl Parser { append_state_checkpoint_to_block: Option, ) -> Result { let _timer = OTHER_TIMERS.timer_with(&["parse_raw_output"]); + let is_block = append_state_checkpoint_to_block.is_some(); - // Parse all outputs. - let mut epoch_ending_flags = { - let _timer = OTHER_TIMERS.timer_with(&["parse_raw_output__epoch_ending_flags"]); + // Collect all statuses. + let statuses_for_input_txns = { + let _timer = OTHER_TIMERS.timer_with(&["parse_raw_output__all_statuses"]); transaction_outputs .iter() - .map(TransactionOutput::has_new_epoch_event) + .map(|t| t.status()) + .cloned() .collect_vec() }; // Isolate retries. - let (to_retry, has_reconfig) = { - let _timer = OTHER_TIMERS.timer_with(&["parse_raw_output__retries"]); - Self::extract_retries( - &mut transactions, - &mut transaction_outputs, - &mut epoch_ending_flags, - ) - }; - - // Collect all statuses. - let statuses_for_input_txns = { - let _timer = OTHER_TIMERS.timer_with(&["parse_raw_output__all_statuses"]); - let keeps_and_discards = transaction_outputs.iter().map(|t| t.status()).cloned(); - // Forcibly overwriting statuses for retries, since VM can output otherwise. - let retries = iter::repeat(TransactionStatus::Retry).take(to_retry.len()); - keeps_and_discards.chain(retries).collect() - }; + let (to_retry, has_reconfig) = + Self::extract_retries(&mut transactions, &mut transaction_outputs); // Isolate discards. - let to_discard = { - let _timer = OTHER_TIMERS.timer_with(&["parse_raw_output__discards"]); - Self::extract_discards( - &mut transactions, - &mut transaction_outputs, - &mut epoch_ending_flags, - ) - }; + let to_discard = Self::extract_discards(&mut transactions, &mut transaction_outputs); // The rest is to be committed, attach block epilogue as needed and optionally get next EpochState. let to_commit = { let _timer = OTHER_TIMERS.timer_with(&["parse_raw_output__to_commit"]); let to_commit = - TransactionsWithOutput::new(transactions, transaction_outputs, epoch_ending_flags); + TransactionsWithOutput::new(transactions, transaction_outputs, has_reconfig); Self::maybe_add_block_epilogue( to_commit, has_reconfig, @@ -380,48 +360,34 @@ impl Parser { fn extract_retries( transactions: &mut Vec, transaction_outputs: &mut Vec, - epoch_ending_flags: &mut Vec, ) -> (TransactionsWithOutput, bool) { - // N.B. off-by-1 intentionally, for exclusive index - let new_epoch_marker = epoch_ending_flags - .iter() - .rposition(|f| *f) - .map(|idx| idx + 1); + let _timer = OTHER_TIMERS.timer_with(&["parse_raw_output__retries"]); - let block_gas_limit_marker = transaction_outputs + let last_non_retry = transaction_outputs .iter() - .position(|o| matches!(o.status(), TransactionStatus::Retry)); - - // Transactions after the epoch ending txn are all to be retried. - // Transactions after the txn that exceeded per-block gas limit are also to be retried. - if let Some(pos) = new_epoch_marker { - ( - TransactionsWithOutput::new( - transactions.drain(pos..).collect(), - transaction_outputs.drain(pos..).collect(), - epoch_ending_flags.drain(pos..).collect(), - ), - true, - ) - } else if let Some(pos) = block_gas_limit_marker { - ( - TransactionsWithOutput::new( - transactions.drain(pos..).collect(), - transaction_outputs.drain(pos..).collect(), - epoch_ending_flags.drain(pos..).collect(), - ), - false, - ) + .rposition(|t| !t.status().is_retry()); + let is_reconfig = if let Some(idx) = last_non_retry { + transaction_outputs[idx].has_new_epoch_event() } else { - (TransactionsWithOutput::new_empty(), false) - } + false + }; + + let first_retry = last_non_retry.map_or(0, |pos| pos + 1); + let to_retry = TransactionsWithOutput::new( + transactions.drain(first_retry..).collect(), + transaction_outputs.drain(first_retry..).collect(), + false, // is_reconfig + ); + + (to_retry, is_reconfig) } fn extract_discards( transactions: &mut Vec, transaction_outputs: &mut Vec, - epoch_ending_flags: &mut Vec, ) -> TransactionsWithOutput { + let _timer = OTHER_TIMERS.timer_with(&["parse_raw_output__discards"]); + let to_discard = { let mut res = TransactionsWithOutput::new_empty(); for idx in 0..transactions.len() { @@ -429,25 +395,23 @@ impl Parser { res.push( transactions[idx].clone(), transaction_outputs[idx].clone(), - epoch_ending_flags[idx], + false, ); } else if !res.is_empty() { transactions[idx - res.len()] = transactions[idx].clone(); transaction_outputs[idx - res.len()] = transaction_outputs[idx].clone(); - epoch_ending_flags[idx - res.len()] = epoch_ending_flags[idx]; } } if !res.is_empty() { let remaining = transactions.len() - res.len(); transactions.truncate(remaining); transaction_outputs.truncate(remaining); - epoch_ending_flags.truncate(remaining); } res }; // Sanity check transactions with the Discard status: - to_discard.iter().for_each(|(t, o, _flag)| { + to_discard.iter().for_each(|(t, o)| { // In case a new status other than Retry, Keep and Discard is added: if !matches!(o.status(), TransactionStatus::Discard(_)) { error!("Status other than Retry, Keep or Discard; Transaction discarded."); diff --git a/execution/executor/src/workflow/do_ledger_update.rs b/execution/executor/src/workflow/do_ledger_update.rs index 0ddf54e37c737..0877e619ce75e 100644 --- a/execution/executor/src/workflow/do_ledger_update.rs +++ b/execution/executor/src/workflow/do_ledger_update.rs @@ -93,12 +93,7 @@ impl DoLedgerUpdate { writeset_hashes ) .map( - |( - (txn, txn_out, _is_reconfig), - state_checkpoint_hash, - event_root_hash, - write_set_hash, - )| { + |((txn, txn_out), state_checkpoint_hash, event_root_hash, write_set_hash)| { TransactionInfo::new( txn.hash(), write_set_hash,