diff --git a/Cargo.lock b/Cargo.lock index 41f1d2707b060..1759f5710e776 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3567,6 +3567,27 @@ dependencies = [ "tokio-retry", ] +[[package]] +name = "aptos-replay-benchmark" +version = "0.1.0" +dependencies = [ + "anyhow", + "aptos-block-executor", + "aptos-logger", + "aptos-move-debugger", + "aptos-push-metrics", + "aptos-rest-client", + "aptos-types", + "aptos-vm", + "bcs 0.1.4", + "claims", + "clap 4.5.21", + "parking_lot 0.12.1", + "serde", + "tokio", + "url", +] + [[package]] name = "aptos-resource-viewer" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index b3c7a1fcfedec..30fa6add1df4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ members = [ "aptos-move/aptos-memory-usage-tracker", "aptos-move/aptos-native-interface", "aptos-move/aptos-release-builder", + "aptos-move/aptos-replay-benchmark", "aptos-move/aptos-resource-viewer", "aptos-move/aptos-sdk-builder", "aptos-move/aptos-transaction-benchmarks", @@ -419,6 +420,7 @@ aptos-push-metrics = { path = "crates/aptos-push-metrics" } aptos-rate-limiter = { path = "crates/aptos-rate-limiter" } aptos-release-builder = { path = "aptos-move/aptos-release-builder" } aptos-reliable-broadcast = { path = "crates/reliable-broadcast" } +aptos-replay-benchmark = { path = "aptos-move/aptos-replay-benchmark" } aptos-resource-viewer = { path = "aptos-move/aptos-resource-viewer" } aptos-rest-client = { path = "crates/aptos-rest-client" } aptos-retrier = { path = "crates/aptos-retrier" } diff --git a/aptos-move/aptos-debugger/src/aptos_debugger.rs b/aptos-move/aptos-debugger/src/aptos_debugger.rs index 2dc191812cc1a..a27281eceb672 100644 --- a/aptos-move/aptos-debugger/src/aptos_debugger.rs +++ b/aptos-move/aptos-debugger/src/aptos_debugger.rs @@ -1,12 +1,8 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use anyhow::{bail, format_err, Result}; -use aptos_block_executor::{ - code_cache_global_manager::AptosModuleCacheManager, - txn_commit_hook::NoOpTransactionCommitHook, - txn_provider::{default::DefaultTxnProvider, TxnProvider}, -}; +use anyhow::{bail, format_err}; +use aptos_block_executor::txn_provider::{default::DefaultTxnProvider, TxnProvider}; use aptos_gas_profiling::{GasProfiler, TransactionGasLog}; use aptos_rest_client::Client; use aptos_types::{ @@ -28,9 +24,7 @@ use aptos_validator_interface::{ AptosValidatorInterface, DBDebuggerInterface, DebuggerStateView, RestDebuggerInterface, }; use aptos_vm::{ - block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper}, - data_cache::AsMoveResolver, - AptosVM, + aptos_vm::AptosVMBlockExecutor, data_cache::AsMoveResolver, AptosVM, VMBlockExecutor, }; use aptos_vm_environment::environment::AptosEnvironment; use aptos_vm_logging::log_schema::AdapterLogSchema; @@ -47,23 +41,31 @@ impl AptosDebugger { Self { debugger } } - pub fn rest_client(rest_client: Client) -> Result { + pub fn rest_client(rest_client: Client) -> anyhow::Result { Ok(Self::new(Arc::new(RestDebuggerInterface::new(rest_client)))) } - pub fn db + Clone>(db_root_path: P) -> Result { + pub fn db + Clone>(db_root_path: P) -> anyhow::Result { Ok(Self::new(Arc::new(DBDebuggerInterface::open( db_root_path, )?))) } + pub async fn get_committed_transactions( + &self, + begin: Version, + limit: u64, + ) -> anyhow::Result<(Vec, Vec)> { + self.debugger.get_committed_transactions(begin, limit).await + } + pub fn execute_transactions_at_version( &self, version: Version, txns: Vec, repeat_execution_times: u64, concurrency_levels: &[usize], - ) -> Result> { + ) -> anyhow::Result> { let sig_verified_txns: Vec = txns.into_iter().map(|x| x.into()).collect::>(); let txn_provider = DefaultTxnProvider::new(sig_verified_txns); @@ -114,7 +116,7 @@ impl AptosDebugger { &self, version: Version, txn: SignedTransaction, - ) -> Result<(VMStatus, VMOutput, TransactionGasLog)> { + ) -> anyhow::Result<(VMStatus, VMOutput, TransactionGasLog)> { let state_view = DebuggerStateView::new(self.debugger.clone(), version); let log_context = AdapterLogSchema::new(state_view.id(), 0); let txn = txn @@ -166,11 +168,8 @@ impl AptosDebugger { use_same_block_boundaries: bool, repeat_execution_times: u64, concurrency_levels: &[usize], - ) -> Result> { - let (txns, txn_infos) = self - .debugger - .get_committed_transactions(begin, limit) - .await?; + ) -> anyhow::Result> { + let (txns, txn_infos) = self.get_committed_transactions(begin, limit).await?; if use_same_block_boundaries { // when going block by block, no need to worry about epoch boundaries @@ -238,7 +237,7 @@ impl AptosDebugger { txns: Vec, repeat_execution_times: u64, concurrency_levels: &[usize], - ) -> Result> { + ) -> anyhow::Result> { let results = self.execute_transactions_at_version( begin, txns, @@ -268,7 +267,7 @@ impl AptosDebugger { repeat_execution_times: u64, concurrency_levels: &[usize], mut txn_infos: Vec, - ) -> Result> { + ) -> anyhow::Result> { let mut ret = vec![]; while limit != 0 { println!( @@ -301,7 +300,7 @@ impl AptosDebugger { txns: Vec, repeat_execution_times: u64, concurrency_levels: &[usize], - ) -> Result> { + ) -> anyhow::Result> { let mut ret = vec![]; let mut cur = vec![]; let mut cur_version = begin; @@ -336,7 +335,7 @@ impl AptosDebugger { &self, account: AccountAddress, seq: u64, - ) -> Result> { + ) -> anyhow::Result> { self.debugger .get_version_by_account_sequence(account, seq) .await @@ -345,7 +344,7 @@ impl AptosDebugger { pub async fn get_committed_transaction_at_version( &self, version: Version, - ) -> Result<(Transaction, TransactionInfo)> { + ) -> anyhow::Result<(Transaction, TransactionInfo)> { let (mut txns, mut info) = self.debugger.get_committed_transactions(version, 1).await?; let txn = txns.pop().expect("there must be exactly 1 txn in the vec"); @@ -434,20 +433,16 @@ fn execute_block_no_limit( state_view: &DebuggerStateView, concurrency_level: usize, ) -> Result, VMStatus> { - AptosVMBlockExecutorWrapper::execute_block::< - _, - NoOpTransactionCommitHook, - DefaultTxnProvider, - >( - txn_provider, - state_view, - &AptosModuleCacheManager::new(), - BlockExecutorConfig { - local: BlockExecutorLocalConfig::default_with_concurrency_level(concurrency_level), - onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), - }, - TransactionSliceMetadata::unknown(), - None, - ) - .map(BlockOutput::into_transaction_outputs_forced) + let executor = AptosVMBlockExecutor::new(); + executor + .execute_block_with_config( + txn_provider, + state_view, + BlockExecutorConfig { + local: BlockExecutorLocalConfig::default_with_concurrency_level(concurrency_level), + onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), + }, + TransactionSliceMetadata::unknown(), + ) + .map(BlockOutput::into_transaction_outputs_forced) } diff --git a/aptos-move/aptos-replay-benchmark/Cargo.toml b/aptos-move/aptos-replay-benchmark/Cargo.toml new file mode 100644 index 0000000000000..fa60fd7dabecb --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "aptos-replay-benchmark" +version = "0.1.0" +description = "A tool to replay and locally benchmark on-chain transactions." + +# Workspace inherited keys +authors = { workspace = true } +edition = { workspace = true } +homepage = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +repository = { workspace = true } +rust-version = { workspace = true } + +[dependencies] +anyhow = { workspace = true } +aptos-block-executor = { workspace = true } +aptos-logger = { workspace = true } +aptos-move-debugger = { workspace = true } +aptos-push-metrics = { workspace = true } +aptos-rest-client = { workspace = true } +aptos-types = { workspace = true } +aptos-vm = { workspace = true } +bcs = { workspace = true } +clap = { workspace = true } +claims = { workspace = true } +serde = { workspace = true } +parking_lot = { workspace = true } +tokio = { workspace = true } +url = { workspace = true } diff --git a/aptos-move/aptos-replay-benchmark/src/block.rs b/aptos-move/aptos-replay-benchmark/src/block.rs new file mode 100644 index 0000000000000..b6f22ce204470 --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/block.rs @@ -0,0 +1,136 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + comparison::Comparison, + state_view::{ReadSet, ReadSetCapturingStateView}, + workload::Workload, +}; +use aptos_types::{ + block_executor::config::{ + BlockExecutorConfig, BlockExecutorConfigFromOnchain, BlockExecutorLocalConfig, + BlockExecutorModuleCacheLocalConfig, + }, + state_store::{state_key::StateKey, state_value::StateValue, StateView}, + transaction::{TransactionOutput, Version}, +}; +use aptos_vm::{aptos_vm::AptosVMBlockExecutor, VMBlockExecutor}; +use std::collections::HashMap; + +/// Config used by benchmarking blocks. +fn block_execution_config(concurrency_level: usize) -> BlockExecutorConfig { + BlockExecutorConfig { + local: BlockExecutorLocalConfig { + concurrency_level, + allow_fallback: true, + discard_failed_blocks: false, + module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), + }, + // For replay, there is no block limit. + onchain: BlockExecutorConfigFromOnchain::new_no_block_limit(), + } +} + +pub struct Block { + inputs: ReadSet, + workload: Workload, + comparisons: Vec, +} + +impl Block { + pub(crate) fn new( + workload: Workload, + state_view: &(impl StateView + Sync), + state_override: HashMap, + ) -> Self { + let onchain_outputs = if state_override.is_empty() { + None + } else { + // Execute transactions with on-chain configs. + let onchain_outputs = + execute_workload(&AptosVMBlockExecutor::new(), &workload, state_view, 1); + + // Check on-chain outputs do not modify state we override. If so, benchmarking results may + // not be correct. + let begin = workload.first_version(); + for (idx, output) in onchain_outputs.iter().enumerate() { + for (state_key, _) in output.write_set() { + if state_override.contains_key(state_key) { + println!( + "Transaction {} writes to overridden state value for {:?}", + begin + idx as Version, + state_key + ); + } + } + } + Some(onchain_outputs) + }; + + // Execute transactions, recording all reads. + let state_view = ReadSetCapturingStateView::new(state_view, state_override); + let outputs = execute_workload(&AptosVMBlockExecutor::new(), &workload, &state_view, 1); + let inputs = state_view.into_read_set(); + + let comparisons = if let Some(onchain_outputs) = onchain_outputs { + let mut comparisons = Vec::with_capacity(onchain_outputs.len()); + for (left, right) in onchain_outputs.into_iter().zip(outputs) { + let comparison = Comparison::diff(left, right); + comparisons.push(comparison); + } + comparisons + } else { + vec![] + }; + + Self { + inputs, + workload, + comparisons, + } + } + + /// Prints the difference in transaction outputs when running with overrides. + pub fn print_diffs(&self) { + let begin = self.workload.first_version(); + + for (idx, comparison) in self.comparisons.iter().enumerate() { + if !comparison.is_ok() { + println!( + "Transaction {} diff:\n {}\n", + begin + idx as Version, + comparison + ); + } + } + } + + /// Executes the workload for benchmarking. + #[inline(always)] + pub(crate) fn run(&self, executor: &AptosVMBlockExecutor, concurrency_level: usize) { + execute_workload(executor, &self.workload, &self.inputs, concurrency_level); + } +} + +#[inline(always)] +fn execute_workload( + executor: &AptosVMBlockExecutor, + workload: &Workload, + state_view: &(impl StateView + Sync), + concurrency_level: usize, +) -> Vec { + executor + .execute_block_with_config( + workload.txn_provider(), + state_view, + block_execution_config(concurrency_level), + workload.transaction_slice_metadata(), + ) + .unwrap_or_else(|err| { + panic!( + "Block execution should not fail, but returned an error: {:?}", + err + ) + }) + .into_transaction_outputs_forced() +} diff --git a/aptos-move/aptos-replay-benchmark/src/comparison.rs b/aptos-move/aptos-replay-benchmark/src/comparison.rs new file mode 100644 index 0000000000000..7a1bb9fa37c04 --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/comparison.rs @@ -0,0 +1,230 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_types::{ + contract_event::ContractEvent, + state_store::state_key::StateKey, + transaction::{ExecutionStatus, TransactionOutput}, + write_set::{WriteOp, WriteSet}, +}; +use claims::assert_ok; +use std::collections::BTreeMap; + +struct Write { + state_key: StateKey, + #[allow(dead_code)] + write_op: WriteOp, +} + +impl Write { + fn new(state_key: StateKey, write_op: WriteOp) -> Self { + Self { + state_key, + write_op, + } + } +} + +/// Different parts of [TransactionOutput] that can be different: +/// 1. gas used, +/// 2. status (must be kept since transactions are replayed), +/// 3. events, +/// 4. writes. +/// Note that fine-grained comparison allows for some differences to be okay, e.g., using more gas +/// implies that the fee statement event, the account balance of the fee payer, and the total token +/// supply are different. +enum Diff { + GasUsed { + left: u64, + right: u64, + }, + ExecutionStatus { + left: ExecutionStatus, + right: ExecutionStatus, + }, + Event { + left: Option, + right: Option, + }, + WriteSet { + left: Option, + right: Option, + }, +} + +/// Holds all differences for a pair of transaction outputs. +pub(crate) struct Comparison { + diffs: Vec, +} + +impl Comparison { + /// Given a pair of transaction outputs, computes its diff for gas used, status, events and + /// write sets. + pub(crate) fn diff(left: TransactionOutput, right: TransactionOutput) -> Self { + let (left_write_set, left_events, left_gas_used, left_transaction_status, _) = + left.unpack(); + let (right_write_set, right_events, right_gas_used, right_transaction_status, _) = + right.unpack(); + + let mut diffs = vec![]; + + let left_execution_status = assert_ok!(left_transaction_status.as_kept_status()); + let right_execution_status = assert_ok!(right_transaction_status.as_kept_status()); + if left_execution_status != right_execution_status { + diffs.push(Diff::ExecutionStatus { + left: left_execution_status, + right: right_execution_status, + }); + } + + if left_gas_used != right_gas_used { + diffs.push(Diff::GasUsed { + left: left_gas_used, + right: right_gas_used, + }); + } + + Self::diff_events(&mut diffs, left_events, right_events); + Self::diff_write_sets(&mut diffs, left_write_set, right_write_set); + + Self { diffs } + } + + fn diff_events(diffs: &mut Vec, left: Vec, right: Vec) { + let mut left_ty_tags = BTreeMap::new(); + for (idx, event) in left.iter().enumerate() { + left_ty_tags.insert(event.type_tag().clone(), idx); + } + + let mut right_ty_tags = BTreeMap::new(); + for (idx, event) in right.iter().enumerate() { + right_ty_tags.insert(event.type_tag().clone(), idx); + } + + for (left_ty_tag, left_idx) in left_ty_tags { + if let Some(right_idx) = right_ty_tags.remove(&left_ty_tag) { + let left_data = left[left_idx].event_data(); + let right_data = right[right_idx].event_data(); + if left_data != right_data { + diffs.push(Diff::Event { + left: Some(left[left_idx].clone()), + right: Some(right[right_idx].clone()), + }); + } + } else { + diffs.push(Diff::Event { + left: Some(left[left_idx].clone()), + right: None, + }); + } + } + + for (_, right_idx) in right_ty_tags { + diffs.push(Diff::Event { + left: None, + right: Some(right[right_idx].clone()), + }); + } + } + + fn diff_write_sets(diffs: &mut Vec, left: WriteSet, right: WriteSet) { + let left = left.into_mut().into_inner(); + let mut right = right.into_mut().into_inner(); + + for (left_state_key, left_write_op) in left { + if let Some(right_write_op) = right.remove(&left_state_key) { + if left_write_op != right_write_op { + diffs.push(Diff::WriteSet { + left: Some(Write::new(left_state_key.clone(), left_write_op)), + right: Some(Write::new(left_state_key, right_write_op)), + }); + } + } else { + diffs.push(Diff::WriteSet { + left: Some(Write::new(left_state_key, left_write_op)), + right: None, + }); + } + } + + for (right_state_key, right_write_op) in right { + diffs.push(Diff::WriteSet { + left: None, + right: Some(Write::new(right_state_key, right_write_op)), + }); + } + } + + pub(crate) fn is_ok(&self) -> bool { + self.diffs.is_empty() + } +} + +impl std::fmt::Display for Comparison { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + writeln!(f, " >>>>> ")?; + for diff in &self.diffs { + match diff { + Diff::GasUsed { left, right } => { + writeln!(f, "[gas used] before: {}, after: {}", left, right)?; + }, + Diff::ExecutionStatus { left, right } => { + writeln!( + f, + "[execution status] before: {:?}, after: {:?}", + left, right + )?; + }, + Diff::Event { left, right } => { + let left = left.as_ref(); + let right = right.as_ref(); + + if left.is_none() { + writeln!( + f, + "[event] {} was not emitted before", + right.unwrap().type_tag().to_canonical_string() + )?; + } else if right.is_none() { + writeln!( + f, + "[event] {} is not emitted anymore", + left.unwrap().type_tag().to_canonical_string() + )?; + } else { + writeln!( + f, + "[event] {} has changed its data", + left.unwrap().type_tag().to_canonical_string() + )?; + } + }, + Diff::WriteSet { left, right } => { + let left = left.as_ref(); + let right = right.as_ref(); + + if left.is_none() { + writeln!( + f, + "[write] {:?} was not written to before", + &right.unwrap().state_key + )?; + } else if right.is_none() { + writeln!( + f, + "[write] {:?} is not written to anymore", + &left.unwrap().state_key + )?; + } else { + writeln!( + f, + "[write] {:?} has changed its value", + &left.unwrap().state_key + )?; + } + }, + } + } + writeln!(f, " <<<<< ") + } +} diff --git a/aptos-move/aptos-replay-benchmark/src/generator.rs b/aptos-move/aptos-replay-benchmark/src/generator.rs new file mode 100644 index 0000000000000..e4563848c38f6 --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/generator.rs @@ -0,0 +1,138 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{block::Block, overrides::OverrideConfig, workload::Workload}; +use aptos_move_debugger::aptos_debugger::AptosDebugger; +use aptos_types::transaction::{Transaction, Version}; +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::Instant, +}; + +pub struct BenchmarkGenerator { + generator: Arc, +} + +impl BenchmarkGenerator { + pub fn new( + debugger: AptosDebugger, + begin_version: Version, + end_version: Version, + override_config: OverrideConfig, + ) -> Self { + let generator = + BenchmarkGeneratorContext::new(debugger, begin_version, end_version, override_config); + Self { + generator: Arc::new(generator), + } + } + + /// Generates a sequence of [Block] for benchmarking. + pub async fn generate_blocks(&self) -> anyhow::Result> { + let limit = self.generator.end_version - self.generator.begin_version + 1; + let (txns, _) = self + .generator + .debugger + .get_committed_transactions(self.generator.begin_version, limit) + .await?; + let txn_blocks = self.generator.partition(txns); + + let num_generated = Arc::new(AtomicU64::new(0)); + let num_blocks = txn_blocks.len(); + + let mut tasks = Vec::with_capacity(num_blocks); + for (begin, txn_block) in txn_blocks { + let task = tokio::task::spawn_blocking({ + let generator = self.generator.clone(); + let num_generated = num_generated.clone(); + move || { + let start_time = Instant::now(); + let block = generator.generate_block(begin, txn_block); + let time = start_time.elapsed().as_secs(); + println!( + "Generated block {}/{} in {}s", + num_generated.fetch_add(1, Ordering::SeqCst) + 1, + num_blocks, + time + ); + block + } + }); + tasks.push(task); + } + + let mut blocks = Vec::with_capacity(tasks.len()); + for task in tasks { + blocks.push(task.await?); + } + + Ok(blocks) + } +} + +struct BenchmarkGeneratorContext { + debugger: AptosDebugger, + begin_version: Version, + end_version: Version, + override_config: OverrideConfig, +} + +impl BenchmarkGeneratorContext { + fn new( + debugger: AptosDebugger, + begin_version: Version, + end_version: Version, + override_config: OverrideConfig, + ) -> Self { + assert!( + begin_version <= end_version, + "Transaction versions are not a valid closed interval: [{}, {}].", + begin_version, + end_version, + ); + + Self { + debugger, + begin_version, + end_version, + override_config, + } + } + + /// Generates a single [Block] for benchmarking. + fn generate_block(&self, begin: Version, txns: Vec) -> Block { + let workload = Workload::new(begin, txns); + + let state_view = self.debugger.state_view_at_version(begin); + let state_override = self.override_config.get_state_override(&state_view); + + let state_view = self.debugger.state_view_at_version(begin); + Block::new(workload, &state_view, state_override) + } + + /// Partitions a sequence of transactions into blocks. + fn partition(&self, txns: Vec) -> Vec<(Version, Vec)> { + let mut begin_versions_and_blocks = Vec::with_capacity(txns.len()); + + let mut curr_begin = self.begin_version; + let mut curr_block = Vec::with_capacity(txns.len()); + + for txn in txns { + if txn.is_block_start() && !curr_block.is_empty() { + let block = std::mem::take(&mut curr_block); + let block_size = block.len(); + begin_versions_and_blocks.push((curr_begin, block)); + curr_begin += block_size as Version; + } + curr_block.push(txn); + } + if !curr_block.is_empty() { + begin_versions_and_blocks.push((curr_begin, curr_block)); + } + + begin_versions_and_blocks + } +} diff --git a/aptos-move/aptos-replay-benchmark/src/lib.rs b/aptos-move/aptos-replay-benchmark/src/lib.rs new file mode 100644 index 0000000000000..36c57785aa0c8 --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/lib.rs @@ -0,0 +1,10 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod block; +mod comparison; +pub mod generator; +pub mod overrides; +pub mod runner; +mod state_view; +mod workload; diff --git a/aptos-move/aptos-replay-benchmark/src/main.rs b/aptos-move/aptos-replay-benchmark/src/main.rs new file mode 100644 index 0000000000000..3113315c5550a --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/main.rs @@ -0,0 +1,114 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_logger::{Level, Logger}; +use aptos_move_debugger::aptos_debugger::AptosDebugger; +use aptos_push_metrics::MetricsPusher; +use aptos_replay_benchmark::{ + generator::BenchmarkGenerator, overrides::OverrideConfig, runner::BenchmarkRunner, +}; +use aptos_rest_client::Client; +use aptos_types::{on_chain_config::FeatureFlag, transaction::Version}; +use clap::Parser; +use url::Url; + +#[derive(Parser)] +pub struct Command { + #[clap(long, help = "Logging level, defaults to ERROR")] + log_level: Option, + + #[clap(long, help = "Fullnode's REST API query endpoint")] + rest_endpoint: String, + + #[clap(long, help = "First transaction to include for benchmarking")] + begin_version: Version, + + #[clap(long, help = "Last transaction to include for benchmarking")] + end_version: Version, + + #[clap( + long, + num_args=1.., + value_delimiter = ' ', + help = "Different concurrency levels to benchmark", + )] + concurrency_levels: Vec, + + #[clap( + long, + help = "Number of times to repeat an experiment for each concurrency level" + )] + num_repeats: Option, + + #[clap( + long, + help = "If true, measure time taken to execute each block, and overall time otherwise" + )] + measure_block_time: bool, + + #[clap( + long, + num_args=1.., + value_delimiter = ' ', + help = "List of space-separated feature flags to enable", + )] + enable_features: Vec, + + #[clap( + long, + num_args=1.., + value_delimiter = ' ', + help = "List of space-separated feature flags to disable", + )] + disable_features: Vec, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let command = Command::parse(); + + Logger::new() + .level(command.log_level.unwrap_or(Level::Error)) + .init(); + let _mp = MetricsPusher::start(vec![]); + + // TODO: + // Right now we fetch transactions from debugger, but ideally we need a way to save them + // locally (with corresponding read-sets) so we can use this for CI. + let debugger = AptosDebugger::rest_client(Client::new(Url::parse(&command.rest_endpoint)?))?; + + // TODO: + // Right now, only features can be overridden. In general, this can be allowed for anything: + // 1. Framework code, e.g., to test performance of new natives or compiler, + // 2. Gas schedule, to track the costs of charging gas or tracking limits. + // We probably should support at least these. + let override_config = OverrideConfig::new(command.enable_features, command.disable_features); + + let blocks = BenchmarkGenerator::new( + debugger, + command.begin_version, + command.end_version, + override_config, + ) + .generate_blocks() + .await?; + + for block in &blocks { + block.print_diffs(); + } + + BenchmarkRunner::new( + command.concurrency_levels, + command.num_repeats, + command.measure_block_time, + ) + .measure_execution_time(&blocks); + + Ok(()) +} + +#[test] +fn verify_tool() { + use clap::CommandFactory; + Command::command().debug_assert(); +} diff --git a/aptos-move/aptos-replay-benchmark/src/overrides.rs b/aptos-move/aptos-replay-benchmark/src/overrides.rs new file mode 100644 index 0000000000000..9796a3ee843da --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/overrides.rs @@ -0,0 +1,88 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_types::{ + on_chain_config::{FeatureFlag, Features, OnChainConfig}, + state_store::{state_key::StateKey, state_value::StateValue, StateView}, +}; +use serde::Serialize; +use std::collections::HashMap; + +pub struct OverrideConfig { + enable_features: Vec, + disable_features: Vec, +} + +impl OverrideConfig { + pub fn new(enable_features: Vec, disable_features: Vec) -> Self { + assert!( + enable_features + .iter() + .all(|f| !disable_features.contains(f)), + "Enable and disable feature flags cannot overlap" + ); + + Self { + enable_features, + disable_features, + } + } + + pub(crate) fn get_state_override( + &self, + state_view: &impl StateView, + ) -> HashMap { + let mut state_override = HashMap::new(); + + // Enable/disable features. + let (features_state_key, features_state_value) = + config_override::(state_view, |features| { + for feature in &self.enable_features { + if features.is_enabled(*feature) { + println!("[WARN] Feature {:?} is already enabled", feature) + } + features.enable(*feature); + } + for feature in &self.disable_features { + if !features.is_enabled(*feature) { + println!("[WARN] Feature {:?} is already disabled", feature) + } + features.disable(*feature); + } + }); + state_override.insert(features_state_key, features_state_value); + state_override + } +} + +/// Returns the state key for on-chain config type. +fn config_state_key() -> StateKey { + StateKey::resource(T::address(), &T::struct_tag()) + .expect("Constructing state key for on-chain config must succeed") +} + +/// Fetches the config from the storage, and modifies it based on the passed function. Panics if +/// there is a storage error, config does not exist or fails to (de-)serialize. +fn config_override( + state_view: &impl StateView, + override_func: F, +) -> (StateKey, StateValue) { + let state_key = config_state_key::(); + let state_value = state_view + .get_state_value(&state_key) + .unwrap_or_else(|err| { + panic!( + "Failed to fetch on-chain config for {:?}: {:?}", + state_key, err + ) + }) + .unwrap_or_else(|| panic!("On-chain config for {:?} must always exist", state_key)); + + let mut config = T::deserialize_into_config(state_value.bytes()) + .expect("On-chain config must be deserializable"); + override_func(&mut config); + let config_bytes = bcs::to_bytes(&config).expect("On-chain config must be serializable"); + + let new_state_value = state_value.map_bytes(|_| Ok(config_bytes.into())).unwrap(); + (state_key, new_state_value) +} diff --git a/aptos-move/aptos-replay-benchmark/src/runner.rs b/aptos-move/aptos-replay-benchmark/src/runner.rs new file mode 100644 index 0000000000000..fdb5253d2f3e1 --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/runner.rs @@ -0,0 +1,118 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::block::Block; +use aptos_vm::{aptos_vm::AptosVMBlockExecutor, VMBlockExecutor}; +use std::time::Instant; + +/// Holds configuration for running the benchmarks and measuring the time taken. +pub struct BenchmarkRunner { + concurrency_levels: Vec, + num_repeats: usize, + measure_block_time: bool, +} + +impl BenchmarkRunner { + pub fn new( + concurrency_levels: Vec, + num_repeats: Option, + measure_block_time: bool, + ) -> Self { + assert!( + !concurrency_levels.is_empty(), + "At least one concurrency level must be provided" + ); + + let default_num_repeats = 3; + let num_repeats = num_repeats.unwrap_or_else(|| { + println!( + "[WARN] Using default number of repeats: {}", + default_num_repeats + ); + default_num_repeats + }); + assert!( + num_repeats >= default_num_repeats, + "Number of times to repeat the benchmark should be at least the default value {}", + default_num_repeats + ); + + Self { + concurrency_levels, + num_repeats, + measure_block_time, + } + } + + // TODO: + // This measures execution time from a cold-start. Ideally, we want to warm-up with executing + // 1-2 blocks prior to selected range, but not timing them. + pub fn measure_execution_time(&self, blocks: &[Block]) { + for concurrency_level in &self.concurrency_levels { + if self.measure_block_time { + self.measure_block_execution_time(blocks, *concurrency_level); + } else { + self.measure_overall_execution_time(blocks, *concurrency_level); + } + } + } + + /// Runs a sequence of blocks, measuring execution time for each block. The median is reported. + fn measure_block_execution_time(&self, blocks: &[Block], concurrency_level: usize) { + let mut times = Vec::with_capacity(blocks.len()); + for _ in blocks { + times.push(Vec::with_capacity(self.num_repeats)); + } + + for i in 0..self.num_repeats { + let executor = AptosVMBlockExecutor::new(); + for (idx, block) in blocks.iter().enumerate() { + let start_time = Instant::now(); + block.run(&executor, concurrency_level); + let time = start_time.elapsed().as_millis(); + println!( + "[{}/{}] Block {} execution time is {}ms", + i + 1, + self.num_repeats, + idx + 1, + time, + ); + times[idx].push(time); + } + } + + for (idx, mut time) in times.into_iter().enumerate() { + time.sort(); + println!( + "Block {} median execution time is {}ms", + idx + 1, + time[self.num_repeats / 2], + ); + } + } + + /// Runs the sequence of blocks, measuring end-to-end execution time. + fn measure_overall_execution_time(&self, blocks: &[Block], concurrency_level: usize) { + let mut times = Vec::with_capacity(self.num_repeats); + for i in 0..self.num_repeats { + let start_time = Instant::now(); + let executor = AptosVMBlockExecutor::new(); + for block in blocks { + block.run(&executor, concurrency_level); + } + let time = start_time.elapsed().as_millis(); + println!( + "[{}/{}] Overall execution time is {}ms", + i + 1, + self.num_repeats, + time, + ); + times.push(time); + } + times.sort(); + println!( + "Overall median execution time is {}ms\n", + times[self.num_repeats / 2], + ); + } +} diff --git a/aptos-move/aptos-replay-benchmark/src/state_view.rs b/aptos-move/aptos-replay-benchmark/src/state_view.rs new file mode 100644 index 0000000000000..47fbfbeb6edfd --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/state_view.rs @@ -0,0 +1,82 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_types::state_store::{ + errors::StateviewError, state_key::StateKey, state_storage_usage::StateStorageUsage, + state_value::StateValue, StateView, TStateView, +}; +use parking_lot::Mutex; +use std::collections::HashMap; + +/// Represents the read-set for obtained when executing transactions. +pub(crate) struct ReadSet { + data: HashMap, +} + +impl TStateView for ReadSet { + type Key = StateKey; + + fn get_state_value(&self, state_key: &Self::Key) -> Result, StateviewError> { + Ok(self.data.get(state_key).cloned()) + } + + fn get_usage(&self) -> Result { + unreachable!("Should not be called when benchmarking") + } +} + +/// [StateView] implementation that records all execution reads. Captured reads can be converted +/// into a [ReadSet]. +pub(crate) struct ReadSetCapturingStateView<'s, S> { + captured_reads: Mutex>, + state_view: &'s S, +} + +impl<'s, S: StateView> ReadSetCapturingStateView<'s, S> { + pub(crate) fn new(state_view: &'s S, initial_read_set: HashMap) -> Self { + Self { + captured_reads: Mutex::new(initial_read_set), + state_view, + } + } + + pub(crate) fn into_read_set(self) -> ReadSet { + ReadSet { + data: self.captured_reads.into_inner(), + } + } +} + +impl<'s, S: StateView> TStateView for ReadSetCapturingStateView<'s, S> { + type Key = StateKey; + + fn get_state_value(&self, state_key: &Self::Key) -> Result, StateviewError> { + // Check the read-set first. + if let Some(state_value) = self.captured_reads.lock().get(state_key) { + return Ok(Some(state_value.clone())); + } + + // We do not allow failures because then benchmarking will not be correct (we miss a read). + // Plus, these failures should not happen when replaying past transactions. + let maybe_state_value = self + .state_view + .get_state_value(state_key) + .unwrap_or_else(|err| { + panic!("Failed to fetch state value for {:?}: {:?}", state_key, err) + }); + + // Populate the read-set if first access. + if let Some(state_value) = &maybe_state_value { + let mut captured_reads = self.captured_reads.lock(); + if !captured_reads.contains_key(state_key) { + captured_reads.insert(state_key.clone(), state_value.clone()); + } + } + + Ok(maybe_state_value) + } + + fn get_usage(&self) -> Result { + unreachable!("Should not be called when benchmarking") + } +} diff --git a/aptos-move/aptos-replay-benchmark/src/workload.rs b/aptos-move/aptos-replay-benchmark/src/workload.rs new file mode 100644 index 0000000000000..75d353e742b35 --- /dev/null +++ b/aptos-move/aptos-replay-benchmark/src/workload.rs @@ -0,0 +1,65 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use aptos_block_executor::txn_provider::default::DefaultTxnProvider; +use aptos_types::{ + block_executor::transaction_slice_metadata::TransactionSliceMetadata, + transaction::{ + signature_verified_transaction::{ + into_signature_verified_block, SignatureVerifiedTransaction, + }, + Transaction, Version, + }, +}; + +/// A workload to benchmark. Contains signature verified transactions, and metadata specifying the +/// start and end versions of these transactions. +pub(crate) struct Workload { + txn_provider: DefaultTxnProvider, + transaction_slice_metadata: TransactionSliceMetadata, +} + +impl Workload { + /// Returns a new workload to execute transactions at specified version. + pub(crate) fn new(begin: Version, txns: Vec) -> Self { + assert!(!txns.is_empty()); + + let end = begin + txns.len() as Version; + let transaction_slice_metadata = TransactionSliceMetadata::chunk(begin, end); + + let signature_verified_txns = into_signature_verified_block(txns); + let txn_provider = DefaultTxnProvider::new(signature_verified_txns); + + Workload { + txn_provider, + transaction_slice_metadata, + } + } + + /// Returns the signature verified transactions in the workload. + pub(crate) fn txn_provider(&self) -> &DefaultTxnProvider { + &self.txn_provider + } + + /// Returns transaction metadata corresponding to [begin, end) versions of the workload. + pub(crate) fn transaction_slice_metadata(&self) -> TransactionSliceMetadata { + self.transaction_slice_metadata + } + + /// Returns the first transaction version in the workload. + pub(crate) fn first_version(&self) -> Version { + match &self.transaction_slice_metadata { + TransactionSliceMetadata::Chunk { begin, .. } => *begin, + _ => unreachable!("Transaction slice metadata is always a chunk"), + } + } + + /// Returns the last transaction version in the workload. + #[allow(dead_code)] + pub(crate) fn last_version(&self) -> Version { + match &self.transaction_slice_metadata { + TransactionSliceMetadata::Chunk { end, .. } => *end - 1, + _ => unreachable!("Transaction slice metadata is always a chunk"), + } + } +} diff --git a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs index f250de5c60758..81db4cbded0c2 100644 --- a/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs +++ b/aptos-move/aptos-transaction-benchmarks/src/transaction_bench_state.rs @@ -3,11 +3,7 @@ use crate::transactions; use aptos_bitvec::BitVec; -use aptos_block_executor::{ - code_cache_global_manager::AptosModuleCacheManager, - txn_commit_hook::NoOpTransactionCommitHook, - txn_provider::{default::DefaultTxnProvider, TxnProvider}, -}; +use aptos_block_executor::txn_provider::{default::DefaultTxnProvider, TxnProvider}; use aptos_block_partitioner::{ v2::config::PartitionerV2Config, BlockPartitioner, PartitionerConfig, }; @@ -32,15 +28,15 @@ use aptos_types::{ }, ExecutionStatus, Transaction, TransactionOutput, TransactionStatus, }, - vm_status::VMStatus, }; use aptos_vm::{ - block_executor::{AptosTransactionOutput, AptosVMBlockExecutorWrapper}, + aptos_vm::AptosVMBlockExecutor, data_cache::AsMoveResolver, sharded_block_executor::{ local_executor_shard::{LocalExecutorClient, LocalExecutorService}, ShardedBlockExecutor, }, + VMBlockExecutor, }; use proptest::{collection::vec, prelude::Strategy, strategy::ValueTree, test_runner::TestRunner}; use std::{net::SocketAddr, sync::Arc, time::Instant}; @@ -217,20 +213,18 @@ where ) -> (Vec, usize) { let block_size = txn_provider.num_txns(); let timer = Instant::now(); - let output = AptosVMBlockExecutorWrapper::execute_block::< - _, - NoOpTransactionCommitHook, - DefaultTxnProvider, - >( - txn_provider, - self.state_view.as_ref(), - &AptosModuleCacheManager::new(), - BlockExecutorConfig::new_maybe_block_limit(1, maybe_block_gas_limit), - TransactionSliceMetadata::unknown(), - None, - ) - .expect("VM should not fail to start") - .into_transaction_outputs_forced(); + + let executor = AptosVMBlockExecutor::new(); + let output = executor + .execute_block_with_config( + txn_provider, + self.state_view.as_ref(), + BlockExecutorConfig::new_maybe_block_limit(1, maybe_block_gas_limit), + TransactionSliceMetadata::unknown(), + ) + .expect("Sequential block execution should succeed") + .into_transaction_outputs_forced(); + let exec_time = timer.elapsed().as_millis(); (output, block_size * 1000 / exec_time as usize) @@ -263,28 +257,25 @@ where fn execute_benchmark_parallel( &self, txn_provider: &DefaultTxnProvider, - concurrency_level_per_shard: usize, + concurrency_level: usize, maybe_block_gas_limit: Option, ) -> (Vec, usize) { let block_size = txn_provider.num_txns(); let timer = Instant::now(); - let output = AptosVMBlockExecutorWrapper::execute_block::< - _, - NoOpTransactionCommitHook, - DefaultTxnProvider, - >( - txn_provider, - self.state_view.as_ref(), - &AptosModuleCacheManager::new(), - BlockExecutorConfig::new_maybe_block_limit( - concurrency_level_per_shard, - maybe_block_gas_limit, - ), - TransactionSliceMetadata::unknown(), - None, - ) - .expect("VM should not fail to start") - .into_transaction_outputs_forced(); + + let executor = AptosVMBlockExecutor::new(); + let output = executor + .execute_block_with_config( + txn_provider, + self.state_view.as_ref(), + BlockExecutorConfig::new_maybe_block_limit( + concurrency_level, + maybe_block_gas_limit, + ), + TransactionSliceMetadata::unknown(), + ) + .expect("Parallel block execution should succeed") + .into_transaction_outputs_forced(); let exec_time = timer.elapsed().as_millis(); (output, block_size * 1000 / exec_time as usize) diff --git a/aptos-move/aptos-vm/src/aptos_vm.rs b/aptos-move/aptos-vm/src/aptos_vm.rs index f2afca7412a88..14fe50cccba23 100644 --- a/aptos-move/aptos-vm/src/aptos_vm.rs +++ b/aptos-move/aptos-vm/src/aptos_vm.rs @@ -2792,35 +2792,29 @@ pub struct AptosVMBlockExecutor { module_cache_manager: AptosModuleCacheManager, } -impl VMBlockExecutor for AptosVMBlockExecutor { - fn new() -> Self { - Self { - module_cache_manager: AptosModuleCacheManager::new(), - } - } - - fn execute_block( +impl AptosVMBlockExecutor { + pub fn execute_block_with_config( &self, txn_provider: &DefaultTxnProvider, state_view: &(impl StateView + Sync), - onchain_config: BlockExecutorConfigFromOnchain, + config: BlockExecutorConfig, transaction_slice_metadata: TransactionSliceMetadata, ) -> Result, VMStatus> { - fail_point!("move_adapter::execute_block", |_| { + fail_point!("aptos_vm_block_executor::execute_block_with_config", |_| { Err(VMStatus::error( StatusCode::UNKNOWN_INVARIANT_VIOLATION_ERROR, None, )) }); + let log_context = AdapterLogSchema::new(state_view.id(), 0); + let num_txns = txn_provider.num_txns(); info!( log_context, - "Executing block, transaction count: {}", - txn_provider.num_txns() + "Executing block, transaction count: {}", num_txns ); - let count = txn_provider.num_txns(); - let ret = AptosVMBlockExecutorWrapper::execute_block::< + let result = AptosVMBlockExecutorWrapper::execute_block::< _, NoOpTransactionCommitHook, DefaultTxnProvider, @@ -2828,23 +2822,42 @@ impl VMBlockExecutor for AptosVMBlockExecutor { txn_provider, state_view, &self.module_cache_manager, - BlockExecutorConfig { - local: BlockExecutorLocalConfig { - concurrency_level: AptosVM::get_concurrency_level(), - allow_fallback: true, - discard_failed_blocks: AptosVM::get_discard_failed_blocks(), - module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), - }, - onchain: onchain_config, - }, + config, transaction_slice_metadata, None, ); - if ret.is_ok() { + if result.is_ok() { // Record the histogram count for transactions per block. - BLOCK_TRANSACTION_COUNT.observe(count as f64); + BLOCK_TRANSACTION_COUNT.observe(num_txns as f64); } - ret + result + } +} + +impl VMBlockExecutor for AptosVMBlockExecutor { + fn new() -> Self { + Self { + module_cache_manager: AptosModuleCacheManager::new(), + } + } + + fn execute_block( + &self, + txn_provider: &DefaultTxnProvider, + state_view: &(impl StateView + Sync), + onchain_config: BlockExecutorConfigFromOnchain, + transaction_slice_metadata: TransactionSliceMetadata, + ) -> Result, VMStatus> { + let config = BlockExecutorConfig { + local: BlockExecutorLocalConfig { + concurrency_level: AptosVM::get_concurrency_level(), + allow_fallback: true, + discard_failed_blocks: AptosVM::get_discard_failed_blocks(), + module_cache_config: BlockExecutorModuleCacheLocalConfig::default(), + }, + onchain: onchain_config, + }; + self.execute_block_with_config(txn_provider, state_view, config, transaction_slice_metadata) } fn execute_block_sharded>( diff --git a/aptos-move/aptos-vm/src/block_executor/mod.rs b/aptos-move/aptos-vm/src/block_executor/mod.rs index d5c55eeb957c4..168b1bb4a3719 100644 --- a/aptos-move/aptos-vm/src/block_executor/mod.rs +++ b/aptos-move/aptos-vm/src/block_executor/mod.rs @@ -481,7 +481,7 @@ impl< } /// Uses shared thread pool to execute blocks. - pub fn execute_block< + pub(crate) fn execute_block< S: StateView + Sync, L: TransactionCommitHook, TP: TxnProvider + Sync, diff --git a/crates/aptos-logger/src/metadata.rs b/crates/aptos-logger/src/metadata.rs index aba5b7503ce61..2aff4dae55ff1 100644 --- a/crates/aptos-logger/src/metadata.rs +++ b/crates/aptos-logger/src/metadata.rs @@ -3,7 +3,8 @@ // SPDX-License-Identifier: Apache-2.0 use serde::{Deserialize, Serialize}; -use std::{fmt, str::FromStr}; +use std::fmt; +use strum_macros::{EnumString, FromRepr}; /// Associated metadata with every log to identify what kind of log and where it came from #[derive(Clone, Copy, Debug, Serialize, Deserialize)] @@ -60,68 +61,60 @@ impl Metadata { } } -static LOG_LEVEL_NAMES: &[&str] = &["ERROR", "WARN", "INFO", "DEBUG", "TRACE"]; - /// Logging levels, used for stratifying logs, and disabling less important ones for performance reasons #[repr(usize)] -#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Hash, Serialize, Deserialize)] +#[derive( + Copy, + Clone, + PartialEq, + Eq, + PartialOrd, + Ord, + Debug, + Hash, + Serialize, + Deserialize, + FromRepr, + EnumString, +)] #[serde(rename_all = "UPPERCASE")] pub enum Level { /// The "error" level. /// /// Designates very serious errors. + #[strum(ascii_case_insensitive)] Error = 0, /// The "warn" level. /// /// Designates hazardous situations. - Warn, + #[strum(ascii_case_insensitive)] + Warn = 1, /// The "info" level. /// /// Designates useful information. - Info, + #[strum(ascii_case_insensitive)] + Info = 2, /// The "debug" level. /// /// Designates lower priority information. - Debug, + #[strum(ascii_case_insensitive)] + Debug = 3, /// The "trace" level. /// /// Designates very low priority, often extremely verbose, information. - Trace, -} - -impl Level { - fn from_usize(idx: usize) -> Option { - let lvl = match idx { - 0 => Level::Error, - 1 => Level::Warn, - 2 => Level::Info, - 3 => Level::Debug, - 4 => Level::Trace, - _ => return None, - }; - - Some(lvl) - } -} - -/// An error given when no `Level` matches the inputted string -#[derive(Debug)] -pub struct LevelParseError; - -impl FromStr for Level { - type Err = LevelParseError; - - fn from_str(level: &str) -> Result { - LOG_LEVEL_NAMES - .iter() - .position(|name| name.eq_ignore_ascii_case(level)) - .map(|idx| Level::from_usize(idx).unwrap()) - .ok_or(LevelParseError) - } + #[strum(ascii_case_insensitive)] + Trace = 4, } impl fmt::Display for Level { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - fmt.pad(LOG_LEVEL_NAMES[*self as usize]) + let level_str = match self { + Level::Error => "ERROR", + Level::Warn => "WARN", + Level::Info => "INFO", + Level::Debug => "DEBUG", + Level::Trace => "TRACE", + }; + fmt.pad(level_str) } } diff --git a/types/src/write_set.rs b/types/src/write_set.rs index e939b70d1346a..0da5b6b89dcd5 100644 --- a/types/src/write_set.rs +++ b/types/src/write_set.rs @@ -568,6 +568,10 @@ impl WriteSetMut { &mut self.write_set } + pub fn into_inner(self) -> BTreeMap { + self.write_set + } + pub fn squash(mut self, other: Self) -> Result { use btree_map::Entry::*;