diff --git a/chain/chain-primitives/Cargo.toml b/chain/chain-primitives/Cargo.toml index cd86f5811c8..cb92c3075ca 100644 --- a/chain/chain-primitives/Cargo.toml +++ b/chain/chain-primitives/Cargo.toml @@ -19,3 +19,6 @@ tracing.workspace = true near-primitives.workspace = true near-crypto.workspace = true + +[features] +new_epoch_sync = [] diff --git a/chain/chain-primitives/src/error.rs b/chain/chain-primitives/src/error.rs index 069ca56f5a2..14f2e9a482b 100644 --- a/chain/chain-primitives/src/error.rs +++ b/chain/chain-primitives/src/error.rs @@ -429,3 +429,16 @@ pub enum BlockKnownError { #[error("already known in invalid blocks")] KnownAsInvalid, } + +#[cfg(feature = "new_epoch_sync")] +pub mod epoch_sync { + #[derive(thiserror::Error, std::fmt::Debug)] + pub enum EpochSyncInfoError { + #[error(transparent)] + EpochSyncInfoErr(#[from] near_primitives::errors::epoch_sync::EpochSyncInfoError), + #[error(transparent)] + IOErr(#[from] std::io::Error), + #[error(transparent)] + ChainErr(#[from] crate::Error), + } +} diff --git a/chain/chain/Cargo.toml b/chain/chain/Cargo.toml index 6a932def6c0..19f392bcfe0 100644 --- a/chain/chain/Cargo.toml +++ b/chain/chain/Cargo.toml @@ -54,7 +54,7 @@ byzantine_asserts = [] expensive_tests = [] test_features = [] no_cache = ["near-store/no_cache"] -new_epoch_sync = ["near-store/new_epoch_sync", "near-primitives/new_epoch_sync", "near-epoch-manager/new_epoch_sync"] +new_epoch_sync = ["near-store/new_epoch_sync", "near-primitives/new_epoch_sync", "near-epoch-manager/new_epoch_sync", "near-chain-primitives/new_epoch_sync"] protocol_feature_reject_blocks_with_outdated_protocol_version = [ "near-primitives/protocol_feature_reject_blocks_with_outdated_protocol_version", diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 67d45f16dd4..fc5f4d9543a 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -31,6 +31,8 @@ use crossbeam_channel::{unbounded, Receiver, Sender}; use itertools::Itertools; use lru::LruCache; use near_chain_configs::StateSplitConfig; +#[cfg(feature = "new_epoch_sync")] +use near_chain_primitives::error::epoch_sync::EpochSyncInfoError; use near_chain_primitives::error::{BlockKnownError, Error, LogTransientStorageError}; use near_epoch_manager::shard_tracker::ShardTracker; use near_epoch_manager::types::BlockHeaderInfo; @@ -44,6 +46,8 @@ use near_primitives::challenge::{ use near_primitives::checked_feature; #[cfg(feature = "new_epoch_sync")] use near_primitives::epoch_manager::{block_info::BlockInfo, epoch_sync::EpochSyncInfo}; +#[cfg(feature = "new_epoch_sync")] +use near_primitives::errors::epoch_sync::EpochSyncHashType; use near_primitives::errors::EpochError; use near_primitives::hash::{hash, CryptoHash}; use near_primitives::merkle::{ @@ -71,6 +75,8 @@ use near_primitives::types::{ NumShards, ShardId, StateRoot, }; use near_primitives::unwrap_or_return; +#[cfg(feature = "new_epoch_sync")] +use near_primitives::utils::index_to_bytes; use near_primitives::utils::MaybeValidated; use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION}; use near_primitives::views::{ @@ -4780,6 +4786,105 @@ impl Chain { } } +/// Epoch sync specific functions. +#[cfg(feature = "new_epoch_sync")] +impl Chain { + /// TODO(posvyatokum): validate `epoch_sync_info` before `store_update` commit. + pub fn validate_and_record_epoch_sync_info( + &mut self, + epoch_sync_info: &EpochSyncInfo, + ) -> Result<(), EpochSyncInfoError> { + let store = self.store().store().clone(); + let mut chain_update = self.chain_update(); + let mut store_update = chain_update.chain_store_update.store().store_update(); + + let epoch_id = epoch_sync_info.get_epoch_id()?; + // save EpochSyncInfo + + store_update.set_ser(DBCol::EpochSyncInfo, epoch_id.as_ref(), epoch_sync_info)?; + + // save EpochInfo's + + store_update.set_ser(DBCol::EpochInfo, epoch_id.as_ref(), &epoch_sync_info.epoch_info)?; + store_update.set_ser( + DBCol::EpochInfo, + epoch_sync_info.get_next_epoch_id()?.as_ref(), + &epoch_sync_info.next_epoch_info, + )?; + store_update.set_ser( + DBCol::EpochInfo, + epoch_sync_info.get_next_next_epoch_id()?.as_ref(), + &epoch_sync_info.next_next_epoch_info, + )?; + + // construct and save all new BlockMerkleTree's + + let mut cur_block_merkle_tree = (*chain_update + .chain_store_update + .get_block_merkle_tree(epoch_sync_info.get_epoch_first_header()?.prev_hash())?) + .clone(); + let mut prev_hash = epoch_sync_info.get_epoch_first_header()?.prev_hash(); + for hash in &epoch_sync_info.all_block_hashes { + cur_block_merkle_tree.insert(*prev_hash); + chain_update + .chain_store_update + .save_block_merkle_tree(*hash, cur_block_merkle_tree.clone()); + prev_hash = hash; + } + + // save all block data in headers_to_save + + for hash in &epoch_sync_info.headers_to_save { + let header = epoch_sync_info.get_header(*hash, EpochSyncHashType::BlockToSave)?; + // check that block is not known already + if store.exists(DBCol::BlockHeader, hash.as_ref())? { + continue; + } + + store_update.insert_ser(DBCol::BlockHeader, header.hash().as_ref(), header)?; + store_update.set_ser( + DBCol::NextBlockHashes, + header.prev_hash().as_ref(), + header.hash(), + )?; + store_update.set_ser( + DBCol::BlockHeight, + &index_to_bytes(header.height()), + header.hash(), + )?; + store_update.set_ser( + DBCol::BlockOrdinal, + &index_to_bytes(header.block_ordinal()), + &header.hash(), + )?; + + store_update.insert_ser( + DBCol::BlockInfo, + hash.as_ref(), + &epoch_sync_info.get_block_info(hash)?, + )?; + } + + // save header head, final head, update epoch_manager aggregator + + chain_update + .chain_store_update + .force_save_header_head(&Tip::from_header(epoch_sync_info.get_epoch_last_header()?))?; + chain_update.chain_store_update.save_final_head(&Tip::from_header( + epoch_sync_info.get_epoch_last_finalised_header()?, + ))?; + chain_update + .epoch_manager + .force_update_aggregator(epoch_id, epoch_sync_info.get_epoch_last_finalised_hash()?); + + // TODO(posvyatokum): add EpochSyncInfo validation. + + chain_update.chain_store_update.merge(store_update); + chain_update.commit()?; + Ok(()) + } +} + /// Chain update helper, contains information that is needed to process block /// and decide to accept it or reject it. /// If rejected nothing will be updated in underlying storage. @@ -5590,10 +5695,13 @@ impl<'a> ChainUpdate<'a> { self.chain_store_update.save_chunk_extra(block_header.hash(), &shard_uid, new_chunk_extra); Ok(true) } +} +/// Epoch sync specific functions. +#[cfg(feature = "new_epoch_sync")] +impl<'a> ChainUpdate<'a> { /// This function assumes `BlockInfo` is already retrievable from `epoch_manager`. /// This can be achieved by calling `add_validator_proposals`. - #[cfg(feature = "new_epoch_sync")] fn save_epoch_sync_info_if_finalised(&mut self, header: &BlockHeader) -> Result<(), Error> { let block_info = self.epoch_manager.get_block_info(header.hash())?; let epoch_first_block_hash = block_info.epoch_first_block(); @@ -5631,12 +5739,20 @@ impl<'a> ChainUpdate<'a> { // We didn't finalise header with `epoch_sync_data_hash` for the previous epoch yet. return Ok(()); } + if self + .chain_store_update + .store() + .exists(DBCol::EpochSyncInfo, prev_epoch_last_block_info.epoch_id().as_ref())? + { + // We already wrote `EpochSyncInfo` for this epoch. + // Probably during epoch sync. + return Ok(()); + } self.save_epoch_sync_info_impl(&prev_epoch_last_block_info, epoch_first_block_hash) } /// If the block is the last one in the epoch /// construct and record `EpochSyncInfo` to `self.chain_store_update`. - #[cfg(feature = "new_epoch_sync")] fn save_epoch_sync_info_impl( &mut self, last_block_info: &BlockInfo, @@ -5657,7 +5773,6 @@ impl<'a> ChainUpdate<'a> { /// Create a pair of `BlockHeader`s necessary to create `BlockInfo` for `block_hash`: /// - header for `block_hash` /// - header for `last_final_block` of `block_hash` header - #[cfg(feature = "new_epoch_sync")] fn get_header_pair( &self, block_hash: &CryptoHash, @@ -5690,7 +5805,6 @@ impl<'a> ChainUpdate<'a> { /// /// Headers not marked with (*) need to be saved on the syncing node. /// Headers marked with (*) only needed for `EpochSyncInfo` validation. - #[cfg(feature = "new_epoch_sync")] fn get_epoch_sync_info_headers( &self, last_block_info: &BlockInfo, @@ -5744,7 +5858,6 @@ impl<'a> ChainUpdate<'a> { } /// Data that is necessary to prove Epoch in new Epoch Sync. - #[cfg(feature = "new_epoch_sync")] pub fn create_epoch_sync_info( &self, last_block_info: &BlockInfo, diff --git a/chain/chain/src/test_utils/kv_runtime.rs b/chain/chain/src/test_utils/kv_runtime.rs index de3a5b7a47c..3a930d370a7 100644 --- a/chain/chain/src/test_utils/kv_runtime.rs +++ b/chain/chain/src/test_utils/kv_runtime.rs @@ -956,6 +956,9 @@ impl EpochManagerAdapter for MockEpochManager { ) -> Result, EpochError> { Ok(vec![]) } + + #[cfg(feature = "new_epoch_sync")] + fn force_update_aggregator(&self, _epoch_id: &EpochId, _hash: &CryptoHash) {} } impl RuntimeAdapter for KeyValueRuntime { diff --git a/chain/epoch-manager/src/adapter.rs b/chain/epoch-manager/src/adapter.rs index 51e0849ab08..d95be2e0685 100644 --- a/chain/epoch-manager/src/adapter.rs +++ b/chain/epoch-manager/src/adapter.rs @@ -1,4 +1,6 @@ use crate::types::BlockHeaderInfo; +#[cfg(feature = "new_epoch_sync")] +use crate::EpochInfoAggregator; use crate::EpochManagerHandle; use near_chain_primitives::Error; use near_crypto::Signature; @@ -394,6 +396,9 @@ pub trait EpochManagerAdapter: Send + Sync { last_block_info: &BlockInfo, hash_to_prev_hash: Option<&HashMap>, ) -> Result, EpochError>; + + #[cfg(feature = "new_epoch_sync")] + fn force_update_aggregator(&self, epoch_id: &EpochId, hash: &CryptoHash); } impl EpochManagerAdapter for EpochManagerHandle { @@ -975,4 +980,10 @@ impl EpochManagerAdapter for EpochManagerHandle { } } } + + #[cfg(feature = "new_epoch_sync")] + fn force_update_aggregator(&self, epoch_id: &EpochId, hash: &CryptoHash) { + let mut epoch_manager = self.write(); + epoch_manager.epoch_info_aggregator = EpochInfoAggregator::new(epoch_id.clone(), *hash); + } } diff --git a/core/primitives/src/epoch_manager.rs b/core/primitives/src/epoch_manager.rs index 1dfee697dca..c20e495a329 100644 --- a/core/primitives/src/epoch_manager.rs +++ b/core/primitives/src/epoch_manager.rs @@ -1234,6 +1234,7 @@ pub mod epoch_sync { use crate::epoch_manager::block_info::BlockInfo; use crate::epoch_manager::epoch_info::EpochInfo; use crate::errors::epoch_sync::{EpochSyncHashType, EpochSyncInfoError}; + use crate::types::EpochId; use borsh::{BorshDeserialize, BorshSerialize}; use near_o11y::log_assert; use near_primitives_core::hash::CryptoHash; @@ -1262,17 +1263,54 @@ pub mod epoch_sync { } impl EpochSyncInfo { - /// Reconstruct BlockInfo for `hash` from information in EpochSyncInfo. - pub fn get_block_info(&self, hash: &CryptoHash) -> Result { + pub fn get_epoch_id(&self) -> Result<&EpochId, EpochSyncInfoError> { + Ok(self.get_epoch_first_header()?.epoch_id()) + } + + pub fn get_next_epoch_id(&self) -> Result<&EpochId, EpochSyncInfoError> { + Ok(self + .get_header(self.next_epoch_first_hash, EpochSyncHashType::NextEpochFirstBlock)? + .epoch_id()) + } + + pub fn get_next_next_epoch_id(&self) -> Result { + Ok(EpochId(*self.get_epoch_last_hash()?)) + } + + pub fn get_epoch_last_hash(&self) -> Result<&CryptoHash, EpochSyncInfoError> { + let epoch_height = self.epoch_info.epoch_height(); + + self.all_block_hashes.last().ok_or(EpochSyncInfoError::ShortEpoch { epoch_height }) + } + + pub fn get_epoch_last_header(&self) -> Result<&BlockHeader, EpochSyncInfoError> { + self.get_header(*self.get_epoch_last_hash()?, EpochSyncHashType::LastEpochBlock) + } + + pub fn get_epoch_last_finalised_hash(&self) -> Result<&CryptoHash, EpochSyncInfoError> { + Ok(self.get_epoch_last_header()?.last_final_block()) + } + + pub fn get_epoch_last_finalised_header(&self) -> Result<&BlockHeader, EpochSyncInfoError> { + self.get_header( + *self.get_epoch_last_finalised_hash()?, + EpochSyncHashType::LastFinalBlock, + ) + } + + pub fn get_epoch_first_hash(&self) -> Result<&CryptoHash, EpochSyncInfoError> { let epoch_height = self.epoch_info.epoch_height(); - let epoch_first_hash = self - .all_block_hashes - .first() - .ok_or(EpochSyncInfoError::ShortEpoch { epoch_height })?; - let epoch_first_header = - self.get_header(*epoch_first_hash, EpochSyncHashType::FirstEpochBlock)?; + self.all_block_hashes.first().ok_or(EpochSyncInfoError::ShortEpoch { epoch_height }) + } + + pub fn get_epoch_first_header(&self) -> Result<&BlockHeader, EpochSyncInfoError> { + self.get_header(*self.get_epoch_first_hash()?, EpochSyncHashType::FirstEpochBlock) + } + /// Reconstruct BlockInfo for `hash` from information in EpochSyncInfo. + pub fn get_block_info(&self, hash: &CryptoHash) -> Result { + let epoch_first_header = self.get_epoch_first_header()?; let header = self.get_header(*hash, EpochSyncHashType::Other)?; log_assert!( @@ -1338,7 +1376,7 @@ pub mod epoch_sync { Ok(next_epoch_first_header.epoch_sync_data_hash()) } - fn get_header( + pub fn get_header( &self, hash: CryptoHash, hash_type: EpochSyncHashType, diff --git a/core/primitives/src/errors.rs b/core/primitives/src/errors.rs index 1aced9a538a..ba42d3c05cb 100644 --- a/core/primitives/src/errors.rs +++ b/core/primitives/src/errors.rs @@ -1214,10 +1214,12 @@ pub mod epoch_sync { #[derive(Eq, PartialEq, Clone, strum::Display, Debug)] pub enum EpochSyncHashType { + LastEpochBlock, LastFinalBlock, FirstEpochBlock, NextEpochFirstBlock, Other, + BlockToSave, } #[derive(Eq, PartialEq, Clone, thiserror::Error, Debug)] diff --git a/integration-tests/src/tests/client/epoch_sync.rs b/integration-tests/src/tests/client/epoch_sync.rs index 78dfa468434..260541ef7b7 100644 --- a/integration-tests/src/tests/client/epoch_sync.rs +++ b/integration-tests/src/tests/client/epoch_sync.rs @@ -4,7 +4,7 @@ use actix::Actor; use actix_rt::System; use futures::{future, FutureExt}; use near_actix_test_utils::run_actix; -use near_chain::ChainStoreAccess; +use near_chain::{BlockProcessingArtifact, ChainStoreAccess}; use near_chain::{ChainGenesis, Provenance}; use near_chain_configs::Genesis; use near_client::test_utils::TestEnv; @@ -16,11 +16,14 @@ use near_o11y::testonly::{init_integration_logger, init_test_logger}; use near_o11y::WithSpanContextExt; use near_primitives::epoch_manager::block_info::BlockInfo; use near_primitives::epoch_manager::epoch_sync::EpochSyncInfo; +use near_primitives::state_part::PartId; +use near_primitives::state_sync::get_num_state_parts; use near_primitives::test_utils::create_test_signer; use near_primitives::transaction::{ Action, DeployContractAction, FunctionCallAction, SignedTransaction, }; use near_primitives::types::EpochId; +use near_primitives::utils::MaybeValidated; use near_primitives_core::hash::CryptoHash; use near_primitives_core::types::BlockHeight; use near_store::Mode::ReadOnly; @@ -306,3 +309,190 @@ fn test_epoch_sync_data_hash_from_epoch_sync_info() { last_epoch_id = last_final_header.epoch_id().clone(); } } + +/// This is an unreliable test that mocks/reimplements sync logic. +/// After epoch sync is integrated into sync process we can write a better test. +/// +/// The test simulates two clients, one of which is +/// - stopped after one epoch +/// - synced through epoch sync, header sync, state sync, and body sync +/// - in sync with other client for two more epochs +#[test] +#[ignore] +fn test_node_after_simulated_sync() { + init_test_logger(); + let num_clients = 2; + let epoch_length = 20; + let num_epochs = 5; + // Max height for clients[0] before sync. + let max_height_0 = epoch_length * num_epochs - 1; + // Max height for clients[1] before sync. + let max_height_1 = epoch_length; + + // TestEnv setup + let mut genesis = Genesis::test(vec!["test0".parse().unwrap(), "test1".parse().unwrap()], 1); + genesis.config.epoch_length = epoch_length; + + let mut env = TestEnv::builder(ChainGenesis::test()) + .clients_count(num_clients) + .real_stores() + .use_state_snapshots() + .real_epoch_managers(&genesis.config) + .nightshade_runtimes(&genesis) + .build(); + + // Produce blocks + let mut last_hash = *env.clients[0].chain.genesis().hash(); + let mut blocks = vec![]; + + for h in 1..max_height_0 { + for tx in generate_transactions(&last_hash, h) { + assert_eq!(env.clients[0].process_tx(tx, false, false), ProcessTxResponse::ValidTx); + } + + let block = env.clients[0].produce_block(h).unwrap().unwrap(); + env.process_block(0, block.clone(), Provenance::PRODUCED); + last_hash = *block.hash(); + blocks.push(block.clone()); + + if h < max_height_1 { + env.process_block(1, block.clone(), Provenance::NONE); + } + } + + // Do "epoch sync" up to last epoch + + // Current epoch for clients[0]. + let epoch_id0 = env.clients[0].chain.header_head().unwrap().epoch_id; + // Next epoch for clients[1]. + let mut epoch_id1 = env.clients[1].chain.header_head().unwrap().epoch_id; + + // We rely on the fact that epoch_id0 is not finished for clients[0]. + // So we need to "sync" all epochs in [epoch_id1, epoch_id0). + while epoch_id1 != epoch_id0 { + tracing::debug!("Syncing epoch {:?}", epoch_id1); + + let epoch_sync_data = env.clients[0].chain.store().get_epoch_sync_info(&epoch_id1).unwrap(); + env.clients[1].chain.validate_and_record_epoch_sync_info(&epoch_sync_data).unwrap(); + + epoch_id1 = env.clients[1] + .epoch_manager + .get_next_epoch_id(&env.clients[1].chain.header_head().unwrap().last_block_hash) + .unwrap(); + } + + // Do "header sync" for the current epoch for clients[0]. + tracing::debug!("Client 0 Header Head: {:?}", env.clients[0].chain.header_head()); + tracing::debug!("Client 1 Header Head Before: {:?}", env.clients[1].chain.header_head()); + + let mut last_epoch_headers = vec![]; + for block in &blocks { + if *block.header().epoch_id() == epoch_id0 { + last_epoch_headers.push(block.header().clone()); + } + } + env.clients[1].chain.sync_block_headers(last_epoch_headers, &mut vec![]).unwrap(); + + tracing::debug!("Client 0 Header Head: {:?}", env.clients[0].chain.header_head()); + tracing::debug!("Client 1 Header Head After: {:?}", env.clients[1].chain.header_head()); + + // Do "state sync" for the last epoch + // write last block of prev epoch + { + let mut store_update = env.clients[1].chain.store().store().store_update(); + + let mut last_block = &blocks[0]; + for block in &blocks { + if *block.header().epoch_id() == epoch_id0 { + break; + } + last_block = block; + } + + tracing::debug!("Write block {:?}", last_block.header()); + + store_update.insert_ser(DBCol::Block, last_block.hash().as_ref(), last_block).unwrap(); + store_update.commit().unwrap(); + } + + let sync_hash = *env.clients[0] + .epoch_manager + .get_block_info(&env.clients[0].chain.header_head().unwrap().last_block_hash) + .unwrap() + .epoch_first_block(); + tracing::debug!("SYNC HASH: {:?}", sync_hash); + for shard_id in 0..env.clients[0].epoch_manager.num_shards(&epoch_id0).unwrap() { + tracing::debug!("Start syncing shard {:?}", shard_id); + let sync_block_header = env.clients[0].chain.get_block_header(&sync_hash).unwrap(); + let sync_prev_header = + env.clients[0].chain.get_previous_header(&sync_block_header).unwrap(); + let sync_prev_prev_hash = sync_prev_header.prev_hash(); + + let state_header = + env.clients[0].chain.compute_state_response_header(shard_id, sync_hash).unwrap(); + let state_root = state_header.chunk_prev_state_root(); + let num_parts = get_num_state_parts(state_header.state_root_node().memory_usage); + + for part_id in 0..num_parts { + tracing::debug!("Syncing part {:?} of {:?}", part_id, num_parts); + let state_part = env.clients[0] + .chain + .runtime_adapter + .obtain_state_part( + shard_id, + sync_prev_prev_hash, + &state_root, + PartId::new(part_id, num_parts), + ) + .unwrap(); + + env.clients[1] + .runtime_adapter + .apply_state_part( + shard_id, + &state_root, + PartId::new(part_id, num_parts), + state_part.as_ref(), + &epoch_id0, + ) + .unwrap(); + } + } + + env.clients[1] + .chain + .reset_heads_post_state_sync( + &None, + sync_hash, + &mut BlockProcessingArtifact::default(), + Arc::new(|_| {}), + ) + .unwrap(); + + tracing::debug!("Client 0 Head: {:?}", env.clients[0].chain.head()); + tracing::debug!("Client 1 Head: {:?}", env.clients[1].chain.head()); + + // Do "body sync" for the last epoch + + for block in &blocks { + if *block.header().epoch_id() == epoch_id0 { + tracing::debug!("Receive block {:?}", block.header()); + env.clients[1] + .process_block_test(MaybeValidated::from(block.clone()), Provenance::NONE) + .unwrap(); + } + } + + // Produce blocks on clients[0] and process them on clients[1] + for h in max_height_0..(max_height_0 + 2 * epoch_length) { + tracing::debug!("Produce and process block {}", h); + for tx in generate_transactions(&last_hash, h) { + assert_eq!(env.clients[0].process_tx(tx, false, false), ProcessTxResponse::ValidTx); + } + + let block = env.clients[0].produce_block(h).unwrap().unwrap(); + env.process_block(0, block.clone(), Provenance::PRODUCED); + env.process_block(1, block.clone(), Provenance::NONE); + last_hash = *block.hash(); + } +}