From d4c4fd83d59f1027c5f8d5b4ba5e28105cd4feff Mon Sep 17 00:00:00 2001 From: George Mitenkov Date: Tue, 19 Nov 2024 20:36:09 +0000 Subject: [PATCH 1/4] [move] Replay benchmark tool --- Cargo.lock | 20 + Cargo.toml | 2 + .../aptos-debugger/src/aptos_debugger.rs | 73 ++- aptos-move/aptos-replay-benchmark/Cargo.toml | 29 + aptos-move/aptos-replay-benchmark/src/lib.rs | 530 ++++++++++++++++++ aptos-move/aptos-replay-benchmark/src/main.rs | 93 +++ .../src/transaction_bench_state.rs | 69 +-- aptos-move/aptos-vm/src/aptos_vm.rs | 65 ++- aptos-move/aptos-vm/src/block_executor/mod.rs | 2 +- crates/aptos-logger/src/metadata.rs | 73 ++- 10 files changed, 811 insertions(+), 145 deletions(-) create mode 100644 aptos-move/aptos-replay-benchmark/Cargo.toml create mode 100644 aptos-move/aptos-replay-benchmark/src/lib.rs create mode 100644 aptos-move/aptos-replay-benchmark/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 41f1d2707b060..d9e8680db97b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3567,6 +3567,26 @@ dependencies = [ "tokio-retry", ] +[[package]] +name = "aptos-replay-benchmark" +version = "0.1.0" +dependencies = [ + "anyhow", + "aptos-block-executor", + "aptos-logger", + "aptos-move-debugger", + "aptos-push-metrics", + "aptos-rest-client", + "aptos-types", + "aptos-validator-interface", + "aptos-vm", + "bcs 0.1.4", + "clap 4.5.21", + "serde", + "tokio", + "url", +] + [[package]] name = "aptos-resource-viewer" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index b3c7a1fcfedec..30fa6add1df4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ members = [ "aptos-move/aptos-memory-usage-tracker", "aptos-move/aptos-native-interface", "aptos-move/aptos-release-builder", + "aptos-move/aptos-replay-benchmark", "aptos-move/aptos-resource-viewer", "aptos-move/aptos-sdk-builder", "aptos-move/aptos-transaction-benchmarks", @@ -419,6 +420,7 @@ aptos-push-metrics = { path = "crates/aptos-push-metrics" } aptos-rate-limiter = { path = "crates/aptos-rate-limiter" } aptos-release-builder = { path = "aptos-move/aptos-release-builder" } aptos-reliable-broadcast = { path = "crates/reliable-broadcast" } +aptos-replay-benchmark = { path = "aptos-move/aptos-replay-benchmark" } aptos-resource-viewer = { path = "aptos-move/aptos-resource-viewer" } aptos-rest-client = { path = "crates/aptos-rest-client" } aptos-retrier = { path = "crates/aptos-retrier" } diff --git a/aptos-move/aptos-debugger/src/aptos_debugger.rs b/aptos-move/aptos-debugger/src/aptos_debugger.rs index 2dc191812cc1a..a27281eceb672 100644 --- a/aptos-move/aptos-debugger/src/aptos_debugger.rs +++ b/aptos-move/aptos-debugger/src/aptos_debugger.rs @@ -1,12 +1,8 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use anyhow::{bail, format_err, Result}; -use aptos_block_executor::{ - code_cache_global_manager::AptosModuleCacheManager, - txn_commit_hook::NoOpTransactionCommitHook, - txn_provider::{default::DefaultTxnProvider, TxnProvider}, -}; +use anyhow::{bail, format_err}; +use aptos_block_executor::txn_provider::{default::DefaultTxnProvider, TxnProvider}; use aptos_gas_profiling::{GasProfiler, TransactionGasLog}; use aptos_rest_client::Client; use aptos_types::{ @@ -28,9 +24,7 @@ use aptos_validator_interface::{ AptosValidatorInterface, DBDebuggerInterface, DebuggerStateView, RestDebuggerInterface, }; use aptos_vm::{ - block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper}, - data_cache::AsMoveResolver, - AptosVM, + aptos_vm::AptosVMBlockExecutor, data_cache::AsMoveResolver, AptosVM, VMBlockExecutor, }; use aptos_vm_environment::environment::AptosEnvironment; use aptos_vm_logging::log_schema::AdapterLogSchema; @@ -47,23 +41,31 @@ impl AptosDebugger { Self { debugger } } - pub fn rest_client(rest_client: Client) -> Result { + pub fn rest_client(rest_client: Client) -> anyhow::Result { Ok(Self::new(Arc::new(RestDebuggerInterface::new(rest_client)))) } - pub fn db + Clone>(db_root_path: P) -> Result { + pub fn db + Clone>(db_root_path: P) -> anyhow::Result { Ok(Self::new(Arc::new(DBDebuggerInterface::open( db_root_path, )?))) } + pub async fn get_committed_transactions( + &self, + begin: Version, + limit: u64, + ) -> anyhow::Result<(Vec, Vec)> { + self.debugger.get_committed_transactions(begin, limit).await + } + pub fn execute_transactions_at_version( &self, version: Version, txns: Vec, repeat_execution_times: u64, concurrency_levels: &[usize], - ) -> Result> { + ) -> anyhow::Result> { let sig_verified_txns: Vec = txns.into_iter().map(|x| x.into()).collect::>(); let txn_provider = DefaultTxnProvider::new(sig_verified_txns); @@ -114,7 +116,7 @@ impl AptosDebugger { &self, version: Version, txn: SignedTransaction, - ) -> Result<(VMStatus, VMOutput, TransactionGasLog)> { + ) -> anyhow::Result<(VMStatus, VMOutput, TransactionGasLog)> { let state_view = DebuggerStateView::new(self.debugger.clone(), version); let log_context = AdapterLogSchema::new(state_view.id(), 0); let txn = txn @@ -166,11 +168,8 @@ impl AptosDebugger { use_same_block_boundaries: bool, repeat_execution_times: u64, concurrency_levels: &[usize], - ) -> Result> { - let (txns, txn_infos) = self - .debugger - .get_committed_transactions(begin, limit) - .await?; + ) -> anyhow::Result> { + let (txns, txn_infos) = self.get_committed_transactions(begin, limit).await?; if use_same_block_boundaries { // when going block by block, no need to worry about epoch boundaries @@ -238,7 +237,7 @@ impl AptosDebugger { txns: Vec, repeat_execution_times: u64, concurrency_levels: &[usize], - ) -> Result> { + ) -> anyhow::Result> { let results = self.execute_transactions_at_version( begin, txns, @@ -268,7 +267,7 @@ impl AptosDebugger { repeat_execution_times: u64, concurrency_levels: &[usize], mut txn_infos: Vec, - ) -> Result> { + ) -> anyhow::Result> { let mut ret = vec![]; while limit != 0 { println!( @@ -301,7 +300,7 @@ impl AptosDebugger { txns: Vec, repeat_execution_times: u64, concurrency_levels: &[usize], - ) -> Result> { + ) -> anyhow::Result> { let mut ret = vec![]; let mut cur = vec![]; let mut cur_version = begin; @@ -336,7 +335,7 @@ impl AptosDebugger { &self, account: AccountAddress, seq: u64, - ) -> Result> { + ) -> anyhow::Result> { self.debugger .get_version_by_account_sequence(account, seq) .await @@ -345,7 +344,7 @@ impl AptosDebugger { pub async fn get_committed_transaction_at_version( &self, version: Version, - ) -> Result<(Transaction, TransactionInfo)> { + ) -> anyhow::Result<(Transaction, TransactionInfo)> { let (mut txns, mut info) = self.debugger.get_committed_transactions(version, 1).await?; let txn = txns.pop().expect("there must be exactly 1 txn in the vec"); @@ -434,20 +433,16 @@ fn execute_block_no_limit( state_view: &DebuggerStateView, concurrency_level: usize, ) -> Result, VMStatus> { - AptosVMBlockExecutorWrapper::execute_block::< - _, - NoOpTransactionCommitHook, - DefaultTxnProvider, - >( - txn_provider, - state_view, - &AptosModuleCacheManager::new(), - BlockExecutorConfig { - local: BlockExecutorLocalConfig::default_with_concurrency_level(concurrency_level), - onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), - }, - TransactionSliceMetadata::unknown(), - None, - ) - .map(BlockOutput::into_transaction_outputs_forced) + let executor = AptosVMBlockExecutor::new(); + executor + .execute_block_with_config( + txn_provider, + state_view, + BlockExecutorConfig { + local: BlockExecutorLocalConfig::default_with_concurrency_level(concurrency_level), + onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), + }, + TransactionSliceMetadata::unknown(), + ) + .map(BlockOutput::into_transaction_outputs_forced) } diff --git a/aptos-move/aptos-replay-benchmark/Cargo.toml b/aptos-move/aptos-replay-benchmark/Cargo.toml new file mode 100644 index 0000000000000..b71b86de7a9c1 --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "aptos-replay-benchmark" +version = "0.1.0" +description = "A tool to replay and locally benchmark on-chain transactions." + +# Workspace inherited keys +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +aptos-block-executor = { workspace = true } +aptos-logger = { workspace = true } +aptos-move-debugger = { workspace = true } +aptos-push-metrics = { workspace = true } +aptos-rest-client = { workspace = true } +aptos-types = { workspace = true } +aptos-validator-interface = { workspace = true } +aptos-vm = { workspace = true } +bcs = { workspace = true } +clap = { workspace = true } +serde = { workspace = true } +tokio = { workspace = true } +url = { workspace = true } diff --git a/aptos-move/aptos-replay-benchmark/src/lib.rs b/aptos-move/aptos-replay-benchmark/src/lib.rs new file mode 100644 index 0000000000000..a1ccf967c13b8 --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/lib.rs @@ -0,0 +1,530 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::anyhow; +use aptos_block_executor::txn_provider::default::DefaultTxnProvider; +use aptos_move_debugger::aptos_debugger::AptosDebugger; +use aptos_types::{ + block_executor::{ + config::{ + BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig, + BlockExecutorModuleCacheLocalConfig, + }, + transaction_slice_metadata::TransactionSliceMetadata, + }, + on_chain_config::{FeatureFlag, Features, GasScheduleV2, OnChainConfig}, + state_store::{ + errors::StateviewError, state_key::StateKey, state_storage_usage::StateStorageUsage, + state_value::StateValue, TStateView, + }, + transaction::{ + signature_verified_transaction::{ + into_signature_verified_block, SignatureVerifiedTransaction, + }, + Transaction, TransactionInfo, TransactionOutput, Version, + }, +}; +use aptos_validator_interface::DebuggerStateView; +use aptos_vm::{aptos_vm::AptosVMBlockExecutor, VMBlockExecutor}; +use serde::Serialize; +use std::{collections::HashMap, sync::Mutex, time::Instant}; + +/// Config used by benchmarking. +fn block_execution_config(concurrency_level: usize) -> BlockExecutorConfig { + BlockExecutorConfig { + local: BlockExecutorLocalConfig { + concurrency_level, + allow_fallback: true, + discard_failed_blocks: false, + module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), + }, + // For replay, there is no block limit. + onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), + } +} + +/// Returns the state key for on-chain config type. +fn config_state_key() -> StateKey { + StateKey::resource(T::address(), &T::struct_tag()) + .expect("Constructing state key for on-chain config must succeed") +} + +/// Fetches the config from the storage, and modifies it based on the passed function. Panics if +/// there is a storage error, config does not exist or fails to (de-)serialize. +fn config_override( + debugger_state_view: &DebuggerStateView, + override_func: F, +) -> (StateKey, StateValue) { + let state_key = config_state_key::(); + let state_value = debugger_state_view + .get_state_value(&state_key) + .unwrap_or_else(|err| { + panic!( + "Failed to fetch on-chain config for {:?}: {:?}", + state_key, err + ) + }) + .unwrap_or_else(|| panic!("On-chain config for {:?} must always exist", state_key)); + + let mut config = T::deserialize_into_config(state_value.bytes()) + .expect("On-chain config must be deserializable"); + override_func(&mut config); + let config_bytes = bcs::to_bytes(&config).expect("On-chain config must be serializable"); + + let new_state_value = state_value.map_bytes(|_| Ok(config_bytes.into())).unwrap(); + (state_key, new_state_value) +} + +/// State view used for setting up the benchmarks. Maintains a set which caches execution reads, +/// populated during block generation phase. These reads are used to create a [BlockReadSet] later, +/// which is used as an input state for the benchmarking. +struct StateViewWithReadSet { + /// Captured read-set. + reads: Mutex>, + /// Remote state view for the specified version. + debugger_state_view: DebuggerStateView, +} + +impl TStateView for StateViewWithReadSet { + type Key = StateKey; + + fn get_state_value(&self, state_key: &Self::Key) -> Result, StateviewError> { + // Check the read-set first. + if let Some(state_value) = self.reads.lock().unwrap().get(state_key) { + return Ok(Some(state_value.clone())); + } + + // We do not allow failures because then benchmarking will not be correct (we miss a read). + // Plus, these failures should not happen when replaying past transactions. + let maybe_state_value = self + .debugger_state_view + .get_state_value(state_key) + .unwrap_or_else(|err| { + panic!("Failed to fetch state value for {:?}: {:?}", state_key, err) + }); + + // Populate the read-set if first access. + let mut reads = self.reads.lock().unwrap(); + if !reads.contains_key(state_key) { + if let Some(state_value) = &maybe_state_value { + reads.insert(state_key.clone(), state_value.clone()); + } + } + drop(reads); + + Ok(maybe_state_value) + } + + fn get_usage(&self) -> Result { + unreachable!("Should not be called when benchmarking") + } +} + +/// Immutable read-set used as an input state for running a block of transactions. +struct BlockReadSet { + data: HashMap, +} + +impl TStateView for BlockReadSet { + type Key = StateKey; + + fn get_state_value(&self, state_key: &Self::Key) -> Result, StateviewError> { + Ok(self.data.get(state_key).cloned()) + } + + fn get_usage(&self) -> Result { + unreachable!("Should not be called when benchmarking") + } +} + +/// A workload to benchmark. Contains signature verified transactions, and metadata specifying the +/// start and end versions of these transactions. +struct Workload { + txn_provider: DefaultTxnProvider, + transaction_slice_metadata: TransactionSliceMetadata, +} + +impl Workload { + /// Returns a new workload to execute transactions at specified version. + fn new(begin: Version, txns: Vec) -> Self { + let end = begin + txns.len() as Version; + let transaction_slice_metadata = TransactionSliceMetadata::chunk(begin, end); + + let signature_verified_txns = into_signature_verified_block(txns); + let txn_provider = DefaultTxnProvider::new(signature_verified_txns); + + Workload { + txn_provider, + transaction_slice_metadata, + } + } + + /// Returns the first transaction version in the workload. + fn first_version(&self) -> Version { + match &self.transaction_slice_metadata { + TransactionSliceMetadata::Chunk { begin, .. } => *begin, + _ => unreachable!("Transaction slice metadata is always a chunk"), + } + } + + /// Returns the last transaction version in the workload. + fn last_version(&self) -> Version { + match &self.transaction_slice_metadata { + TransactionSliceMetadata::Chunk { end, .. } => *end - 1, + _ => unreachable!("Transaction slice metadata is always a chunk"), + } + } +} + +/// Captures information for benchmarking a single block of signature-verified transactions. +struct Block { + /// Pre-execution state view to run the workload. + input: BlockReadSet, + /// Expected outputs of execution. + output: Vec, + /// Workload containing transactions to execute. + workload: Workload, +} + +impl Block { + /// Executes the workload using the provided executor and at specified concurrency level. + #[inline(always)] + fn execute(&self, executor: &AptosVMBlockExecutor, concurrency_level: usize) { + executor + .execute_block_with_config( + &self.workload.txn_provider, + &self.input, + block_execution_config(concurrency_level), + self.workload.transaction_slice_metadata, + ) + .unwrap_or_else(|err| { + panic!( + "Block execution should not fail, but returned an error: {:?}", + err + ) + }); + } + + /// Generates a new [Block] for the specified workload. During the generation, the workload is + /// pre-executed to determine the read-set, and the expected outputs. + fn generate( + workload: Workload, + state_view: StateViewWithReadSet, + concurrency_level: usize, + ) -> anyhow::Result { + let executor = AptosVMBlockExecutor::new(); + let output = executor + .execute_block_with_config( + &workload.txn_provider, + &state_view, + block_execution_config(concurrency_level), + workload.transaction_slice_metadata, + ) + .map_err(|err| anyhow!("Failed to generate block for benchmarking: {:?}", err))? + .into_transaction_outputs_forced(); + + let data = state_view.reads.into_inner().unwrap(); + Ok(Block { + input: BlockReadSet { data }, + output, + workload, + }) + } +} + +/// Represents a closed interval for transaction versions. +pub struct ClosedInterval { + begin: Version, + end: Version, +} + +impl ClosedInterval { + pub fn new(begin: Version, end: Version) -> Self { + assert!( + begin <= end, + "Transaction versions are not a valid closed interval: [{}, {}].", + begin, + end, + ); + Self { begin, end } + } +} + +/// Overrides for different environment configs, such as feature flags, etc. +#[derive(Debug)] +pub struct EnvironmentOverride { + enable_features: Vec, + disable_features: Vec, + gas_feature_version: Option, +} + +impl EnvironmentOverride { + pub fn new( + enable_features: Vec, + disable_features: Vec, + gas_feature_version: Option, + ) -> Self { + assert!( + enable_features + .iter() + .all(|f| !disable_features.contains(f)), + "Enable and disable feature flags cannot overlap" + ); + + Self { + enable_features, + disable_features, + gas_feature_version, + } + } + + fn generate_state_override( + &self, + debugger_state_view: &DebuggerStateView, + ) -> HashMap { + let mut state_override = HashMap::new(); + + // Enable/disable features. + let (features_state_key, features_state_value) = + config_override::(debugger_state_view, |features| { + for feature in &self.enable_features { + if features.is_enabled(*feature) { + println!("[WARN] Feature {:?} is already enabled", feature) + } + features.enable(*feature); + } + for feature in &self.disable_features { + if !features.is_enabled(*feature) { + println!("[WARN] Feature {:?} is already disabled", feature) + } + features.disable(*feature); + } + }); + state_override.insert(features_state_key, features_state_value); + + // Override gas feature version. + if let Some(gas_feature_version_override) = self.gas_feature_version { + let (gas_schedule_v2_state_key, gas_schedule_v2_state_value) = + config_override::(debugger_state_view, |gas_schedule_v2| { + gas_schedule_v2.feature_version = gas_feature_version_override; + }); + state_override.insert(gas_schedule_v2_state_key, gas_schedule_v2_state_value); + } + + state_override + } + + /// Checks if the output of the transaction contains writes to overridden configs. If so, + /// warnings are logged to stdout. + fn ensure_overrides_do_not_conflict(&self, version: Version, output: &TransactionOutput) { + let features_state_key = config_state_key::(); + let gas_schedule_v2_state_key = config_state_key::(); + + for (state_key, _) in output.write_set() { + if state_key == &features_state_key { + println!( + "[WARN] Features are being updated by transaction {}", + version + ); + } + + if self.gas_feature_version.is_some() && state_key == &gas_schedule_v2_state_key { + println!( + "[WARN] Gas schedule V2 is being updated by transaction {}", + version + ); + } + } + } +} + +pub struct AptosBenchmarkRunner { + /// Used to fetch transactions and transaction infos from the DB or REST endpoint. + debugger: AptosDebugger, + /// Specifies the closed interval of transaction versions to execute. + versions: ClosedInterval, + /// Different concurrency levels to benchmark. + concurrency_levels: Vec, + /// Number of times benchmark is repeated for each concurrency level. + num_repeats: usize, + /// Specifies how to override execution environment configs for each benchmark. + environment_override: EnvironmentOverride, +} + +impl AptosBenchmarkRunner { + pub fn new( + debugger: AptosDebugger, + versions: ClosedInterval, + concurrency_levels: Vec, + num_repeats: Option, + environment_override: EnvironmentOverride, + ) -> Self { + assert!( + !concurrency_levels.is_empty(), + "At least one concurrency level must be provided" + ); + + let default_num_repeats = 3; + let num_repeats = num_repeats.unwrap_or_else(|| { + println!( + "[WARN] Using default number of repeats: {}", + default_num_repeats + ); + default_num_repeats + }); + assert!( + num_repeats >= default_num_repeats, + "Number of times to repeat the benchmark should be at least the default value {}", + default_num_repeats + ); + + Self { + debugger, + versions, + concurrency_levels, + num_repeats, + environment_override, + } + } + + /// Creates [StateViewWithReadSet] to generate the workloads for benchmarking. Also, overrides + /// on-chain configs based on the specified environment. + fn state_with_at_version_with_override(&self, version: Version) -> StateViewWithReadSet { + let debugger_state_view = self.debugger.state_view_at_version(version); + let state_override = self + .environment_override + .generate_state_override(&debugger_state_view); + + StateViewWithReadSet { + reads: Mutex::new(state_override), + debugger_state_view, + } + } + + /// Generates a single [Block] for benchmarking. + fn generate_block(&self, begin: Version, txns: Vec) -> anyhow::Result { + // To generate blocks, run with maximum concurrency specified. + let concurrency_level = *self + .concurrency_levels + .iter() + .max() + .expect("At least one concurrency level must be provided"); + + let workload = Workload::new(begin, txns); + let state_view = self.state_with_at_version_with_override(begin); + + Block::generate(workload, state_view, concurrency_level) + } + + /// Generates a sequence of [Block]s for benchmarking. Block execution boundaries correspond to + /// the real boundaries on-chain. + async fn generate_blocks(&self, txns: Vec) -> anyhow::Result> { + let mut blocks = Vec::with_capacity(txns.len()); + + let mut curr_block = Vec::with_capacity(txns.len()); + let mut curr_version = self.versions.begin; + + for txn in txns { + if txn.is_block_start() && !curr_block.is_empty() { + let block_size = curr_block.len() as Version; + blocks.push(self.generate_block(curr_version, std::mem::take(&mut curr_block))?); + curr_version += block_size; + } + curr_block.push(txn); + } + + if !curr_block.is_empty() { + blocks.push(self.generate_block(curr_version, curr_block)?); + } + + Ok(blocks) + } + + /// Checks generated [Block]s against the on-chain data: + /// - Outputs should match on-chain transaction infos. + /// - Outputs should not write to overridden configs. + fn check_blocks(&self, blocks: &[Block], txn_infos: &[TransactionInfo]) -> anyhow::Result<()> { + for (idx, (output, txn_info)) in blocks + .iter() + .flat_map(|b| &b.output) + .zip(txn_infos) + .enumerate() + { + let version = self.versions.begin + idx as Version; + if let Err(err) = output.ensure_match_transaction_info(version, txn_info, None, None) { + println!("[WARN] Output mismatch: {:?}", err); + } + + self.environment_override + .ensure_overrides_do_not_conflict(version, output); + } + Ok(()) + } + + /// Logs different statistics about [Block]s: + fn analyze_blocks(&self, blocks: &[Block]) { + for (idx, block) in blocks.iter().enumerate() { + let num_txns = block.workload.txn_provider.get_txns().len(); + println!( + "Block {}: versions [{}, {}] with {} transactions", + idx + 1, + block.workload.first_version(), + block.workload.last_version(), + num_txns + ) + } + } + + /// The main entrypoint for benchmarking: for specified concurrency levels, replays blocks of + /// transactions and measures the overall time taken. Note that during execution each block + /// runs on its own state, so the execution time does not take into account block commit. + pub async fn benchmark_past_transactions(&self) -> anyhow::Result<()> { + let limit = self.versions.end - self.versions.begin + 1; + let (txns, txn_infos) = self + .debugger + .get_committed_transactions(self.versions.begin, limit) + .await?; + + println!("Generating blocks for benchmarking ..."); + let blocks = self.generate_blocks(txns).await?; + + println!("Checking generated blocks ..."); + self.check_blocks(&blocks, &txn_infos)?; + + println!("Analyzing {} generated blocks ...", blocks.len()); + self.analyze_blocks(&blocks); + + println!("Benchmarking ... \n"); + + for concurrency_level in &self.concurrency_levels { + println!("Concurrency level: {}", concurrency_level); + let mut times = Vec::with_capacity(self.num_repeats); + + for i in 0..self.num_repeats { + let start_time = Instant::now(); + + let executor = AptosVMBlockExecutor::new(); + for block in &blocks { + block.execute(&executor, *concurrency_level); + } + + let time = start_time.elapsed().as_millis(); + println!( + "[{}/{}] Execution time is {}ms", + i + 1, + self.num_repeats, + time, + ); + times.push(time); + } + times.sort(); + + println!( + "Median execution time is {}ms\n", + times[self.num_repeats / 2], + ); + } + + Ok(()) + } +} diff --git a/aptos-move/aptos-replay-benchmark/src/main.rs b/aptos-move/aptos-replay-benchmark/src/main.rs new file mode 100644 index 0000000000000..c73d7ed2ef57c --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/main.rs @@ -0,0 +1,93 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_logger::{Level, Logger}; +use aptos_move_debugger::aptos_debugger::AptosDebugger; +use aptos_push_metrics::MetricsPusher; +use aptos_replay_benchmark::{AptosBenchmarkRunner, ClosedInterval, EnvironmentOverride}; +use aptos_rest_client::Client; +use aptos_types::on_chain_config::FeatureFlag; +use clap::Parser; +use url::Url; + +#[derive(Parser)] +pub struct Command { + #[clap(long, help = "Logging level, defaults to ERROR")] + log_level: Option, + + #[clap(long, help = "Fullnode's REST API query endpoint")] + rest_endpoint: String, + + #[clap( + long, + num_args=1.., + value_delimiter = ' ', + help = "Different concurrency levels to benchmark", + )] + concurrency_levels: Vec, + + #[clap( + long, + help = "Number of times to repeat an experiment for each concurrency level" + )] + num_repeats: Option, + + #[clap(long, help = "First transaction to include for benchmarking")] + begin_version: u64, + + #[clap(long, help = "Last transaction to include for benchmarking")] + end_version: u64, + + #[clap( + long, + num_args=1.., + value_delimiter = ' ', + help = "List of space-separated feature flags to enable", + )] + enable_features: Vec, + + #[clap( + long, + num_args=1.., + value_delimiter = ' ', + help = "List of space-separated feature flags to disable", + )] + disable_features: Vec, + + #[clap(long, help = "If specified, used as the gas feature version")] + gas_feature_version: Option, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let command = Command::parse(); + + let level = command.log_level.unwrap_or(Level::Error); + Logger::new().level(level).init(); + let _mp = MetricsPusher::start(vec![]); + + let debugger = AptosDebugger::rest_client(Client::new(Url::parse(&command.rest_endpoint)?))?; + let versions = ClosedInterval::new(command.begin_version, command.end_version); + let environment_override = EnvironmentOverride::new( + command.enable_features, + command.disable_features, + command.gas_feature_version, + ); + + let runner = AptosBenchmarkRunner::new( + debugger, + versions, + command.concurrency_levels, + command.num_repeats, + environment_override, + ); + runner.benchmark_past_transactions().await?; + + Ok(()) +} + +#[test] +fn verify_tool() { + use clap::CommandFactory; + Command::command().debug_assert(); +} diff --git a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs index f250de5c60758..81db4cbded0c2 100644 --- a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs +++ b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs @@ -3,11 +3,7 @@ use crate::transactions; use aptos_bitvec::BitVec; -use aptos_block_executor::{ - code_cache_global_manager::AptosModuleCacheManager, - txn_commit_hook::NoOpTransactionCommitHook, - txn_provider::{default::DefaultTxnProvider, TxnProvider}, -}; +use aptos_block_executor::txn_provider::{default::DefaultTxnProvider, TxnProvider}; use aptos_block_partitioner::{ v2::config::PartitionerV2Config, BlockPartitioner, PartitionerConfig, }; @@ -32,15 +28,15 @@ use aptos_types::{ }, ExecutionStatus, Transaction, TransactionOutput, TransactionStatus, }, - vm_status::VMStatus, }; use aptos_vm::{ - block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper}, + aptos_vm::AptosVMBlockExecutor, data_cache::AsMoveResolver, sharded_block_executor::{ local_executor_shard::{LocalExecutorClient, LocalExecutorService}, ShardedBlockExecutor, }, + VMBlockExecutor, }; use proptest::{collection::vec, prelude::Strategy, strategy::ValueTree, test_runner::TestRunner}; use std::{net::SocketAddr, sync::Arc, time::Instant}; @@ -217,20 +213,18 @@ where ) -> (Vec, usize) { let block_size = txn_provider.num_txns(); let timer = Instant::now(); - let output = AptosVMBlockExecutorWrapper::execute_block::< - _, - NoOpTransactionCommitHook, - DefaultTxnProvider, - >( - txn_provider, - self.state_view.as_ref(), - &AptosModuleCacheManager::new(), - BlockExecutorConfig::new_maybe_block_limit(1, maybe_block_gas_limit), - TransactionSliceMetadata::unknown(), - None, - ) - .expect("VM should not fail to start") - .into_transaction_outputs_forced(); + + let executor = AptosVMBlockExecutor::new(); + let output = executor + .execute_block_with_config( + txn_provider, + self.state_view.as_ref(), + BlockExecutorConfig::new_maybe_block_limit(1, maybe_block_gas_limit), + TransactionSliceMetadata::unknown(), + ) + .expect("Sequential block execution should succeed") + .into_transaction_outputs_forced(); + let exec_time = timer.elapsed().as_millis(); (output, block_size * 1000 / exec_time as usize) @@ -263,28 +257,25 @@ where fn execute_benchmark_parallel( &self, txn_provider: &DefaultTxnProvider, - concurrency_level_per_shard: usize, + concurrency_level: usize, maybe_block_gas_limit: Option, ) -> (Vec, usize) { let block_size = txn_provider.num_txns(); let timer = Instant::now(); - let output = AptosVMBlockExecutorWrapper::execute_block::< - _, - NoOpTransactionCommitHook, - DefaultTxnProvider, - >( - txn_provider, - self.state_view.as_ref(), - &AptosModuleCacheManager::new(), - BlockExecutorConfig::new_maybe_block_limit( - concurrency_level_per_shard, - maybe_block_gas_limit, - ), - TransactionSliceMetadata::unknown(), - None, - ) - .expect("VM should not fail to start") - .into_transaction_outputs_forced(); + + let executor = AptosVMBlockExecutor::new(); + let output = executor + .execute_block_with_config( + txn_provider, + self.state_view.as_ref(), + BlockExecutorConfig::new_maybe_block_limit( + concurrency_level, + maybe_block_gas_limit, + ), + TransactionSliceMetadata::unknown(), + ) + .expect("Parallel block execution should succeed") + .into_transaction_outputs_forced(); let exec_time = timer.elapsed().as_millis(); (output, block_size * 1000 / exec_time as usize) diff --git a/aptos-move/aptos-vm/src/aptos_vm.rs b/aptos-move/aptos-vm/src/aptos_vm.rs index f2afca7412a88..14fe50cccba23 100644 --- a/aptos-move/aptos-vm/src/aptos_vm.rs +++ b/aptos-move/aptos-vm/src/aptos_vm.rs @@ -2792,35 +2792,29 @@ pub struct AptosVMBlockExecutor { module_cache_manager: AptosModuleCacheManager, } -impl VMBlockExecutor for AptosVMBlockExecutor { - fn new() -> Self { - Self { - module_cache_manager: AptosModuleCacheManager::new(), - } - } - - fn execute_block( +impl AptosVMBlockExecutor { + pub fn execute_block_with_config( &self, txn_provider: &DefaultTxnProvider, state_view: &(impl StateView + Sync), - onchain_config: BlockExecutorConfigFromOnchain, + config: BlockExecutorConfig, transaction_slice_metadata: TransactionSliceMetadata, ) -> Result, VMStatus> { - fail_point!("move_adapter::execute_block", |_| { + fail_point!("aptos_vm_block_executor::execute_block_with_config", |_| { Err(VMStatus::error( StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, None, )) }); + let log_context = AdapterLogSchema::new(state_view.id(), 0); + let num_txns = txn_provider.num_txns(); info!( log_context, - "Executing block, transaction count: {}", - txn_provider.num_txns() + "Executing block, transaction count: {}", num_txns ); - let count = txn_provider.num_txns(); - let ret = AptosVMBlockExecutorWrapper::execute_block::< + let result = AptosVMBlockExecutorWrapper::execute_block::< _, NoOpTransactionCommitHook, DefaultTxnProvider, @@ -2828,23 +2822,42 @@ impl VMBlockExecutor for AptosVMBlockExecutor { txn_provider, state_view, &self.module_cache_manager, - BlockExecutorConfig { - local: BlockExecutorLocalConfig { - concurrency_level: AptosVM::get_concurrency_level(), - allow_fallback: true, - discard_failed_blocks: AptosVM::get_discard_failed_blocks(), - module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), - }, - onchain: onchain_config, - }, + config, transaction_slice_metadata, None, ); - if ret.is_ok() { + if result.is_ok() { // Record the histogram count for transactions per block. - BLOCK_TRANSACTION_COUNT.observe(count as f64); + BLOCK_TRANSACTION_COUNT.observe(num_txns as f64); } - ret + result + } +} + +impl VMBlockExecutor for AptosVMBlockExecutor { + fn new() -> Self { + Self { + module_cache_manager: AptosModuleCacheManager::new(), + } + } + + fn execute_block( + &self, + txn_provider: &DefaultTxnProvider, + state_view: &(impl StateView + Sync), + onchain_config: BlockExecutorConfigFromOnchain, + transaction_slice_metadata: TransactionSliceMetadata, + ) -> Result, VMStatus> { + let config = BlockExecutorConfig { + local: BlockExecutorLocalConfig { + concurrency_level: AptosVM::get_concurrency_level(), + allow_fallback: true, + discard_failed_blocks: AptosVM::get_discard_failed_blocks(), + module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), + }, + onchain: onchain_config, + }; + self.execute_block_with_config(txn_provider, state_view, config, transaction_slice_metadata) } fn execute_block_sharded>( diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index d5c55eeb957c4..168b1bb4a3719 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -481,7 +481,7 @@ impl< } /// Uses shared thread pool to execute blocks. - pub fn execute_block< + pub(crate) fn execute_block< S: StateView + Sync, L: TransactionCommitHook, TP: TxnProvider + Sync, diff --git a/crates/aptos-logger/src/metadata.rs b/crates/aptos-logger/src/metadata.rs index aba5b7503ce61..2aff4dae55ff1 100644 --- a/crates/aptos-logger/src/metadata.rs +++ b/crates/aptos-logger/src/metadata.rs @@ -3,7 +3,8 @@ // SPDX-License-Identifier: Apache-2.0 use serde::{Deserialize, Serialize}; -use std::{fmt, str::FromStr}; +use std::fmt; +use strum_macros::{EnumString, FromRepr}; /// Associated metadata with every log to identify what kind of log and where it came from #[derive(Clone, Copy, Debug, Serialize, Deserialize)] @@ -60,68 +61,60 @@ impl Metadata { } } -static LOG_LEVEL_NAMES: &[&str] = &["ERROR", "WARN", "INFO", "DEBUG", "TRACE"]; - /// Logging levels, used for stratifying logs, and disabling less important ones for performance reasons #[repr(usize)] -#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Serialize, Deserialize)] +#[derive( + Copy, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + Debug, + Hash, + Serialize, + Deserialize, + FromRepr, + EnumString, +)] #[serde(rename_all = "UPPERCASE")] pub enum Level { /// The "error" level. /// /// Designates very serious errors. + #[strum(ascii_case_insensitive)] Error = 0, /// The "warn" level. /// /// Designates hazardous situations. - Warn, + #[strum(ascii_case_insensitive)] + Warn = 1, /// The "info" level. /// /// Designates useful information. - Info, + #[strum(ascii_case_insensitive)] + Info = 2, /// The "debug" level. /// /// Designates lower priority information. - Debug, + #[strum(ascii_case_insensitive)] + Debug = 3, /// The "trace" level. /// /// Designates very low priority, often extremely verbose, information. - Trace, -} - -impl Level { - fn from_usize(idx: usize) -> Option { - let lvl = match idx { - 0 => Level::Error, - 1 => Level::Warn, - 2 => Level::Info, - 3 => Level::Debug, - 4 => Level::Trace, - _ => return None, - }; - - Some(lvl) - } -} - -/// An error given when no `Level` matches the inputted string -#[derive(Debug)] -pub struct LevelParseError; - -impl FromStr for Level { - type Err = LevelParseError; - - fn from_str(level: &str) -> Result { - LOG_LEVEL_NAMES - .iter() - .position(|name| name.eq_ignore_ascii_case(level)) - .map(|idx| Level::from_usize(idx).unwrap()) - .ok_or(LevelParseError) - } + #[strum(ascii_case_insensitive)] + Trace = 4, } impl fmt::Display for Level { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.pad(LOG_LEVEL_NAMES[*self as usize]) + let level_str = match self { + Level::Error => "ERROR", + Level::Warn => "WARN", + Level::Info => "INFO", + Level::Debug => "DEBUG", + Level::Trace => "TRACE", + }; + fmt.pad(level_str) } } From 289aa44399c532f9deb0cc7c47aa8595fc3ff226 Mon Sep 17 00:00:00 2001 From: George Mitenkov Date: Sun, 24 Nov 2024 15:45:57 +0000 Subject: [PATCH 2/4] [refactor] Split between files --- Cargo.lock | 1 + aptos-move/aptos-replay-benchmark/Cargo.toml | 1 + .../aptos-replay-benchmark/src/block.rs | 115 ++++ .../aptos-replay-benchmark/src/generator.rs | 86 +++ aptos-move/aptos-replay-benchmark/src/lib.rs | 533 +----------------- aptos-move/aptos-replay-benchmark/src/main.rs | 56 +- .../aptos-replay-benchmark/src/overrides.rs | 88 +++ .../aptos-replay-benchmark/src/runner.rs | 113 ++++ .../aptos-replay-benchmark/src/state_view.rs | 82 +++ .../aptos-replay-benchmark/src/workload.rs | 65 +++ 10 files changed, 589 insertions(+), 551 deletions(-) create mode 100644 aptos-move/aptos-replay-benchmark/src/block.rs create mode 100644 aptos-move/aptos-replay-benchmark/src/generator.rs create mode 100644 aptos-move/aptos-replay-benchmark/src/overrides.rs create mode 100644 aptos-move/aptos-replay-benchmark/src/runner.rs create mode 100644 aptos-move/aptos-replay-benchmark/src/state_view.rs create mode 100644 aptos-move/aptos-replay-benchmark/src/workload.rs diff --git a/Cargo.lock b/Cargo.lock index d9e8680db97b0..f0648a1d786d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3582,6 +3582,7 @@ dependencies = [ "aptos-vm", "bcs 0.1.4", "clap 4.5.21", + "parking_lot 0.12.1", "serde", "tokio", "url", diff --git a/aptos-move/aptos-replay-benchmark/Cargo.toml b/aptos-move/aptos-replay-benchmark/Cargo.toml index b71b86de7a9c1..757e9f23da382 100644 --- a/aptos-move/aptos-replay-benchmark/Cargo.toml +++ b/aptos-move/aptos-replay-benchmark/Cargo.toml @@ -25,5 +25,6 @@ aptos-vm = { workspace = true } bcs = { workspace = true } clap = { workspace = true } serde = { workspace = true } +parking_lot = { workspace = true } tokio = { workspace = true } url = { workspace = true } diff --git a/aptos-move/aptos-replay-benchmark/src/block.rs b/aptos-move/aptos-replay-benchmark/src/block.rs new file mode 100644 index 0000000000000..19a57beac0231 --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/block.rs @@ -0,0 +1,115 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + state_view::{ReadSet, ReadSetCapturingStateView}, + workload::Workload, +}; +use anyhow::bail; +use aptos_types::{ + block_executor::config::{ + BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig, + BlockExecutorModuleCacheLocalConfig, + }, + state_store::{state_key::StateKey, state_value::StateValue, StateView}, + transaction::{TransactionOutput, Version}, +}; +use aptos_vm::{aptos_vm::AptosVMBlockExecutor, VMBlockExecutor}; +use std::collections::HashMap; + +/// Config used by benchmarking blocks. +fn block_execution_config(concurrency_level: usize) -> BlockExecutorConfig { + BlockExecutorConfig { + local: BlockExecutorLocalConfig { + concurrency_level, + allow_fallback: true, + discard_failed_blocks: false, + module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), + }, + // For replay, there is no block limit. + onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), + } +} + +pub struct Block { + inputs: ReadSet, + workload: Workload, +} + +impl Block { + pub(crate) fn new( + workload: Workload, + state_view: &(impl StateView + Sync), + state_override: HashMap, + concurrency_level: usize, + ) -> anyhow::Result { + // Execute transactions with on-chain configs. + let onchain_outputs = execute_workload( + &AptosVMBlockExecutor::new(), + &workload, + state_view, + concurrency_level, + ); + + // Check on-chain outputs do not modify state we override. If so, benchmarking results may + // not be correct. + let begin = workload.first_version(); + for (idx, output) in onchain_outputs.iter().enumerate() { + for (state_key, _) in output.write_set() { + if state_override.contains_key(state_key) { + bail!( + "Transaction {} writes to overridden state value for {:?}", + begin + idx as Version, + state_key + ); + } + } + } + + // Execute transactions, recording all reads. + let state_view = ReadSetCapturingStateView::new(state_view, state_override); + let _outputs = execute_workload( + &AptosVMBlockExecutor::new(), + &workload, + &state_view, + concurrency_level, + ); + let inputs = state_view.into_read_set(); + + // Check on-chain outputs against new outputs. We want to ensure that changes are minimal + // so that overrides do not change execution flow too much. + // Run analysis. + // TODO + + Ok(Self { inputs, workload }) + } + + /// Executes the workload for benchmarking. + #[inline(always)] + pub(crate) fn run(&self, executor: &AptosVMBlockExecutor, concurrency_level: usize) { + execute_workload(executor, &self.workload, &self.inputs, concurrency_level); + } +} + +#[inline(always)] +fn execute_workload( + executor: &AptosVMBlockExecutor, + workload: &Workload, + state_view: &(impl StateView + Sync), + concurrency_level: usize, +) -> Vec { + executor + .execute_block_with_config( + workload.txn_provider(), + state_view, + block_execution_config(concurrency_level), + workload.transaction_slice_metadata(), + ) + .unwrap_or_else(|err| { + panic!( + "Block execution should not fail, but returned an error: {:?}", + err + ) + }) + .into_transaction_outputs_forced() +} diff --git a/aptos-move/aptos-replay-benchmark/src/generator.rs b/aptos-move/aptos-replay-benchmark/src/generator.rs new file mode 100644 index 0000000000000..635e920beef08 --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/generator.rs @@ -0,0 +1,86 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{block::Block, overrides::OverrideConfig, workload::Workload}; +use aptos_move_debugger::aptos_debugger::AptosDebugger; +use aptos_types::transaction::{Transaction, Version}; + +pub struct BenchmarkGenerator { + debugger: AptosDebugger, + begin_version: Version, + end_version: Version, + override_config: OverrideConfig, +} + +impl BenchmarkGenerator { + pub fn new( + debugger: AptosDebugger, + begin_version: Version, + end_version: Version, + override_config: OverrideConfig, + ) -> Self { + assert!( + begin_version <= end_version, + "Transaction versions are not a valid closed interval: [{}, {}].", + begin_version, + end_version, + ); + + Self { + debugger, + begin_version, + end_version, + override_config, + } + } + + /// Generates a sequence of [Block] for benchmarking. + pub async fn generate_blocks(&self) -> anyhow::Result> { + let limit = self.end_version - self.begin_version + 1; + let (txns, _) = self + .debugger + .get_committed_transactions(self.begin_version, limit) + .await?; + let txn_blocks = self.partition(txns); + + let mut blocks = vec![]; + for (begin, txn_block) in txn_blocks { + blocks.push(self.generate_block(begin, txn_block)?); + } + Ok(blocks) + } + + /// Generates a single [Block] for benchmarking. + fn generate_block(&self, begin: Version, txns: Vec) -> anyhow::Result { + let workload = Workload::new(begin, txns); + + let state_view = self.debugger.state_view_at_version(begin); + let state_override = self.override_config.get_state_override(&state_view); + + let state_view = self.debugger.state_view_at_version(begin); + Block::new(workload, &state_view, state_override, 32) + } + + /// Partitions a sequence of transactions into blocks. + fn partition(&self, txns: Vec) -> Vec<(Version, Vec)> { + let mut begin_versions_and_blocks = Vec::with_capacity(txns.len()); + + let mut curr_begin = self.begin_version; + let mut curr_block = Vec::with_capacity(txns.len()); + + for txn in txns { + if txn.is_block_start() && !curr_block.is_empty() { + let block = std::mem::take(&mut curr_block); + let block_size = block.len(); + begin_versions_and_blocks.push((curr_begin, block)); + curr_begin += block_size as Version; + } + curr_block.push(txn); + } + if !curr_block.is_empty() { + begin_versions_and_blocks.push((curr_begin, curr_block)); + } + + begin_versions_and_blocks + } +} diff --git a/aptos-move/aptos-replay-benchmark/src/lib.rs b/aptos-move/aptos-replay-benchmark/src/lib.rs index a1ccf967c13b8..c29da841c780d 100644 --- a/aptos-move/aptos-replay-benchmark/src/lib.rs +++ b/aptos-move/aptos-replay-benchmark/src/lib.rs @@ -1,530 +1,9 @@ // Copyright (c) Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use anyhow::anyhow; -use aptos_block_executor::txn_provider::default::DefaultTxnProvider; -use aptos_move_debugger::aptos_debugger::AptosDebugger; -use aptos_types::{ - block_executor::{ - config::{ - BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig, - BlockExecutorModuleCacheLocalConfig, - }, - transaction_slice_metadata::TransactionSliceMetadata, - }, - on_chain_config::{FeatureFlag, Features, GasScheduleV2, OnChainConfig}, - state_store::{ - errors::StateviewError, state_key::StateKey, state_storage_usage::StateStorageUsage, - state_value::StateValue, TStateView, - }, - transaction::{ - signature_verified_transaction::{ - into_signature_verified_block, SignatureVerifiedTransaction, - }, - Transaction, TransactionInfo, TransactionOutput, Version, - }, -}; -use aptos_validator_interface::DebuggerStateView; -use aptos_vm::{aptos_vm::AptosVMBlockExecutor, VMBlockExecutor}; -use serde::Serialize; -use std::{collections::HashMap, sync::Mutex, time::Instant}; - -/// Config used by benchmarking. -fn block_execution_config(concurrency_level: usize) -> BlockExecutorConfig { - BlockExecutorConfig { - local: BlockExecutorLocalConfig { - concurrency_level, - allow_fallback: true, - discard_failed_blocks: false, - module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), - }, - // For replay, there is no block limit. - onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), - } -} - -/// Returns the state key for on-chain config type. -fn config_state_key() -> StateKey { - StateKey::resource(T::address(), &T::struct_tag()) - .expect("Constructing state key for on-chain config must succeed") -} - -/// Fetches the config from the storage, and modifies it based on the passed function. Panics if -/// there is a storage error, config does not exist or fails to (de-)serialize. -fn config_override( - debugger_state_view: &DebuggerStateView, - override_func: F, -) -> (StateKey, StateValue) { - let state_key = config_state_key::(); - let state_value = debugger_state_view - .get_state_value(&state_key) - .unwrap_or_else(|err| { - panic!( - "Failed to fetch on-chain config for {:?}: {:?}", - state_key, err - ) - }) - .unwrap_or_else(|| panic!("On-chain config for {:?} must always exist", state_key)); - - let mut config = T::deserialize_into_config(state_value.bytes()) - .expect("On-chain config must be deserializable"); - override_func(&mut config); - let config_bytes = bcs::to_bytes(&config).expect("On-chain config must be serializable"); - - let new_state_value = state_value.map_bytes(|_| Ok(config_bytes.into())).unwrap(); - (state_key, new_state_value) -} - -/// State view used for setting up the benchmarks. Maintains a set which caches execution reads, -/// populated during block generation phase. These reads are used to create a [BlockReadSet] later, -/// which is used as an input state for the benchmarking. -struct StateViewWithReadSet { - /// Captured read-set. - reads: Mutex>, - /// Remote state view for the specified version. - debugger_state_view: DebuggerStateView, -} - -impl TStateView for StateViewWithReadSet { - type Key = StateKey; - - fn get_state_value(&self, state_key: &Self::Key) -> Result, StateviewError> { - // Check the read-set first. - if let Some(state_value) = self.reads.lock().unwrap().get(state_key) { - return Ok(Some(state_value.clone())); - } - - // We do not allow failures because then benchmarking will not be correct (we miss a read). - // Plus, these failures should not happen when replaying past transactions. - let maybe_state_value = self - .debugger_state_view - .get_state_value(state_key) - .unwrap_or_else(|err| { - panic!("Failed to fetch state value for {:?}: {:?}", state_key, err) - }); - - // Populate the read-set if first access. - let mut reads = self.reads.lock().unwrap(); - if !reads.contains_key(state_key) { - if let Some(state_value) = &maybe_state_value { - reads.insert(state_key.clone(), state_value.clone()); - } - } - drop(reads); - - Ok(maybe_state_value) - } - - fn get_usage(&self) -> Result { - unreachable!("Should not be called when benchmarking") - } -} - -/// Immutable read-set used as an input state for running a block of transactions. -struct BlockReadSet { - data: HashMap, -} - -impl TStateView for BlockReadSet { - type Key = StateKey; - - fn get_state_value(&self, state_key: &Self::Key) -> Result, StateviewError> { - Ok(self.data.get(state_key).cloned()) - } - - fn get_usage(&self) -> Result { - unreachable!("Should not be called when benchmarking") - } -} - -/// A workload to benchmark. Contains signature verified transactions, and metadata specifying the -/// start and end versions of these transactions. -struct Workload { - txn_provider: DefaultTxnProvider, - transaction_slice_metadata: TransactionSliceMetadata, -} - -impl Workload { - /// Returns a new workload to execute transactions at specified version. - fn new(begin: Version, txns: Vec) -> Self { - let end = begin + txns.len() as Version; - let transaction_slice_metadata = TransactionSliceMetadata::chunk(begin, end); - - let signature_verified_txns = into_signature_verified_block(txns); - let txn_provider = DefaultTxnProvider::new(signature_verified_txns); - - Workload { - txn_provider, - transaction_slice_metadata, - } - } - - /// Returns the first transaction version in the workload. - fn first_version(&self) -> Version { - match &self.transaction_slice_metadata { - TransactionSliceMetadata::Chunk { begin, .. } => *begin, - _ => unreachable!("Transaction slice metadata is always a chunk"), - } - } - - /// Returns the last transaction version in the workload. - fn last_version(&self) -> Version { - match &self.transaction_slice_metadata { - TransactionSliceMetadata::Chunk { end, .. } => *end - 1, - _ => unreachable!("Transaction slice metadata is always a chunk"), - } - } -} - -/// Captures information for benchmarking a single block of signature-verified transactions. -struct Block { - /// Pre-execution state view to run the workload. - input: BlockReadSet, - /// Expected outputs of execution. - output: Vec, - /// Workload containing transactions to execute. - workload: Workload, -} - -impl Block { - /// Executes the workload using the provided executor and at specified concurrency level. - #[inline(always)] - fn execute(&self, executor: &AptosVMBlockExecutor, concurrency_level: usize) { - executor - .execute_block_with_config( - &self.workload.txn_provider, - &self.input, - block_execution_config(concurrency_level), - self.workload.transaction_slice_metadata, - ) - .unwrap_or_else(|err| { - panic!( - "Block execution should not fail, but returned an error: {:?}", - err - ) - }); - } - - /// Generates a new [Block] for the specified workload. During the generation, the workload is - /// pre-executed to determine the read-set, and the expected outputs. - fn generate( - workload: Workload, - state_view: StateViewWithReadSet, - concurrency_level: usize, - ) -> anyhow::Result { - let executor = AptosVMBlockExecutor::new(); - let output = executor - .execute_block_with_config( - &workload.txn_provider, - &state_view, - block_execution_config(concurrency_level), - workload.transaction_slice_metadata, - ) - .map_err(|err| anyhow!("Failed to generate block for benchmarking: {:?}", err))? - .into_transaction_outputs_forced(); - - let data = state_view.reads.into_inner().unwrap(); - Ok(Block { - input: BlockReadSet { data }, - output, - workload, - }) - } -} - -/// Represents a closed interval for transaction versions. -pub struct ClosedInterval { - begin: Version, - end: Version, -} - -impl ClosedInterval { - pub fn new(begin: Version, end: Version) -> Self { - assert!( - begin <= end, - "Transaction versions are not a valid closed interval: [{}, {}].", - begin, - end, - ); - Self { begin, end } - } -} - -/// Overrides for different environment configs, such as feature flags, etc. -#[derive(Debug)] -pub struct EnvironmentOverride { - enable_features: Vec, - disable_features: Vec, - gas_feature_version: Option, -} - -impl EnvironmentOverride { - pub fn new( - enable_features: Vec, - disable_features: Vec, - gas_feature_version: Option, - ) -> Self { - assert!( - enable_features - .iter() - .all(|f| !disable_features.contains(f)), - "Enable and disable feature flags cannot overlap" - ); - - Self { - enable_features, - disable_features, - gas_feature_version, - } - } - - fn generate_state_override( - &self, - debugger_state_view: &DebuggerStateView, - ) -> HashMap { - let mut state_override = HashMap::new(); - - // Enable/disable features. - let (features_state_key, features_state_value) = - config_override::(debugger_state_view, |features| { - for feature in &self.enable_features { - if features.is_enabled(*feature) { - println!("[WARN] Feature {:?} is already enabled", feature) - } - features.enable(*feature); - } - for feature in &self.disable_features { - if !features.is_enabled(*feature) { - println!("[WARN] Feature {:?} is already disabled", feature) - } - features.disable(*feature); - } - }); - state_override.insert(features_state_key, features_state_value); - - // Override gas feature version. - if let Some(gas_feature_version_override) = self.gas_feature_version { - let (gas_schedule_v2_state_key, gas_schedule_v2_state_value) = - config_override::(debugger_state_view, |gas_schedule_v2| { - gas_schedule_v2.feature_version = gas_feature_version_override; - }); - state_override.insert(gas_schedule_v2_state_key, gas_schedule_v2_state_value); - } - - state_override - } - - /// Checks if the output of the transaction contains writes to overridden configs. If so, - /// warnings are logged to stdout. - fn ensure_overrides_do_not_conflict(&self, version: Version, output: &TransactionOutput) { - let features_state_key = config_state_key::(); - let gas_schedule_v2_state_key = config_state_key::(); - - for (state_key, _) in output.write_set() { - if state_key == &features_state_key { - println!( - "[WARN] Features are being updated by transaction {}", - version - ); - } - - if self.gas_feature_version.is_some() && state_key == &gas_schedule_v2_state_key { - println!( - "[WARN] Gas schedule V2 is being updated by transaction {}", - version - ); - } - } - } -} - -pub struct AptosBenchmarkRunner { - /// Used to fetch transactions and transaction infos from the DB or REST endpoint. - debugger: AptosDebugger, - /// Specifies the closed interval of transaction versions to execute. - versions: ClosedInterval, - /// Different concurrency levels to benchmark. - concurrency_levels: Vec, - /// Number of times benchmark is repeated for each concurrency level. - num_repeats: usize, - /// Specifies how to override execution environment configs for each benchmark. - environment_override: EnvironmentOverride, -} - -impl AptosBenchmarkRunner { - pub fn new( - debugger: AptosDebugger, - versions: ClosedInterval, - concurrency_levels: Vec, - num_repeats: Option, - environment_override: EnvironmentOverride, - ) -> Self { - assert!( - !concurrency_levels.is_empty(), - "At least one concurrency level must be provided" - ); - - let default_num_repeats = 3; - let num_repeats = num_repeats.unwrap_or_else(|| { - println!( - "[WARN] Using default number of repeats: {}", - default_num_repeats - ); - default_num_repeats - }); - assert!( - num_repeats >= default_num_repeats, - "Number of times to repeat the benchmark should be at least the default value {}", - default_num_repeats - ); - - Self { - debugger, - versions, - concurrency_levels, - num_repeats, - environment_override, - } - } - - /// Creates [StateViewWithReadSet] to generate the workloads for benchmarking. Also, overrides - /// on-chain configs based on the specified environment. - fn state_with_at_version_with_override(&self, version: Version) -> StateViewWithReadSet { - let debugger_state_view = self.debugger.state_view_at_version(version); - let state_override = self - .environment_override - .generate_state_override(&debugger_state_view); - - StateViewWithReadSet { - reads: Mutex::new(state_override), - debugger_state_view, - } - } - - /// Generates a single [Block] for benchmarking. - fn generate_block(&self, begin: Version, txns: Vec) -> anyhow::Result { - // To generate blocks, run with maximum concurrency specified. - let concurrency_level = *self - .concurrency_levels - .iter() - .max() - .expect("At least one concurrency level must be provided"); - - let workload = Workload::new(begin, txns); - let state_view = self.state_with_at_version_with_override(begin); - - Block::generate(workload, state_view, concurrency_level) - } - - /// Generates a sequence of [Block]s for benchmarking. Block execution boundaries correspond to - /// the real boundaries on-chain. - async fn generate_blocks(&self, txns: Vec) -> anyhow::Result> { - let mut blocks = Vec::with_capacity(txns.len()); - - let mut curr_block = Vec::with_capacity(txns.len()); - let mut curr_version = self.versions.begin; - - for txn in txns { - if txn.is_block_start() && !curr_block.is_empty() { - let block_size = curr_block.len() as Version; - blocks.push(self.generate_block(curr_version, std::mem::take(&mut curr_block))?); - curr_version += block_size; - } - curr_block.push(txn); - } - - if !curr_block.is_empty() { - blocks.push(self.generate_block(curr_version, curr_block)?); - } - - Ok(blocks) - } - - /// Checks generated [Block]s against the on-chain data: - /// - Outputs should match on-chain transaction infos. - /// - Outputs should not write to overridden configs. - fn check_blocks(&self, blocks: &[Block], txn_infos: &[TransactionInfo]) -> anyhow::Result<()> { - for (idx, (output, txn_info)) in blocks - .iter() - .flat_map(|b| &b.output) - .zip(txn_infos) - .enumerate() - { - let version = self.versions.begin + idx as Version; - if let Err(err) = output.ensure_match_transaction_info(version, txn_info, None, None) { - println!("[WARN] Output mismatch: {:?}", err); - } - - self.environment_override - .ensure_overrides_do_not_conflict(version, output); - } - Ok(()) - } - - /// Logs different statistics about [Block]s: - fn analyze_blocks(&self, blocks: &[Block]) { - for (idx, block) in blocks.iter().enumerate() { - let num_txns = block.workload.txn_provider.get_txns().len(); - println!( - "Block {}: versions [{}, {}] with {} transactions", - idx + 1, - block.workload.first_version(), - block.workload.last_version(), - num_txns - ) - } - } - - /// The main entrypoint for benchmarking: for specified concurrency levels, replays blocks of - /// transactions and measures the overall time taken. Note that during execution each block - /// runs on its own state, so the execution time does not take into account block commit. - pub async fn benchmark_past_transactions(&self) -> anyhow::Result<()> { - let limit = self.versions.end - self.versions.begin + 1; - let (txns, txn_infos) = self - .debugger - .get_committed_transactions(self.versions.begin, limit) - .await?; - - println!("Generating blocks for benchmarking ..."); - let blocks = self.generate_blocks(txns).await?; - - println!("Checking generated blocks ..."); - self.check_blocks(&blocks, &txn_infos)?; - - println!("Analyzing {} generated blocks ...", blocks.len()); - self.analyze_blocks(&blocks); - - println!("Benchmarking ... \n"); - - for concurrency_level in &self.concurrency_levels { - println!("Concurrency level: {}", concurrency_level); - let mut times = Vec::with_capacity(self.num_repeats); - - for i in 0..self.num_repeats { - let start_time = Instant::now(); - - let executor = AptosVMBlockExecutor::new(); - for block in &blocks { - block.execute(&executor, *concurrency_level); - } - - let time = start_time.elapsed().as_millis(); - println!( - "[{}/{}] Execution time is {}ms", - i + 1, - self.num_repeats, - time, - ); - times.push(time); - } - times.sort(); - - println!( - "Median execution time is {}ms\n", - times[self.num_repeats / 2], - ); - } - - Ok(()) - } -} +pub mod block; +pub mod generator; +pub mod overrides; +pub mod runner; +mod state_view; +mod workload; diff --git a/aptos-move/aptos-replay-benchmark/src/main.rs b/aptos-move/aptos-replay-benchmark/src/main.rs index c73d7ed2ef57c..0b0e77e91396a 100644 --- a/aptos-move/aptos-replay-benchmark/src/main.rs +++ b/aptos-move/aptos-replay-benchmark/src/main.rs @@ -4,9 +4,11 @@ use aptos_logger::{Level, Logger}; use aptos_move_debugger::aptos_debugger::AptosDebugger; use aptos_push_metrics::MetricsPusher; -use aptos_replay_benchmark::{AptosBenchmarkRunner, ClosedInterval, EnvironmentOverride}; +use aptos_replay_benchmark::{ + generator::BenchmarkGenerator, overrides::OverrideConfig, runner::BenchmarkRunner, +}; use aptos_rest_client::Client; -use aptos_types::on_chain_config::FeatureFlag; +use aptos_types::{on_chain_config::FeatureFlag, transaction::Version}; use clap::Parser; use url::Url; @@ -18,6 +20,12 @@ pub struct Command { #[clap(long, help = "Fullnode's REST API query endpoint")] rest_endpoint: String, + #[clap(long, help = "First transaction to include for benchmarking")] + begin_version: Version, + + #[clap(long, help = "Last transaction to include for benchmarking")] + end_version: Version, + #[clap( long, num_args=1.., @@ -32,11 +40,11 @@ pub struct Command { )] num_repeats: Option, - #[clap(long, help = "First transaction to include for benchmarking")] - begin_version: u64, - - #[clap(long, help = "Last transaction to include for benchmarking")] - end_version: u64, + #[clap( + long, + help = "If true, measure time taken to execute each block, and overall time otherwise" + )] + measure_block_time: bool, #[clap( long, @@ -53,35 +61,35 @@ pub struct Command { help = "List of space-separated feature flags to disable", )] disable_features: Vec, - - #[clap(long, help = "If specified, used as the gas feature version")] - gas_feature_version: Option, } #[tokio::main] async fn main() -> anyhow::Result<()> { let command = Command::parse(); - let level = command.log_level.unwrap_or(Level::Error); - Logger::new().level(level).init(); + Logger::new() + .level(command.log_level.unwrap_or(Level::Error)) + .init(); let _mp = MetricsPusher::start(vec![]); let debugger = AptosDebugger::rest_client(Client::new(Url::parse(&command.rest_endpoint)?))?; - let versions = ClosedInterval::new(command.begin_version, command.end_version); - let environment_override = EnvironmentOverride::new( - command.enable_features, - command.disable_features, - command.gas_feature_version, - ); - - let runner = AptosBenchmarkRunner::new( + let override_config = OverrideConfig::new(command.enable_features, command.disable_features); + + let blocks = BenchmarkGenerator::new( debugger, - versions, + command.begin_version, + command.end_version, + override_config, + ) + .generate_blocks() + .await?; + + BenchmarkRunner::new( command.concurrency_levels, command.num_repeats, - environment_override, - ); - runner.benchmark_past_transactions().await?; + command.measure_block_time, + ) + .measure_execution_time(&blocks); Ok(()) } diff --git a/aptos-move/aptos-replay-benchmark/src/overrides.rs b/aptos-move/aptos-replay-benchmark/src/overrides.rs new file mode 100644 index 0000000000000..9796a3ee843da --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/overrides.rs @@ -0,0 +1,88 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_types::{ + on_chain_config::{FeatureFlag, Features, OnChainConfig}, + state_store::{state_key::StateKey, state_value::StateValue, StateView}, +}; +use serde::Serialize; +use std::collections::HashMap; + +pub struct OverrideConfig { + enable_features: Vec, + disable_features: Vec, +} + +impl OverrideConfig { + pub fn new(enable_features: Vec, disable_features: Vec) -> Self { + assert!( + enable_features + .iter() + .all(|f| !disable_features.contains(f)), + "Enable and disable feature flags cannot overlap" + ); + + Self { + enable_features, + disable_features, + } + } + + pub(crate) fn get_state_override( + &self, + state_view: &impl StateView, + ) -> HashMap { + let mut state_override = HashMap::new(); + + // Enable/disable features. + let (features_state_key, features_state_value) = + config_override::(state_view, |features| { + for feature in &self.enable_features { + if features.is_enabled(*feature) { + println!("[WARN] Feature {:?} is already enabled", feature) + } + features.enable(*feature); + } + for feature in &self.disable_features { + if !features.is_enabled(*feature) { + println!("[WARN] Feature {:?} is already disabled", feature) + } + features.disable(*feature); + } + }); + state_override.insert(features_state_key, features_state_value); + state_override + } +} + +/// Returns the state key for on-chain config type. +fn config_state_key() -> StateKey { + StateKey::resource(T::address(), &T::struct_tag()) + .expect("Constructing state key for on-chain config must succeed") +} + +/// Fetches the config from the storage, and modifies it based on the passed function. Panics if +/// there is a storage error, config does not exist or fails to (de-)serialize. +fn config_override( + state_view: &impl StateView, + override_func: F, +) -> (StateKey, StateValue) { + let state_key = config_state_key::(); + let state_value = state_view + .get_state_value(&state_key) + .unwrap_or_else(|err| { + panic!( + "Failed to fetch on-chain config for {:?}: {:?}", + state_key, err + ) + }) + .unwrap_or_else(|| panic!("On-chain config for {:?} must always exist", state_key)); + + let mut config = T::deserialize_into_config(state_value.bytes()) + .expect("On-chain config must be deserializable"); + override_func(&mut config); + let config_bytes = bcs::to_bytes(&config).expect("On-chain config must be serializable"); + + let new_state_value = state_value.map_bytes(|_| Ok(config_bytes.into())).unwrap(); + (state_key, new_state_value) +} diff --git a/aptos-move/aptos-replay-benchmark/src/runner.rs b/aptos-move/aptos-replay-benchmark/src/runner.rs new file mode 100644 index 0000000000000..152e0d27aebfd --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/runner.rs @@ -0,0 +1,113 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::block::Block; +use aptos_vm::{aptos_vm::AptosVMBlockExecutor, VMBlockExecutor}; +use std::time::Instant; + +pub struct BenchmarkRunner { + concurrency_levels: Vec, + num_repeats: usize, + measure_block_time: bool, +} + +impl BenchmarkRunner { + pub fn new( + concurrency_levels: Vec, + num_repeats: Option, + measure_block_time: bool, + ) -> Self { + assert!( + !concurrency_levels.is_empty(), + "At least one concurrency level must be provided" + ); + + let default_num_repeats = 3; + let num_repeats = num_repeats.unwrap_or_else(|| { + println!( + "[WARN] Using default number of repeats: {}", + default_num_repeats + ); + default_num_repeats + }); + assert!( + num_repeats >= default_num_repeats, + "Number of times to repeat the benchmark should be at least the default value {}", + default_num_repeats + ); + + Self { + concurrency_levels, + num_repeats, + measure_block_time, + } + } + + pub fn measure_execution_time(&self, blocks: &[Block]) { + for concurrency_level in &self.concurrency_levels { + if self.measure_block_time { + self.measure_block_execution_time(blocks, *concurrency_level); + } else { + self.measure_overall_execution_time(blocks, *concurrency_level); + } + } + } + + fn measure_block_execution_time(&self, blocks: &[Block], concurrency_level: usize) { + let mut times = Vec::with_capacity(blocks.len()); + for _ in blocks { + times.push(Vec::with_capacity(self.num_repeats)); + } + + for i in 0..self.num_repeats { + let executor = AptosVMBlockExecutor::new(); + for (idx, block) in blocks.iter().enumerate() { + let start_time = Instant::now(); + block.run(&executor, concurrency_level); + let time = start_time.elapsed().as_millis(); + + println!( + "[{}/{}] Block {} execution time is {}ms", + i + 1, + self.num_repeats, + idx + 1, + time, + ); + times[idx].push(time); + } + } + + for (idx, mut time) in times.into_iter().enumerate() { + time.sort(); + println!( + "Block {} median execution time is {}ms\n", + idx + 1, + time[self.num_repeats / 2], + ); + } + } + + fn measure_overall_execution_time(&self, blocks: &[Block], concurrency_level: usize) { + let mut times = Vec::with_capacity(self.num_repeats); + for i in 0..self.num_repeats { + let start_time = Instant::now(); + let executor = AptosVMBlockExecutor::new(); + for block in blocks { + block.run(&executor, concurrency_level); + } + let time = start_time.elapsed().as_millis(); + println!( + "[{}/{}] Overall execution time is {}ms", + i + 1, + self.num_repeats, + time, + ); + times.push(time); + } + times.sort(); + println!( + "Overall median execution time is {}ms\n", + times[self.num_repeats / 2], + ); + } +} diff --git a/aptos-move/aptos-replay-benchmark/src/state_view.rs b/aptos-move/aptos-replay-benchmark/src/state_view.rs new file mode 100644 index 0000000000000..47fbfbeb6edfd --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/state_view.rs @@ -0,0 +1,82 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_types::state_store::{ + errors::StateviewError, state_key::StateKey, state_storage_usage::StateStorageUsage, + state_value::StateValue, StateView, TStateView, +}; +use parking_lot::Mutex; +use std::collections::HashMap; + +/// Represents the read-set for obtained when executing transactions. +pub(crate) struct ReadSet { + data: HashMap, +} + +impl TStateView for ReadSet { + type Key = StateKey; + + fn get_state_value(&self, state_key: &Self::Key) -> Result, StateviewError> { + Ok(self.data.get(state_key).cloned()) + } + + fn get_usage(&self) -> Result { + unreachable!("Should not be called when benchmarking") + } +} + +/// [StateView] implementation that records all execution reads. Captured reads can be converted +/// into a [ReadSet]. +pub(crate) struct ReadSetCapturingStateView<'s, S> { + captured_reads: Mutex>, + state_view: &'s S, +} + +impl<'s, S: StateView> ReadSetCapturingStateView<'s, S> { + pub(crate) fn new(state_view: &'s S, initial_read_set: HashMap) -> Self { + Self { + captured_reads: Mutex::new(initial_read_set), + state_view, + } + } + + pub(crate) fn into_read_set(self) -> ReadSet { + ReadSet { + data: self.captured_reads.into_inner(), + } + } +} + +impl<'s, S: StateView> TStateView for ReadSetCapturingStateView<'s, S> { + type Key = StateKey; + + fn get_state_value(&self, state_key: &Self::Key) -> Result, StateviewError> { + // Check the read-set first. + if let Some(state_value) = self.captured_reads.lock().get(state_key) { + return Ok(Some(state_value.clone())); + } + + // We do not allow failures because then benchmarking will not be correct (we miss a read). + // Plus, these failures should not happen when replaying past transactions. + let maybe_state_value = self + .state_view + .get_state_value(state_key) + .unwrap_or_else(|err| { + panic!("Failed to fetch state value for {:?}: {:?}", state_key, err) + }); + + // Populate the read-set if first access. + if let Some(state_value) = &maybe_state_value { + let mut captured_reads = self.captured_reads.lock(); + if !captured_reads.contains_key(state_key) { + captured_reads.insert(state_key.clone(), state_value.clone()); + } + } + + Ok(maybe_state_value) + } + + fn get_usage(&self) -> Result { + unreachable!("Should not be called when benchmarking") + } +} diff --git a/aptos-move/aptos-replay-benchmark/src/workload.rs b/aptos-move/aptos-replay-benchmark/src/workload.rs new file mode 100644 index 0000000000000..75d353e742b35 --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/workload.rs @@ -0,0 +1,65 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_block_executor::txn_provider::default::DefaultTxnProvider; +use aptos_types::{ + block_executor::transaction_slice_metadata::TransactionSliceMetadata, + transaction::{ + signature_verified_transaction::{ + into_signature_verified_block, SignatureVerifiedTransaction, + }, + Transaction, Version, + }, +}; + +/// A workload to benchmark. Contains signature verified transactions, and metadata specifying the +/// start and end versions of these transactions. +pub(crate) struct Workload { + txn_provider: DefaultTxnProvider, + transaction_slice_metadata: TransactionSliceMetadata, +} + +impl Workload { + /// Returns a new workload to execute transactions at specified version. + pub(crate) fn new(begin: Version, txns: Vec) -> Self { + assert!(!txns.is_empty()); + + let end = begin + txns.len() as Version; + let transaction_slice_metadata = TransactionSliceMetadata::chunk(begin, end); + + let signature_verified_txns = into_signature_verified_block(txns); + let txn_provider = DefaultTxnProvider::new(signature_verified_txns); + + Workload { + txn_provider, + transaction_slice_metadata, + } + } + + /// Returns the signature verified transactions in the workload. + pub(crate) fn txn_provider(&self) -> &DefaultTxnProvider { + &self.txn_provider + } + + /// Returns transaction metadata corresponding to [begin, end) versions of the workload. + pub(crate) fn transaction_slice_metadata(&self) -> TransactionSliceMetadata { + self.transaction_slice_metadata + } + + /// Returns the first transaction version in the workload. + pub(crate) fn first_version(&self) -> Version { + match &self.transaction_slice_metadata { + TransactionSliceMetadata::Chunk { begin, .. } => *begin, + _ => unreachable!("Transaction slice metadata is always a chunk"), + } + } + + /// Returns the last transaction version in the workload. + #[allow(dead_code)] + pub(crate) fn last_version(&self) -> Version { + match &self.transaction_slice_metadata { + TransactionSliceMetadata::Chunk { end, .. } => *end - 1, + _ => unreachable!("Transaction slice metadata is always a chunk"), + } + } +} From acd2d9c892965a5dd30e4d98cb3ac56cc59fc707 Mon Sep 17 00:00:00 2001 From: George Mitenkov Date: Sun, 24 Nov 2024 17:05:46 +0000 Subject: [PATCH 3/4] add fine-grained comparisons --- Cargo.lock | 2 + aptos-move/aptos-replay-benchmark/Cargo.toml | 2 + .../aptos-replay-benchmark/src/block.rs | 83 ++++--- .../aptos-replay-benchmark/src/comparison.rs | 235 ++++++++++++++++++ .../aptos-replay-benchmark/src/generator.rs | 2 +- aptos-move/aptos-replay-benchmark/src/lib.rs | 1 + aptos-move/aptos-replay-benchmark/src/main.rs | 4 + types/src/write_set.rs | 4 + 8 files changed, 301 insertions(+), 32 deletions(-) create mode 100644 aptos-move/aptos-replay-benchmark/src/comparison.rs diff --git a/Cargo.lock b/Cargo.lock index f0648a1d786d3..e9a76450cb515 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3581,7 +3581,9 @@ dependencies = [ "aptos-validator-interface", "aptos-vm", "bcs 0.1.4", + "claims", "clap 4.5.21", + "move-core-types", "parking_lot 0.12.1", "serde", "tokio", diff --git a/aptos-move/aptos-replay-benchmark/Cargo.toml b/aptos-move/aptos-replay-benchmark/Cargo.toml index 757e9f23da382..fbcba83ed1fd4 100644 --- a/aptos-move/aptos-replay-benchmark/Cargo.toml +++ b/aptos-move/aptos-replay-benchmark/Cargo.toml @@ -24,6 +24,8 @@ aptos-validator-interface = { workspace = true } aptos-vm = { workspace = true } bcs = { workspace = true } clap = { workspace = true } +claims = { workspace = true } +move-core-types = { workspace = true } serde = { workspace = true } parking_lot = { workspace = true } tokio = { workspace = true } diff --git a/aptos-move/aptos-replay-benchmark/src/block.rs b/aptos-move/aptos-replay-benchmark/src/block.rs index 19a57beac0231..72f8f82c1dce1 100644 --- a/aptos-move/aptos-replay-benchmark/src/block.rs +++ b/aptos-move/aptos-replay-benchmark/src/block.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ + comparison::Comparison, state_view::{ReadSet, ReadSetCapturingStateView}, workload::Workload, }; @@ -34,6 +35,7 @@ fn block_execution_config(concurrency_level: usize) -> BlockExecutorConfig { pub struct Block { inputs: ReadSet, workload: Workload, + comparisons: Vec, } impl Block { @@ -41,47 +43,66 @@ impl Block { workload: Workload, state_view: &(impl StateView + Sync), state_override: HashMap, - concurrency_level: usize, ) -> anyhow::Result { - // Execute transactions with on-chain configs. - let onchain_outputs = execute_workload( - &AptosVMBlockExecutor::new(), - &workload, - state_view, - concurrency_level, - ); + let onchain_outputs = if state_override.is_empty() { + None + } else { + // Execute transactions with on-chain configs. + let onchain_outputs = + execute_workload(&AptosVMBlockExecutor::new(), &workload, state_view, 1); - // Check on-chain outputs do not modify state we override. If so, benchmarking results may - // not be correct. - let begin = workload.first_version(); - for (idx, output) in onchain_outputs.iter().enumerate() { - for (state_key, _) in output.write_set() { - if state_override.contains_key(state_key) { - bail!( - "Transaction {} writes to overridden state value for {:?}", - begin + idx as Version, - state_key - ); + // Check on-chain outputs do not modify state we override. If so, benchmarking results may + // not be correct. + let begin = workload.first_version(); + for (idx, output) in onchain_outputs.iter().enumerate() { + for (state_key, _) in output.write_set() { + if state_override.contains_key(state_key) { + bail!( + "Transaction {} writes to overridden state value for {:?}", + begin + idx as Version, + state_key + ); + } } } - } + Some(onchain_outputs) + }; // Execute transactions, recording all reads. let state_view = ReadSetCapturingStateView::new(state_view, state_override); - let _outputs = execute_workload( - &AptosVMBlockExecutor::new(), - &workload, - &state_view, - concurrency_level, - ); + let outputs = execute_workload(&AptosVMBlockExecutor::new(), &workload, &state_view, 1); let inputs = state_view.into_read_set(); - // Check on-chain outputs against new outputs. We want to ensure that changes are minimal - // so that overrides do not change execution flow too much. - // Run analysis. - // TODO + let comparisons = if let Some(onchain_outputs) = onchain_outputs { + let mut comparisons = Vec::with_capacity(onchain_outputs.len()); + for (left, right) in onchain_outputs.into_iter().zip(outputs) { + let comparison = Comparison::diff(left, right); + comparisons.push(comparison); + } + comparisons + } else { + vec![] + }; - Ok(Self { inputs, workload }) + Ok(Self { + inputs, + workload, + comparisons, + }) + } + + pub fn print_diffs(&self) { + let begin = self.workload.first_version(); + + for (idx, comparison) in self.comparisons.iter().enumerate() { + if !comparison.is_ok() { + println!( + "Transaction {} diff:\n {}\n", + begin + idx as Version, + comparison + ); + } + } } /// Executes the workload for benchmarking. diff --git a/aptos-move/aptos-replay-benchmark/src/comparison.rs b/aptos-move/aptos-replay-benchmark/src/comparison.rs new file mode 100644 index 0000000000000..b319723490cdc --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/comparison.rs @@ -0,0 +1,235 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_types::{ + contract_event::ContractEvent, + state_store::state_key::StateKey, + transaction::{ExecutionStatus, TransactionOutput}, + write_set::{WriteOp, WriteSet}, +}; +use claims::assert_ok; +use std::collections::BTreeMap; + +struct Write { + state_key: StateKey, + #[allow(dead_code)] + write_op: WriteOp, +} + +impl Write { + fn new(state_key: StateKey, write_op: WriteOp) -> Self { + Self { + state_key, + write_op, + } + } +} + +enum Diff { + GasUsed { + left: u64, + right: u64, + }, + ExecutionStatus { + left: ExecutionStatus, + right: ExecutionStatus, + }, + Event { + left: Option, + right: Option, + }, + WriteSet { + left: Option, + right: Option, + }, +} + +pub(crate) struct Comparison { + diffs: Vec, +} + +impl Comparison { + pub(crate) fn diff(left: TransactionOutput, right: TransactionOutput) -> Self { + let (left_write_set, left_events, left_gas_used, left_transaction_status, _) = + left.unpack(); + let (right_write_set, right_events, right_gas_used, right_transaction_status, _) = + right.unpack(); + + let mut diffs = vec![]; + + let left_execution_status = assert_ok!(left_transaction_status.as_kept_status()); + let right_execution_status = assert_ok!(right_transaction_status.as_kept_status()); + if left_execution_status != right_execution_status { + diffs.push(Diff::ExecutionStatus { + left: left_execution_status, + right: right_execution_status, + }); + } + + if left_gas_used != right_gas_used { + diffs.push(Diff::GasUsed { + left: left_gas_used, + right: right_gas_used, + }); + } + + Self::diff_events(&mut diffs, left_events, right_events); + Self::diff_write_sets(&mut diffs, left_write_set, right_write_set); + + Self { diffs } + } + + fn diff_events(diffs: &mut Vec, left: Vec, right: Vec) { + let mut left_ty_tags = BTreeMap::new(); + for (idx, event) in left.iter().enumerate() { + left_ty_tags.insert(event.type_tag().clone(), idx); + } + + let mut right_ty_tags = BTreeMap::new(); + for (idx, event) in right.iter().enumerate() { + right_ty_tags.insert(event.type_tag().clone(), idx); + } + + for (left_ty_tag, left_idx) in left_ty_tags { + if let Some(right_idx) = right_ty_tags.remove(&left_ty_tag) { + let left_data = left[left_idx].event_data(); + let right_data = right[right_idx].event_data(); + if left_data != right_data { + diffs.push(Diff::Event { + left: Some(left[left_idx].clone()), + right: Some(right[right_idx].clone()), + }); + } + } else { + diffs.push(Diff::Event { + left: Some(left[left_idx].clone()), + right: None, + }); + } + } + + for (_, right_idx) in right_ty_tags { + diffs.push(Diff::Event { + left: None, + right: Some(right[right_idx].clone()), + }); + } + } + + fn diff_write_sets(diffs: &mut Vec, left: WriteSet, right: WriteSet) { + let left = left.into_mut().into_inner(); + let mut right = right.into_mut().into_inner(); + + for (left_state_key, left_write_op) in left { + if let Some(right_write_op) = right.remove(&left_state_key) { + if left_write_op != right_write_op { + diffs.push(Diff::WriteSet { + left: Some(Write::new(left_state_key.clone(), left_write_op)), + right: Some(Write::new(left_state_key, right_write_op)), + }); + } + } else { + diffs.push(Diff::WriteSet { + left: Some(Write::new(left_state_key, left_write_op)), + right: None, + }); + } + } + + for (right_state_key, right_write_op) in right { + diffs.push(Diff::WriteSet { + left: None, + right: Some(Write::new(right_state_key, right_write_op)), + }); + } + } + + pub(crate) fn is_ok(&self) -> bool { + self.diffs.is_empty() + } +} + +impl std::fmt::Display for Comparison { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut gas_used_diffs = false; + let mut total_left_gas_used = 0; + let mut total_right_gas_used = 0; + + writeln!(f, " >>>>> ")?; + for diff in &self.diffs { + match diff { + Diff::GasUsed { left, right } => { + total_left_gas_used += left; + total_right_gas_used += right; + gas_used_diffs = true; + + writeln!(f, "[gas used] before: {}, after: {}", left, right)?; + }, + Diff::ExecutionStatus { left, right } => { + writeln!( + f, + "[execution status] before: {:?}, after: {:?}", + left, right + )?; + }, + Diff::Event { left, right } => { + let left = left.as_ref(); + let right = right.as_ref(); + + if left.is_none() { + writeln!( + f, + "[event] {} was not emitted before", + right.unwrap().type_tag().to_canonical_string() + )?; + } else if right.is_none() { + writeln!( + f, + "[event] {} is not emitted anymore", + left.unwrap().type_tag().to_canonical_string() + )?; + } else { + writeln!( + f, + "[event] {} has changed its data", + left.unwrap().type_tag().to_canonical_string() + )?; + } + }, + Diff::WriteSet { left, right } => { + let left = left.as_ref(); + let right = right.as_ref(); + + if left.is_none() { + writeln!( + f, + "[write] {:?} was not written to before", + &right.unwrap().state_key + )?; + } else if right.is_none() { + writeln!( + f, + "[write] {:?} is not written to anymore", + &left.unwrap().state_key + )?; + } else { + writeln!( + f, + "[write] {:?} has changed its value", + &left.unwrap().state_key + )?; + } + }, + } + } + + if gas_used_diffs { + writeln!( + f, + "[total gas used] before: {}, after: {}", + total_left_gas_used, total_right_gas_used + )?; + } + writeln!(f, " <<<<< ") + } +} diff --git a/aptos-move/aptos-replay-benchmark/src/generator.rs b/aptos-move/aptos-replay-benchmark/src/generator.rs index 635e920beef08..910072cce1821 100644 --- a/aptos-move/aptos-replay-benchmark/src/generator.rs +++ b/aptos-move/aptos-replay-benchmark/src/generator.rs @@ -58,7 +58,7 @@ impl BenchmarkGenerator { let state_override = self.override_config.get_state_override(&state_view); let state_view = self.debugger.state_view_at_version(begin); - Block::new(workload, &state_view, state_override, 32) + Block::new(workload, &state_view, state_override) } /// Partitions a sequence of transactions into blocks. diff --git a/aptos-move/aptos-replay-benchmark/src/lib.rs b/aptos-move/aptos-replay-benchmark/src/lib.rs index c29da841c780d..36c57785aa0c8 100644 --- a/aptos-move/aptos-replay-benchmark/src/lib.rs +++ b/aptos-move/aptos-replay-benchmark/src/lib.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 pub mod block; +mod comparison; pub mod generator; pub mod overrides; pub mod runner; diff --git a/aptos-move/aptos-replay-benchmark/src/main.rs b/aptos-move/aptos-replay-benchmark/src/main.rs index 0b0e77e91396a..b1bfb6abf6b25 100644 --- a/aptos-move/aptos-replay-benchmark/src/main.rs +++ b/aptos-move/aptos-replay-benchmark/src/main.rs @@ -84,6 +84,10 @@ async fn main() -> anyhow::Result<()> { .generate_blocks() .await?; + for block in &blocks { + block.print_diffs(); + } + BenchmarkRunner::new( command.concurrency_levels, command.num_repeats, diff --git a/types/src/write_set.rs b/types/src/write_set.rs index e939b70d1346a..0da5b6b89dcd5 100644 --- a/types/src/write_set.rs +++ b/types/src/write_set.rs @@ -568,6 +568,10 @@ impl WriteSetMut { &mut self.write_set } + pub fn into_inner(self) -> BTreeMap { + self.write_set + } + pub fn squash(mut self, other: Self) -> Result { use btree_map::Entry::*; From 85f4f11016522733f8a5dda034aec06a37e991a1 Mon Sep 17 00:00:00 2001 From: George Mitenkov Date: Sun, 24 Nov 2024 17:13:44 +0000 Subject: [PATCH 4/4] make generation faster --- Cargo.lock | 2 - aptos-move/aptos-replay-benchmark/Cargo.toml | 2 - .../aptos-replay-benchmark/src/block.rs | 10 +-- .../aptos-replay-benchmark/src/comparison.rs | 27 +++--- .../aptos-replay-benchmark/src/generator.rs | 90 +++++++++++++++---- aptos-move/aptos-replay-benchmark/src/main.rs | 9 ++ .../aptos-replay-benchmark/src/runner.rs | 9 +- 7 files changed, 103 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e9a76450cb515..1759f5710e776 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3578,12 +3578,10 @@ dependencies = [ "aptos-push-metrics", "aptos-rest-client", "aptos-types", - "aptos-validator-interface", "aptos-vm", "bcs 0.1.4", "claims", "clap 4.5.21", - "move-core-types", "parking_lot 0.12.1", "serde", "tokio", diff --git a/aptos-move/aptos-replay-benchmark/Cargo.toml b/aptos-move/aptos-replay-benchmark/Cargo.toml index fbcba83ed1fd4..fa60fd7dabecb 100644 --- a/aptos-move/aptos-replay-benchmark/Cargo.toml +++ b/aptos-move/aptos-replay-benchmark/Cargo.toml @@ -20,12 +20,10 @@ aptos-move-debugger = { workspace = true } aptos-push-metrics = { workspace = true } aptos-rest-client = { workspace = true } aptos-types = { workspace = true } -aptos-validator-interface = { workspace = true } aptos-vm = { workspace = true } bcs = { workspace = true } clap = { workspace = true } claims = { workspace = true } -move-core-types = { workspace = true } serde = { workspace = true } parking_lot = { workspace = true } tokio = { workspace = true } diff --git a/aptos-move/aptos-replay-benchmark/src/block.rs b/aptos-move/aptos-replay-benchmark/src/block.rs index 72f8f82c1dce1..b6f22ce204470 100644 --- a/aptos-move/aptos-replay-benchmark/src/block.rs +++ b/aptos-move/aptos-replay-benchmark/src/block.rs @@ -6,7 +6,6 @@ use crate::{ state_view::{ReadSet, ReadSetCapturingStateView}, workload::Workload, }; -use anyhow::bail; use aptos_types::{ block_executor::config::{ BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig, @@ -43,7 +42,7 @@ impl Block { workload: Workload, state_view: &(impl StateView + Sync), state_override: HashMap, - ) -> anyhow::Result { + ) -> Self { let onchain_outputs = if state_override.is_empty() { None } else { @@ -57,7 +56,7 @@ impl Block { for (idx, output) in onchain_outputs.iter().enumerate() { for (state_key, _) in output.write_set() { if state_override.contains_key(state_key) { - bail!( + println!( "Transaction {} writes to overridden state value for {:?}", begin + idx as Version, state_key @@ -84,13 +83,14 @@ impl Block { vec![] }; - Ok(Self { + Self { inputs, workload, comparisons, - }) + } } + /// Prints the difference in transaction outputs when running with overrides. pub fn print_diffs(&self) { let begin = self.workload.first_version(); diff --git a/aptos-move/aptos-replay-benchmark/src/comparison.rs b/aptos-move/aptos-replay-benchmark/src/comparison.rs index b319723490cdc..7a1bb9fa37c04 100644 --- a/aptos-move/aptos-replay-benchmark/src/comparison.rs +++ b/aptos-move/aptos-replay-benchmark/src/comparison.rs @@ -25,6 +25,14 @@ impl Write { } } +/// Different parts of [TransactionOutput] that can be different: +/// 1. gas used, +/// 2. status (must be kept since transactions are replayed), +/// 3. events, +/// 4. writes. +/// Note that fine-grained comparison allows for some differences to be okay, e.g., using more gas +/// implies that the fee statement event, the account balance of the fee payer, and the total token +/// supply are different. enum Diff { GasUsed { left: u64, @@ -44,11 +52,14 @@ enum Diff { }, } +/// Holds all differences for a pair of transaction outputs. pub(crate) struct Comparison { diffs: Vec, } impl Comparison { + /// Given a pair of transaction outputs, computes its diff for gas used, status, events and + /// write sets. pub(crate) fn diff(left: TransactionOutput, right: TransactionOutput) -> Self { let (left_write_set, left_events, left_gas_used, left_transaction_status, _) = left.unpack(); @@ -151,18 +162,10 @@ impl Comparison { impl std::fmt::Display for Comparison { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut gas_used_diffs = false; - let mut total_left_gas_used = 0; - let mut total_right_gas_used = 0; - writeln!(f, " >>>>> ")?; for diff in &self.diffs { match diff { Diff::GasUsed { left, right } => { - total_left_gas_used += left; - total_right_gas_used += right; - gas_used_diffs = true; - writeln!(f, "[gas used] before: {}, after: {}", left, right)?; }, Diff::ExecutionStatus { left, right } => { @@ -222,14 +225,6 @@ impl std::fmt::Display for Comparison { }, } } - - if gas_used_diffs { - writeln!( - f, - "[total gas used] before: {}, after: {}", - total_left_gas_used, total_right_gas_used - )?; - } writeln!(f, " <<<<< ") } } diff --git a/aptos-move/aptos-replay-benchmark/src/generator.rs b/aptos-move/aptos-replay-benchmark/src/generator.rs index 910072cce1821..e4563848c38f6 100644 --- a/aptos-move/aptos-replay-benchmark/src/generator.rs +++ b/aptos-move/aptos-replay-benchmark/src/generator.rs @@ -4,16 +4,84 @@ use crate::{block::Block, overrides::OverrideConfig, workload::Workload}; use aptos_move_debugger::aptos_debugger::AptosDebugger; use aptos_types::transaction::{Transaction, Version}; +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Instant, +}; pub struct BenchmarkGenerator { + generator: Arc, +} + +impl BenchmarkGenerator { + pub fn new( + debugger: AptosDebugger, + begin_version: Version, + end_version: Version, + override_config: OverrideConfig, + ) -> Self { + let generator = + BenchmarkGeneratorContext::new(debugger, begin_version, end_version, override_config); + Self { + generator: Arc::new(generator), + } + } + + /// Generates a sequence of [Block] for benchmarking. + pub async fn generate_blocks(&self) -> anyhow::Result> { + let limit = self.generator.end_version - self.generator.begin_version + 1; + let (txns, _) = self + .generator + .debugger + .get_committed_transactions(self.generator.begin_version, limit) + .await?; + let txn_blocks = self.generator.partition(txns); + + let num_generated = Arc::new(AtomicU64::new(0)); + let num_blocks = txn_blocks.len(); + + let mut tasks = Vec::with_capacity(num_blocks); + for (begin, txn_block) in txn_blocks { + let task = tokio::task::spawn_blocking({ + let generator = self.generator.clone(); + let num_generated = num_generated.clone(); + move || { + let start_time = Instant::now(); + let block = generator.generate_block(begin, txn_block); + let time = start_time.elapsed().as_secs(); + println!( + "Generated block {}/{} in {}s", + num_generated.fetch_add(1, Ordering::SeqCst) + 1, + num_blocks, + time + ); + block + } + }); + tasks.push(task); + } + + let mut blocks = Vec::with_capacity(tasks.len()); + for task in tasks { + blocks.push(task.await?); + } + + Ok(blocks) + } +} + +struct BenchmarkGeneratorContext { debugger: AptosDebugger, begin_version: Version, end_version: Version, override_config: OverrideConfig, } -impl BenchmarkGenerator { - pub fn new( +impl BenchmarkGeneratorContext { + fn new( debugger: AptosDebugger, begin_version: Version, end_version: Version, @@ -34,24 +102,8 @@ impl BenchmarkGenerator { } } - /// Generates a sequence of [Block] for benchmarking. - pub async fn generate_blocks(&self) -> anyhow::Result> { - let limit = self.end_version - self.begin_version + 1; - let (txns, _) = self - .debugger - .get_committed_transactions(self.begin_version, limit) - .await?; - let txn_blocks = self.partition(txns); - - let mut blocks = vec![]; - for (begin, txn_block) in txn_blocks { - blocks.push(self.generate_block(begin, txn_block)?); - } - Ok(blocks) - } - /// Generates a single [Block] for benchmarking. - fn generate_block(&self, begin: Version, txns: Vec) -> anyhow::Result { + fn generate_block(&self, begin: Version, txns: Vec) -> Block { let workload = Workload::new(begin, txns); let state_view = self.debugger.state_view_at_version(begin); diff --git a/aptos-move/aptos-replay-benchmark/src/main.rs b/aptos-move/aptos-replay-benchmark/src/main.rs index b1bfb6abf6b25..3113315c5550a 100644 --- a/aptos-move/aptos-replay-benchmark/src/main.rs +++ b/aptos-move/aptos-replay-benchmark/src/main.rs @@ -72,7 +72,16 @@ async fn main() -> anyhow::Result<()> { .init(); let _mp = MetricsPusher::start(vec![]); + // TODO: + // Right now we fetch transactions from debugger, but ideally we need a way to save them + // locally (with corresponding read-sets) so we can use this for CI. let debugger = AptosDebugger::rest_client(Client::new(Url::parse(&command.rest_endpoint)?))?; + + // TODO: + // Right now, only features can be overridden. In general, this can be allowed for anything: + // 1. Framework code, e.g., to test performance of new natives or compiler, + // 2. Gas schedule, to track the costs of charging gas or tracking limits. + // We probably should support at least these. let override_config = OverrideConfig::new(command.enable_features, command.disable_features); let blocks = BenchmarkGenerator::new( diff --git a/aptos-move/aptos-replay-benchmark/src/runner.rs b/aptos-move/aptos-replay-benchmark/src/runner.rs index 152e0d27aebfd..fdb5253d2f3e1 100644 --- a/aptos-move/aptos-replay-benchmark/src/runner.rs +++ b/aptos-move/aptos-replay-benchmark/src/runner.rs @@ -5,6 +5,7 @@ use crate::block::Block; use aptos_vm::{aptos_vm::AptosVMBlockExecutor, VMBlockExecutor}; use std::time::Instant; +/// Holds configuration for running the benchmarks and measuring the time taken. pub struct BenchmarkRunner { concurrency_levels: Vec, num_repeats: usize, @@ -43,6 +44,9 @@ impl BenchmarkRunner { } } + // TODO: + // This measures execution time from a cold-start. Ideally, we want to warm-up with executing + // 1-2 blocks prior to selected range, but not timing them. pub fn measure_execution_time(&self, blocks: &[Block]) { for concurrency_level in &self.concurrency_levels { if self.measure_block_time { @@ -53,6 +57,7 @@ impl BenchmarkRunner { } } + /// Runs a sequence of blocks, measuring execution time for each block. The median is reported. fn measure_block_execution_time(&self, blocks: &[Block], concurrency_level: usize) { let mut times = Vec::with_capacity(blocks.len()); for _ in blocks { @@ -65,7 +70,6 @@ impl BenchmarkRunner { let start_time = Instant::now(); block.run(&executor, concurrency_level); let time = start_time.elapsed().as_millis(); - println!( "[{}/{}] Block {} execution time is {}ms", i + 1, @@ -80,13 +84,14 @@ impl BenchmarkRunner { for (idx, mut time) in times.into_iter().enumerate() { time.sort(); println!( - "Block {} median execution time is {}ms\n", + "Block {} median execution time is {}ms", idx + 1, time[self.num_repeats / 2], ); } } + /// Runs the sequence of blocks, measuring end-to-end execution time. fn measure_overall_execution_time(&self, blocks: &[Block], concurrency_level: usize) { let mut times = Vec::with_capacity(self.num_repeats); for i in 0..self.num_repeats {