From 1102b3740837b7857b7263f876c6e5eac9d5b1c3 Mon Sep 17 00:00:00 2001 From: posvyatokum Date: Mon, 20 Nov 2023 15:59:58 +0000 Subject: [PATCH] feat(epoch-sync): adding EpochSyncInfo validation tool (#10146) Adding tool for testing epoch sync implementation. Right now only one subcommand is available `validate-epoch-sync-info`. It constructs `EpochSyncInfo` for every finalised epoch in DB and checks that `EpochSyncInfo` passes validation. Currently we only have one validation (kinda) implemented -- checking of `epoch_sync_data_hash`. As epoch sync is under feature and that feature includes DB upgrade, the tool first creates a snapshot of the DB, and then modifies the config to work with the snapshot. --- Cargo.lock | 16 ++ Cargo.toml | 1 + chain/chain/src/chain.rs | 8 +- chain/chain/src/lib.rs | 2 +- chain/chain/src/test_utils/kv_runtime.rs | 1 + chain/epoch-manager/src/adapter.rs | 11 +- chain/epoch-manager/src/lib.rs | 28 ++- neard/Cargo.toml | 3 +- neard/src/cli.rs | 10 + tools/epoch-sync/Cargo.toml | 24 ++ tools/epoch-sync/src/cli.rs | 306 +++++++++++++++++++++++ tools/epoch-sync/src/lib.rs | 2 + 12 files changed, 404 insertions(+), 8 deletions(-) create mode 100644 tools/epoch-sync/Cargo.toml create mode 100644 tools/epoch-sync/src/cli.rs create mode 100644 tools/epoch-sync/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index e3467d1e76b..9316b653fb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3759,6 +3759,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "near-epoch-sync-tool" +version = "0.0.0" +dependencies = [ + "anyhow", + "clap", + "near-chain", + "near-chain-configs", + "near-epoch-manager", + "near-primitives", + "near-store", + "nearcore", + "tracing", +] + [[package]] name = "near-flat-storage" version = "0.0.0" @@ -4714,6 +4729,7 @@ dependencies = [ "near-crypto", "near-database-tool", "near-dyn-configs", + "near-epoch-sync-tool", "near-flat-storage", "near-fork-network", "near-jsonrpc-primitives", diff --git a/Cargo.toml b/Cargo.toml index e5f2b39bbf5..c4515308744 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -199,6 +199,7 @@ nearcore = { path = "nearcore" } near-crypto = { path = "core/crypto" } near-dyn-configs = { path = "core/dyn-configs" } near-epoch-manager = { path = "chain/epoch-manager" } +near-epoch-sync-tool = { path = "tools/epoch-sync"} near-flat-storage = { path = "tools/flat-storage" } near-fork-network = { path = "tools/fork-network" } near-fmt = { path = "utils/fmt" } diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 3aded062944..67d45f16dd4 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -5647,7 +5647,7 @@ impl<'a> ChainUpdate<'a> { .set_ser( DBCol::EpochSyncInfo, last_block_info.epoch_id().as_ref(), - &self.create_epoch_sync_info(last_block_info, next_epoch_first_hash)?, + &self.create_epoch_sync_info(last_block_info, next_epoch_first_hash, None)?, ) .map_err(EpochError::from)?; self.chain_store_update.merge(store_update); @@ -5745,12 +5745,14 @@ impl<'a> ChainUpdate<'a> { /// Data that is necessary to prove Epoch in new Epoch Sync. #[cfg(feature = "new_epoch_sync")] - fn create_epoch_sync_info( + pub fn create_epoch_sync_info( &self, last_block_info: &BlockInfo, next_epoch_first_hash: &CryptoHash, + hash_to_prev_hash: Option<&HashMap>, ) -> Result { - let mut all_block_hashes = self.epoch_manager.get_all_epoch_hashes(last_block_info)?; + let mut all_block_hashes = + self.epoch_manager.get_all_epoch_hashes(last_block_info, hash_to_prev_hash)?; all_block_hashes.reverse(); let (headers, headers_to_save) = diff --git a/chain/chain/src/lib.rs b/chain/chain/src/lib.rs index bcf3be21320..51d0ad104d7 100644 --- a/chain/chain/src/lib.rs +++ b/chain/chain/src/lib.rs @@ -1,5 +1,5 @@ pub use block_processing_utils::{BlockProcessingArtifact, DoneApplyChunkCallback}; -pub use chain::{check_known, collect_receipts, Chain, MAX_ORPHAN_SIZE}; +pub use chain::{check_known, collect_receipts, Chain, ChainUpdate, MAX_ORPHAN_SIZE}; pub use doomslug::{Doomslug, DoomslugBlockProductionReadiness, DoomslugThresholdMode}; pub use lightclient::{create_light_client_block_view, get_epoch_block_producers_view}; pub use near_chain_primitives::{self, Error}; diff --git a/chain/chain/src/test_utils/kv_runtime.rs b/chain/chain/src/test_utils/kv_runtime.rs index 06f9f42a0e9..de3a5b7a47c 100644 --- a/chain/chain/src/test_utils/kv_runtime.rs +++ b/chain/chain/src/test_utils/kv_runtime.rs @@ -952,6 +952,7 @@ impl EpochManagerAdapter for MockEpochManager { fn get_all_epoch_hashes( &self, _last_block_info: &BlockInfo, + _hash_to_prev_hash: Option<&HashMap>, ) -> Result, EpochError> { Ok(vec![]) } diff --git a/chain/epoch-manager/src/adapter.rs b/chain/epoch-manager/src/adapter.rs index fccb993fbc5..51e0849ab08 100644 --- a/chain/epoch-manager/src/adapter.rs +++ b/chain/epoch-manager/src/adapter.rs @@ -20,6 +20,8 @@ use near_primitives::version::ProtocolVersion; use near_primitives::views::EpochValidatorInfo; use near_store::{ShardUId, StoreUpdate}; use std::cmp::Ordering; +#[cfg(feature = "new_epoch_sync")] +use std::collections::HashMap; use std::sync::Arc; /// A trait that abstracts the interface of the EpochManager. @@ -390,6 +392,7 @@ pub trait EpochManagerAdapter: Send + Sync { fn get_all_epoch_hashes( &self, last_block_info: &BlockInfo, + hash_to_prev_hash: Option<&HashMap>, ) -> Result, EpochError>; } @@ -962,8 +965,14 @@ impl EpochManagerAdapter for EpochManagerHandle { fn get_all_epoch_hashes( &self, last_block_info: &BlockInfo, + hash_to_prev_hash: Option<&HashMap>, ) -> Result, EpochError> { let epoch_manager = self.read(); - epoch_manager.get_all_epoch_hashes(last_block_info) + match hash_to_prev_hash { + None => epoch_manager.get_all_epoch_hashes_from_db(last_block_info), + Some(hash_to_prev_hash) => { + epoch_manager.get_all_epoch_hashes_from_cache(last_block_info, hash_to_prev_hash) + } + } } } diff --git a/chain/epoch-manager/src/lib.rs b/chain/epoch-manager/src/lib.rs index 36c1b251dcd..b2f9bb193d3 100644 --- a/chain/epoch-manager/src/lib.rs +++ b/chain/epoch-manager/src/lib.rs @@ -1859,12 +1859,12 @@ impl EpochManager { } #[cfg(feature = "new_epoch_sync")] - pub fn get_all_epoch_hashes( + pub fn get_all_epoch_hashes_from_db( &self, last_block_info: &BlockInfo, ) -> Result, EpochError> { let _span = - tracing::debug_span!(target: "epoch_manager", "get_all_epoch_hashes", ?last_block_info) + tracing::debug_span!(target: "epoch_manager", "get_all_epoch_hashes_from_db", ?last_block_info) .entered(); let mut result = vec![]; @@ -1891,4 +1891,28 @@ impl EpochManager { Ok(result) } + + #[cfg(feature = "new_epoch_sync")] + fn get_all_epoch_hashes_from_cache( + &self, + last_block_info: &BlockInfo, + hash_to_prev_hash: &HashMap, + ) -> Result, EpochError> { + let _span = + tracing::debug_span!(target: "epoch_manager", "get_all_epoch_hashes_from_cache", ?last_block_info) + .entered(); + + let mut result = vec![]; + let mut current_hash = *last_block_info.hash(); + while current_hash != *last_block_info.epoch_first_block() { + result.push(current_hash); + current_hash = *hash_to_prev_hash + .get(¤t_hash) + .ok_or(EpochError::MissingBlock(current_hash))?; + } + // First block of an epoch is not covered by the while loop. + result.push(current_hash); + + Ok(result) + } } diff --git a/neard/Cargo.toml b/neard/Cargo.toml index 9c3599433a2..9a719782628 100644 --- a/neard/Cargo.toml +++ b/neard/Cargo.toml @@ -43,6 +43,7 @@ near-config-utils.workspace = true near-crypto.workspace = true near-database-tool.workspace = true near-dyn-configs.workspace = true +near-epoch-sync-tool = { workspace = true, optional = true } near-flat-storage.workspace = true near-fork-network.workspace = true near-jsonrpc-primitives.workspace = true @@ -74,7 +75,7 @@ rosetta_rpc = ["nearcore/rosetta_rpc"] json_rpc = ["nearcore/json_rpc"] protocol_feature_fix_staking_threshold = ["nearcore/protocol_feature_fix_staking_threshold"] serialize_all_state_changes = ["nearcore/serialize_all_state_changes"] -new_epoch_sync = ["nearcore/new_epoch_sync"] +new_epoch_sync = ["nearcore/new_epoch_sync", "dep:near-epoch-sync-tool"] nightly = [ "nightly_protocol", diff --git a/neard/src/cli.rs b/neard/src/cli.rs index 750b681beb4..242729f6248 100644 --- a/neard/src/cli.rs +++ b/neard/src/cli.rs @@ -6,6 +6,8 @@ use near_client::ConfigUpdater; use near_cold_store_tool::ColdStoreCommand; use near_database_tool::commands::DatabaseCommand; use near_dyn_configs::{UpdateableConfigLoader, UpdateableConfigLoaderError, UpdateableConfigs}; +#[cfg(feature = "new_epoch_sync")] +use near_epoch_sync_tool::EpochSyncCommand; use near_flat_storage::commands::FlatStorageCommand; use near_fork_network::cli::ForkNetworkCommand; use near_jsonrpc_primitives::types::light_client::RpcLightClientExecutionProofResponse; @@ -144,6 +146,10 @@ impl NeardCmd { NeardSubCommand::StatePartsDumpCheck(cmd) => { cmd.run()?; } + #[cfg(feature = "new_epoch_sync")] + NeardSubCommand::EpochSync(cmd) => { + cmd.run(&home_dir)?; + } }; Ok(()) } @@ -272,6 +278,10 @@ pub(super) enum NeardSubCommand { /// Check completeness of dumped state parts of an epoch StatePartsDumpCheck(StatePartsDumpCheckCommand), + + #[cfg(feature = "new_epoch_sync")] + /// Testing tool for epoch sync + EpochSync(EpochSyncCommand), } #[derive(clap::Parser)] diff --git a/tools/epoch-sync/Cargo.toml b/tools/epoch-sync/Cargo.toml new file mode 100644 index 00000000000..ec2a59bb18a --- /dev/null +++ b/tools/epoch-sync/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "near-epoch-sync-tool" +version.workspace = true +authors.workspace = true +edition.workspace = true +rust-version.workspace = true +repository.workspace = true +license.workspace = true +publish = false + +[lints] +workspace = true + +[dependencies] +anyhow.workspace = true +clap.workspace = true +tracing.workspace = true + +nearcore = { workspace = true, features = ["new_epoch_sync"] } +near-chain.workspace = true +near-chain-configs.workspace = true +near-epoch-manager.workspace = true +near-primitives.workspace = true +near-store.workspace = true diff --git a/tools/epoch-sync/src/cli.rs b/tools/epoch-sync/src/cli.rs new file mode 100644 index 00000000000..1bc0f761abe --- /dev/null +++ b/tools/epoch-sync/src/cli.rs @@ -0,0 +1,306 @@ +use anyhow::Context; +use clap; +use near_chain::{ChainStore, ChainStoreAccess, ChainUpdate, DoomslugThresholdMode}; +use near_epoch_manager::shard_tracker::{ShardTracker, TrackedConfig}; +use near_epoch_manager::EpochManager; +use near_primitives::block::BlockHeader; +use near_primitives::borsh::BorshDeserialize; +use near_primitives::epoch_manager::block_info::BlockInfo; +use near_primitives::epoch_manager::AGGREGATOR_KEY; +use near_primitives::hash::CryptoHash; +use near_store::{checkpoint_hot_storage_and_cleanup_columns, DBCol, NodeStorage}; +use nearcore::NightshadeRuntime; +use std::collections::{HashMap, HashSet}; +use std::path::{Path, PathBuf}; + +#[derive(clap::Parser)] +pub struct EpochSyncCommand { + #[clap(subcommand)] + subcmd: SubCommand, +} + +#[derive(clap::Parser)] +#[clap(subcommand_required = true, arg_required_else_help = true)] +enum SubCommand { + /// For every finished epoch construct `EpochSyncInfo` + /// and validate it the same way we would if we received it from a peer. + ValidateEpochSyncInfo(ValidateEpochSyncInfoCmd), +} + +impl EpochSyncCommand { + pub fn run(self, home_dir: &Path) -> anyhow::Result<()> { + let mut near_config = Self::create_snapshot(home_dir)?; + let storage = nearcore::open_storage(&home_dir, &mut near_config)?; + + match self.subcmd { + SubCommand::ValidateEpochSyncInfo(cmd) => cmd.run(&home_dir, &storage, &near_config), + } + } + + fn create_snapshot(home_dir: &Path) -> anyhow::Result { + let mut near_config = nearcore::config::load_config( + &home_dir, + near_chain_configs::GenesisValidationMode::UnsafeFast, + ) + .unwrap_or_else(|e| panic!("Error loading config: {e:#}")); + + let store_path_addition = near_config + .config + .store + .path + .clone() + .unwrap_or(PathBuf::from("data")) + .join("epoch-sync-snapshot"); + let snapshot_path = home_dir.join(store_path_addition.clone()); + + let storage = nearcore::open_storage(&home_dir, &mut near_config)?; + + if snapshot_path.exists() && snapshot_path.is_dir() { + tracing::info!(?snapshot_path, "Found a DB snapshot"); + } else { + tracing::info!(destination = ?snapshot_path, "Creating snapshot of original DB"); + // checkpointing only hot storage, because cold storage will not be changed + checkpoint_hot_storage_and_cleanup_columns( + &storage.get_hot_store(), + &snapshot_path, + None, + )?; + } + + near_config.config.store.path = Some(store_path_addition.join("data")); + + Ok(near_config) + } +} + +#[derive(clap::Parser)] +struct ValidateEpochSyncInfoCmd { + /// If `archive` flag is specified, `BlockInfo` column is assumed to be full and is used for optimisation purposes. + /// Using `BlockInfo` (`--archive` flag) takes around 10 minutes. + /// Using `BlockHeader` takes around 1.5 hours. + #[clap(short, long)] + archive: bool, +} + +impl ValidateEpochSyncInfoCmd { + pub fn run( + &self, + home_dir: &Path, + storage: &NodeStorage, + config: &nearcore::config::NearConfig, + ) -> anyhow::Result<()> { + let store = storage.get_hot_store(); + + let hash_to_prev_hash = if self.archive { + get_hash_to_prev_hash_from_block_info(storage)? + } else { + get_hash_to_prev_hash_from_block_header(storage)? + }; + let epoch_ids = get_all_epoch_ids(storage)?; + + let mut chain_store = + ChainStore::new(store.clone(), config.genesis.config.genesis_height, false); + let header_head_hash = chain_store.header_head()?.last_block_hash; + let hash_to_next_hash = get_hash_to_next_hash(&hash_to_prev_hash, &header_head_hash)?; + + let epoch_manager = + EpochManager::new_arc_handle(storage.get_hot_store(), &config.genesis.config); + let shard_tracker = ShardTracker::new( + TrackedConfig::from_config(&config.client_config), + epoch_manager.clone(), + ); + let runtime = NightshadeRuntime::from_config( + home_dir, + storage.get_hot_store(), + &config, + epoch_manager.clone(), + ); + let chain_update = ChainUpdate::new( + &mut chain_store, + epoch_manager, + shard_tracker, + runtime, + DoomslugThresholdMode::TwoThirds, + config.genesis.config.transaction_validity_period, + ); + + let genesis_hash = store + .get_ser::( + DBCol::BlockHeight, + &config.genesis.config.genesis_height.to_le_bytes(), + )? + .expect("Expect genesis height to be present in BlockHeight column"); + + let mut cur_hash = header_head_hash; + + // Edge case if we exactly at the epoch boundary. + // In this case we cannot create `EpochSyncInfo` for this epoch yet, + // as there is no block header with `epoch_sync_data_hash` for that epoch. + if epoch_ids.contains(&cur_hash) { + cur_hash = hash_to_prev_hash[&cur_hash]; + } + + let mut num_errors = 0; + + while cur_hash != genesis_hash { + tracing::debug!(?cur_hash, "Big loop hash"); + + // epoch ids are the last hashes of some epochs + if epoch_ids.contains(&cur_hash) { + let last_header = store + .get_ser::(DBCol::BlockHeader, cur_hash.as_ref())? + .context("BlockHeader for cur_hash not found")?; + let last_finalized_height = + if *last_header.last_final_block() == CryptoHash::default() { + 0 + } else { + let last_finalized_header = store + .get_ser::( + DBCol::BlockHeader, + last_header.last_final_block().as_ref(), + )? + .context("BlockHeader for cur_hash.last_final_block not found")?; + last_finalized_header.height() + }; + + loop { + let prev_hash = hash_to_prev_hash[&cur_hash]; + if epoch_ids.contains(&prev_hash) { + // prev_hash is the end of previous epoch + // cur_hash is the start of current epoch + break; + } else { + // prev_hash is still in the current epoch + // we descent to it + cur_hash = prev_hash; + } + } + + let first_block_hash = cur_hash; + + let mut last_block_info = BlockInfo::new( + *last_header.hash(), + last_header.height(), + last_finalized_height, + *last_header.last_final_block(), + *last_header.prev_hash(), + last_header.prev_validator_proposals().collect(), + last_header.chunk_mask().to_vec(), + vec![], + last_header.total_supply(), + last_header.latest_protocol_version(), + last_header.raw_timestamp(), + ); + + *last_block_info.epoch_id_mut() = last_header.epoch_id().clone(); + *last_block_info.epoch_first_block_mut() = first_block_hash; + + let next_epoch_first_hash = hash_to_next_hash[last_header.hash()]; + tracing::debug!("Creating EpochSyncInfo from block {:?}", last_header); + + let epoch_sync_info = chain_update.create_epoch_sync_info( + &last_block_info, + &next_epoch_first_hash, + Some(&hash_to_prev_hash), + )?; + + let calculated_epoch_sync_data_hash_result = + epoch_sync_info.calculate_epoch_sync_data_hash(); + let canonical_epoch_sync_data_hash_result = + epoch_sync_info.get_epoch_sync_data_hash(); + + if let (Ok(calculated), Ok(Some(canonical))) = ( + &calculated_epoch_sync_data_hash_result, + &canonical_epoch_sync_data_hash_result, + ) { + if calculated == canonical { + tracing::info!( + "EpochSyncInfo for height {:?} OK", + epoch_sync_info.epoch_info.epoch_height() + ); + continue; + } + } + tracing::error!( + "EpochSyncInfo for height {:?} ERROR {:?} {:?}", + epoch_sync_info.epoch_info.epoch_height(), + calculated_epoch_sync_data_hash_result, + canonical_epoch_sync_data_hash_result + ); + num_errors += 1; + } else { + cur_hash = hash_to_prev_hash[&cur_hash]; + } + } + assert_eq!(num_errors, 0); + Ok(()) + } +} + +/// Creates mapping from `cur_hash` to `prev_hash` by iterating through `BlockInfo` column. +/// Mapping from `cur_hash` to `prev_hash` is unique, as there are no two blocks with the same hash. +/// This means there will not be key collision. Value collision may happen, but it is irrelevant for `HashMap`. +fn get_hash_to_prev_hash_from_block_info( + storage: &NodeStorage, +) -> anyhow::Result> { + let mut hash_to_prev_hash = HashMap::new(); + let store = storage.get_split_store().unwrap_or(storage.get_hot_store()); + for result in store.iter(DBCol::BlockInfo) { + let (_, value) = result?; + let block_info = + BlockInfo::try_from_slice(value.as_ref()).expect("Failed to deser BlockInfo"); + if block_info.hash() != block_info.prev_hash() { + hash_to_prev_hash.insert(*block_info.hash(), *block_info.prev_hash()); + } + } + Ok(hash_to_prev_hash) +} + +/// Creates mapping from `cur_hash` to `prev_hash` by iterating through `BlockHeader` column. +/// Mapping from `cur_hash` to `prev_hash` is unique, as there are no two blocks with the same hash. +/// This means there will not be key collision. Value collision may happen, but it is irrelevant for `HashMap`. +fn get_hash_to_prev_hash_from_block_header( + storage: &NodeStorage, +) -> anyhow::Result> { + let mut hash_to_prev_hash = HashMap::new(); + for result in storage.get_hot_store().iter(DBCol::BlockHeader) { + let (_, value) = result?; + let block_header = + BlockHeader::try_from_slice(value.as_ref()).expect("Failed to deser BlockHeader"); + if block_header.hash() != block_header.prev_hash() { + hash_to_prev_hash.insert(*block_header.hash(), *block_header.prev_hash()); + } + } + Ok(hash_to_prev_hash) +} + +/// Creates mapping from `cur_hash` to `next_hash` for the chain ending in `chain_head` +/// by descending through mapping from `cur_hash` to `prev_hash`. +/// Only builds mapping for one chain to avoid key collision due to forks. +fn get_hash_to_next_hash( + hash_to_prev_hash: &HashMap, + chain_head: &CryptoHash, +) -> anyhow::Result> { + let mut hash_to_next_hash = HashMap::new(); + let mut cur_head = *chain_head; + while let Some(prev_hash) = hash_to_prev_hash.get(&cur_head) { + hash_to_next_hash.insert(*prev_hash, cur_head); + cur_head = *prev_hash; + } + Ok(hash_to_next_hash) +} + +/// Get all `EpochId`s by iterating `EpochInfo` column and return them as `HashSet`. +/// This function is used to get hashes of all last epoch blocks as `EpochId` represents last hash of prev prev column. +fn get_all_epoch_ids(storage: &NodeStorage) -> anyhow::Result> { + let mut epoch_ids = HashSet::new(); + for result in storage.get_hot_store().iter(DBCol::EpochInfo) { + let (key, _) = result?; + if key.as_ref() == AGGREGATOR_KEY { + continue; + } + epoch_ids + .insert(CryptoHash::try_from_slice(key.as_ref()).expect("Failed to deser CryptoHash")); + } + Ok(epoch_ids) +} diff --git a/tools/epoch-sync/src/lib.rs b/tools/epoch-sync/src/lib.rs new file mode 100644 index 00000000000..8a0197d948f --- /dev/null +++ b/tools/epoch-sync/src/lib.rs @@ -0,0 +1,2 @@ +pub mod cli; +pub use cli::EpochSyncCommand;