diff --git a/Cargo.lock b/Cargo.lock index 238d381aaae1b9..8c3c2e2a2a4b9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2856,6 +2856,7 @@ dependencies = [ "itertools 0.13.0", "regex", "reqwest 0.11.23", + "serde", "tokio", "url", ] diff --git a/aptos-move/aptos-debugger/Cargo.toml b/aptos-move/aptos-debugger/Cargo.toml index 8e83673767603b..ebed42a28f4f17 100644 --- a/aptos-move/aptos-debugger/Cargo.toml +++ b/aptos-move/aptos-debugger/Cargo.toml @@ -31,6 +31,7 @@ clap = { workspace = true } itertools = { workspace = true } regex = { workspace = true } reqwest = { workspace = true } +serde = { workspace = true } tokio = { workspace = true } url = { workspace = true } diff --git a/aptos-move/aptos-debugger/src/aptos_debugger.rs b/aptos-move/aptos-debugger/src/aptos_debugger.rs index 80428e541b845c..84946acab23a76 100644 --- a/aptos-move/aptos-debugger/src/aptos_debugger.rs +++ b/aptos-move/aptos-debugger/src/aptos_debugger.rs @@ -51,6 +51,14 @@ impl AptosDebugger { )?))) } + pub(crate) async fn get_committed_transactions( + &self, + begin: Version, + limit: u64, + ) -> anyhow::Result<(Vec, Vec)> { + self.debugger.get_committed_transactions(begin, limit).await + } + pub fn execute_transactions_at_version( &self, version: Version, @@ -161,10 +169,7 @@ impl AptosDebugger { repeat_execution_times: u64, concurrency_levels: &[usize], ) -> anyhow::Result> { - let (txns, txn_infos) = self - .debugger - .get_committed_transactions(begin, limit) - .await?; + let (txns, txn_infos) = self.get_committed_transactions(begin, limit).await?; if use_same_block_boundaries { // when going block by block, no need to worry about epoch boundaries diff --git a/aptos-move/aptos-debugger/src/benchmark_past_transactions.rs b/aptos-move/aptos-debugger/src/benchmark_past_transactions.rs new file mode 100644 index 00000000000000..8ce315432f206c --- /dev/null +++ b/aptos-move/aptos-debugger/src/benchmark_past_transactions.rs @@ -0,0 +1,501 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{aptos_debugger::AptosDebugger, common::Opts}; +use anyhow::anyhow; +use aptos_block_executor::txn_provider::default::DefaultTxnProvider; +use aptos_rest_client::Client; +use aptos_types::{ + block_executor::{ + config::{ + BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig, + BlockExecutorModuleCacheLocalConfig, + }, + transaction_slice_metadata::TransactionSliceMetadata, + }, + on_chain_config::{FeatureFlag, Features, GasScheduleV2, OnChainConfig}, + state_store::{ + errors::StateviewError, state_key::StateKey, state_storage_usage::StateStorageUsage, + state_value::StateValue, TStateView, + }, + transaction::{ + signature_verified_transaction::{ + into_signature_verified_block, SignatureVerifiedTransaction, + }, + TransactionInfo, Version, + }, +}; +use aptos_validator_interface::DebuggerStateView; +use aptos_vm::{aptos_vm::AptosVMBlockExecutor, VMBlockExecutor}; +use clap::Parser; +use serde::Serialize; +use std::{collections::HashMap, sync::Mutex, time::Instant}; +use url::Url; + +/// Config used by benchmarking. Does not allow any fallbacks or block skips, so that the execution +/// panics if anything goes wrong. +fn block_execution_config(concurrency_level: usize) -> BlockExecutorConfig { + BlockExecutorConfig { + local: BlockExecutorLocalConfig { + concurrency_level, + allow_fallback: false, + discard_failed_blocks: false, + module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), + }, + // For replay, there is no block limit. + onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), + } +} + +/// Returns the state key for on-chain config type. +fn config_state_key() -> StateKey { + StateKey::resource(T::address(), &T::struct_tag()) + .expect("Constructing state key for on-chain config must succeed") +} + +/// Fetches the config from the storage, and modifies it based on the passed function. Panics if +/// there is a storage error, config does not exist or fails to (de-)serialize. +fn config_override( + debugger_state_view: &DebuggerStateView, + override_func: F, +) -> (StateKey, StateValue) { + let state_key = config_state_key::(); + let state_value = debugger_state_view + .get_state_value(&state_key) + .unwrap_or_else(|err| { + panic!( + "Failed to fetch on-chain config for {:?}: {:?}", + state_key, err + ) + }) + .unwrap_or_else(|| panic!("On-chain config for {:?} must always exist", state_key)); + + let mut config = T::deserialize_into_config(state_value.bytes()) + .expect("On-chain config must be deserializable"); + override_func(&mut config); + let config_bytes = bcs::to_bytes(&config).expect("On-chain config must be serializable"); + + let new_state_value = state_value.map_bytes(|_| Ok(config_bytes.into())).unwrap(); + (state_key, new_state_value) +} + +/// State view used for setting up the benchmarks. Maintains a set which caches execution reads, +/// populated during preparation phase. These reads are used to create a [LocalStateView] later +/// which is used for actual benchmarking. +struct ReadRecorderStateView { + /// Captured read-set. + reads: Mutex>, + /// Remote state view for the specified version. + debugger_state_view: DebuggerStateView, +} + +impl TStateView for ReadRecorderStateView { + type Key = StateKey; + + fn get_state_value(&self, state_key: &Self::Key) -> Result, StateviewError> { + // Check the read-set first. + if let Some(state_value) = self.reads.lock().unwrap().get(state_key) { + return Ok(Some(state_value.clone())); + } + + // We do not allow failures because then benchmarking will not be correct (we miss a read). + // Plus, these failures should not happen when replaying past transactions. + let maybe_state_value = self + .debugger_state_view + .get_state_value(state_key) + .unwrap_or_else(|err| { + panic!("Failed to fetch state value for {:?}: {:?}", state_key, err) + }); + + // Populate the read-set if first access. + let mut reads = self.reads.lock().unwrap(); + if !reads.contains_key(state_key) { + if let Some(state_value) = &maybe_state_value { + reads.insert(state_key.clone(), state_value.clone()); + } + } + drop(reads); + + Ok(maybe_state_value) + } + + fn get_usage(&self) -> Result { + unreachable!("Should not be called when benchmarking") + } +} + +/// Immutable state view used by each [Workload]. +struct LocalStateView { + data: HashMap, +} + +impl TStateView for LocalStateView { + type Key = StateKey; + + fn get_state_value(&self, state_key: &Self::Key) -> Result, StateviewError> { + Ok(self.data.get(state_key).cloned()) + } + + fn get_usage(&self) -> Result { + unreachable!("Should not be called when benchmarking") + } +} + +/// Information for benchmarking a single block of signature-verified transactions. Contains the +/// local state (on top of which transactions should eb applied), and the associated metadata. +struct Workload { + state_view: LocalStateView, + txn_provider: DefaultTxnProvider, + transaction_slice_metadata: TransactionSliceMetadata, +} + +impl Workload { + /// Runs executor on top of the captured state and transactions. Panics if block execution + /// fails. This function is used for the actual benchmarking. + fn run(&self, executor: &AptosVMBlockExecutor, concurrency_level: usize) { + executor + .execute_block_with_config( + &self.txn_provider, + &self.state_view, + block_execution_config(concurrency_level), + self.transaction_slice_metadata, + ) + .unwrap_or_else(|err| { + panic!( + "Block execution should not fail, but returned an error: {:?}", + err + ) + }); + } +} + +struct AptosBenchmarkRunner { + debugger: AptosDebugger, +} + +impl AptosBenchmarkRunner { + /// Creates [ReadRecorderStateView] to generate the workloads for benchmarking. Also, overrides + /// on-chain configs based on the specified environment. + fn state_with_at_version_with_override( + &self, + version: Version, + environment_override: &EnvironmentOverride, + ) -> ReadRecorderStateView { + let debugger_state_view = self.debugger.state_view_at_version(version); + let mut state_override = HashMap::new(); + + // Enable/disable features. + let (features_state_key, features_state_value) = + config_override::(&debugger_state_view, |features| { + for feature in &environment_override.enable_features { + if features.is_enabled(*feature) { + println!("[WARN] Feature {:?} is already enabled", feature) + } + features.enable(*feature); + } + for feature in &environment_override.disable_features { + if !features.is_enabled(*feature) { + println!("[WARN] Feature {:?} is already disabled", feature) + } + features.disable(*feature); + } + }); + state_override.insert(features_state_key, features_state_value); + + // If specified, change gas feature version. With this we can test if there is a regression + // due to extra charging, for instance. We should be careful because it is possible that + // some gas parameters may not be found anymore. + if let Some(gas_feature_version_override) = environment_override.gas_feature_version { + let (gas_schedule_v2_state_key, gas_schedule_v2_state_value) = + config_override::(&debugger_state_view, |gas_schedule_v2| { + gas_schedule_v2.feature_version = gas_feature_version_override; + }); + state_override.insert(gas_schedule_v2_state_key, gas_schedule_v2_state_value); + } + + ReadRecorderStateView { + reads: Mutex::new(state_override), + debugger_state_view, + } + } + + fn generate_and_check_workload( + &self, + begin: Version, + signature_verified_txns: Vec, + txn_infos: &[TransactionInfo], + concurrency_level: usize, + environment_override: &EnvironmentOverride, + ) -> anyhow::Result { + let state_view = self.state_with_at_version_with_override(begin, environment_override); + + let end = begin + signature_verified_txns.len() as Version; + let transaction_slice_metadata = TransactionSliceMetadata::chunk(begin, end); + + // Pre-execute the block to record all reads. Should not fail. + let txn_provider = DefaultTxnProvider::new(signature_verified_txns); + let outputs = AptosVMBlockExecutor::new() + .execute_block_with_config( + &txn_provider, + &state_view, + block_execution_config(concurrency_level), + transaction_slice_metadata, + ) + .map_err(|err| { + anyhow!( + "Failed to execute block when preparing for benchmarking: {:?}", + err + ) + })? + .into_transaction_outputs_forced(); + + // Check that outputs are as expected. + for (idx, (output, txn_info)) in outputs.iter().zip(txn_infos).enumerate() { + let version = begin + idx as Version; + if let Err(err) = output.ensure_match_transaction_info(version, txn_info, None, None) { + // We do not want to fail, because the behaviour may be incompatible due to the + // fact that some configs were overridden. The user can decide how to interpret the + // mismatch. All we care is: "if config would had been different, what the runtime + // be?". An example of such a behaviour is when we run newer transactions with + // older configs, e.g., older gas feature version. + println!( + "[WARN] Output mismatch for transaction {}: {:?}", + version, err + ); + } + + // Check overrides. + let features_state_key = config_state_key::(); + let gas_schedule_v2_state_key = config_state_key::(); + + for (state_key, _) in output.write_set() { + if state_key == &features_state_key { + println!( + "[WARN] Features are being updated by transaction {}", + version + ); + } + + if environment_override.gas_feature_version.is_some() + && state_key == &gas_schedule_v2_state_key + { + println!( + "[WARN] Gas schedule is being updated by transaction {}", + version + ); + } + } + } + + // Create the workload with the state based on the read-set. + let data = state_view.reads.into_inner().unwrap(); + Ok(Workload { + state_view: LocalStateView { data }, + txn_provider, + transaction_slice_metadata, + }) + } + + async fn generate_and_check_workloads( + &self, + begin: Version, + limit: u64, + concurrency_level: usize, + environment_override: &EnvironmentOverride, + ) -> anyhow::Result> { + let (txns, txn_infos) = self + .debugger + .get_committed_transactions(begin, limit) + .await?; + + let mut workloads = vec![]; + let mut curr_version = begin; + let mut curr_block = vec![]; + + let mut num_blocks = 0; + for txn in txns { + let block_size = curr_block.len() as Version; + if txn.is_block_start() && !curr_block.is_empty() { + let i = (curr_version - begin) as usize; + let j = i + block_size as usize; + let txn_infos_slice = &txn_infos[i..j]; + + let workload = self.generate_and_check_workload( + curr_version, + into_signature_verified_block(std::mem::take(&mut curr_block)), + txn_infos_slice, + concurrency_level, + environment_override, + )?; + num_blocks += 1; + println!( + "Block {}: [{}, {}] with {} transactions", + num_blocks, + curr_version, + curr_version + block_size - 1, + block_size + ); + workloads.push(workload); + curr_version += block_size; + } + curr_block.push(txn); + } + if !curr_block.is_empty() { + let i = (curr_version - begin) as usize; + let txn_infos_slice = &txn_infos[i..]; + + let block_size = curr_block.len() as Version; + let workload = self.generate_and_check_workload( + curr_version, + into_signature_verified_block(curr_block), + txn_infos_slice, + concurrency_level, + environment_override, + )?; + num_blocks += 1; + println!( + "Block {}: [{}, {}] with {} transactions", + num_blocks, + curr_version, + curr_version + block_size - 1, + block_size + ); + workloads.push(workload); + } + println!("Workload with {} block generated ... \n", workloads.len()); + + Ok(workloads) + } + + pub async fn benchmark_past_transactions( + &self, + begin: Version, + limit: u64, + repeat_execution_times: usize, + concurrency_levels: &[usize], + environment_override: EnvironmentOverride, + ) -> anyhow::Result<()> { + let workloads = self + .generate_and_check_workloads( + begin, + limit, + *concurrency_levels + .iter() + .max() + .expect("At least one concurrency level must be provided"), + &environment_override, + ) + .await?; + + for concurrency_level in concurrency_levels { + let mut times = vec![]; + for i in 0..repeat_execution_times { + let start_time = Instant::now(); + + let executor = AptosVMBlockExecutor::new(); + for workload in &workloads { + workload.run(&executor, *concurrency_level); + } + + let time = start_time.elapsed().as_millis(); + println!( + "[{}/{}] Execution time for concurrency_level={} is {}ms", + i + 1, + repeat_execution_times, + concurrency_level, + time, + ); + times.push(time); + } + times.sort(); + + println!( + "[Summary] Execution time for concurrency_level={} is {}ms (median)\n", + concurrency_level, + times[repeat_execution_times / 2], + ); + } + + Ok(()) + } +} + +/// Overrides for different environment configs, such as feature flags, etc. +#[derive(Default)] +struct EnvironmentOverride { + enable_features: Vec, + disable_features: Vec, + gas_feature_version: Option, +} + +#[derive(Parser)] +pub struct Command { + #[clap(flatten)] + opts: Opts, + + #[clap(long)] + begin_version: u64, + + #[clap(long)] + limit: u64, + + #[clap(long)] + repeat_execution_times: Option, + + #[clap(flatten)] + environment: Option, +} + +#[derive(Parser)] +struct EnvironmentOpts { + #[clap( + long, + num_args=1.., + value_delimiter = ' ', + )] + enable_features: Vec, + + #[clap( + long, + num_args=1.., + value_delimiter = ' ', + )] + disable_features: Vec, + + #[clap(long)] + gas_feature_version: Option, +} + +impl Command { + pub async fn run(self) -> anyhow::Result<()> { + let debugger = if let Some(rest_endpoint) = self.opts.target.rest_endpoint { + AptosDebugger::rest_client(Client::new(Url::parse(&rest_endpoint)?))? + } else if let Some(db_path) = self.opts.target.db_path { + AptosDebugger::db(db_path)? + } else { + unreachable!("Must provide one target."); + }; + + let environment_override = self + .environment + .map(|opts| EnvironmentOverride { + enable_features: opts.enable_features, + disable_features: opts.disable_features, + gas_feature_version: opts.gas_feature_version, + }) + .unwrap_or_default(); + + let runner = AptosBenchmarkRunner { debugger }; + runner + .benchmark_past_transactions( + self.begin_version, + self.limit, + self.repeat_execution_times.unwrap_or(1), + &self.opts.concurrency_level, + environment_override, + ) + .await?; + + Ok(()) + } +} diff --git a/aptos-move/aptos-debugger/src/common.rs b/aptos-move/aptos-debugger/src/common.rs index 584a040fb59fae..b89863b83ae5b1 100644 --- a/aptos-move/aptos-debugger/src/common.rs +++ b/aptos-move/aptos-debugger/src/common.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{execute_past_transactions, execute_pending_block}; +use crate::{benchmark_past_transactions, execute_past_transactions, execute_pending_block}; use anyhow::Result; use clap::Parser; use std::path::PathBuf; @@ -34,6 +34,7 @@ pub struct Opts { #[derive(Parser)] pub enum Command { ExecutePastTransactions(execute_past_transactions::Command), + BenchmarkPastTransactions(benchmark_past_transactions::Command), ExecutePendingBlock(execute_pending_block::Command), } @@ -41,6 +42,7 @@ impl Command { pub async fn run(self) -> Result<()> { match self { Command::ExecutePastTransactions(cmd) => cmd.run().await, + Command::BenchmarkPastTransactions(cmd) => cmd.run().await, Command::ExecutePendingBlock(cmd) => cmd.run().await, } } diff --git a/aptos-move/aptos-debugger/src/lib.rs b/aptos-move/aptos-debugger/src/lib.rs index 1079cd1ab8e384..ed84b14ce0760b 100644 --- a/aptos-move/aptos-debugger/src/lib.rs +++ b/aptos-move/aptos-debugger/src/lib.rs @@ -3,6 +3,7 @@ pub mod aptos_debugger; pub mod bcs_txn_decoder; +mod benchmark_past_transactions; pub mod common; pub mod execute_past_transactions; pub mod execute_pending_block; diff --git a/crates/aptos-debugger/src/main.rs b/crates/aptos-debugger/src/main.rs index 61faf756345282..7288190d86b38f 100644 --- a/crates/aptos-debugger/src/main.rs +++ b/crates/aptos-debugger/src/main.rs @@ -13,7 +13,8 @@ static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; #[tokio::main] async fn main() -> Result<()> { - Logger::new().level(Level::Info).init(); + // TODO: Inline! + Logger::new().level(Level::Error).init(); let _mp = MetricsPusher::start(vec![]); Cmd::parse().run().await diff --git a/crates/aptos-logger/src/metadata.rs b/crates/aptos-logger/src/metadata.rs index aba5b7503ce615..04462bfee4808d 100644 --- a/crates/aptos-logger/src/metadata.rs +++ b/crates/aptos-logger/src/metadata.rs @@ -90,7 +90,7 @@ pub enum Level { } impl Level { - fn from_usize(idx: usize) -> Option { + pub fn from_usize(idx: usize) -> Option { let lvl = match idx { 0 => Level::Error, 1 => Level::Warn,