Skip to content

Commit

Permalink
feat(epoch-sync): EpochSyncInfo processing (#10226)
Browse files Browse the repository at this point in the history
#10031 
This PR is a step towards `EpochSyncInfo` validation that doesn't
actually include any validation.
I want to combine creation of necessary `StoreUpdate` with validation,
because to validate `EpochSyncInfo` we need to rebuild `BlockMerkleTree`
for every block in the epoch, but also not commit it if anything goes
bad, and that is supported in `ChainStoreUpdate`.
Creation of `StoreUpdate` is already a big change, so I want to do a
separate PR for it.
To test that we process `EpochSyncInfo` correctly I create a VERY hacky
test, that should not be included in CI ever. Right now it works as
intended, but if we change implementation of any sync, we will have to
rewrite this test.
I hope I will add epoch sync into `run_sync_step` flow soon enough for
us to replace this test with something stable before we need to rewrite
unstable test.
  • Loading branch information
posvyatokum authored Nov 21, 2023
1 parent 2b24bc8 commit 2318c3a
Show file tree
Hide file tree
Showing 9 changed files with 389 additions and 16 deletions.
3 changes: 3 additions & 0 deletions chain/chain-primitives/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ tracing.workspace = true

near-primitives.workspace = true
near-crypto.workspace = true

[features]
new_epoch_sync = []
13 changes: 13 additions & 0 deletions chain/chain-primitives/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,3 +429,16 @@ pub enum BlockKnownError {
#[error("already known in invalid blocks")]
KnownAsInvalid,
}

#[cfg(feature = "new_epoch_sync")]
pub mod epoch_sync {
#[derive(thiserror::Error, std::fmt::Debug)]
pub enum EpochSyncInfoError {
#[error(transparent)]
EpochSyncInfoErr(#[from] near_primitives::errors::epoch_sync::EpochSyncInfoError),
#[error(transparent)]
IOErr(#[from] std::io::Error),
#[error(transparent)]
ChainErr(#[from] crate::Error),
}
}
2 changes: 1 addition & 1 deletion chain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ byzantine_asserts = []
expensive_tests = []
test_features = []
no_cache = ["near-store/no_cache"]
new_epoch_sync = ["near-store/new_epoch_sync", "near-primitives/new_epoch_sync", "near-epoch-manager/new_epoch_sync"]
new_epoch_sync = ["near-store/new_epoch_sync", "near-primitives/new_epoch_sync", "near-epoch-manager/new_epoch_sync", "near-chain-primitives/new_epoch_sync"]

protocol_feature_reject_blocks_with_outdated_protocol_version = [
"near-primitives/protocol_feature_reject_blocks_with_outdated_protocol_version",
Expand Down
123 changes: 118 additions & 5 deletions chain/chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use crossbeam_channel::{unbounded, Receiver, Sender};
use itertools::Itertools;
use lru::LruCache;
use near_chain_configs::StateSplitConfig;
#[cfg(feature = "new_epoch_sync")]
use near_chain_primitives::error::epoch_sync::EpochSyncInfoError;
use near_chain_primitives::error::{BlockKnownError, Error, LogTransientStorageError};
use near_epoch_manager::shard_tracker::ShardTracker;
use near_epoch_manager::types::BlockHeaderInfo;
Expand All @@ -44,6 +46,8 @@ use near_primitives::challenge::{
use near_primitives::checked_feature;
#[cfg(feature = "new_epoch_sync")]
use near_primitives::epoch_manager::{block_info::BlockInfo, epoch_sync::EpochSyncInfo};
#[cfg(feature = "new_epoch_sync")]
use near_primitives::errors::epoch_sync::EpochSyncHashType;
use near_primitives::errors::EpochError;
use near_primitives::hash::{hash, CryptoHash};
use near_primitives::merkle::{
Expand Down Expand Up @@ -71,6 +75,8 @@ use near_primitives::types::{
NumShards, ShardId, StateRoot,
};
use near_primitives::unwrap_or_return;
#[cfg(feature = "new_epoch_sync")]
use near_primitives::utils::index_to_bytes;
use near_primitives::utils::MaybeValidated;
use near_primitives::version::{ProtocolFeature, PROTOCOL_VERSION};
use near_primitives::views::{
Expand Down Expand Up @@ -4780,6 +4786,105 @@ impl Chain {
}
}

/// Epoch sync specific functions.
#[cfg(feature = "new_epoch_sync")]
impl Chain {
/// TODO(posvyatokum): validate `epoch_sync_info` before `store_update` commit.
pub fn validate_and_record_epoch_sync_info(
&mut self,
epoch_sync_info: &EpochSyncInfo,
) -> Result<(), EpochSyncInfoError> {
let store = self.store().store().clone();
let mut chain_update = self.chain_update();
let mut store_update = chain_update.chain_store_update.store().store_update();

let epoch_id = epoch_sync_info.get_epoch_id()?;
// save EpochSyncInfo

store_update.set_ser(DBCol::EpochSyncInfo, epoch_id.as_ref(), epoch_sync_info)?;

// save EpochInfo's

store_update.set_ser(DBCol::EpochInfo, epoch_id.as_ref(), &epoch_sync_info.epoch_info)?;
store_update.set_ser(
DBCol::EpochInfo,
epoch_sync_info.get_next_epoch_id()?.as_ref(),
&epoch_sync_info.next_epoch_info,
)?;
store_update.set_ser(
DBCol::EpochInfo,
epoch_sync_info.get_next_next_epoch_id()?.as_ref(),
&epoch_sync_info.next_next_epoch_info,
)?;

// construct and save all new BlockMerkleTree's

let mut cur_block_merkle_tree = (*chain_update
.chain_store_update
.get_block_merkle_tree(epoch_sync_info.get_epoch_first_header()?.prev_hash())?)
.clone();
let mut prev_hash = epoch_sync_info.get_epoch_first_header()?.prev_hash();
for hash in &epoch_sync_info.all_block_hashes {
cur_block_merkle_tree.insert(*prev_hash);
chain_update
.chain_store_update
.save_block_merkle_tree(*hash, cur_block_merkle_tree.clone());
prev_hash = hash;
}

// save all block data in headers_to_save

for hash in &epoch_sync_info.headers_to_save {
let header = epoch_sync_info.get_header(*hash, EpochSyncHashType::BlockToSave)?;
// check that block is not known already
if store.exists(DBCol::BlockHeader, hash.as_ref())? {
continue;
}

store_update.insert_ser(DBCol::BlockHeader, header.hash().as_ref(), header)?;
store_update.set_ser(
DBCol::NextBlockHashes,
header.prev_hash().as_ref(),
header.hash(),
)?;
store_update.set_ser(
DBCol::BlockHeight,
&index_to_bytes(header.height()),
header.hash(),
)?;
store_update.set_ser(
DBCol::BlockOrdinal,
&index_to_bytes(header.block_ordinal()),
&header.hash(),
)?;

store_update.insert_ser(
DBCol::BlockInfo,
hash.as_ref(),
&epoch_sync_info.get_block_info(hash)?,
)?;
}

// save header head, final head, update epoch_manager aggregator

chain_update
.chain_store_update
.force_save_header_head(&Tip::from_header(epoch_sync_info.get_epoch_last_header()?))?;
chain_update.chain_store_update.save_final_head(&Tip::from_header(
epoch_sync_info.get_epoch_last_finalised_header()?,
))?;
chain_update
.epoch_manager
.force_update_aggregator(epoch_id, epoch_sync_info.get_epoch_last_finalised_hash()?);

// TODO(posvyatokum): add EpochSyncInfo validation.

chain_update.chain_store_update.merge(store_update);
chain_update.commit()?;
Ok(())
}
}

/// Chain update helper, contains information that is needed to process block
/// and decide to accept it or reject it.
/// If rejected nothing will be updated in underlying storage.
Expand Down Expand Up @@ -5590,10 +5695,13 @@ impl<'a> ChainUpdate<'a> {
self.chain_store_update.save_chunk_extra(block_header.hash(), &shard_uid, new_chunk_extra);
Ok(true)
}
}

/// Epoch sync specific functions.
#[cfg(feature = "new_epoch_sync")]
impl<'a> ChainUpdate<'a> {
/// This function assumes `BlockInfo` is already retrievable from `epoch_manager`.
/// This can be achieved by calling `add_validator_proposals`.
#[cfg(feature = "new_epoch_sync")]
fn save_epoch_sync_info_if_finalised(&mut self, header: &BlockHeader) -> Result<(), Error> {
let block_info = self.epoch_manager.get_block_info(header.hash())?;
let epoch_first_block_hash = block_info.epoch_first_block();
Expand Down Expand Up @@ -5631,12 +5739,20 @@ impl<'a> ChainUpdate<'a> {
// We didn't finalise header with `epoch_sync_data_hash` for the previous epoch yet.
return Ok(());
}
if self
.chain_store_update
.store()
.exists(DBCol::EpochSyncInfo, prev_epoch_last_block_info.epoch_id().as_ref())?
{
// We already wrote `EpochSyncInfo` for this epoch.
// Probably during epoch sync.
return Ok(());
}
self.save_epoch_sync_info_impl(&prev_epoch_last_block_info, epoch_first_block_hash)
}

/// If the block is the last one in the epoch
/// construct and record `EpochSyncInfo` to `self.chain_store_update`.
#[cfg(feature = "new_epoch_sync")]
fn save_epoch_sync_info_impl(
&mut self,
last_block_info: &BlockInfo,
Expand All @@ -5657,7 +5773,6 @@ impl<'a> ChainUpdate<'a> {
/// Create a pair of `BlockHeader`s necessary to create `BlockInfo` for `block_hash`:
/// - header for `block_hash`
/// - header for `last_final_block` of `block_hash` header
#[cfg(feature = "new_epoch_sync")]
fn get_header_pair(
&self,
block_hash: &CryptoHash,
Expand Down Expand Up @@ -5690,7 +5805,6 @@ impl<'a> ChainUpdate<'a> {
///
/// Headers not marked with (*) need to be saved on the syncing node.
/// Headers marked with (*) only needed for `EpochSyncInfo` validation.
#[cfg(feature = "new_epoch_sync")]
fn get_epoch_sync_info_headers(
&self,
last_block_info: &BlockInfo,
Expand Down Expand Up @@ -5744,7 +5858,6 @@ impl<'a> ChainUpdate<'a> {
}

/// Data that is necessary to prove Epoch in new Epoch Sync.
#[cfg(feature = "new_epoch_sync")]
pub fn create_epoch_sync_info(
&self,
last_block_info: &BlockInfo,
Expand Down
3 changes: 3 additions & 0 deletions chain/chain/src/test_utils/kv_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,9 @@ impl EpochManagerAdapter for MockEpochManager {
) -> Result<Vec<CryptoHash>, EpochError> {
Ok(vec![])
}

#[cfg(feature = "new_epoch_sync")]
fn force_update_aggregator(&self, _epoch_id: &EpochId, _hash: &CryptoHash) {}
}

impl RuntimeAdapter for KeyValueRuntime {
Expand Down
11 changes: 11 additions & 0 deletions chain/epoch-manager/src/adapter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::types::BlockHeaderInfo;
#[cfg(feature = "new_epoch_sync")]
use crate::EpochInfoAggregator;
use crate::EpochManagerHandle;
use near_chain_primitives::Error;
use near_crypto::Signature;
Expand Down Expand Up @@ -394,6 +396,9 @@ pub trait EpochManagerAdapter: Send + Sync {
last_block_info: &BlockInfo,
hash_to_prev_hash: Option<&HashMap<CryptoHash, CryptoHash>>,
) -> Result<Vec<CryptoHash>, EpochError>;

#[cfg(feature = "new_epoch_sync")]
fn force_update_aggregator(&self, epoch_id: &EpochId, hash: &CryptoHash);
}

impl EpochManagerAdapter for EpochManagerHandle {
Expand Down Expand Up @@ -975,4 +980,10 @@ impl EpochManagerAdapter for EpochManagerHandle {
}
}
}

#[cfg(feature = "new_epoch_sync")]
fn force_update_aggregator(&self, epoch_id: &EpochId, hash: &CryptoHash) {
let mut epoch_manager = self.write();
epoch_manager.epoch_info_aggregator = EpochInfoAggregator::new(epoch_id.clone(), *hash);
}
}
56 changes: 47 additions & 9 deletions core/primitives/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,7 @@ pub mod epoch_sync {
use crate::epoch_manager::block_info::BlockInfo;
use crate::epoch_manager::epoch_info::EpochInfo;
use crate::errors::epoch_sync::{EpochSyncHashType, EpochSyncInfoError};
use crate::types::EpochId;
use borsh::{BorshDeserialize, BorshSerialize};
use near_o11y::log_assert;
use near_primitives_core::hash::CryptoHash;
Expand Down Expand Up @@ -1262,17 +1263,54 @@ pub mod epoch_sync {
}

impl EpochSyncInfo {
/// Reconstruct BlockInfo for `hash` from information in EpochSyncInfo.
pub fn get_block_info(&self, hash: &CryptoHash) -> Result<BlockInfo, EpochSyncInfoError> {
pub fn get_epoch_id(&self) -> Result<&EpochId, EpochSyncInfoError> {
Ok(self.get_epoch_first_header()?.epoch_id())
}

pub fn get_next_epoch_id(&self) -> Result<&EpochId, EpochSyncInfoError> {
Ok(self
.get_header(self.next_epoch_first_hash, EpochSyncHashType::NextEpochFirstBlock)?
.epoch_id())
}

pub fn get_next_next_epoch_id(&self) -> Result<EpochId, EpochSyncInfoError> {
Ok(EpochId(*self.get_epoch_last_hash()?))
}

pub fn get_epoch_last_hash(&self) -> Result<&CryptoHash, EpochSyncInfoError> {
let epoch_height = self.epoch_info.epoch_height();

self.all_block_hashes.last().ok_or(EpochSyncInfoError::ShortEpoch { epoch_height })
}

pub fn get_epoch_last_header(&self) -> Result<&BlockHeader, EpochSyncInfoError> {
self.get_header(*self.get_epoch_last_hash()?, EpochSyncHashType::LastEpochBlock)
}

pub fn get_epoch_last_finalised_hash(&self) -> Result<&CryptoHash, EpochSyncInfoError> {
Ok(self.get_epoch_last_header()?.last_final_block())
}

pub fn get_epoch_last_finalised_header(&self) -> Result<&BlockHeader, EpochSyncInfoError> {
self.get_header(
*self.get_epoch_last_finalised_hash()?,
EpochSyncHashType::LastFinalBlock,
)
}

pub fn get_epoch_first_hash(&self) -> Result<&CryptoHash, EpochSyncInfoError> {
let epoch_height = self.epoch_info.epoch_height();

let epoch_first_hash = self
.all_block_hashes
.first()
.ok_or(EpochSyncInfoError::ShortEpoch { epoch_height })?;
let epoch_first_header =
self.get_header(*epoch_first_hash, EpochSyncHashType::FirstEpochBlock)?;
self.all_block_hashes.first().ok_or(EpochSyncInfoError::ShortEpoch { epoch_height })
}

pub fn get_epoch_first_header(&self) -> Result<&BlockHeader, EpochSyncInfoError> {
self.get_header(*self.get_epoch_first_hash()?, EpochSyncHashType::FirstEpochBlock)
}

/// Reconstruct BlockInfo for `hash` from information in EpochSyncInfo.
pub fn get_block_info(&self, hash: &CryptoHash) -> Result<BlockInfo, EpochSyncInfoError> {
let epoch_first_header = self.get_epoch_first_header()?;
let header = self.get_header(*hash, EpochSyncHashType::Other)?;

log_assert!(
Expand Down Expand Up @@ -1338,7 +1376,7 @@ pub mod epoch_sync {
Ok(next_epoch_first_header.epoch_sync_data_hash())
}

fn get_header(
pub fn get_header(
&self,
hash: CryptoHash,
hash_type: EpochSyncHashType,
Expand Down
2 changes: 2 additions & 0 deletions core/primitives/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1214,10 +1214,12 @@ pub mod epoch_sync {

#[derive(Eq, PartialEq, Clone, strum::Display, Debug)]
pub enum EpochSyncHashType {
LastEpochBlock,
LastFinalBlock,
FirstEpochBlock,
NextEpochFirstBlock,
Other,
BlockToSave,
}

#[derive(Eq, PartialEq, Clone, thiserror::Error, Debug)]
Expand Down
Loading

0 comments on commit 2318c3a

Please sign in to comment.