diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d431c1d..7f1a7fc9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ All notable changes to this project will be documented in this file. +## Version 0.59.12 + +- Enhanced bundler engine for creating short and quick bundles + using state proofs instead of full state for debugging and testing purposes + ## Version 0.59.11 - Fix for broken shard merge diff --git a/Cargo.toml b/Cargo.toml index f8c0b2cd..aa82377c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ build = 'common/build/build.rs' edition = '2021' name = 'ever-node' -version = '0.59.11' +version = '0.59.12' [workspace] members = [ 'storage' ] @@ -107,7 +107,7 @@ tokio = { features = [ 'macros' ], version = '1.5' } [features] default = [ 'telemetry', 'ever_block/export_key', 'validator_session/export_key' ] -export_key = [ 'catchain/export_key', 'ever_block/export_key' ] +export_key = [ 'catchain/export_key', 'ever_block/export_key', 'validator_session/export_key' ] external_db = [ 'rdkafka' ] fast_finality_extra = [ ] gosh = [ 'ever_block/gosh', 'ever_vm/gosh' ] diff --git a/bin/console.rs b/bin/console.rs index c6d1c1d0..32860da5 100644 --- a/bin/console.rs +++ b/bin/console.rs @@ -1138,7 +1138,6 @@ async fn main() { #[cfg(test)] mod test { - use super::*; use rand::{Rng, SeedableRng}; use std::{ @@ -1155,13 +1154,17 @@ mod test { ShardIdent, ShardStateUnsplit, ValidatorDescr, ValidatorSet }; use ever_node::{ - block::BlockKind, collator_test_bundle::{create_engine_telemetry, create_engine_allocated}, + block::BlockKind, + collator_test_bundle::create_engine_allocated, config::TonNodeConfig, engine_traits::{EngineAlloc, EngineOperations}, internal_db::{InternalDbConfig, InternalDb, state_gc_resolver::AllowStateGcSmartResolver}, network::{control::{ControlServer, DataSource}, node_network::NodeNetwork}, shard_state::ShardStateStuff, shard_states_keeper::PinnedShardStateGuard, validator::validator_manager::ValidationStatus }; + + #[cfg(feature = "telemetry")] + use ever_node::collator_test_bundle::create_engine_telemetry; #[cfg(feature = "telemetry")] use ever_node::engine_traits::EngineTelemetry; diff --git a/src/block.rs b/src/block.rs index 19866177..b66f4fff 100644 --- a/src/block.rs +++ b/src/block.rs @@ -349,7 +349,7 @@ impl BlockStuff { BlockKind::QueueUpdate{queue_update_for: *queue_update_for, empty: *empty }, BlockOrigin::MeshUpdate{network_id, ..} => BlockKind::MeshUpdate{network_id: *network_id}, BlockOrigin::MeshKit{network_id, ..} => BlockKind::MeshKit{network_id: *network_id}, - } + } } pub fn block(&self) -> Result<&Block> { @@ -373,7 +373,7 @@ impl BlockStuff { if let Some(wc) = self.is_queue_update_for() { if wc != workchain_id { fail!("{} is not queue update for wc {}", self.id(), workchain_id) - } + } } self .virt_block()? diff --git a/src/collator_test_bundle.rs b/src/collator_test_bundle.rs index ad5d4d21..2c7e51cc 100644 --- a/src/collator_test_bundle.rs +++ b/src/collator_test_bundle.rs @@ -10,14 +10,23 @@ * See the License for the specific EVERX DEV software governing permissions and * limitations under the License. */ +#![allow(unused_imports, unused_assignments, unused_variables, dead_code)] use crate::{ - block::BlockStuff, engine_traits::{EngineAlloc, EngineOperations}, - shard_state::ShardStateStuff, types::top_block_descr::TopBlockDescrStuff, + block::BlockStuff, + config::CollatorConfig, + engine_traits::{EngineAlloc, EngineOperations}, + shard_state::ShardStateStuff, + types::top_block_descr::TopBlockDescrStuff, validator::{ - accept_block::create_top_shard_block_description, BlockCandidate, - out_msg_queue::{OutMsgQueueInfoStuff, CachedStates}, - }, config::CollatorConfig + accept_block::create_top_shard_block_description, + collator::{CollateResult, Collator}, + out_msg_queue::{CachedStates, OutMsgQueueInfoStuff}, + validate_query::ValidateQuery, + validator_utils::{compute_validator_set_cc, PrevBlockHistory}, + BlockCandidate, + CollatorSettings, + }, }; #[cfg(feature = "telemetry")] use crate::engine_traits::EngineTelemetry; @@ -29,7 +38,7 @@ use std::{ ops::Deref, sync::{Arc, atomic::AtomicU64} }; use storage::{ - StorageAlloc, TimeChecker, + StorageAlloc, block_handle_db::{BlockHandle, BlockHandleDb, BlockHandleStorage}, block_handle_db::NodeStateDb, types::BlockMeta, @@ -37,12 +46,12 @@ use storage::{ #[cfg(feature = "telemetry")] use storage::StorageTelemetry; use ever_block::{ - BlockIdExt, Message, ShardIdent, Serializable, MerkleUpdate, Deserializable, - ValidatorBaseInfo, BlockSignaturesPure, BlockSignatures, HashmapAugType, - TopBlockDescrSet, OutMsgQueue, + error, fail, read_boc, read_single_root_boc, Error, + BlockIdExt, BlockSignatures, BlockSignaturesPure, Cell, CellType, Deserializable, + GlobalCapabilities, HashmapAugType, HashmapType, MerkleProof, Message, OutMsgQueue, + Result, Serializable, ShardIdent, ShardStateUnsplit, TopBlockDescr, TopBlockDescrSet, + UInt256, UsageTree, ValidatorBaseInfo, ValidatorSet, FundamentalSmcAddresses, }; -use ever_block::{ShardStateUnsplit, TopBlockDescr}; -use ever_block::{UInt256, fail, error, Error, Result, CellType, read_boc, read_single_root_boc}; use crate::engine_traits::RempDuplicateStatus; #[derive(serde::Deserialize, serde::Serialize)] @@ -57,10 +66,10 @@ struct CollatorTestBundleIndexJson { prev_blocks: Vec, created_by: String, rand_seed: String, - #[serde(skip_serializing)] + #[serde(default, skip_serializing)] now: u32, + #[serde(default)] now_ms: u64, - fake: bool, contains_ethalon: bool, #[serde(default)] contains_candidate: bool, @@ -103,7 +112,6 @@ impl TryFrom for CollatorTestBundleIndex { created_by: value.created_by.parse()?, rand_seed: Some(value.rand_seed.parse()?), now_ms: if value.now_ms == 0 { (value.now as u64) * 1000 } else { value.now_ms }, - fake: value.fake, contains_ethalon: value.contains_ethalon, contains_candidate: value.contains_candidate, notes: value.notes, @@ -129,7 +137,6 @@ impl From<&CollatorTestBundleIndex> for CollatorTestBundleIndexJson { }, now: (value.now_ms / 1000) as u32, now_ms: value.now_ms, - fake: value.fake, contains_ethalon: value.contains_ethalon, contains_candidate: value.contains_candidate, notes: String::new(), @@ -149,24 +156,11 @@ struct CollatorTestBundleIndex { created_by: UInt256, rand_seed: Option, now_ms: u64, - fake: bool, contains_ethalon: bool, contains_candidate: bool, notes: String, } -impl CollatorTestBundleIndex { - pub fn oldest_mc_state(&self) -> BlockIdExt { - let mut oldest_mc_state = self.last_mc_state.clone(); - for id in self.mc_states.iter() { - if id.seq_no < oldest_mc_state.seq_no { - oldest_mc_state = id.clone(); - } - } - oldest_mc_state - } -} - fn construct_from_file(path: &str) -> Result<(T, UInt256, UInt256)> { let bytes = std::fs::read(path)?; let fh = UInt256::calc_file_hash(&bytes); @@ -225,9 +219,9 @@ pub struct CollatorTestBundle { index: CollatorTestBundleIndex, top_shard_blocks: Vec>, external_messages: Vec<(Arc, UInt256)>, - states: HashMap>, - mc_merkle_updates: HashMap, - blocks: HashMap, + states: HashMap>, // used for loading purposes + state_proofs: HashMap, // merkle proofs for states to lower their size + ethalon_block: Option, candidate: Option, block_handle_storage: BlockHandleStorage, #[cfg(feature = "telemetry")] @@ -290,7 +284,6 @@ impl CollatorTestBundle { created_by: UInt256::default(), rand_seed: None, now_ms, - fake: true, contains_ethalon: false, contains_candidate: false, notes: String::new(), @@ -301,8 +294,9 @@ impl CollatorTestBundle { top_shard_blocks: Default::default(), external_messages: Default::default(), states, - mc_merkle_updates: Default::default(), - blocks: Default::default(), + state_proofs: Default::default(), + ethalon_block: None, + // blocks: Default::default(), block_handle_storage: create_block_handle_storage(), candidate: None, #[cfg(feature = "telemetry")] @@ -313,27 +307,68 @@ impl CollatorTestBundle { }) } - pub fn load(path: &str) -> Result { + fn deserialize_state( + path: &str, + ss_id: &BlockIdExt, + #[cfg(feature = "telemetry")] + telemetry: &EngineTelemetry, + allocated: &EngineAlloc, + ) -> Result> { + let filename = format!("{}/states/{:x}", path, ss_id.root_hash()); + log::info!("Loading state {} from {}", ss_id, filename); + let data = read(&filename).map_err(|_| error!("cannot read file {}", filename))?; + if ss_id.seq_no() == 0 { + ShardStateStuff::deserialize_zerostate( + ss_id.clone(), + &data, + #[cfg(feature = "telemetry")] + &telemetry, + &allocated + ) + } else if let Ok(proof) = MerkleProof::construct_from_bytes(&data) { + ShardStateStuff::from_state_root_cell( + ss_id.clone(), + proof.proof.virtualize(1), + #[cfg(feature = "telemetry")] + &telemetry, + &allocated + ) + } else { + ShardStateStuff::deserialize_state_inmem( + ss_id.clone(), + Arc::new(data), + #[cfg(feature = "telemetry")] + &telemetry, + &allocated, + &|| false + ) + } + } + + pub fn load(path: impl AsRef) -> Result { - if !std::path::Path::new(path).is_dir() { - fail!("Directory not found: {}", path); + let path = path.as_ref(); + if !path.is_dir() { + fail!("Directory not found: {:?}", path); } + let path = path.to_str().unwrap(); #[cfg(feature = "telemetry")] let telemetry = create_engine_telemetry(); let allocated = create_engine_allocated(); // 🗂 index + // let file = std::fs::File::open(path.join("index.json"))?; let file = std::fs::File::open(format!("{}/index.json", path))?; let index: CollatorTestBundleIndexJson = serde_json::from_reader(file)?; - let mut index: CollatorTestBundleIndex = index.try_into()?; + let index: CollatorTestBundleIndex = index.try_into()?; // ├─📂 top_shard_blocks let mut top_shard_blocks = vec!(); for id in index.top_shard_blocks.iter() { let filename = format!("{}/top_shard_blocks/{:x}", path, id.root_hash()); let tbd = TopBlockDescr::construct_from_file(filename)?; - top_shard_blocks.push(Arc::new(TopBlockDescrStuff::new(tbd, id, index.fake, false)?)); + top_shard_blocks.push(Arc::new(TopBlockDescrStuff::new(tbd, id, true, false)?)); } // to add simple external message: @@ -362,9 +397,13 @@ impl CollatorTestBundle { // ├─📂 states let mut states = HashMap::new(); - // all shardes states - for ss_id in index.neighbors.iter().chain(index.prev_blocks.iter()) { + // all shards and mc states + let iter = index.neighbors.iter() + .chain(index.prev_blocks.iter()) + .chain(index.mc_states.iter()); + for ss_id in iter { let filename = format!("{}/states/{:x}", path, ss_id.root_hash()); + log::info!("Loading state {} from {}", ss_id, filename); let data = read(&filename).map_err(|_| error!("cannot read file {}", filename))?; let ss = if ss_id.seq_no() == 0 { ShardStateStuff::deserialize_zerostate( @@ -374,112 +413,35 @@ impl CollatorTestBundle { &telemetry, &allocated )? - } else { - ShardStateStuff::deserialize_state_inmem( - ss_id.clone(), - Arc::new(data), + } else if let Ok(proof) = MerkleProof::construct_from_bytes(&data) { + ShardStateStuff::from_state_root_cell( + ss_id.clone(), + proof.proof.virtualize(1), #[cfg(feature = "telemetry")] &telemetry, - &allocated, - &|| false + &allocated )? - }; - states.insert(ss_id.clone(), ss); - - } - if index.contains_ethalon && !index.id.shard().is_masterchain() { - let filename = format!("{}/states/{:x}", path, index.id.root_hash()); - let data = read(&filename).map_err(|_| error!("cannot read file {}", filename))?; - states.insert( - index.id.clone(), + } else { ShardStateStuff::deserialize_state_inmem( - index.id.clone(), + ss_id.clone(), Arc::new(data), #[cfg(feature = "telemetry")] &telemetry, &allocated, &|| false )? - ); - } - - // oldest mc state is saved full - let oldest_mc_state_id = index.oldest_mc_state(); - let filename = format!("{}/states/{:x}", path, oldest_mc_state_id.root_hash()); - let data = read(&filename).map_err(|_| error!("cannot read file {}", filename))?; - let oldest_mc_state = if oldest_mc_state_id.seq_no() == 0 { - ShardStateStuff::deserialize_zerostate( - oldest_mc_state_id.clone(), - &data, - #[cfg(feature = "telemetry")] - &telemetry, - &allocated - )? - } else { - ShardStateStuff::deserialize_state_inmem( - oldest_mc_state_id.clone(), - Arc::new(data), - #[cfg(feature = "telemetry")] - &telemetry, - &allocated, - &|| false - )? - }; - let mut prev_state_root = oldest_mc_state.root_cell().clone(); - states.insert(oldest_mc_state_id.clone(), oldest_mc_state); - - // other states are culculated by merkle updates - let mut mc_merkle_updates = HashMap::new(); - for id in index.mc_states.iter() { - if id != &oldest_mc_state_id { - let filename = format!("{}/states/mc_merkle_updates/{:x}", path, id.root_hash()); - mc_merkle_updates.insert( - id.clone(), - MerkleUpdate::construct_from_file(filename)?, - ); - } - } - index.mc_states.sort_by_key(|id| id.seq_no); - for id in index.mc_states.iter() { - if id != &oldest_mc_state_id { - let mu = mc_merkle_updates.get(id).ok_or_else( - || error!("Can't get merkle update {}", id) - )?; - let new_root = mu.apply_for(&prev_state_root)?; - states.insert( - id.clone(), - ShardStateStuff::from_state_root_cell( - id.clone(), - new_root.clone(), - #[cfg(feature = "telemetry")] - &telemetry, - &allocated - )? - ); - prev_state_root = new_root; - } + }; + states.insert(ss_id.clone(), ss); } // ├─📂 blocks - let mut blocks = HashMap::new(); - if index.contains_ethalon { + let ethalon_block = if !index.contains_ethalon { + None + } else { let filename = format!("{}/blocks/{:x}", path, index.id.root_hash()); let data = read(&filename).map_err(|_| error!("cannot read file {}", filename))?; - blocks.insert( - index.id.clone(), - BlockStuff::deserialize_block(index.id.clone(), data)? - ); - } - for id in index.prev_blocks.iter() { - if id.seq_no() != 0 { - let filename = format!("{}/blocks/{:x}", path, id.root_hash()); - let data = read(&filename).map_err(|_| error!("cannot read file {}", filename))?; - blocks.insert( - id.clone(), - BlockStuff::deserialize_block(id.clone(), data)? - ); - } - } + Some(BlockStuff::deserialize_block(index.id.clone(), data)?) + }; let candidate = if !index.contains_candidate { None @@ -500,8 +462,8 @@ impl CollatorTestBundle { top_shard_blocks, external_messages, states, - mc_merkle_updates, - blocks, + state_proofs: Default::default(), + ethalon_block, block_handle_storage: create_block_handle_storage(), candidate, #[cfg(feature = "telemetry")] @@ -520,11 +482,10 @@ impl CollatorTestBundle { }) } + // returns ethalon block or desrialize it from candidate if present pub fn ethalon_block(&self) -> Result> { if self.index.contains_ethalon { - Ok(Some( - self.blocks.get(&self.index.id).ok_or_else(|| error!("Index declares contains_ethalon=true but the block is not found"))?.clone() - )) + Ok(self.ethalon_block.clone()) } else if let Some(candidate) = self.candidate() { Ok(Some(BlockStuff::deserialize_block_checked(self.index.id.clone(), candidate.data.clone())?)) } else { @@ -532,33 +493,6 @@ impl CollatorTestBundle { } } -/* UNUSED - pub fn ethalon_state(&self) -> Result> { - if self.index.contains_ethalon { - Ok(self.states.get(&self.index.id).cloned()) - } else if let Some(block) = self.ethalon_block()? { - let prev_ss_root = match block.construct_prev_id()? { - (prev1, Some(prev2)) => { - let ss1 = self.states.get(&prev1).ok_or_else(|| error!("Prev state is not found"))?.root_cell().clone(); - let ss2 = self.states.get(&prev2).ok_or_else(|| error!("Prev state is not found"))?.root_cell().clone(); - ShardStateStuff::construct_split_root(ss1, ss2)? - }, - (prev, None) => { - self.states.get(&prev).ok_or_else(|| error!("Prev state is not found"))?.root_cell().clone() - } - }; - let merkle_update = block - .block()? - .read_state_update()?; - let block_id = block.id().clone(); - let ss_root = merkle_update.apply_for(&prev_ss_root)?; - Ok(Some(ShardStateStuff::new(block_id.clone(), ss_root)?)) - } else { - Ok(None) - } - } -*/ - pub fn block_id(&self) -> &BlockIdExt { &self.index.id } pub fn prev_blocks_ids(&self) -> &Vec { &self.index.prev_blocks } pub fn min_ref_mc_seqno(&self) -> u32 { self.index.min_ref_mc_seqno } @@ -570,32 +504,149 @@ impl CollatorTestBundle { } impl CollatorTestBundle { + fn load_state_internal(&self, block_id: &BlockIdExt) -> Result> { + if let Some(state) = self.states.get(block_id) { + Ok(state.clone()) + } else if let Some(proof) = self.state_proofs.get(block_id) { + ShardStateStuff::from_state_root_cell( + block_id.clone(), + proof.proof.clone().virtualize(1), + #[cfg(feature = "telemetry")] + &self.telemetry, + &self.allocated, + ) + } else { + fail!("bundle doesn't contain state for block {}", block_id) + } + } + + async fn load_and_simplify_state( + engine: &Arc, + state_proofs: &mut HashMap, + id: &BlockIdExt, + block_opt: Option<&BlockStuff>, + ) -> Result<()> { + Self::add_simplified_state( + engine.load_state(id).await?.root_cell(), + state_proofs, + id, + block_opt, + None, + None + ) + } + fn add_simplified_state( + state_root: &Cell, + state_proofs: &mut HashMap, + id: &BlockIdExt, + block_opt: Option<&BlockStuff>, + usage_tree_opt: Option<&UsageTree>, + min_ref_mc_seqno: Option, + ) -> Result<()> { + if state_proofs.get(id).is_some() { + assert!(min_ref_mc_seqno.is_none()); + assert!(block_opt.is_none()); + assert!(usage_tree_opt.is_none()); + log::debug!("state proof already exists {}", id); + return Ok(()); + } + log::debug!("prepare simplified state for {}", id); + // let root_hash = root.repr_hash(); + let usage_tree_local = UsageTree::default(); + let usage_tree = usage_tree_opt.unwrap_or(&usage_tree_local); + let state_root = usage_tree.use_cell(state_root.clone(), false); + let state = ShardStateUnsplit::construct_from_cell(state_root.clone())?; + let mut sub_trees = HashSet::new(); + let accounts = state.read_accounts()?; + let mut smc_addresses = FundamentalSmcAddresses::default(); + if let Some(mut custom) = state.read_custom()? { + if let Some(min_ref_mc_seqno) = min_ref_mc_seqno { + for mc_seqno in min_ref_mc_seqno..id.seq_no { + custom.prev_blocks.get_raw(&mc_seqno)?.unwrap(); + } + // add fake for new block to avoid pruned access + custom.prev_blocks.set( + &id.seq_no, + &Default::default(), + &Default::default() + )?; + + // get all system contracts + smc_addresses = custom.config().fundamental_smc_addr()?; + smc_addresses.add_key(&custom.config().minter_address()?)?; + smc_addresses.add_key(&custom.config().config_address()?)?; + smc_addresses.add_key(&custom.config().elector_address()?)?; + } + // here clear all unnecessary data + custom.prev_blocks = Default::default(); + // serialize struct and store all sub-trees + let cell = custom.serialize()?; + for i in 0..cell.references_count() { + let child = cell.reference(i)?; + for j in 0..child.references_count() { + sub_trees.insert(child.reference(j)?.repr_hash()); + } + } + } + // read all accounts affected in block + if let Some(block) = block_opt { + let extra = block.block()?.read_extra()?; + extra.read_account_blocks()?.iterate_slices(|account_id, _| { + smc_addresses.add_key_serialized(account_id)?; + Ok(true) + })?; + // load all work cells + // log::trace!("traverse accounts"); + // accounts.len()?; + } + smc_addresses.iterate_slices_with_keys(|account_id, _| { + if let (Some(leaf), _) = accounts.clone().set_builder_serialized(account_id, &Default::default(), &Default::default())? { + // if let Some(leaf) = accounts.get_serialized_raw(account_id)? { + sub_trees.insert(leaf.cell().repr_hash()); + } + Ok(true) + })?; + + // don't prune out_msg_queue_info - it could be very big + let hash = state.out_msg_queue_info_cell().repr_hash(); + sub_trees.insert(hash); + let proof = MerkleProof::create_with_subtrees( + &state_root, + |hash| usage_tree.contains(hash), + |hash| sub_trees.contains(hash) + )?; + state_proofs.insert(id.clone(), proof); + Ok(()) + } + // build bundle for a collating (just now) block. // Uses real engine for top shard blocks and external messages. - // Blocks data loading is optional because we sometimes create bundles using a cut database (without blocks). - // Such a bundle will work, but creating merkle updates could be long + // If usage_tree is not present, try to collate block pub async fn build_for_collating_block( - prev_blocks_ids: Vec, engine: &Arc, + prev_blocks_ids: Vec, + usage_tree_opt: Option, ) -> Result { - log::info!("Building for furure block, prev[0]: {}", prev_blocks_ids[0]); // TODO: fill caches states let mut cached_states = CachedStates::new(engine); - // TODO: use cached states instead - let mut states = HashMap::new(); - let shard = if prev_blocks_ids.len() > 1 { prev_blocks_ids[0].shard().merge()? } else { prev_blocks_ids[0].shard().clone() }; - let is_master = shard.is_masterchain(); + let mut state_proofs = HashMap::new(); + let is_master = prev_blocks_ids[0].shard().is_masterchain(); + let shard = if let Some(merge_block_id) = prev_blocks_ids.get(1) { + merge_block_id.shard().merge()? + } else if engine.load_state(&prev_blocks_ids[0]).await?.state()?.before_split() { + prev_blocks_ids[0].shard().split()?.0 + } else { + prev_blocks_ids[0].shard().clone() + }; // // last mc state // let mc_state = engine.load_last_applied_mc_state().await?; let last_mc_id = mc_state.block_id().clone(); - let mut oldest_mc_seq_no = last_mc_id.seq_no(); - let mut newest_mc_seq_no = last_mc_id.seq_no(); // // top shard blocks @@ -614,52 +665,84 @@ impl CollatorTestBundle { 0 ).collect::>(); - // + // + // prev states + // + let (usage_tree, candidate) = if let Some(usage_tree) = usage_tree_opt { + (usage_tree, None) + } else { + // try to collate block + let collate_result = try_collate( + engine, + shard.clone(), + prev_blocks_ids.clone(), + None, + None, + true, + false + ).await?; + (collate_result.usage_tree, collate_result.candidate) + }; + let (id, now_ms, block_opt); + if let Some(candidate) = &candidate { + let block = BlockStuff::deserialize_block(candidate.block_id.clone(), candidate.data.clone())?; + now_ms = block.block()?.read_info()?.gen_utime_ms(); + id = candidate.block_id.clone(); + block_opt = Some(block); + } else { + now_ms = engine.now_ms(); + // now_ms = engine.load_state(&prev_blocks_ids[0]).await?.state_or_queue()?.gen_time_ms() + 1; // TODO: merge? + id = BlockIdExt { + shard_id: shard.clone(), + seq_no: prev_blocks_ids.iter().map(|id| id.seq_no()).max().unwrap() + 1, + root_hash: UInt256::default(), + file_hash: UInt256::default(), + }; + block_opt = None; + } + if let Some(merge_block_id) = prev_blocks_ids.get(1) { + let proof = MerkleProof::create( + engine.load_state(merge_block_id).await?.root_cell(), + |h| usage_tree.contains(h) + )?; + state_proofs.insert(merge_block_id.clone(), proof); + } + if !is_master { + let proof = MerkleProof::create( + engine.load_state(&prev_blocks_ids[0]).await?.root_cell(), + |h| usage_tree.contains(h) + )?; + state_proofs.insert(prev_blocks_ids[0].clone(), proof); + } + + // // neighbors // let mut neighbors = vec!(); let shards = mc_state.shard_hashes()?; + // TODO: this can be improved later by collated block let neighbor_list = shards.neighbours_for(&shard)?; for shard in neighbor_list.iter() { - states.insert(shard.block_id().clone(), engine.load_state(shard.block_id()).await?); + Self::load_and_simplify_state(engine, &mut state_proofs, shard.block_id(), None).await?; neighbors.push(shard.block_id().clone()); } - if shards.is_empty() || mc_state.block_id().seq_no() != 0 { - states.insert(last_mc_id.clone(), mc_state); - } - // master blocks's collator uses new neighbours, based on new shaedes config. + // master blocks's collator uses new neighbours, based on new shards config. // It is difficult to calculate new config there. So add states for all new shard blocks. for tsb in top_shard_blocks.iter() { let id = tsb.proof_for(); - if !states.contains_key(id) { - states.insert(id.clone(), engine.load_state(id).await?); + if !state_proofs.contains_key(id) { + Self::load_and_simplify_state(engine, &mut state_proofs, id, None).await?; neighbors.push(id.clone()); } } - // - // prev_blocks & states - // - let mut blocks = HashMap::new(); - let handle = engine.load_block_handle(&prev_blocks_ids[0])? - .ok_or_else(|| error!("Cannot load handle for prev1 block {}", prev_blocks_ids[0]))?; - if let Ok(prev1) = engine.load_block(&handle).await { - blocks.insert(prev_blocks_ids[0].clone(), prev1); - } - states.insert(prev_blocks_ids[0].clone(), engine.load_state(&prev_blocks_ids[0]).await?); - if prev_blocks_ids.len() > 1 { - let handle = engine.load_block_handle(&prev_blocks_ids[1])? - .ok_or_else(|| error!("Cannot load handle for prev2 block {}", prev_blocks_ids[1]))?; - if let Ok(prev2) = engine.load_block(&handle).await { - blocks.insert(prev_blocks_ids[1].clone(), prev2); - } - states.insert(prev_blocks_ids[1].clone(), engine.load_state(&prev_blocks_ids[1]).await?); - } - // collect needed mc states - for (_, state) in states.iter() { - let nb = OutMsgQueueInfoStuff::from_shard_state(state, &mut cached_states).await?; + let mut oldest_mc_seq_no = last_mc_id.seq_no(); + let mut newest_mc_seq_no = last_mc_id.seq_no(); + for (block_id, _state_root) in state_proofs.iter() { + let state = engine.load_state(block_id).await?; + let nb = OutMsgQueueInfoStuff::from_shard_state(&state, &mut cached_states).await?; for entry in nb.entries() { if entry.mc_seqno() < oldest_mc_seq_no { oldest_mc_seq_no = entry.mc_seqno(); @@ -669,38 +752,24 @@ impl CollatorTestBundle { } } - // mc states and merkle updates - let oldest_mc_state = engine.load_state( - engine.find_mc_block_by_seq_no(oldest_mc_seq_no).await?.id() - ).await?; - let mut prev_mc_state = oldest_mc_state.clone(); - let mut mc_states = vec!(oldest_mc_state.block_id().clone()); - states.insert(oldest_mc_state.block_id().clone(), oldest_mc_state); - let mut mc_merkle_updates = HashMap::new(); - - for mc_seq_no in oldest_mc_seq_no + 1..=newest_mc_seq_no { + // + // mc states + // + Self::add_simplified_state( + mc_state.root_cell(), + &mut state_proofs, + mc_state.block_id(), + if is_master { block_opt.as_ref() } else { None }, + if is_master { Some(&usage_tree) } else { None }, + Some(oldest_mc_seq_no), + )?; + let mut mc_states = vec!(mc_state.block_id().clone()); + for mc_seq_no in oldest_mc_seq_no..newest_mc_seq_no { let handle = engine.find_mc_block_by_seq_no(mc_seq_no).await?; - let mc_state = engine.load_state(handle.id()).await?; - let merkle_update = if let Ok(block) = engine.load_block(&handle).await { - block.block()?.read_state_update()? - } else { - let _tc = TimeChecker::new(format!("create merkle update for {}", handle.id()), 30); - // MerkleUpdate::default() - MerkleUpdate::create(prev_mc_state.root_cell(), mc_state.root_cell())? - }; - mc_merkle_updates.insert(handle.id().clone(), merkle_update); - prev_mc_state = mc_state.clone(); - states.insert(handle.id().clone(), mc_state); + Self::load_and_simplify_state(engine, &mut state_proofs, handle.id(), None).await?; mc_states.push(handle.id().clone()); } - let id = BlockIdExt { - shard_id: shard, - seq_no: prev_blocks_ids.iter().max_by_key(|id| id.seq_no()).unwrap().seq_no() + 1, - root_hash: UInt256::default(), - file_hash: UInt256::default(), - }; - let index = CollatorTestBundleIndex { id, top_shard_blocks: top_shard_blocks.iter().map(|tsb| tsb.proof_for().clone()).collect(), @@ -712,10 +781,9 @@ impl CollatorTestBundle { prev_blocks: prev_blocks_ids, created_by: UInt256::default(), rand_seed: None, - now_ms: engine.now_ms(), - fake: true, + now_ms, contains_ethalon: false, - contains_candidate: false, + contains_candidate: candidate.is_some(), notes: String::new(), }; @@ -723,11 +791,11 @@ impl CollatorTestBundle { index, top_shard_blocks, external_messages, - states, - mc_merkle_updates, - blocks, + states: Default::default(), + state_proofs, + ethalon_block: None, block_handle_storage: create_block_handle_storage(), - candidate: None, + candidate, #[cfg(feature = "telemetry")] telemetry: create_engine_telemetry(), allocated: create_engine_allocated(), @@ -740,30 +808,28 @@ impl CollatorTestBundle { // Uses real engine for top shard blocks and external messages. // Blocks data loading is optional because we sometimes create bundles using a cut database (without blocks). // Such a bundle will work, but creating merkle updates could be long + pub async fn build_for_validating_block( - shard: ShardIdent, - _min_masterchain_block_id: BlockIdExt, - prev_blocks_ids: Vec, - candidate: BlockCandidate, engine: &Arc, + prev: &PrevBlockHistory, + candidate: BlockCandidate, ) -> Result { - log::info!("Building for validating block, candidate: {}", candidate.block_id); // TODO: fill caches states let mut cached_states = CachedStates::new(engine); - // TODO: use cached states instead - let mut states = HashMap::new(); - let is_master = shard.is_masterchain(); + let mut state_proofs = HashMap::new(); + let is_master = candidate.block_id.shard().is_masterchain(); + + let block = BlockStuff::deserialize_block_checked(candidate.block_id.clone(), candidate.data.clone())?; + let now_ms = block.block()?.read_info()?.gen_utime_ms(); // // last mc state // let mc_state = engine.load_last_applied_mc_state().await?; let last_mc_id = mc_state.block_id().clone(); - let mut oldest_mc_seq_no = last_mc_id.seq_no(); - let mut newest_mc_seq_no = last_mc_id.seq_no(); // // top shard blocks @@ -777,53 +843,60 @@ impl CollatorTestBundle { // // external messages // - let external_messages = engine.get_external_messages_iterator( - shard.clone(), - 0 - ).collect::>(); + let external_messages = engine.get_external_messages_iterator(candidate.block_id.shard().clone(), 0).collect::>(); + + // + // prev states + // + if let Some(merge_block_id) = prev.get_prev(1) { + let key = candidate.block_id.shard().shard_key(false); + let usage_tree = UsageTree::default(); + let state = engine.load_state(merge_block_id).await?; + let state_root = usage_tree.use_cell(state.root_cell().clone(), false); + let mut accounts = ShardStateUnsplit::construct_from_cell(state_root)?.read_accounts()?; + + let other = engine.load_state(&prev.get_prevs()[0]).await?; + let state_root = usage_tree.use_cell(other.root_cell().clone(), false); + let other_accounts = ShardStateUnsplit::construct_from_cell(state_root)?.read_accounts()?; + accounts.merge(&other_accounts, &key)?; + + Self::add_simplified_state(state.root_cell(), &mut state_proofs, merge_block_id, Some(&block), Some(&usage_tree), None)?; + Self::add_simplified_state(other.root_cell(), &mut state_proofs, &prev.get_prevs()[0], Some(&block), Some(&usage_tree), None)?; + } else if !is_master { + Self::load_and_simplify_state(engine, &mut state_proofs, &prev.get_prevs()[0], Some(&block)).await?; + } // // neighbors // let mut neighbors = vec!(); - let shards = if shard.is_masterchain() { - let block = BlockStuff::deserialize_block_checked(candidate.block_id.clone(), candidate.data.clone())?; + let shards = if is_master { block.shard_hashes()? } else { mc_state.shard_hashes()? }; - let neighbor_list = shards.neighbours_for(&shard)?; + let neighbor_list = shards.neighbours_for(&candidate.block_id.shard())?; for shard in neighbor_list.iter() { - states.insert(shard.block_id().clone(), engine.load_state(shard.block_id()).await?); + Self::load_and_simplify_state(engine, &mut state_proofs, shard.block_id(), None).await?; neighbors.push(shard.block_id().clone()); } - if shards.is_empty() || mc_state.block_id().seq_no() != 0 { - states.insert(last_mc_id.clone(), mc_state); - } - - // - // prev_blocks & states - // - let mut blocks = HashMap::new(); - let handle = engine.load_block_handle(&prev_blocks_ids[0])? - .ok_or_else(|| error!("Cannot load handle for prev1 block {}", prev_blocks_ids[0]))?; - if let Ok(prev1) = engine.load_block(&handle).await { - blocks.insert(prev_blocks_ids[0].clone(), prev1); - } - states.insert(prev_blocks_ids[0].clone(), engine.load_state(&prev_blocks_ids[0]).await?); - if prev_blocks_ids.len() > 1 { - let handle = engine.load_block_handle(&prev_blocks_ids[1])? - .ok_or_else(|| error!("Cannot load handle for prev2 block {}", prev_blocks_ids[1]))?; - if let Ok(prev2) = engine.load_block(&handle).await { - blocks.insert(prev_blocks_ids[1].clone(), prev2); + // master blocks's collator uses new neighbours, based on new shards config. + // It is difficult to calculate new config there. So add states for all new shard blocks. + for tsb in top_shard_blocks.iter() { + let id = tsb.proof_for(); + if !state_proofs.contains_key(id) { + Self::load_and_simplify_state(engine, &mut state_proofs, id, None).await?; + neighbors.push(id.clone()); } - states.insert(prev_blocks_ids[1].clone(), engine.load_state(&prev_blocks_ids[1]).await?); } // collect needed mc states - for (_, state) in states.iter() { - let nb = OutMsgQueueInfoStuff::from_shard_state(state, &mut cached_states).await?; + let mut oldest_mc_seq_no = last_mc_id.seq_no(); + let mut newest_mc_seq_no = last_mc_id.seq_no(); + for (block_id, _state_root) in state_proofs.iter() { + let state = engine.load_state(block_id).await?; + let nb = OutMsgQueueInfoStuff::from_shard_state(&state, &mut cached_states).await?; for entry in nb.entries() { if entry.mc_seqno() < oldest_mc_seq_no { oldest_mc_seq_no = entry.mc_seqno(); @@ -833,32 +906,26 @@ impl CollatorTestBundle { } } - // mc states and merkle updates - let oldest_mc_state = engine.load_state( - engine.find_mc_block_by_seq_no(oldest_mc_seq_no).await?.id() - ).await?; - let mut prev_mc_state = oldest_mc_state.clone(); - let mut mc_states = vec!(oldest_mc_state.block_id().clone()); - states.insert(oldest_mc_state.block_id().clone(), oldest_mc_state); - let mut mc_merkle_updates = HashMap::new(); - - for mc_seq_no in oldest_mc_seq_no + 1..=newest_mc_seq_no { + // + // mc states + // + Self::add_simplified_state( + mc_state.root_cell(), + &mut state_proofs, + mc_state.block_id(), + if is_master { Some(&block) } else { None }, + None, + Some(oldest_mc_seq_no), + )?; + let mut mc_states = vec!(mc_state.block_id().clone()); + for mc_seq_no in oldest_mc_seq_no..newest_mc_seq_no { let handle = engine.find_mc_block_by_seq_no(mc_seq_no).await?; - let mc_state = engine.load_state(handle.id()).await?; - let merkle_update = if let Ok(block) = engine.load_block(&handle).await { - block.block()?.read_state_update()? - } else { - // be careful - creating of new merkle update is very slow - // if some shards were frozen for a long time - MerkleUpdate::create(prev_mc_state.root_cell(), mc_state.root_cell())? - }; - prev_mc_state = mc_state.clone(); - mc_merkle_updates.insert(handle.id().clone(), merkle_update); - states.insert(handle.id().clone(), mc_state); + Self::load_and_simplify_state(engine, &mut state_proofs, handle.id(), None).await?; mc_states.push(handle.id().clone()); } - let b = BlockStuff::deserialize_block_checked(candidate.block_id.clone(), candidate.data.clone())?; + // let mut blocks = HashMap::new(); + // blocks.insert(candidate.block_id.clone(), block); let index = CollatorTestBundleIndex { id: candidate.block_id.clone(), @@ -868,11 +935,10 @@ impl CollatorTestBundle { min_ref_mc_seqno: oldest_mc_seq_no, mc_states, neighbors, - prev_blocks: prev_blocks_ids, + prev_blocks: prev.get_prevs().to_vec(), created_by: candidate.created_by.clone(), rand_seed: None, - now_ms: b.block()?.read_info()?.gen_utime_ms(), - fake: true, + now_ms, contains_ethalon: false, contains_candidate: true, notes: String::new(), @@ -882,9 +948,9 @@ impl CollatorTestBundle { index, top_shard_blocks, external_messages, - states, - mc_merkle_updates, - blocks, + states: Default::default(), + state_proofs, + ethalon_block: None, block_handle_storage: create_block_handle_storage(), candidate: Some(candidate), #[cfg(feature = "telemetry")] @@ -899,77 +965,84 @@ impl CollatorTestBundle { // without signatures. Ethalon block is included, external messages are taken // from ethalon block pub async fn build_with_ethalon( - block_id: &BlockIdExt, engine: &Arc, + block: BlockStuff, ) -> Result { + log::info!("Building with ethalon {}", block.id()); - log::info!("Building with ethalon {}", block_id); - - let handle = engine.load_block_handle(block_id)?.ok_or_else( - || error!("Cannot load handle for block {}", block_id) - )?; - let block = engine.load_block(&handle).await?; let info = block.block()?.read_info()?; let extra = block.block()?.read_extra()?; // TODO: fill caches states let mut cached_states = CachedStates::new(engine); - // TODO: use cached states instead - let mut states = HashMap::new(); + let mut state_proofs = HashMap::new(); + let is_master = block.id().shard().is_masterchain(); // // last mc state // + let (prev, merge_block_id) = block.construct_prev_id()?; let last_mc_id = if let Some(master_ref) = info.read_master_ref()? { BlockIdExt::from_ext_blk(master_ref.master) } else { - block.construct_prev_id()?.0 + prev.clone() }; - let mut oldest_mc_seq_no = last_mc_id.seq_no(); - let mut newest_mc_seq_no = last_mc_id.seq_no(); + let mc_state = engine.load_state(&last_mc_id).await?; // - // top shard blocks (fake) + // prev states // - let mut shard_blocks_ids = vec![]; - if let Ok(shards) = block.shards() { - shards.iterate_shards(|shard_id, descr| { - shard_blocks_ids.push(BlockIdExt { - shard_id, - seq_no: descr.seq_no, - root_hash: descr.root_hash, - file_hash: descr.file_hash, - }); - Ok(true) - })?; + let mut prev_blocks_ids = vec!(prev); + if let Some(merge_block_id) = merge_block_id { + let key = block.id().shard().shard_key(false); + let usage_tree = UsageTree::default(); + let state = engine.load_state(&merge_block_id).await?; + let state_root = usage_tree.use_cell(state.root_cell().clone(), false); + let mut accounts = ShardStateUnsplit::construct_from_cell(state_root)?.read_accounts()?; + + let other = engine.load_state(&prev_blocks_ids[0]).await?; + let state_root = usage_tree.use_cell(other.root_cell().clone(), false); + let other_accounts = ShardStateUnsplit::construct_from_cell(state_root)?.read_accounts()?; + accounts.merge(&other_accounts, &key)?; + + Self::add_simplified_state(state.root_cell(), &mut state_proofs, &merge_block_id, Some(&block), Some(&usage_tree), None)?; + Self::add_simplified_state(other.root_cell(), &mut state_proofs, &prev_blocks_ids[0], Some(&block), Some(&usage_tree), None)?; + prev_blocks_ids.push(merge_block_id); + } else if !is_master { + Self::load_and_simplify_state(engine, &mut state_proofs, &prev_blocks_ids[0], Some(&block)).await?; } + + // + // top shard blocks (fake) + // + let shard_blocks_ids = block.top_blocks_all().unwrap_or_default(); let mut top_shard_blocks = vec![]; let mut top_shard_blocks_ids = vec![]; - let mc_state = engine.load_state(&last_mc_id).await?; for shard_block_id in shard_blocks_ids.iter().filter(|id| id.seq_no() != 0) { let handle = engine.load_block_handle(shard_block_id)? .ok_or_else(|| error!("Cannot load handle for shard block {}", shard_block_id))?; - let block = engine.load_block(&handle).await?; - let info = block.block()?.read_info()?; - let prev_blocks_ids = info.read_prev_ids()?; - let base_info = ValidatorBaseInfo::with_params( - info.gen_validator_list_hash_short(), - info.gen_catchain_seqno() - ); - let signatures = BlockSignaturesPure::default(); + if let Ok(block) = engine.load_block(&handle).await { + let info = block.block()?.read_info()?; + let prev_blocks_ids = info.read_prev_ids()?; + let base_info = ValidatorBaseInfo::with_params( + info.gen_validator_list_hash_short(), + info.gen_catchain_seqno() + ); + let signatures = BlockSignaturesPure::default(); - // sometimes some shards don't have new blocks to create TSBD - if let Some(tbd) = create_top_shard_block_description( + // sometimes some shards don't have new blocks to create TSBD + if let Some(tbd) = create_top_shard_block_description( &block, BlockSignatures::with_params(base_info, signatures), &mc_state, // TODO &prev_blocks_ids, engine.deref(), ).await? { - let tbd = TopBlockDescrStuff::new(tbd, block_id, true, false).unwrap(); - top_shard_blocks_ids.push(tbd.proof_for().clone()); - top_shard_blocks.push(Arc::new(tbd)); + let tbd = TopBlockDescrStuff::new(tbd, block.id(), true, false).unwrap(); + top_shard_blocks_ids.push(tbd.proof_for().clone()); + top_shard_blocks.push(Arc::new(tbd)); + } } } @@ -992,47 +1065,19 @@ impl CollatorTestBundle { // neighbors // let mut neighbors = vec!(); - let shards = match block.shard_hashes() { - Ok(shards) => shards, - Err(_) => mc_state.shard_hashes()? - }; - - let neighbor_list = shards.neighbours_for(block_id.shard())?; + let shards = block.shard_hashes().or_else(|_| mc_state.shard_hashes())?; + let neighbor_list = shards.neighbours_for(block.id().shard())?; for shard in neighbor_list.iter() { - states.insert(shard.block_id().clone(), engine.load_state(shard.block_id()).await?); + Self::load_and_simplify_state(engine, &mut state_proofs, shard.block_id(), None).await?; neighbors.push(shard.block_id().clone()); } - if shards.is_empty() || mc_state.block_id().seq_no() != 0 { - states.insert(mc_state.block_id().clone(), mc_state); - } - - // - // prev_blocks & states - // - let mut blocks = HashMap::new(); - let mut prev_blocks_ids = vec!(); - let prev = block.construct_prev_id()?; - let prev1 = engine.load_block_handle(&prev.0)?.ok_or_else( - || error!("Cannot load handle for prev1 block {}", prev.0) - )?; - prev_blocks_ids.push(prev1.id().clone()); - states.insert(prev1.id().clone(), engine.load_state(prev1.id()).await?); - if let Ok(block) = engine.load_block(&prev1).await { - blocks.insert(prev1.id().clone(), block); - } - if let Some(prev2) = prev.1 { - let handle = engine.load_block_handle(&prev2)? - .ok_or_else(|| error!("Cannot load handle for prev2 block {}", prev2 ))?; - let prev2 = engine.load_block(&handle).await?; - prev_blocks_ids.push(prev2.id().clone()); - states.insert(prev2.id().clone(), engine.load_state(prev2.id()).await?); - blocks.insert(prev2.id().clone(), prev2); - } - // collect needed mc states - for (_, state) in states.iter() { - let nb = OutMsgQueueInfoStuff::from_shard_state(state, &mut cached_states).await?; + let mut oldest_mc_seq_no = last_mc_id.seq_no(); + let mut newest_mc_seq_no = last_mc_id.seq_no(); + for (block_id, _state) in state_proofs.iter() { + let state = engine.load_state(block_id).await?; + let nb = OutMsgQueueInfoStuff::from_shard_state(&state, &mut cached_states).await?; for entry in nb.entries() { if entry.mc_seqno() < oldest_mc_seq_no { oldest_mc_seq_no = entry.mc_seqno(); @@ -1042,35 +1087,26 @@ impl CollatorTestBundle { } } - // ethalon block and state - blocks.insert(block_id.clone(), block); - if block_id.shard().is_masterchain() { - if block_id.seq_no() < oldest_mc_seq_no { - oldest_mc_seq_no = block_id.seq_no(); - } else if block_id.seq_no() > newest_mc_seq_no { - newest_mc_seq_no = block_id.seq_no(); - } - } else { - states.insert(block_id.clone(), engine.load_state(block_id).await?); - } - // mc states and merkle updates - let oldest_mc_state = engine.load_state( - engine.find_mc_block_by_seq_no(oldest_mc_seq_no).await?.id() - ).await?; - let mut mc_states = vec!(oldest_mc_state.block_id().clone()); - states.insert(oldest_mc_state.block_id().clone(), oldest_mc_state); - let mut mc_merkle_updates = HashMap::new(); - - for mc_seq_no in oldest_mc_seq_no + 1..=newest_mc_seq_no { + // + // mc states + // + Self::add_simplified_state( + mc_state.root_cell(), + &mut state_proofs, + mc_state.block_id(), + None, + None, + Some(oldest_mc_seq_no) + )?; + let mut mc_states = vec!(mc_state.block_id().clone()); + for mc_seq_no in oldest_mc_seq_no..newest_mc_seq_no { let handle = engine.find_mc_block_by_seq_no(mc_seq_no).await?; - let block = engine.load_block(&handle).await?; - mc_merkle_updates.insert(block.id().clone(), block.block()?.read_state_update()?); - states.insert(block.id().clone(), engine.load_state(block.id()).await?); - mc_states.push(block.id().clone()); + Self::load_and_simplify_state(engine, &mut state_proofs, handle.id(), None).await?; + mc_states.push(handle.id().clone()); } let index = CollatorTestBundleIndex { - id: block_id.clone(), + id: block.id().clone(), top_shard_blocks: top_shard_blocks_ids, external_messages: external_messages_ids, last_mc_state: last_mc_id, @@ -1081,7 +1117,6 @@ impl CollatorTestBundle { created_by: extra.created_by().clone(), rand_seed: Some(extra.rand_seed().clone()), now_ms: info.gen_utime_ms(), - fake: true, contains_ethalon: true, contains_candidate: false, notes: String::new(), @@ -1091,9 +1126,9 @@ impl CollatorTestBundle { index, top_shard_blocks, external_messages, - states, - mc_merkle_updates, - blocks, + states: Default::default(), + state_proofs, + ethalon_block: Some(block), block_handle_storage: create_block_handle_storage(), candidate: None, #[cfg(feature = "telemetry")] @@ -1115,6 +1150,7 @@ impl CollatorTestBundle { let path = format!("{}/top_shard_blocks/", path); std::fs::create_dir_all(&path)?; let filename = format!("{}/{:x}", path, tbd.proof_for().root_hash()); + log::info!("Saving top_shard_blocks {}", filename); tbd.top_block_descr().write_to_file(filename)?; } @@ -1123,47 +1159,34 @@ impl CollatorTestBundle { let path = format!("{}/external_messages/", path); std::fs::create_dir_all(&path)?; let filename = format!("{}/{:x}", path, id); + log::info!("Saving external message {}", filename); m.write_to_file(filename)?; } // ├─📂 states - // all shardes states + // all states ptoofs let path1 = format!("{}/states/", path); std::fs::create_dir_all(&path1)?; - for ss_id in self.index.neighbors.iter().chain(self.index.prev_blocks.iter()) { + let iter = self.index.neighbors.iter() + .chain(self.index.prev_blocks.iter()) + .chain(self.index.mc_states.iter()); + for ss_id in iter { let filename = format!("{}/{:x}", path1, ss_id.root_hash()); - self.states.get(ss_id) + log::debug!("Saving {} state to {}", ss_id, filename); + let now = std::time::Instant::now(); + self.state_proofs.get(ss_id) .ok_or_else(|| error!("Bundle's internal error (state {})", ss_id))? - .write_to(&mut File::create(filename)?)?; - } - // ethalon state - if self.index.contains_ethalon && !self.index.id.shard().is_masterchain() { - let filename = format!("{}/{:x}", path1, self.index.id.root_hash()); - self.states.get(&self.index.id) - .ok_or_else(|| error!("Bundle's internal error (state {})", self.index.id))? - .write_to(&mut File::create(filename)?)?; - } - // oldest mc state is saved full - let oldest_mc_state = self.index.oldest_mc_state(); - let filename = format!("{}/{:x}", path1, oldest_mc_state.root_hash()); - self.states.get(&oldest_mc_state) - .ok_or_else(|| error!("Bundle's internal error (state {})", oldest_mc_state))? - .write_to(&mut File::create(filename)?)?; - - // merkle updates for all other mc states - let path1 = format!("{}/states/mc_merkle_updates/", path); - std::fs::create_dir_all(&path1)?; - for (id, mu) in self.mc_merkle_updates.iter() { - let filename = format!("{}/{:x}", path1, id.root_hash()); - mu.write_to_file(filename)?; + .write_to_file(&filename)?; + log::debug!("Saved {} state to {} in {} ms", ss_id, filename, now.elapsed().as_millis()); } // ├─📂 blocks - for (id, b) in self.blocks.iter() { + if let Some(block) = &self.ethalon_block { let path = format!("{}/blocks/", path); std::fs::create_dir_all(&path)?; - let filename = format!("{}/{:x}", path, id.root_hash()); - b.write_to(&mut File::create(filename)?)?; + let filename = format!("{}/{:x}", path, block.id().root_hash()); + log::info!("Saving ethalon block {}", filename); + block.write_to(&mut File::create(filename)?)?; } // candidate @@ -1229,8 +1252,7 @@ impl EngineOperations for CollatorTestBundle { None )?; if let Some(handle) = handle { - if self.blocks.contains_key(id) && (id != &self.index.id) { - handle.set_data(); + if self.states.contains_key(id) { handle.set_state(); handle.set_block_applied(); } @@ -1242,16 +1264,14 @@ impl EngineOperations for CollatorTestBundle { async fn load_state(&self, block_id: &BlockIdExt) -> Result> { if *block_id != self.index.id { - if let Some(s) = self.states.get(block_id) { - return Ok(s.clone()); - } + return self.load_state_internal(&block_id) } fail!("bundle doesn't contain state for block {}", block_id) } async fn load_block(&self, handle: &BlockHandle) -> Result { if *handle.id() != self.index.id { - if let Some(s) = self.blocks.get(handle.id()) { + if let Some(s) = &self.ethalon_block { return Ok(s.clone()); } } @@ -1259,11 +1279,7 @@ impl EngineOperations for CollatorTestBundle { } async fn load_last_applied_mc_state(&self) -> Result> { - if let Some(s) = self.states.get(&self.index.last_mc_state) { - Ok(s.clone()) - } else { - fail!("bundle doesn't contain state for block {}", &self.index.last_mc_state) - } + self.load_state_internal(&self.index.last_mc_state) } async fn wait_state( @@ -1276,7 +1292,7 @@ impl EngineOperations for CollatorTestBundle { } async fn find_mc_block_by_seq_no(&self, seq_no: u32) -> Result> { - for (id, _block) in self.blocks.iter() { + for (id, _block) in self.states.iter() { if (id.seq_no() != seq_no) || !id.shard().is_masterchain() { continue } @@ -1299,21 +1315,24 @@ impl EngineOperations for CollatorTestBundle { _: &Arc, _: Option<&mut u32>, ) -> Result>> { - if self.top_shard_blocks.len() > 0 { + if !self.top_shard_blocks.is_empty() { return Ok(self.top_shard_blocks.clone()); } else if let Some(candidate) = self.candidate() { - let collated_roots = read_boc(&candidate.collated_data)?.roots; - for i in 0..collated_roots.len() { - let croot = collated_roots[i].clone(); - if croot.cell_type() == CellType::Ordinary { - let mut res = vec!(); - let top_shard_descr_dict = TopBlockDescrSet::construct_from_cell(croot)?; - top_shard_descr_dict.collection().iterate(|tbd| { - let id = tbd.0.proof_for().clone(); - res.push(Arc::new(TopBlockDescrStuff::new(tbd.0, &id, true, false)?)); - Ok(true) - })?; - return Ok(res); + log::info!("candidate.collated_data.len(): {}", candidate.collated_data.len()); + if !candidate.collated_data.is_empty() { + let collated_roots = read_boc(&candidate.collated_data)?.roots; + for i in 0..collated_roots.len() { + let croot = collated_roots[i].clone(); + if croot.cell_type() == CellType::Ordinary { + let mut res = vec!(); + let top_shard_descr_dict = TopBlockDescrSet::construct_from_cell(croot)?; + top_shard_descr_dict.collection().iterate(|tbd| { + let id = tbd.0.proof_for().clone(); + res.push(Arc::new(TopBlockDescrStuff::new(tbd.0, &id, true, false)?)); + Ok(true) + })?; + return Ok(res); + } } } } @@ -1370,3 +1389,84 @@ impl EngineOperations for CollatorTestBundle { None } } + +pub async fn try_collate( + engine: &Arc, + shard: ShardIdent, + prev_blocks_ids: Vec, + created_by_opt: Option, + rand_seed_opt: Option, + is_bundle: bool, + check_validation: bool, +) -> Result { + let mc_state = engine.load_last_applied_mc_state().await?; + let mc_state_extra = mc_state.shard_state_extra()?; + let prev_blocks_history = PrevBlockHistory::with_prevs(&shard, prev_blocks_ids); + let mut cc_seqno_with_delta = 0; + let cc_seqno_from_state = if shard.is_masterchain() { + mc_state_extra.validator_info.catchain_seqno + } else { + mc_state_extra.shards.calc_shard_cc_seqno(&shard)? + }; + let nodes = compute_validator_set_cc( + &mc_state, + &shard, + prev_blocks_history.get_next_seqno().unwrap_or_default(), + cc_seqno_from_state, + &mut cc_seqno_with_delta + )?; + let validator_set = ValidatorSet::with_cc_seqno(0, 0, 0, cc_seqno_with_delta, nodes)?; + + // log::debug!("{}", block_stuff.id()); + + log::info!("TRY COLLATE block {}", shard); + + let min_mc_seqno = if prev_blocks_history.get_prevs()[0].seq_no() == 0 { + 0 + } else { + let state = engine.load_state(&prev_blocks_history.get_prevs()[0]).await?; + state.state()?.min_ref_mc_seqno() + }; + + let collator_settings = CollatorSettings { + #[cfg(test)] + is_bundle, + ..Default::default() + }; + let collator = Collator::new( + shard.clone(), + min_mc_seqno, + &prev_blocks_history, + validator_set.clone(), + created_by_opt.unwrap_or_default(), + engine.clone(), + rand_seed_opt, + None, + )?; + let collate_result = collator.collate(collator_settings).await?; + + if let Some(candidate) = &collate_result.candidate { + if check_validation { + // let new_block = Block::construct_from_bytes(&candidate.data).unwrap(); + + // std::fs::write(&format!("{}/state_candidate.json", RES_PATH), ever_block_json::debug_state(new_state.clone())?)?; + // std::fs::write(&format!("{}/block_candidate.json", RES_PATH), ever_block_json::debug_block_full(new_block)?)?; + + let validator_query = ValidateQuery::new( + shard.clone(), + min_mc_seqno, + prev_blocks_history.get_prevs().to_vec(), + candidate.clone(), + validator_set.clone(), + engine.clone(), + true, + true, + None, + ); + validator_query.try_validate().await?; + } + Ok(collate_result) + } else { + Err(collate_result.error.unwrap()) + } +} diff --git a/src/config.rs b/src/config.rs index 65e98fd2..edc32e36 100644 --- a/src/config.rs +++ b/src/config.rs @@ -784,7 +784,7 @@ impl TonNodeConfig { let (private, public) = Ed25519KeyOption::generate_with_json()?; let key_id = public.id().data(); log::info!("generate_and_save_keys: generate new key (id: {:?})", key_id); - let key_ring = self.validator_key_ring.get_or_insert_with(|| HashMap::new()); + let key_ring = self.validator_key_ring.get_or_insert_with(HashMap::new); key_ring.insert(base64_encode(key_id), private); Ok((key_id.clone(), public)) } diff --git a/src/engine_operations.rs b/src/engine_operations.rs index 886a52c0..bf5f3340 100644 --- a/src/engine_operations.rs +++ b/src/engine_operations.rs @@ -25,7 +25,6 @@ use crate::{ BlockResult, INITIAL_MC_BLOCK, LAST_APPLIED_MC_BLOCK, LAST_MESH_HARDFORK_BLOCK, LAST_MESH_KEYBLOCK, LAST_MESH_MC_BLOCK, LAST_ROTATION_MC_BLOCK, SHARD_CLIENT_MC_BLOCK }, - jaeger, shard_state::ShardStateStuff, shard_states_keeper::PinnedShardStateGuard, types::top_block_descr::{TopBlockDescrId, TopBlockDescrStuff}, @@ -1415,7 +1414,6 @@ async fn redirect_external_message( ).await; #[cfg(feature = "telemetry")] engine.full_node_telemetry().sent_ext_msg_broadcast(); - jaeger::broadcast_sended(id.to_hex_string()); res } else { fail!("External message is not properly formatted: {}", message) diff --git a/src/internal_db/mod.rs b/src/internal_db/mod.rs index bf948649..20323544 100644 --- a/src/internal_db/mod.rs +++ b/src/internal_db/mod.rs @@ -15,14 +15,14 @@ use crate::{ block::{BlockStuff, BlockKind}, block_proof::BlockProofStuff, engine_traits::EngineAlloc, error::NodeError, shard_state::ShardStateStuff, types::top_block_descr::{TopBlockDescrId, TopBlockDescrStuff}, internal_db::restore::check_db, - }; #[cfg(feature = "telemetry")] use crate::engine_traits::EngineTelemetry; use std::{ - cmp::min, collections::{HashMap, HashSet}, io::Cursor, mem::size_of, path::{Path, PathBuf}, - sync::{Arc, atomic::{AtomicBool, AtomicU32, Ordering}}, time::{UNIX_EPOCH, Duration}, ops::Deref + collections::{HashMap, HashSet}, io::Cursor, mem::size_of, ops::Deref, + path::{Path, PathBuf}, sync::{atomic::{AtomicBool, AtomicU32, Ordering}, Arc}, + time::{Duration, UNIX_EPOCH} }; use storage::{ StorageAlloc, TimeChecker, @@ -35,9 +35,9 @@ use storage::{ use storage::shardstate_db_async::{self, AllowStateGcResolver, ShardStateDb}; #[cfg(feature = "telemetry")] use storage::StorageTelemetry; -use ever_block::{Block, BlockIdExt, INVALID_WORKCHAIN_ID, CellsFactory}; use ever_block::{ - error, fail, Result, UInt256, Cell, BocWriterStack, MAX_SAFE_DEPTH, DoneCellsStorage, + error, fail, Block, BlockIdExt, BocWriterStack, Cell, CellsFactory, DoneCellsStorage, + Result, UInt256, INVALID_WORKCHAIN_ID, MAX_SAFE_DEPTH }; /// Full node state keys @@ -206,6 +206,23 @@ pub struct InternalDb { #[allow(dead_code)] impl InternalDb { + #[cfg(test)] + pub async fn only_cells_db( + config: InternalDbConfig, + #[cfg(feature = "telemetry")] + telemetry: Arc, + allocated: Arc, + ) -> Result { + Self::construct( + config, + false, + true, + #[cfg(feature = "telemetry")] + telemetry, + allocated + ).await + } + pub async fn with_update( config: InternalDbConfig, restore_db_enabled: bool, @@ -217,43 +234,43 @@ impl InternalDb { telemetry: Arc, allocated: Arc, ) -> Result { - let mut db = Self::construct( + let db = Self::construct( config, allow_update, + false, #[cfg(feature = "telemetry")] telemetry, allocated, ).await?; let version = db.resolve_db_version()?; - if version != CURRENT_DB_VERSION { - if allow_update { - db = update::update(db, version, check_stop, is_broken, force_check_db, - restore_db_enabled).await? - } else { - fail!( - "DB version {} does not correspond to current supported one {}.", - version, - CURRENT_DB_VERSION - ) - } - } else { + let db = if version == CURRENT_DB_VERSION { log::info!("DB VERSION {}", version); // TODO correct workchain id needed here, but it will be known later - db = check_db(db, 0, restore_db_enabled, force_check_db, check_stop, is_broken).await?; - } + check_db(db, 0, restore_db_enabled, force_check_db, check_stop, is_broken).await? + } else if allow_update { + update::update(db, version, check_stop, is_broken, force_check_db, + restore_db_enabled).await? + } else { + fail!( + "DB version {} does not correspond to current supported one {}.", + version, + CURRENT_DB_VERSION + ) + }; Ok(db) } async fn construct( config: InternalDbConfig, allow_update: bool, + read_only: bool, #[cfg(feature = "telemetry")] telemetry: Arc, allocated: Arc, ) -> Result { let mut hi_perf_cfs = HashSet::new(); hi_perf_cfs.insert(CELLS_CF_NAME.to_string()); - let db = RocksDb::with_options(config.db_directory.as_str(), "db", hi_perf_cfs, false)?; + let db = RocksDb::with_options(config.db_directory.as_str(), "db", hi_perf_cfs, read_only)?; let db_catchain = RocksDb::with_path(config.db_directory.as_str(), "catchains")?; let block_handle_db = Arc::new( BlockHandleDb::with_db(db.clone(), "block_handle_db", true)? @@ -320,7 +337,7 @@ impl InternalDb { ).await? ); - let db = Self { + Ok(Self { db: db.clone(), block_handle_storage, prev1_block_db: BlockInfoDb::with_db(db.clone(), "prev1_block_db", true)?, @@ -341,9 +358,7 @@ impl InternalDb { #[cfg(feature = "telemetry")] telemetry, allocated - }; - - Ok(db) + }) } fn resolve_db_version(&self) -> Result { @@ -624,6 +639,30 @@ impl InternalDb { } } + #[cfg(test)] + /// seacrhes for a block in the previous blocks db + /// be careful it can be slow + pub fn find_block_by_seq_no(&self, shard: &ever_block::ShardIdent, seqno: u32) -> Result> { + let _tc = TimeChecker::new(format!("find_block_by_seq_no {}", seqno), 300); + let mut found = None; + self.prev1_block_db.for_each(&mut |_key, val| { + let id = BlockIdExt::deserialize(&mut Cursor::new(&val))?; + if id.shard() == shard && id.seq_no() == seqno { + found = Some(id); + Ok(false) + } else { + Ok(true) + } + })?; + if let Some(id) = found { + self.load_block_handle(&id)?.ok_or_else( + || error!("Cannot load handle for master block {}", id) + ) + } else { + fail!("Can't find block with seqno {} in shard {}", seqno, shard) + } + } + pub async fn store_block_proof( &self, id: &BlockIdExt, @@ -953,7 +992,7 @@ impl InternalDb { if offset == full_lenth { Ok(vec![]) } else { - let length = min(length, full_lenth - offset); + let length = length.min(full_lenth - offset); let data = self.shard_state_persistent_db.read_file_part(id, offset, length).await?; Ok(data) } diff --git a/src/jaeger.rs b/src/jaeger.rs deleted file mode 100644 index cf421539..00000000 --- a/src/jaeger.rs +++ /dev/null @@ -1,172 +0,0 @@ -/* -* Copyright (C) 2019-2024 EverX. All Rights Reserved. -* -* Licensed under the SOFTWARE EVALUATION License (the "License"); you may not use -* this file except in compliance with the License. -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific EVERX DEV software governing permissions and -* limitations under the License. -*/ - -use rustracing::sampler::AllSampler; -use rustracing_jaeger::{ - reporter::JaegerCompactReporter, span::SpanContext, span::SpanReceiver, Tracer, -}; -use std::net::ToSocketAddrs; -use std::{collections::HashMap, env, sync::Mutex}; - -use ever_block::fail; -use ever_block::types::*; - -#[allow(dead_code)] -enum LogKind { - Normal, - Error, -} - -struct JaegerHelper { - tracer: Tracer, - reporter: JaegerCompactReporter, - span_rx: SpanReceiver, -} - -lazy_static::lazy_static! { - static ref JAEGER: Option> = JaegerHelper::new("r-node"); -} - -pub fn init_jaeger() { - lazy_static::initialize(&JAEGER); - log::trace!("Jaeger lazy init"); -} - -#[cfg(feature = "external_db")] -pub fn message_from_kafka_received(kf_key: &[u8]) { - if let Some(jaeger) = JAEGER.as_ref() { - let msg_id_bytes = kf_key[0..32].to_vec(); - tokio::task::spawn_blocking(move || match jaeger.lock() { - Ok(mut helper) => { - if msg_id_bytes.len() == 32 { - let msg_id = hex::encode(&msg_id_bytes); - helper.send_span(msg_id, "kafka msg received".to_string()); - } else { - log::error!(target: "jaeger", "Corrupted key field in message from q-server"); - } - } - Err(e) => { - log::error!(target: "jaeger", "Mutex locking error: {}", e); - } - }); - } -} - -pub fn broadcast_sended(msg_id: String) { - if let Some(jaeger) = JAEGER.as_ref() { - tokio::task::spawn_blocking(move || match jaeger.lock() { - Ok(mut helper) => { - helper.send_span(msg_id, "broadcast sended".to_string()) - }, - Err(e) => { - log::error!(target: "jaeger", "Mutex locking error: {}", e); - } - }); - } -} - -impl JaegerHelper { - - pub fn new(service_name: &str) -> Option> { - let (span_tx, span_rx) = crossbeam_channel::bounded(1000); - let tracer = Tracer::with_sender(AllSampler, span_tx); - let mut reporter = match JaegerCompactReporter::new(service_name) { - Ok(reporter) => reporter, - Err(e) => { - log::error!(target: "jaeger", "Can't create jaeger reporter: {}", e); - panic!("Can't create jaeger reporter: {}", e) - } - }; - let agent_host = match env::var("JAEGER_AGENT_HOST") { - Ok(val) => val, - Err(_) => { - log::info!(target: "jaeger", "JAEGER_AGENT_HOST not set. Use default value"); - "localhost".to_string() - } - }; - let agent_port = match env::var("JAEGER_AGENT_PORT") { - Ok(val) => val, - Err(_) => { - log::info!(target: "jaeger", "JAEGER_AGENT_PORT not set. Use default value"); - "6831".to_string() - } - }; - let agent_url = match format!("{}:{}", agent_host, agent_port) - .to_socket_addrs() - .map(|mut iter| iter.next()) - { - Ok(Some(url)) => url, - _ => { - log::error!( - target: "jaeger", - "Invalid JAEGER_* env. Can't parse string to valid address" - ); - return None - } - }; - match reporter.set_agent_addr(agent_url) { - Ok(_) => log::info!( - target: "jaeger", - "Init done with addr {}:{}", agent_host, agent_port - ), - Err(e) => log::error!( - target: "jaeger", - "Can't set agent address to jaeger library. Internal rust_jaegertracing error: {}", - e - ) - } - let ret = JaegerHelper { - tracer, - reporter, - span_rx, - }; - Some(Mutex::new(ret)) - } - - pub fn send_span(&mut self, msg_id: String, span_name: String) { - match self.create_root_span(msg_id) { - Ok(span_root) => { - self.start_span(span_root, span_name); - self.report_span(); - } - Err(e) => { - log::error!(target: "jaeger", "Error: {}", e); - } - } - } - - fn create_root_span(&mut self, msg_id: String) -> Result { - let mut carrier = HashMap::new(); - let span_ctx = format!("{}:{}:0:1", &msg_id[0..16], &msg_id[16..32]); - carrier.insert("uber-trace-id".to_string(), span_ctx); - if let Ok(Some(ctx)) = SpanContext::extract_from_text_map(&carrier) { - Ok(ctx) - } else { - fail!("Can't extract root span context from textmap") - } - } - - fn start_span(&mut self, ctx: SpanContext, name: String) { - let _span = self.tracer.span(name).child_of(&ctx).start(); - log::trace!(target: "jaeger", "Span started"); - } - - fn report_span(&mut self) { - if let Err(e) = self - .reporter - .report(&(self.span_rx).try_iter().collect::>()) - { - log::error!(target: "jaeger", "Can't report span. Internal rustracing_jaeger crate error in reporter: {}", e); - } - } -} diff --git a/src/lib.rs b/src/lib.rs index 36973801..adbf5c8f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,17 +39,6 @@ mod shard_blocks; include!("../common/src/info.rs"); -#[cfg(feature = "tracing")] -pub mod jaeger; - -#[cfg(not(feature = "tracing"))] -pub mod jaeger { - pub fn init_jaeger(){} - #[cfg(feature = "external_db")] - pub fn message_from_kafka_received(_kf_key: &[u8]) {} - pub fn broadcast_sended(_msg_id: String) {} -} - #[cfg(feature = "external_db")] mod external_db; diff --git a/src/main.rs b/src/main.rs index 436807cf..658c5a4b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,20 +39,9 @@ mod ext_messages; mod shard_blocks; -#[cfg(feature = "tracing")] -mod jaeger; - -#[cfg(not(feature = "tracing"))] -mod jaeger { - pub fn init_jaeger(){} - #[cfg(feature = "external_db")] - pub fn message_from_kafka_received(_kf_key: &[u8]) {} - pub fn broadcast_sended(_msg_id: String) {} -} - use crate::{ config::TonNodeConfig, engine::{Engine, Stopper, EngineFlags}, - jaeger::init_jaeger, internal_db::restore::set_graceful_termination, + internal_db::restore::set_graceful_termination, validating_utils::supported_version }; #[cfg(feature = "external_db")] @@ -456,8 +445,6 @@ fn main() { .build() .expect("Can't create Validator tokio runtime"); - init_jaeger(); - #[cfg(feature = "trace_alloc_detail")] thread::spawn( || { diff --git a/src/network/control.rs b/src/network/control.rs index 23bdf761..33b4672f 100644 --- a/src/network/control.rs +++ b/src/network/control.rs @@ -541,7 +541,9 @@ impl ControlQuerySubscriber { async fn prepare_bundle(&self, block_id: BlockIdExt) -> Result { if let DataSource::Engine(ref engine) = self.data_source { - let bundle = CollatorTestBundle::build_with_ethalon(&block_id, engine).await?; + let handle = engine.load_block_handle(&block_id)?.ok_or_else(|| error!("Block handle for {} not found", block_id))?; + let block = engine.load_block(&handle).await?; + let bundle = CollatorTestBundle::build_with_ethalon(engine, block).await?; tokio::task::spawn_blocking(move || { bundle.save("target/bundles").ok(); }); @@ -549,10 +551,10 @@ impl ControlQuerySubscriber { Ok(Success::Engine_Validator_Success) } - async fn prepare_future_bundle(&self, prev_block_ids: Vec) -> Result { + async fn prepare_future_bundle(&self, prev_blocks_ids: Vec) -> Result { if let DataSource::Engine(ref engine) = self.data_source { let bundle = CollatorTestBundle::build_for_collating_block( - prev_block_ids, engine + engine, prev_blocks_ids, None ).await?; tokio::task::spawn_blocking(move || { bundle.save("target/bundles").ok(); @@ -703,6 +705,7 @@ impl ControlQuerySubscriber { self.add_validator_bls_key( query.permanent_key_hash.as_slice(), query.key_hash.as_slice(), query.ttl ).await?, + #[cfg(feature = "telemetry")] None ), Err(query) => query diff --git a/src/network/neighbours.rs b/src/network/neighbours.rs index 22f96092..a107ae4e 100644 --- a/src/network/neighbours.rs +++ b/src/network/neighbours.rs @@ -474,6 +474,7 @@ impl Neighbours { }); } + #[cfg(feature = "telemetry")] pub fn log_neighbors_stat(&self) { log::debug!( target: "telemetry", diff --git a/src/network/node_network.rs b/src/network/node_network.rs index aec10768..6807da50 100644 --- a/src/network/node_network.rs +++ b/src/network/node_network.rs @@ -271,6 +271,7 @@ impl NodeNetwork { } } + #[cfg(feature = "telemetry")] pub fn log_neighbors_stat(&self) { for guard in self.overlays.iter() { guard.val().peers().log_neighbors_stat(); diff --git a/src/network/remp.rs b/src/network/remp.rs index e2c6fa94..7393a1a6 100644 --- a/src/network/remp.rs +++ b/src/network/remp.rs @@ -182,6 +182,11 @@ impl RempNode { self.telemetry.set(telemetry).map_err(|_| error!("Can't set telemetry"))?; Ok(()) } + + #[cfg(all(test,feature = "telemetry"))] + pub fn telemetry(&self) -> Arc { + self.telemetry.get().unwrap().clone() + } } #[async_trait::async_trait] diff --git a/src/network/tests/test_remp.rs b/src/network/tests/test_remp.rs index 175b0c45..0d1d20b3 100644 --- a/src/network/tests/test_remp.rs +++ b/src/network/tests/test_remp.rs @@ -1,7 +1,9 @@ use crate::{ network::remp::{RempNode, RempMessagesSubscriber, RempReceiptsSubscriber, ReceiptStuff}, - test_helper::{get_adnl_config, init_test_log}, validator::telemetry::RempCoreTelemetry + test_helper::{get_adnl_config, init_test_log}, }; +#[cfg(feature = "telemetry")] +use crate::validator::telemetry::RempCoreTelemetry; use adnl::node::AdnlNode; use std::{ @@ -16,7 +18,7 @@ use ever_block::{fail, KeyId, Result, UInt256}; const KEY_TAG: usize = 0; -async fn init_remp_node(ip: &str) -> Result<(Arc, Arc, Arc)> { +async fn init_remp_node(ip: &str) -> Result<(Arc, Arc)> { let config = get_adnl_config("target/remp", ip, vec![KEY_TAG], true).await.unwrap(); let node = AdnlNode::with_config(config).await.unwrap(); let remp = Arc::new(RempNode::new(node.clone(), KEY_TAG)?); @@ -26,9 +28,10 @@ async fn init_remp_node(ip: &str) -> Result<(Arc, Arc, Arc Result<()> { init_test_log(); - let (node1, remp1, telemetry1) = init_remp_node("127.0.0.1:4191").await?; - let (node2, remp2, telemetry2) = init_remp_node("127.0.0.1:4192").await?; + let (node1, remp1) = init_remp_node("127.0.0.1:4191").await?; + let (node2, remp2) = init_remp_node("127.0.0.1:4192").await?; let peer1 = node2.add_peer( node2.key_by_tag(KEY_TAG).unwrap().id(), node1.ip_address(), @@ -114,8 +117,10 @@ async fn test_remp_client_compact_protocol() -> Result<()> { assert!(s1.got_receipts.load(Ordering::Relaxed) > 0); assert!(s2.got_receipts.load(Ordering::Relaxed) > 0); - log::info!("\n1{}", telemetry1.report()); - log::info!("\n\n2{}", telemetry2.report()); + #[cfg(feature = "telemetry")] + log::info!("\n1{}", remp1.telemetry().report()); + #[cfg(feature = "telemetry")] + log::info!("\n\n2{}", remp2.telemetry().report()); node1.stop().await; node2.stop().await; @@ -162,6 +167,7 @@ async fn test_remp_receipts_send_worker() -> Result<()> { sender.clone(), receiver, receipts_in_channel.clone(), + #[cfg(feature = "telemetry")] telemetry.clone() ); @@ -203,6 +209,7 @@ async fn test_remp_receipts_send_worker() -> Result<()> { log::info!("sent packages {}", sender.sent_receipts.lock().unwrap().len()); log::info!("sent receipts {}", sender.sent_receipts.lock().unwrap().iter().map(|p| p.receipts().len()).sum::()); + #[cfg(feature = "telemetry")] log::info!("{}", telemetry.report()); Ok(()) diff --git a/src/tests/test_control.rs b/src/tests/test_control.rs index cbc24a9c..0e79a483 100644 --- a/src/tests/test_control.rs +++ b/src/tests/test_control.rs @@ -37,7 +37,7 @@ use std::{ }; use storage::block_handle_db::BlockHandle; use ton_api::{ - serialize_boxed, tag_from_boxed_type, AnyBoxedSerialize, + serialize_boxed, AnyBoxedSerialize, ton::{ self, TLObject, accountaddress::AccountAddress, engine::validator::{ControlQueryError, KeyHash, Stats}, @@ -51,6 +51,8 @@ use ton_api::{ }; use ton_api::ton::raw::ShardAccountMeta; use ton_api::ton::rpc::raw::{GetAccountMetaByBlock, GetShardAccountMeta}; +#[cfg(feature = "telemetry")] +use ton_api::tag_from_boxed_type; use ever_block::{ Account, BlockIdExt, ConfigParamEnum, ConfigParams, Deserializable, generate_test_account_by_init_code_hash, Message, Serializable, ShardIdent diff --git a/src/tests/test_helper.rs b/src/tests/test_helper.rs index 904973a9..f25f33b5 100644 --- a/src/tests/test_helper.rs +++ b/src/tests/test_helper.rs @@ -14,8 +14,8 @@ #![allow(dead_code)] use crate::{ block::{BlockStuff, BlockKind}, block_proof::BlockProofStuff, - config::{CollatorConfig, TonNodeConfig}, collator_test_bundle::create_engine_allocated, + config::{CollatorConfig, TonNodeConfig}, full_node::apply_block::apply_block, internal_db::{ LAST_APPLIED_MC_BLOCK, SHARD_CLIENT_MC_BLOCK, @@ -555,6 +555,7 @@ impl TestEngine { } pub async fn change_mc_state(&self, mc_state_id: &BlockIdExt) -> Result<()> { + log::debug!("Changing last masterchain state to {}", mc_state_id); let mc_state = self.db.load_shard_state_dynamic(&mc_state_id)?; self.save_last_applied_mc_block_id(&mc_state_id)?; self.shard_blocks.update_shard_blocks(&mc_state).await?; @@ -565,11 +566,19 @@ impl TestEngine { let mc_state_id = self.db.load_full_node_state(LAST_APPLIED_MC_BLOCK)?.unwrap(); let mc_state = self.load_state(&mc_state_id).await?; let (_, mc_state_id, _) = mc_state.shard_state_extra()?.prev_blocks.get(&mc_seq_no)?.unwrap().master_block_id(); - log::debug!("Changing last masterchain state to {}", mc_state_id); self.change_mc_state(&mc_state_id).await?; Ok(mc_state_id) } + pub async fn change_mc_state_by_prev_blocks_ids(&self, prev_blocks_ids: &[BlockIdExt]) -> Result { + let mut mc_seq_no = u32::MAX; + for block_id in prev_blocks_ids { + let handle = self.load_block_handle(block_id)?.unwrap(); + mc_seq_no = mc_seq_no.min(handle.masterchain_ref_seq_no()); + } + self.change_mc_state_by_seqno(mc_seq_no).await + } + pub async fn load_block_by_id(&self, id: &BlockIdExt) -> Result { let handle = self.load_block_handle(id)?.ok_or_else( || error!("Cannot load handle for block {}", id) @@ -718,10 +727,10 @@ impl TestEngine { self.clone(), Some(extra.rand_seed().clone()), None, - CollatorSettings::default(), )?; - let (block_candidate, new_state) = collator.collate().await?; + let collate_result = collator.collate(CollatorSettings::default()).await?; + let block_candidate = collate_result.candidate.unwrap(); if let Some(res_path) = &self.res_path { @@ -772,7 +781,7 @@ impl TestEngine { )?; std::fs::write( &format!("{}/state_candidate.txt", res_path), - debug_state(new_state.clone())? + debug_state(collate_result.new_state.clone().unwrap())? )?; // std::fs::write( @@ -1111,6 +1120,18 @@ impl EngineOperations for TestEngine { ) -> Result<()> { self.ext_messages.complete_messages(to_delay, to_delete, self.now()) } + fn get_remp_messages(&self, _shard: &ShardIdent) -> Result, UInt256)>> { + Ok(Vec::new()) + } + fn finalize_remp_messages( + &self, + _block: BlockIdExt, + _accepted: Vec, + _rejected: Vec<(UInt256, String)>, + _ignored: Vec, + ) -> Result<()> { + Ok(()) + } async fn get_shard_blocks( &self, last_mc_state: &Arc, diff --git a/src/tests/test_remp_client.rs b/src/tests/test_remp_client.rs index ccb21d0b..44f2159a 100644 --- a/src/tests/test_remp_client.rs +++ b/src/tests/test_remp_client.rs @@ -1,6 +1,6 @@ use crate::{ block::BlockStuff, engine_traits::EngineOperations, - full_node::remp_client::{RempClient}, shard_state::ShardStateStuff, + full_node::remp_client::RempClient, shard_state::ShardStateStuff, validator::validator_utils::get_adnl_id, }; #[cfg(feature = "telemetry")] @@ -777,6 +777,7 @@ async fn test_remp_client() -> Result<()> { block_handle_storage: crate::collator_test_bundle::create_block_handle_storage(), sent_remp_messages: AtomicU32::new(0), signed_remp_messages: AtomicU32::new(0), + #[cfg(feature = "telemetry")] telemetry: RempClientTelemetry::default(), }); diff --git a/src/tests/test_shard_blocks.rs b/src/tests/test_shard_blocks.rs index 6460c990..6d96fbce 100644 --- a/src/tests/test_shard_blocks.rs +++ b/src/tests/test_shard_blocks.rs @@ -14,7 +14,7 @@ use super::*; use crate::test_helper::gen_master_state; use crate::collator_test_bundle::{create_block_handle_storage, create_engine_allocated}; -#[cfg(all(feature = "telemetry", not(feature = "fast_finality")))] +#[cfg(all(feature = "telemetry"))] use crate::collator_test_bundle::create_engine_telemetry; use std::{sync::{atomic::{AtomicU32, Ordering}, Arc}, collections::HashSet}; use storage::{block_handle_db::{BlockHandle, BlockHandleStorage}, types::BlockMeta}; diff --git a/src/types/accounts.rs b/src/types/accounts.rs index 95f16e42..9985ebfe 100644 --- a/src/types/accounts.rs +++ b/src/types/accounts.rs @@ -63,6 +63,7 @@ impl ShardAccountStuff { } else { let shard_acc = ShardAccount::with_account_root(self.account_root(), self.last_trans_hash.clone(), self.last_trans_lt); let value = shard_acc.write_to_new_cell()?; + log::trace!("Updating account {:x} in shard state", self.account_addr()); new_accounts.set_builder_serialized(self.account_addr().clone(), &value, &account.aug()?)?; } AccountBlock::with_params(&self.account_addr, &self.transactions, &self.state_update) @@ -114,7 +115,7 @@ impl ShardAccountStuff { let account = self.read_account()?; let new_libs = account.libraries(); if new_libs.root() != self.orig_libs.root() { - new_libs.scan_diff(&self.orig_libs, |key: UInt256, old, new| { + self.orig_libs.scan_diff(&new_libs, |key: UInt256, old, new| { let old = old.unwrap_or_default(); let new = new.unwrap_or_default(); if old.is_public_library() && !new.is_public_library() { diff --git a/src/validator/accept_block.rs b/src/validator/accept_block.rs index 1a5400c3..cdf64031 100644 --- a/src/validator/accept_block.rs +++ b/src/validator/accept_block.rs @@ -611,24 +611,23 @@ pub async fn create_top_shard_block_description( fn find_known_ancestors( block: &BlockStuff, - mc_state: &ShardStateStuff) - -> Result)>> { + mc_state: &ShardStateStuff +) -> Result)>> { let block_descr = fmt_block_id_short(block.id()); let master_ref = block.block()?.read_info()?.read_master_ref()? .ok_or_else(|| error!("Block {} doesn't have `master_ref`", block.id()))?.master; let shard = block.id().shard(); - let mc_state_extra = mc_state.state()?.read_custom()? - .ok_or_else(|| error!("State for {} doesn't have McStateExtra", mc_state.block_id()))?; + let shards = mc_state.shards()?; let mut ancestors = vec!(); let oldest_ancestor_seqno; - match mc_state_extra.shards().find_shard(shard) { + match shards.find_shard(shard) { Ok(None) => { let (a1, a2) = shard.split()?; - let ancestor1 = mc_state_extra.shards().get_shard(&a1)?; - let ancestor2 = mc_state_extra.shards().find_shard(&a2)?; + let ancestor1 = shards.get_shard(&a1)?; + let ancestor2 = shards.find_shard(&a2)?; if let (Some(ancestor1), Some(ancestor2)) = (ancestor1, ancestor2) { log::trace!(target: "validator", "({}): found two ancestors: {} and {}", block_descr, ancestor1.shard(), ancestor2.shard()); diff --git a/src/validator/collator.rs b/src/validator/collator.rs index 070a9974..8b78f38c 100644 --- a/src/validator/collator.rs +++ b/src/validator/collator.rs @@ -31,7 +31,7 @@ use crate::{ }, validator::{ BlockCandidate, CollatorSettings, McData, - out_msg_queue::{MsgQueueManager, OutMsgQueueInfoStuff}, + out_msg_queue::{MsgQueueManager, OutMsgQueueInfoStuff}, validator_utils::calc_subset_for_masterchain }, CHECK, @@ -44,7 +44,6 @@ use ton_api::ton::ton_node::{ RempMessageLevel, RempMessageStatus }; use std::{ - cmp::{max, min}, collections::{BinaryHeap, HashMap, HashSet}, ops::Deref, sync::{ @@ -54,24 +53,26 @@ use std::{ time::{Duration, Instant} }; use ever_block::{ - AddSub, BlkPrevInfo, Block, BlockCreateStats, BlockExtra, BlockIdExt, BlockInfo, CommonMsgInfo, - ConfigParams, CopyleftRewards, CreatorStats, CurrencyCollection, Deserializable, ExtBlkRef, - FutureSplitMerge, GlobalCapabilities, GlobalVersion, Grams, HashmapAugType, InMsg, InMsgDescr, - InternalMessageHeader, KeyExtBlkRef, KeyMaxLt, Libraries, McBlockExtra, McShardRecord, - McStateExtra, MerkleUpdate, MeshHashes, MeshHashesExt, MeshMsgQueuesInfo, Message, - MsgAddressInt, OutMsg, OutMsgDescr, OutMsgQueueInfo, OutMsgQueueKey, OutQueueUpdates, - ParamLimitIndex, Serializable, ShardAccount, ShardAccountBlocks, ShardAccounts, ShardDescr, - ShardFees, ShardHashes, ShardIdent, ShardStateSplit, ShardStateUnsplit, TopBlockDescrSet, - Transaction, TransactionTickTock, UnixTime32, ValidatorSet, ValueFlow, VarUInteger32, - WorkchainDescr, Workchains, MASTERCHAIN_ID, SERDE_OPTS_COMMON_MESSAGE, SERDE_OPTS_EMPTY, - CommonMessage, AccountIdPrefixFull, ChildCell, ConnectedNwOutDescr, HashUpdate, InRefValue, - ConnectedNwDescrExt, ConnectedNwDescr, Account, GetRepresentationHash, + Account, AccountId, AccountIdPrefixFull, AddSub, BlkPrevInfo, Block, BlockCreateStats, + BlockExtra, BlockIdExt, BlockInfo, Cell, ChildCell, CommonMessage, CommonMsgInfo, + ConfigParams, ConnectedNwDescr, ConnectedNwDescrExt, ConnectedNwOutDescr, + CopyleftRewards, CreatorStats, CurrencyCollection, Deserializable, + ExtBlkRef, Failure, FutureSplitMerge, GetRepresentationHash, GlobalCapabilities, + GlobalVersion, Grams, HashUpdate, HashmapAugType, HashmapType, InMsg, InMsgDescr, + InRefValue, InternalMessageHeader, KeyExtBlkRef, KeyMaxLt, Libraries, MASTERCHAIN_ID, + McBlockExtra, McShardRecord, McStateExtra, MerkleUpdate, MeshHashes, MeshHashesExt, + MeshMsgQueuesInfo, Message, MsgAddressInt, OutMsg, OutMsgDescr, OutMsgQueueInfo, + OutMsgQueueKey, OutQueueUpdates, ParamLimitIndex, ProcessedInfoKey, ProcessedUpto, + Result, SERDE_OPTS_COMMON_MESSAGE, SERDE_OPTS_EMPTY, Serializable, ShardAccount, + ShardAccountBlocks, ShardAccounts, ShardDescr, ShardFees, ShardHashes, ShardIdent, + ShardStateSplit, ShardStateUnsplit, SliceData, TopBlockDescrSet, Transaction, + TransactionTickTock, UInt256, UnixTime32, UsageTree, ValidatorSet, ValueFlow, + VarUInteger32, WorkchainDescr, Workchains, error, fail, write_boc }; use ever_executor::{ BlockchainConfig, ExecuteParams, OrdinaryTransactionExecutor, TickTockTransactionExecutor, TransactionExecutor, }; -use ever_block::{error, fail, AccountId, Cell, HashmapType, Result, UInt256, UsageTree, SliceData}; use crate::engine_traits::RempQueueCollatorInterface; use crate::validator::validator_utils::{is_remp_enabled, PrevBlockHistory}; @@ -133,8 +134,8 @@ impl PrevData { let mut overload_history = 0; let mut underload_history = 0; if let Some(state) = states.get(1) { - gen_utime = std::cmp::max(gen_utime, state.state()?.gen_time()); - gen_lt = std::cmp::max(gen_lt, state.state()?.gen_lt()); + gen_utime = gen_utime.max(state.state()?.gen_time()); + gen_lt = gen_lt.max(state.state()?.gen_lt()); let key = state.shard().merge()?.shard_key(false); accounts.merge(&state.state()?.read_accounts()?, &key)?; total_validator_fees.add(state.state()?.total_validator_fees())?; @@ -220,6 +221,9 @@ impl PartialOrd for NewMessage { } struct CollatorData { + collator_settings: Arc, + collated_block_descr: Arc, + // lists, empty by default in_msgs: InMsgDescr, out_msgs: OutMsgDescr, @@ -291,11 +295,15 @@ impl CollatorData { usage_tree: UsageTree, prev_data: &PrevData, is_masterchain: bool, + collator_settings: Arc, + collated_block_descr: Arc, ) -> Result { let limits = Arc::new(config.raw_config().block_limits(is_masterchain)?); let opts = serde_opts_from_caps(&config); let split_queues = !config.has_capability(GlobalCapabilities::CapNoSplitOutQueue); let ret = Self { + collator_settings, + collated_block_descr, in_msgs: InMsgDescr::with_serde_opts(opts), out_msgs: OutMsgDescr::with_serde_opts(opts), accounts: ShardAccountBlocks::default(), @@ -384,7 +392,7 @@ impl CollatorData { /// add in and out messages from to block, and to new message queue fn new_transaction(&mut self, transaction: &Transaction, tr_cell: ChildCell, in_msg_opt: Option<&InMsg>) -> Result<()> { // log::trace!( - // "new transaction, message {:x}\n{}", + // "{} new transaction, message {:x}\n{}", self.collated_block_descr, // in_msg_opt.map(|m| m.message_cell().unwrap().repr_hash()).unwrap_or_default(), // ever_block_json::debug_transaction(transaction.clone()).unwrap_or_default(), // ); @@ -449,9 +457,9 @@ impl CollatorData { // let mut data = self.out_msg_queue_info.del_message(key)?; // let created_lt = u64::construct_from(&mut data)?; // let enq = MsgEnqueueStuff::construct_from(&mut data, created_lt)?; - // let data = ever_block::write_boc(&enq.message_cell())?; - // log::debug!("del_out_msg_from_state {:x} size {}", key, data.len()); - log::debug!("del_out_msg_from_state {:x}", key); + // let data = write_boc(&enq.message_cell())?; + // log::debug!("{} del_out_msg_from_state {:x} size {}", self.collated_block_descr, key, data.len()); + log::debug!("{} del_out_msg_from_state {:x}", self.collated_block_descr, key); self.dequeue_count += 1; self.out_msg_queue_info.del_message(key)?; self.block_limit_status.register_out_msg_queue_op( @@ -592,7 +600,7 @@ impl CollatorData { } fn update_min_mc_seqno(&mut self, mc_seqno: u32) -> u32 { - let min_ref_mc_seqno = min(self.min_ref_mc_seqno.unwrap_or(std::u32::MAX), mc_seqno); + let min_ref_mc_seqno = self.min_ref_mc_seqno.unwrap_or(std::u32::MAX).min(mc_seqno); self.min_ref_mc_seqno = Some(min_ref_mc_seqno); min_ref_mc_seqno } @@ -774,25 +782,38 @@ impl ExecutionManager { Ok(()) } + /// starts a new transaction execution task + /// if pruned access error occurs, returns false pub async fn execute( &mut self, account_id: AccountId, msg: AsyncMessage, prev_data: &PrevData, collator_data: &mut CollatorData, - ) -> Result<()> { + ) -> Result { log::trace!("{}: execute (adding into queue): {:x}", self.collated_block_descr, account_id); if let Some((sender, _handle)) = self.changed_accounts.get(&account_id) { self.wait_tr.request(); sender.send(Arc::new(msg))?; } else { - let shard_acc = if let Some(shard_acc) = prev_data.accounts().account(&account_id)? { - shard_acc - } else if let AsyncMessage::Ext(_, msg_id) = msg { - collator_data.rejected_ext_messages.push((msg_id, format!("account {:x} not found", account_id))); - return Ok(()); // skip external messages for unexisting accounts - } else { - ShardAccount::default() + let shard_acc = match prev_data.accounts().account(&account_id) { + Ok(Some(shard_acc)) => shard_acc, + Ok(None) => { + if let AsyncMessage::Ext(_, msg_id) = msg { + collator_data.rejected_ext_messages.push((msg_id, format!("account {:x} not found", account_id))); + return Ok(true); // skip external messages for unexisting accounts + } else { + ShardAccount::default() + } + } + Err(err) => { + // this code is for collator bundles not to produce error accessing pruned messages + #[cfg(test)] + if collator_data.collator_settings.is_bundle && err.downcast_ref() == Some(&ever_block::ExceptionCode::PrunedCellAccess) { + return Ok(false); + } + return Err(err); + } }; let (sender, handle) = self.start_account_job( account_id.clone(), @@ -805,7 +826,7 @@ impl ExecutionManager { self.check_parallel_transactions(collator_data).await?; - Ok(()) + Ok(true) } fn start_account_job( @@ -879,7 +900,7 @@ impl ExecutionManager { if let Err(err) = res { log::error!("FAILED to add transaction to shard account staff: {}", &err); fail!(err); - } + } } total_trans_duration.fetch_add(duration, Ordering::Relaxed); log::trace!("{}: account {:x} TIME execute {}μ;", @@ -1045,6 +1066,13 @@ impl ExecutionManager { } } +pub struct CollateResult { + pub candidate: Option, + pub new_state: Option, + pub usage_tree: UsageTree, + pub error: Failure, +} + pub struct Collator { engine: Arc, remp_collator_interface: Option>, @@ -1062,7 +1090,6 @@ pub struct Collator { debug: bool, rand_seed: UInt256, - collator_settings: CollatorSettings, started: Instant, stop_flag: Arc, @@ -1078,7 +1105,6 @@ impl Collator { engine: Arc, rand_seed: Option, remp_collator_interface: Option>, - collator_settings: CollatorSettings ) -> Result { let prev_blocks_ids = prev_blocks_history.get_prevs(); @@ -1087,7 +1113,7 @@ impl Collator { let new_block_seqno = match prev_blocks_ids.len() { 1 => prev_blocks_ids[0].seq_no() + 1, - 2 => max(prev_blocks_ids[0].seq_no(), prev_blocks_ids[1].seq_no()) + 1, + 2 => prev_blocks_ids[0].seq_no().max(prev_blocks_ids[1].seq_no()) + 1, _ => fail!("`prev_blocks_ids` has invalid length"), }; @@ -1157,7 +1183,7 @@ impl Collator { remp_collator_interface, shard, min_mc_seqno, - prev_blocks_ids: prev_blocks_ids.clone(), + prev_blocks_ids: prev_blocks_ids.to_vec(), created_by, after_merge, after_split, @@ -1165,13 +1191,12 @@ impl Collator { collated_block_descr, debug: true, rand_seed, - collator_settings, started: Instant::now(), stop_flag: Arc::new(AtomicBool::new(false)), }) } - pub async fn collate(mut self) -> Result<(BlockCandidate, ShardStateUnsplit)> { + pub async fn collate(mut self, collator_settings: CollatorSettings) -> Result { log::info!( "{}: COLLATE min_mc_seqno = {}, prev_blocks_ids: {} {}", self.collated_block_descr, @@ -1181,17 +1206,18 @@ impl Collator { ); self.init_timeout(); + let collator_settings = Arc::new(collator_settings); let mut collator_data; let mut attempt = 0; let mut duration; // inside the loop try to collate new block - let (candidate, state, exec_manager) = loop { + let (collate_result, exec_manager) = loop { let attempt_started = Instant::now(); // load required data including masterchain and shards states - let imported_data = self.import_data() - .await.map_err(|e| { + let imported_data = self.import_data().await + .map_err(|e| { log::warn!("{}: COLLATION FAILED: TIME: {}ms import_data: {:?}", self.collated_block_descr, self.started.elapsed().as_millis(), e); e @@ -1200,23 +1226,30 @@ impl Collator { let mc_data; let prev_data; // unpack state, perform some checkes, import masterchain and shards blocks - (mc_data, prev_data, collator_data) = self.prepare_data(imported_data) - .await.map_err(|e| { + (mc_data, prev_data, collator_data) = self.prepare_data(imported_data, collator_settings.clone()).await + .map_err(|e| { log::warn!("{}: COLLATION FAILED: TIME: {}ms prepare_data: {:?}", self.collated_block_descr, self.started.elapsed().as_millis(), e); e })?; // load messages and process them to produce block candidate - let result = self.do_collate(&mc_data, &prev_data, &mut collator_data).await - .map_err(|e| { + let result = self.do_collate(&mc_data, &prev_data, &mut collator_data).await; + duration = attempt_started.elapsed().as_millis() as u32; + match result { + Err(e) => { log::warn!("{}: COLLATION FAILED: TIME: {}ms do_collate: {:?}", self.collated_block_descr, self.started.elapsed().as_millis(), e); - e - }); - duration = attempt_started.elapsed().as_millis() as u32; - if let Some(result) = result? { - break result; + let collate_result = CollateResult { + candidate: None, + new_state: None, + usage_tree: collator_data.usage_tree, + error: Some(e), + }; + return Ok(collate_result); + } + Ok(Some(result)) => break result, + Ok(None) => () } // sleep after empty collation to respect the collation time iterval @@ -1238,20 +1271,21 @@ impl Collator { let pruned_count = collator_data.estimate_pruned_count(); let estimate_size = collator_data.block_limit_status.estimate_block_size(None, pruned_count) as usize; - log::info!( - "{}: ASYNC COLLATED SIZE: {} ESTIMATEED SIZE: {} GAS: {} TIME: {}ms GAS_RATE: {} TRANS: {}ms ID: {}", - self.collated_block_descr, - candidate.data.len(), - estimate_size, - collator_data.block_limit_status.gas_used(), - duration, - ratio, - exec_manager.total_trans_duration.load(Ordering::Relaxed) / 1000, - candidate.block_id, - ); - - if estimate_size > 400_000 && 100 * estimate_size.abs_diff(candidate.data.len()) / estimate_size > 5 { - log::warn!("{}: diff is too much", self.collated_block_descr) + if let Some(candidate) = &collate_result.candidate { + log::info!( + "{}: ASYNC COLLATED SIZE: {} ESTIMATEED SIZE: {} GAS: {} TIME: {}ms GAS_RATE: {} TRANS: {}ms ID: {}", + self.collated_block_descr, + candidate.data.len(), + estimate_size, + collator_data.block_limit_status.gas_used(), + duration, + ratio, + exec_manager.total_trans_duration.load(Ordering::Relaxed) / 1000, + candidate.block_id, + ); + if estimate_size > 400_000 && 100 * estimate_size.abs_diff(candidate.data.len()) / estimate_size > 5 { + log::warn!("{}: diff is too much", self.collated_block_descr) + } } #[cfg(feature = "log_metrics")] @@ -1265,7 +1299,7 @@ impl Collator { collator_data.execute_count, collator_data.block_limit_status.gas_used(), ratio, - candidate.data.len(), + collate_result.candidate.data.len(), duration, ); @@ -1278,7 +1312,7 @@ impl Collator { collator_data.block_limit_status.gas_used() ); - Ok((candidate, state)) + Ok(collate_result) } async fn import_data(&self) -> Result { @@ -1304,7 +1338,7 @@ impl Collator { let top_shard_blocks_descr = Vec::new(); - break Ok(ImportedData { + return Ok(ImportedData { mc_state, prev_states, prev_ext_blocks_refs, @@ -1314,7 +1348,7 @@ impl Collator { } } - async fn prepare_data(&self, mut imported_data: ImportedData) + async fn prepare_data(&self, mut imported_data: ImportedData, collator_settings: Arc) -> Result<(McData, PrevData, CollatorData)> { log::trace!("{}: prepare_data", self.collated_block_descr); @@ -1347,6 +1381,8 @@ impl Collator { usage_tree, &prev_data, is_masterchain, + collator_settings, + self.collated_block_descr.clone(), )?; if !self.shard.is_masterchain() { let (now_upper_limit, before_split, _accept_msgs) = check_this_shard_mc_info( @@ -1373,7 +1409,7 @@ impl Collator { mc_data.mc_state_extra(), mc_data.mc_state_extra().shards(), &mc_data.state(), - self.collator_settings.is_fake, + collator_data.collator_settings.is_fake, )?; self.check_utime(&mc_data, &prev_data, &mut collator_data)?; @@ -1402,7 +1438,7 @@ impl Collator { mc_data: &McData, prev_data: &PrevData, collator_data: &mut CollatorData, - ) -> Result> { + ) -> Result> { log::debug!("{}: do_collate", self.collated_block_descr); self.check_stop_flag()?; @@ -1474,8 +1510,9 @@ impl Collator { if !self.after_split || !collator_data.split_queues { // import inbound internal messages, process or transit let now = std::time::Instant::now(); - self.process_inbound_internal_messages(prev_data, collator_data, &output_queue_manager, - &mut exec_manager).await?; + self.process_inbound_internal_messages( + prev_data, collator_data, &output_queue_manager, &mut exec_manager + ).await?; log::debug!("{}: TIME: process_inbound_internal_messages {}ms;", self.collated_block_descr, now.elapsed().as_millis()); @@ -1587,10 +1624,11 @@ impl Collator { //collator_data.block_limit_status.dump_block_size(); // serialize everything - let result = self.finalize_block( - mc_data, prev_data, collator_data, exec_manager, new_state_copyleft_rewards).await?; + let (collate_result, exec_manager) = self.finalize_block( + mc_data, prev_data, collator_data, exec_manager, new_state_copyleft_rewards + ).await?; - Ok(Some(result)) + Ok(Some((collate_result, exec_manager))) } async fn clean_out_msg_queue( @@ -1667,8 +1705,8 @@ impl Collator { ); } prev_state.proc_info()?.iterate_slices_with_keys(|ref mut key, ref mut value| { - let key = ever_block::ProcessedInfoKey::construct_from(key)?; - let value = ever_block::ProcessedUpto::construct_from(value)?; + let key = ProcessedInfoKey::construct_from(key)?; + let value = ProcessedUpto::construct_from(value)?; log::trace!( "{}: prev processed upto {} {:x} - {} {:x}", self.collated_block_descr, @@ -1723,7 +1761,7 @@ impl Collator { Ok(mc_data) } - fn unpack_last_state(&self, mc_data: &McData, prev_states: &Vec>) -> Result { + fn unpack_last_state(&self, mc_data: &McData, prev_states: &[Arc]) -> Result { log::trace!("{}: unpack_last_state", self.collated_block_descr); for state in prev_states.iter() { self.check_one_state(mc_data, state)?; @@ -1795,9 +1833,9 @@ impl Collator { // consider unixtime and lt from previous block(s) of the same shardchain let prev_now = prev_data.prev_state_utime(); - let prev = max(mc_data.state().state()?.gen_time(), prev_now); + let prev = mc_data.state().state()?.gen_time().max(prev_now); log::trace!("{}: init_utime prev_time: {}", self.collated_block_descr, prev); - let time = max(prev + 1, self.engine.now()); + let time = self.engine.now().max(prev + 1); Ok(time) } @@ -1868,9 +1906,9 @@ impl Collator { log::trace!("{}: init_lt", self.collated_block_descr); let mut start_lt = if !self.shard.is_masterchain() { - max(mc_data.state().state()?.gen_lt(), prev_data.prev_state_lt()) + mc_data.state().state()?.gen_lt().max(prev_data.prev_state_lt()) } else { - max(mc_data.state().state()?.gen_lt(), collator_data.shards_max_end_lt()) + mc_data.state().state()?.gen_lt().max(collator_data.shards_max_end_lt()) }; let align = mc_data.get_lt_align(); @@ -1919,8 +1957,8 @@ impl Collator { let mut shards = mc_data.state().shards()?.clone(); let wc_set = mc_data.config().workchains()?; wc_set.iterate_with_keys(|wc_id: i32, wc_info| { - log::trace!(" - {}: adjust_shard_config workchain {wc_id}, active {}, enabled_since {} (now {})", + log::trace!( + "{}: adjust_shard_config workchain {wc_id}, active {}, enabled_since {} (now {})", self.collated_block_descr, wc_info.active(), wc_info.enabled_since, @@ -2064,7 +2102,7 @@ impl Collator { prev_descr.descr.reg_mc_seqno = self.new_block_id_part.seq_no; descr.descr.reg_mc_seqno = self.new_block_id_part.seq_no; - let end_lt = max(prev_descr.descr.end_lt, descr.descr.end_lt); + let end_lt = prev_descr.descr.end_lt.max(descr.descr.end_lt); if let Err(e) = self.update_shard_block_info2( collator_data.shards_mut()?, prev_descr.clone(), descr.clone(), @@ -2451,7 +2489,17 @@ impl Collator { log::debug!("{}: process_inbound_internal_messages", self.collated_block_descr); let mut iter = output_queue_manager.merge_out_queue_iter(&self.shard)?; while let Some(k_v) = iter.next() { - let (key, enq, created_lt, block_id) = k_v?; + let (key, enq, created_lt, block_id) = match k_v { + Ok(k_v) => k_v, + Err(err) => { + // this code is for collator bundles not to produce error accessing pruned messages + #[cfg(test)] + if collator_data.collator_settings.is_bundle && err.downcast_ref() == Some(&ever_block::ExceptionCode::PrunedCellAccess) { + break + } + return Err(err) + } + }; if !collator_data.split_queues && !block_id.shard().contains_full_prefix(&enq.cur_prefix()) { // this message was left from split result continue; @@ -2476,7 +2524,9 @@ impl Collator { let account_id = enq.dst_account_id()?; log::debug!("{}: message {:x} sent to execution to account {:x}", self.collated_block_descr, key.hash, account_id); let msg = AsyncMessage::Int(enq, our); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + if !exec_manager.execute(account_id, msg, prev_data, collator_data).await? { + break; + } } else { // println!("{:x} {:#}", key, enq); // println!("cur: {}, dst: {}", enq.cur_prefix(), enq.dst_prefix()); @@ -2574,7 +2624,9 @@ impl Collator { let (_, account_id) = header.dst.extract_std_address(true)?; log::debug!("{}: message {:x} sent to execution", self.collated_block_descr, msg_id); let msg = AsyncMessage::Ext(CommonMessage::Std(msg.deref().clone()), msg_id); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + if !exec_manager.execute(account_id, msg, prev_data, collator_data).await? { + break; + } } else { // usually node collates more than one shard, the message can belong another one, // so we can't postpone it @@ -2639,7 +2691,9 @@ impl Collator { let (_, account_id) = header.dst.extract_std_address(true)?; let msg_std = CommonMessage::Std(msg.deref().clone()); let msg = AsyncMessage::Ext(msg_std, id); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + if !exec_manager.execute(account_id, msg, prev_data, collator_data).await? { + break; + } } } else { remp_collator_interface.update_message_collation_result( @@ -2710,7 +2764,9 @@ impl Collator { collator_data.update_last_proc_int_msg((created_lt, hash))?; let msg = AsyncMessage::New(env, tr_cell); log::debug!("{}: message {:x} sent to execution", self.collated_block_descr, key.hash); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + if !exec_manager.execute(account_id, msg, prev_data, collator_data).await? { + enqueue_only = true; + } }; self.check_stop_flag()?; } @@ -2775,11 +2831,11 @@ impl Collator { log::info!("{}: Block is loaded normally", self.collated_block_descr); } - if let Some(true) = self.collator_settings.want_split { + if let Some(true) = collator_data.collator_settings.want_split { log::info!("{}: want_split manually set", self.collated_block_descr); collator_data.want_split = true; return - } else if let Some(true) = self.collator_settings.want_merge { + } else if let Some(true) = collator_data.collator_settings.want_merge { log::info!("{}: want_merge manually set", self.collated_block_descr); collator_data.want_merge = true; return @@ -2828,7 +2884,9 @@ impl Collator { hdr.created_lt = collator_data.start_lt()?; hdr.created_at = UnixTime32::new(collator_data.gen_utime); let msg = CommonMessage::Std(Message::with_int_header(hdr)); - exec_manager.execute(account_id, AsyncMessage::Copyleft(msg), prev_data, collator_data).await?; + if !exec_manager.execute(account_id, AsyncMessage::Copyleft(msg), prev_data, collator_data).await? { + break; + } self.check_stop_flag()?; } @@ -2850,7 +2908,7 @@ impl Collator { collator_data: &mut CollatorData, mut exec_manager: ExecutionManager, new_state_copyleft_rewards: CopyleftRewards, - ) -> Result<(BlockCandidate, ShardStateUnsplit, ExecutionManager)> { + ) -> Result<(CollateResult, ExecutionManager)> { log::trace!("{}: finalize_block", self.collated_block_descr); let opts = collator_data.serde_opts; let (want_split, overload_history) = collator_data.want_split(); @@ -2970,7 +3028,8 @@ impl Collator { log::trace!("{}: finalize_block: calc new state", self.collated_block_descr); // Calc new state, then state update - log::trace!("copyleft rewards count from workchains: {}", collator_data.get_workchains_copyleft_rewards().len()?); + log::trace!("{}: copyleft rewards count from workchains: {}", + self.collated_block_descr, collator_data.get_workchains_copyleft_rewards().len()?); if self.shard.is_masterchain() && !value_flow.copyleft_rewards.is_empty() { log::warn!("copyleft rewards in masterchain must be empty") } @@ -3018,8 +3077,8 @@ impl Collator { .read_out_msg_queue_info()? .proc_info() .iterate_slices_with_keys(|ref mut key, ref mut value| { - let key = ever_block::ProcessedInfoKey::construct_from(key)?; - let value = ever_block::ProcessedUpto::construct_from(value)?; + let key = ProcessedInfoKey::construct_from(key)?; + let value = ProcessedUpto::construct_from(value)?; log::trace!( "{}: new processed upto {} {:x} - {} {:x}", self.collated_block_descr, @@ -3098,11 +3157,11 @@ impl Collator { } else { Block::with_out_queue_updates( global_id, - info, - value_flow, - state_update, - queue_updates, - extra, + info, + value_flow, + state_update, + queue_updates, + extra, )? }; let mut block_id = self.new_block_id_part.clone(); @@ -3111,7 +3170,7 @@ impl Collator { log::trace!("{}: finalize_block: fill block candidate", self.collated_block_descr); let cell = new_block.serialize_with_opts(opts)?; block_id.root_hash = cell.repr_hash(); - let data = ever_block::write_boc(&cell)?; + let data = write_boc(&cell)?; block_id.file_hash = UInt256::calc_file_hash(&data); // !!!! DEBUG !!!! @@ -3162,7 +3221,13 @@ impl Collator { collator_data.in_msg_count, collator_data.out_msg_count, collator_data.execute_count, collator_data.transit_count, collator_data.remove_count, candidate.data.len() ); - Ok((candidate, new_state, exec_manager)) + let collate_result = CollateResult { + candidate: Some(candidate), + new_state: Some(new_state), + usage_tree: std::mem::take(&mut collator_data.usage_tree), + error: None, + }; + Ok((collate_result, exec_manager)) } fn _check_visited_integrity(cell: &Cell, visited: &HashSet, visited_from_root: &mut HashSet) { @@ -3251,13 +3316,22 @@ impl Collator { // Self::_check_visited_integrity(&prev_data.state_root, &visited, &mut visited_from_root); // assert_eq!(visited.len(), visited_from_root.len()); - let now = std::time::Instant::now(); - let state_update = MerkleUpdate::create_fast( - &prev_data.state_root, - new_ss_root, - |h| collator_data.usage_tree.contains(h) || collator_data.imported_visited.contains(h) - )?; - log::trace!("{}: TIME: merkle update creating {}ms;", self.collated_block_descr, now.elapsed().as_millis()); + #[cfg(test)] + let need_full_state_update = collator_data.collator_settings.is_bundle; + #[cfg(not(test))] + let need_full_state_update = true; + let state_update; + if need_full_state_update { + let now = std::time::Instant::now(); + state_update = MerkleUpdate::create_fast( + &prev_data.state_root, + new_ss_root, + |h| collator_data.usage_tree.contains(h) || collator_data.imported_visited.contains(h) + )?; + log::trace!("{}: TIME: merkle update creating {}ms;", self.collated_block_descr, now.elapsed().as_millis()); + } else { + state_update = MerkleUpdate::default(); + } // let new_root2 = state_update.apply_for(&prev_data.state_root)?; // assert_eq!(new_root2.repr_hash(), new_ss_root.repr_hash()); @@ -3486,12 +3560,12 @@ impl Collator { // temp code, delete after iterate_shards_with_siblings_mut let mut changed_shards = HashMap::new(); collator_data.shards()?.iterate_shards_with_siblings(|shard, mut descr, mut sibling| { - min_ref_mc_seqno = min(min_ref_mc_seqno, descr.min_ref_mc_seqno); + min_ref_mc_seqno = min_ref_mc_seqno.min(descr.min_ref_mc_seqno); let unchanged_sibling = sibling.clone(); let updated_sibling = if let Some(sibling) = sibling.as_mut() { - min_ref_mc_seqno = min(min_ref_mc_seqno, sibling.min_ref_mc_seqno); + min_ref_mc_seqno = min_ref_mc_seqno.min(sibling.min_ref_mc_seqno); self.update_one_shard( &shard.sibling(), sibling, diff --git a/src/validator/fabric.rs b/src/validator/fabric.rs index 9e50aa41..e4dcf311 100644 --- a/src/validator/fabric.rs +++ b/src/validator/fabric.rs @@ -40,10 +40,10 @@ pub async fn run_validate_query_any_candidate( let real_block = Block::construct_from_bytes(&block.data)?; let shard = block.block_id.shard().clone(); let info = real_block.read_info()?; - let prev = PrevBlockHistory::new_prevs(&shard, &info.read_prev_ids()?); + let prev = PrevBlockHistory::with_prevs(&shard, info.read_prev_ids()?); let (_, master_ref) = info.read_master_id()?.master_block_id(); let mc_state = engine.load_state(&master_ref).await?; - let min_masterchain_block_id = mc_state.find_block_id(info.min_ref_mc_seqno())?; + let min_mc_seq_no = info.min_ref_mc_seqno(); let mut cc_seqno_with_delta = 0; let mc_state_extra = mc_state.shard_state_extra()?; let cc_seqno_from_state = if shard.is_masterchain() { @@ -67,8 +67,8 @@ pub async fn run_validate_query_any_candidate( run_validate_query( shard, SystemTime::now(), - min_masterchain_block_id, - &prev, + min_mc_seq_no, + prev, block, validator_set, engine, @@ -80,8 +80,8 @@ pub async fn run_validate_query_any_candidate( pub async fn run_validate_query( shard: ShardIdent, _min_ts: SystemTime, - min_masterchain_block_id: BlockIdExt, - prev: &PrevBlockHistory, + min_mc_seq_no: u32, + prev: PrevBlockHistory, block: super::BlockCandidate, set: ValidatorSet, engine: Arc, @@ -96,7 +96,7 @@ pub async fn run_validate_query( "({}): before validator query shard: {}, min: {}", next_block_descr, shard, - min_masterchain_block_id, + min_mc_seq_no, ); let labels = [("shard", shard.to_string())]; @@ -107,7 +107,7 @@ pub async fn run_validate_query( let validator_result = if !test_bundles_config.is_enable() { ValidateQuery::new( shard.clone(), - min_masterchain_block_id.seq_no(), + min_mc_seq_no, prev.get_prevs().clone(), block, set, @@ -119,7 +119,7 @@ pub async fn run_validate_query( } else { let query = ValidateQuery::new( shard.clone(), - min_masterchain_block_id.seq_no(), + min_mc_seq_no, prev.get_prevs().clone(), block.clone(), set, @@ -136,12 +136,10 @@ pub async fn run_validate_query( if !CollatorTestBundle::exists(test_bundles_config.path(), &id) { let path = test_bundles_config.path().to_string(); let engine = engine.clone(); - let shard = shard.clone(); - let prev_vec = prev.get_prevs().clone(); tokio::spawn( async move { match CollatorTestBundle::build_for_validating_block( - shard, min_masterchain_block_id, prev_vec, block, &engine + &engine, &prev, block ).await { Err(e) => log::error!( "({}): Error while test bundle for {} building: {}", next_block_descr, id, e @@ -211,7 +209,7 @@ pub async fn run_collate_query ( shard: ShardIdent, _min_ts: SystemTime, min_mc_seqno: u32, - prev: &PrevBlockHistory, + prev: PrevBlockHistory, remp_collator_interface: Option>, collator_id: PublicKey, set: ValidatorSet, @@ -228,66 +226,73 @@ pub async fn run_collate_query ( let collator = collator::Collator::new( shard.clone(), min_mc_seqno, - prev, + &prev, set, UInt256::from(collator_id.pub_key()?), engine.clone(), None, remp_collator_interface, - CollatorSettings::default() )?; - let collator_result = collator.collate().await; + let collate_result = collator.collate(CollatorSettings::default()).await; let labels = [("shard", shard.to_string())]; #[cfg(not(feature = "statsd"))] metrics::decrement_gauge!("run_collators", 1.0, &labels); + let mut usage_tree_opt = None; - match collator_result { - Ok((candidate, _)) => { - metrics::increment_counter!("successful_collations", &labels); - - return Ok(validator_query_candidate_to_validator_block_candidate(collator_id, candidate)) - } - Err(err) => { - let labels = [("shard", shard.to_string())]; - metrics::increment_counter!("failed_collations", &labels); - let test_bundles_config = &engine.test_bundles_config().collator; - - let err_str = if test_bundles_config.is_enable() { - err.to_string() + let err = match collate_result { + Ok(collate_result) => { + if let Some(candidate) = collate_result.candidate { + metrics::increment_counter!("successful_collations", &labels); + + return Ok(validator_query_candidate_to_validator_block_candidate(collator_id, candidate)) } else { - String::default() - }; + usage_tree_opt = Some(collate_result.usage_tree); + collate_result.error.unwrap() + } + } + Err(err) => err + }; + let labels = [("shard", shard.to_string())]; + metrics::increment_counter!("failed_collations", &labels); + let test_bundles_config = &engine.test_bundles_config().collator; - #[cfg(feature = "telemetry")] - engine.collator_telemetry().failed_attempt(&shard, &err_str); + let err_str = if test_bundles_config.is_enable() { + err.to_string() + } else { + String::default() + }; - if test_bundles_config.is_enable() { - if test_bundles_config.need_to_build_for(&err_str) { - let id = prev.get_next_block_id(&UInt256::default(), &UInt256::default()); - let prev_vec = prev.get_prevs().clone(); + #[cfg(feature = "telemetry")] + engine.collator_telemetry().failed_attempt(&shard, &err_str); - if !CollatorTestBundle::exists(test_bundles_config.path(), &id) { - let path = test_bundles_config.path().to_string(); - let engine = engine.clone(); - tokio::spawn(async move { - match CollatorTestBundle::build_for_collating_block(prev_vec, &engine).await { - Err(e) => log::error!("({}): Error while test bundle for {} building: {}", next_block_descr, id, e), - Ok(mut b) => { - b.set_notes(err_str.to_string()); - if let Err(e) = b.save(&path) { - log::error!("({}): Error while test bundle for {} saving: {}", next_block_descr, id, e); - } else { - log::info!("({}): Built test bundle for {}", next_block_descr, id); - } - } + if test_bundles_config.is_enable() { + if test_bundles_config.need_to_build_for(&err_str) { + let id = BlockIdExt { + shard_id: shard, + seq_no: prev.get_next_seqno().unwrap_or_default(), + root_hash: UInt256::default(), + file_hash: UInt256::default(), + }; + if !CollatorTestBundle::exists(test_bundles_config.path(), &id) { + let path = test_bundles_config.path().to_string(); + let engine = engine.clone(); + tokio::spawn(async move { + match CollatorTestBundle::build_for_collating_block(&engine, prev.get_prevs().to_vec(), usage_tree_opt).await { + Err(e) => log::error!("({}): Error while test bundle for {} building: {}", next_block_descr, id, e), + Ok(mut b) => { + b.set_notes(err_str.to_string()); + if let Err(e) = b.save(&path) { + log::error!("({}): Error while test bundle for {} saving: {}", next_block_descr, id, e); + } else { + log::info!("({}): Built test bundle for {}", next_block_descr, id); } - }); + } } - } + }); } - return Err(err); } } + Err(err) } diff --git a/src/validator/mod.rs b/src/validator/mod.rs index 1b4ed909..6feff637 100644 --- a/src/validator/mod.rs +++ b/src/validator/mod.rs @@ -39,8 +39,8 @@ pub mod slashing; mod verification; use std::sync::Arc; -use ever_block::{Result, UInt256, error}; use ever_block::{ + error, Result, UInt256, BlkMasterInfo, BlockIdExt, ConfigParams, CurrencyCollection, ExtBlkRef, McStateExtra, Libraries, }; @@ -60,6 +60,10 @@ pub struct CollatorSettings { pub want_split: Option, pub want_merge: Option, pub is_fake: bool, + // for collator test bundles we don't need to calculate state update + // because of state is merkle proofed + #[cfg(test)] + pub is_bundle: bool, } impl CollatorSettings { diff --git a/src/validator/out_msg_queue.rs b/src/validator/out_msg_queue.rs index 5992902a..a7e17927 100644 --- a/src/validator/out_msg_queue.rs +++ b/src/validator/out_msg_queue.rs @@ -257,8 +257,7 @@ impl OutMsgQueueInfoStuff { 0x5777784F96FB1CFFu64, "05aa297e3a2e003e1449e1297742d64f188985dc029c620edc84264f9786c0c3".parse().unwrap() ); - let key = SliceData::load_bitstring(key.write_to_new_cell()?)?; - out_queue.remove(key)?; + out_queue.remove(key.write_to_bitstring()?)?; } let ihr_pending = out_queue_info.ihr_pending().clone(); @@ -1502,18 +1501,10 @@ impl MsgQueueMergerIterator { let shard_prefix = shard.shard_key(true); let mut roots = vec![]; for nb in manager.neighbors.iter().filter(|nb| !nb.is_disabled()) { - let out_queue_short = if let Ok(full_queue) = nb.out_queue() { - let mut q = full_queue.clone(); - q.into_subtree_with_prefix(&shard_prefix, &mut 0)?; - q - } else { - let mut q = nb.out_queue_part()?.clone(); - q.into_subtree_with_prefix(&shard_prefix, &mut 0)?; - q - }; + let out_queue_short = nb.out_queue().or_else(|_| nb.out_queue_part())? + .subtree_with_prefix(&shard_prefix, &mut 0)?; if let Some(cell) = out_queue_short.data() { roots.push(RootRecord::from_cell(cell, out_queue_short.bit_len(), nb.block_id().clone())?); - // roots.push(RootRecord::new(lt, cursor, bit_len, key, nb.block_id().clone())); } } if !roots.is_empty() { diff --git a/src/validator/out_msg_queue_cleaner.rs b/src/validator/out_msg_queue_cleaner.rs index c607b938..379e9a1e 100644 --- a/src/validator/out_msg_queue_cleaner.rs +++ b/src/validator/out_msg_queue_cleaner.rs @@ -1123,8 +1123,11 @@ impl HashmapOrderedFilterCursor { let mut label = child_node_key.clone(); label.move_by(branch_key_len)?; - let mut builder = OutMsgQueue::make_cell_with_label(label, top_bit_len)?; - builder.checked_append_references_and_data(child_node_reminder)?; + let builder = OutMsgQueue::make_cell_with_remainder( + label, + top_bit_len, + child_node_reminder + )?; let new_node_cell = builder.into_cell()?; diff --git a/src/validator/tests/test_collator.rs b/src/validator/tests/test_collator.rs index 761fab18..546bd21b 100644 --- a/src/validator/tests/test_collator.rs +++ b/src/validator/tests/test_collator.rs @@ -13,13 +13,10 @@ use super::*; use crate::{ - collator_test_bundle::CollatorTestBundle, engine_traits::EngineOperations, + collator_test_bundle::{try_collate, CollatorTestBundle}, + engine_traits::EngineOperations, + test_helper::compare_blocks, test_helper::test_async, types::messages::{count_matching_bits, MsgEnvelopeStuff}, - validator::{ - CollatorSettings, collator, - validate_query::ValidateQuery, - validator_utils::compute_validator_set_cc, - }, }; use ever_block::{Result, AccountIdPrefixFull}; use pretty_assertions::assert_eq; @@ -27,8 +24,8 @@ use std::{fs::{create_dir_all, remove_dir_all}, sync::Arc}; const RES_PATH: &'static str = "target/cmp"; -async fn try_collate_by_bundle(bundle: Arc) -> Result<(Block, ShardStateUnsplit)> { - try_collate_by_engine( +async fn try_collate_by_bundle(bundle: Arc) -> Result { + let collate_result = try_collate_by_engine( bundle.clone(), bundle.block_id().shard().clone(), bundle.prev_blocks_ids().clone(), @@ -36,8 +33,21 @@ async fn try_collate_by_bundle(bundle: Arc) -> Result<(Block match bundle.ethalon_block()? { Some(block) => Some(block.block()?.read_extra().unwrap().rand_seed().clone()), None => bundle.rand_seed().cloned() + }, + true, + ).await?; + if let Some(ethalon_block) = bundle.ethalon_block()? { + let mut block = match &collate_result.candidate { + Some(candidate) => { + Block::construct_from_bytes(&candidate.data)? + } + None => return Err(collate_result.error.unwrap()) + }; + if let Err(result) = compare_blocks(ethalon_block.block()?, &mut block) { + panic!("Blocks are not equal: {}", result); } - ).await + } + Ok(collate_result) } async fn try_collate_by_engine( @@ -46,68 +56,10 @@ async fn try_collate_by_engine( prev_blocks_ids: Vec, created_by_opt: Option, rand_seed_opt: Option, -) -> Result<(Block, ShardStateUnsplit)> { + skip_state_update: bool, +) -> Result { std::fs::create_dir_all(RES_PATH).ok(); - let prev_blocks_history = PrevBlockHistory::new_prevs(&shard, &prev_blocks_ids); - let mc_state = engine.load_last_applied_mc_state().await?; - let mc_state_extra = mc_state.shard_state_extra()?; - let mut cc_seqno_with_delta = 0; - let cc_seqno_from_state = if shard.is_masterchain() { - mc_state_extra.validator_info.catchain_seqno - } else { - mc_state_extra.shards.calc_shard_cc_seqno(&shard)? - }; - let nodes = compute_validator_set_cc( - &mc_state, - &shard, - prev_blocks_history.get_next_seqno().ok_or_else(|| error!("Empty prev blocks"))?, - cc_seqno_from_state, - &mut cc_seqno_with_delta - )?; - let validator_set = ValidatorSet::with_cc_seqno(0, 0, 0, cc_seqno_with_delta, nodes)?; - - // log::debug!("{}", block_stuff.id()); - - log::info!("TRY COLLATE block {}", shard); - - let min_mc_seq_no = if prev_blocks_ids[0].seq_no() == 0 { - 0 - } else { - let state = engine.load_state(&prev_blocks_ids[0]).await?; - state.state()?.min_ref_mc_seqno() - }; - - let collator = collator::Collator::new( - shard.clone(), - min_mc_seq_no, - &prev_blocks_history, - validator_set.clone(), - created_by_opt.unwrap_or_default(), - engine.clone(), - rand_seed_opt, - None, - CollatorSettings::default(), - )?; - let (block_candidate, new_state) = collator.collate().await?; - - let new_block = Block::construct_from_bytes(&block_candidate.data)?; - - // std::fs::write(&format!("{}/state_candidate.json", RES_PATH), ever_block_json::debug_state(new_state.clone())?)?; - // std::fs::write(&format!("{}/block_candidate.json", RES_PATH), ever_block_json::debug_block_full(&new_block)?)?; - - let validator_query = ValidateQuery::new( - shard.clone(), - min_mc_seq_no, - prev_blocks_ids.clone(), - block_candidate.clone(), - validator_set.clone(), - engine.clone(), - true, - true, - None, - ); - validator_query.try_validate().await?; - Ok((new_block, new_state)) + try_collate(&engine, shard, prev_blocks_ids, created_by_opt, rand_seed_opt, skip_state_update, false).await } #[tokio::test(flavor = "multi_thread")] @@ -116,8 +68,8 @@ async fn test_collate_first_block() { create_dir_all(RES_PATH).ok(); //init_test_log(); match CollatorTestBundle::build_with_zero_state( - "src/tests/static/zerostate.boc", - &["src/tests/static/basestate0.boc", "src/tests/static/basestate0.boc"] + "src/tests/static/zerostate.boc", + &["src/tests/static/basestate0.boc", "src/tests/static/basestate0.boc"] ).await { Ok(bundle) => try_collate_by_bundle(Arc::new(bundle)).await.map(|_| ()), Err(e) => Err(e) diff --git a/src/validator/tests/test_message_cache.rs b/src/validator/tests/test_message_cache.rs index 51b15325..dac6cd22 100644 --- a/src/validator/tests/test_message_cache.rs +++ b/src/validator/tests/test_message_cache.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use std::time::Duration; use openssl::rand::rand_bytes; use rand::{Rng, thread_rng}; +#[cfg(feature = "telemetry")] use adnl::telemetry::Metric; use ton_api::ton::ton_node::{RempMessageLevel, RempMessageStatus, rempmessagestatus::RempAccepted}; use ever_block::{BlockIdExt, ShardIdent}; diff --git a/src/validator/tests/test_out_msg_queue.rs b/src/validator/tests/test_out_msg_queue.rs index 5b5c86fc..2a93cac7 100644 --- a/src/validator/tests/test_out_msg_queue.rs +++ b/src/validator/tests/test_out_msg_queue.rs @@ -411,7 +411,7 @@ fn test_clean_queue() { continue; } - queue.remove(SliceData::load_builder(key.write_to_new_cell()?)?)?; + queue.remove(key.write_to_bitstring()?)?; } queue.after_remove()?; diff --git a/src/validator/tests/test_validator_utils.rs b/src/validator/tests/test_validator_utils.rs index 3c36e8ee..71c342d3 100644 --- a/src/validator/tests/test_validator_utils.rs +++ b/src/validator/tests/test_validator_utils.rs @@ -16,7 +16,6 @@ use crate::{ block::BlockStuff, error::NodeError, shard_state::ShardHashesStuff, validator::accept_block::{create_new_proof, create_new_proof_link} }; - use ever_block::{ error, Block, BlockIdExt, ConfigParamEnum, CryptoSignature, CryptoSignaturePair, Deserializable, HashmapType, MASTERCHAIN_ID, SigPubKey, diff --git a/src/validator/validate_query.rs b/src/validator/validate_query.rs index 5250e10b..2e3e6971 100644 --- a/src/validator/validate_query.rs +++ b/src/validator/validate_query.rs @@ -197,7 +197,7 @@ impl ValidateBase { pub struct ValidateQuery { // current state of blockchain shard: ShardIdent, - min_mc_seq_no: u32, + min_mc_seqno: u32, // block_id: BlockIdExt, block_candidate: BlockCandidate, // other @@ -232,7 +232,7 @@ impl ValidateQuery { } pub fn new( shard: ShardIdent, - min_mc_seq_no: u32, + min_mc_seqno: u32, prev_blocks_ids: Vec, block_candidate: BlockCandidate, validator_set: ValidatorSet, @@ -245,7 +245,7 @@ impl ValidateQuery { Self { engine, shard, - min_mc_seq_no, + min_mc_seqno, block_candidate, validator_set, is_fake, @@ -287,7 +287,7 @@ impl ValidateQuery { soft_reject_query!("can validate block candidates only for masterchain (-1) and base workchain (0) and standard workchain (1-255)") } if block_id.shard().is_masterchain() && base.prev_blocks_ids.is_empty() { - self.min_mc_seq_no = 0 + self.min_mc_seqno = 0 } match base.prev_blocks_ids.len() { 2 => { @@ -316,10 +316,10 @@ impl ValidateQuery { soft_reject_query!("cannot split shards in masterchain") } } - if block_id.shard().is_masterchain() && self.min_mc_seq_no > base.prev_blocks_ids[0].seq_no { + if block_id.shard().is_masterchain() && self.min_mc_seqno > base.prev_blocks_ids[0].seq_no { soft_reject_query!("cannot refer to specified masterchain block {} \ because it is later than {} the immediately preceding masterchain block", - self.min_mc_seq_no, base.prev_blocks_ids[0].seq_no) + self.min_mc_seqno, base.prev_blocks_ids[0].seq_no) } } 0 => soft_reject_query!("must have one or two previous blocks to generate a next block"), @@ -524,9 +524,9 @@ impl ValidateQuery { let (_, mc_id) = base.info.read_master_id()?.master_block_id(); let mc_state = self.engine.clone().wait_state(&mc_id, Some(1_000), true).await?; log::debug!(target: "validate_query", "({}): in ValidateQuery::get_ref_mc_state() {}", self.next_block_descr, mc_state.block_id()); - if mc_state.state()?.seq_no() < self.min_mc_seq_no { + if mc_state.state()?.seq_no() < self.min_mc_seqno { reject_query!("requested to validate a block referring to an unknown future masterchain block {} < {}", - mc_state.state()?.seq_no(), self.min_mc_seq_no) + mc_state.state()?.seq_no(), self.min_mc_seqno) } self.try_unpack_mc_state(&base, mc_state) } @@ -2766,12 +2766,12 @@ impl ValidateQuery { enq: MsgEnqueueStuff, created_lt: u64, key: &OutMsgQueueKey, - nb_block_id: &BlockIdExt, + block_id: &BlockIdExt, ) -> Result { if created_lt != enq.created_lt() { reject_query!("EnqueuedMsg with key {:x} in outbound queue of our neighbor {} \ pretends to have been created at lt {} but its actual creation lt is {}", - key, nb_block_id, created_lt, enq.created_lt()) + key, block_id, created_lt, enq.created_lt()) } CHECK!(base.shard().contains_full_prefix(&enq.next_prefix())); @@ -2787,7 +2787,7 @@ impl ValidateQuery { // just check that we have not imported it once again if in_msg.is_some() { reject_query!("have an InMsg entry for processing again already processed EnqueuedMsg with key {:x} \ - of neighbor {}", key, nb_block_id) + of neighbor {}", key, block_id) } if base.shard().contains_full_prefix(&enq.cur_prefix()) { // if this message comes from our own outbound queue, we must have dequeued it @@ -2815,7 +2815,7 @@ impl ValidateQuery { // log::error!(target: "validate_query", "internal inconsistency: new ProcessedInfo claims \ // to have processed all messages up to ({},{}) but we had somehow already processed a message ({},{}) \ // from OutMsgQueue of neighbor {} key {}", self.claimed_proc_lt, self.claimed_proc_hash.to_hex_string(), - // created_lt, key.hash.to_hex_string(), nb_block_id, key.to_hex_string()); + // created_lt, key.hash.to_hex_string(), block_id, key.to_hex_string()); // return Ok(false) // } // Ok(true) @@ -2830,7 +2830,7 @@ impl ValidateQuery { to have processed all messages up to ({},{:x}), but we had somehow processed in this block \ a message ({},{:x}) from OutMsgQueue of neighbor {} key {:x}", claimed_proc_lt, claimed_proc_hash, - created_lt, key, nb_block_id, key) + created_lt, key, block_id, key) } } // must have a msg_import_fin or msg_import_tr InMsg record @@ -2839,15 +2839,15 @@ impl ValidateQuery { Some(InMsg::Transit(info)) => info.in_envelope_message_hash(), None => reject_query!("there is no InMsg entry for processing EnqueuedMsg with key {:x} \ of neighbor {} which is claimed to be processed by new ProcessedInfo of this block", - key, nb_block_id), + key, block_id), _ => reject_query!("expected either a msg_import_fin or a msg_import_tr InMsg record \ for processing EnqueuedMsg with key {:x} of neighbor {} which is claimed to be processed \ - by new ProcessedInfo of this block", key, nb_block_id) + by new ProcessedInfo of this block", key, block_id) }; if hash != enq.envelope_hash() { reject_query!("InMsg record for processing EnqueuedMsg with key {:x} of neighbor {} \ which is claimed to be processed by new ProcessedInfo of this block contains a reference \ - to a different MsgEnvelope", key, nb_block_id); + to a different MsgEnvelope", key, block_id); } // all other checks have been done while checking InMsgDescr Ok(true) @@ -2864,7 +2864,7 @@ impl ValidateQuery { base.next_block_descr, claimed_proc_lt, claimed_proc_hash, created_lt, key.hash, - nb_block_id, key); + block_id, key); } } Ok(false) @@ -2876,18 +2876,18 @@ impl ValidateQuery { log::debug!(target: "validate_query", "({}): check_in_queue len: {}", base.next_block_descr, manager.neighbors().len()); let mut iter = manager.merge_out_queue_iter(base.shard())?; while let Some(k_v) = iter.next() { - let (msg_key, enq, lt, nb_block_id) = k_v?; - if !base.split_queues && !nb_block_id.shard().contains_full_prefix(enq.cur_prefix()) { + let (msg_key, enq, lt, block_id) = k_v?; + if !base.split_queues && !block_id.shard().contains_full_prefix(enq.cur_prefix()) { // this case from shard split result without splitting queues continue; } log::debug!(target: "validate_query", "({}): processing inbound message with \ - (lt,hash)=({},{:x}) from neighbor - {}", base.next_block_descr, lt, msg_key.hash, nb_block_id); + (lt,hash)=({},{:x}) from neighbor - {}", base.next_block_descr, lt, msg_key.hash, block_id); // if (verbosity > 3) { // std::cerr << "inbound message: lt=" << kv->lt from=" << kv->source key=" << kv->key.to_hex_string() msg="; // block::gen::t_EnqueuedMsg.print(std::cerr, *(kv->msg)); // } - match Self::check_neighbor_outbound_message_processed(base, manager, enq, lt, &msg_key, &nb_block_id) { + match Self::check_neighbor_outbound_message_processed(base, manager, enq, lt, &msg_key, &block_id) { Err(err) => { // if (verbosity > 1) { // std::cerr << "invalid neighbor outbound message: lt=" << kv->lt from=" << kv->source @@ -2895,7 +2895,7 @@ impl ValidateQuery { // block::gen::t_EnqueuedMsg.print(std::cerr, *(kv->msg)); // } reject_query!("error processing outbound internal message {:x} of neighbor {} : {}", - msg_key.hash, nb_block_id, err) + msg_key.hash, block_id, err) } Ok(false) => return Ok(false), _ => () diff --git a/src/validator/validator_group.rs b/src/validator/validator_group.rs index 4f003787..a379f8e3 100644 --- a/src/validator/validator_group.rs +++ b/src/validator/validator_group.rs @@ -145,7 +145,7 @@ impl ValidatorGroupImpl { log::info!(target: "validator", "Starting session {} (start remp: {})", self.info(), start_remp_session); - self.prev_block_ids.update_prev(&prev); + self.prev_block_ids.update_prev(prev); self.min_masterchain_block_id = Some(min_masterchain_block_id.clone()); self.min_ts = min_ts; @@ -269,7 +269,7 @@ impl ValidatorGroupImpl { ) -> ValidatorGroupImpl { log::info!(target: "validator", "Initializing session {:x}, shard {}", session_id, shard); - let prev_block_ids = PrevBlockHistory::new(&shard); + let prev_block_ids = PrevBlockHistory::with_shard(&shard); ValidatorGroupImpl { min_masterchain_block_id: None, min_ts: SystemTime::now(), @@ -623,10 +623,10 @@ impl ValidatorGroup { self.info_round(round).await ); - let (_lk_round, prev_block_ids, mm_block_id, min_ts) = + let (_lk_round, prev, mm_block_id, min_ts) = self.group_impl.execute_sync(|group_impl| group_impl.update_round (round)).await; - let remp_queue_collator_interface_impl = match self.check_in_sync(&prev_block_ids).await { + let remp_queue_collator_interface_impl = match self.check_in_sync(&prev).await { Err(e) => { log::warn!(target: "validator", "({}): Error checking sync for {}: `{}`", next_block_descr, self.info_round(round).await, e @@ -644,7 +644,7 @@ impl ValidatorGroup { self.shard().clone(), min_ts, mc.seq_no, - &prev_block_ids, + prev, remp_queue_collator_interface_impl.clone().map(|x| x.into_interface()), self.local_key.clone(), self.validator_set.clone(), @@ -798,8 +798,8 @@ impl ValidatorGroup { let validation_completion_time = run_validate_query( self.shard().clone(), min_ts, - mc_block_id, - &prev_block_ids, + mc_block_id.seq_no(), + prev_block_ids, candidate.clone(), self.validator_set.clone(), self.engine.clone(), @@ -975,7 +975,7 @@ impl ValidatorGroup { // TODO: retry block commit }; - group_impl.prev_block_ids.update_prev(&vec!(next_block_id)); + group_impl.prev_block_ids.update_prev(vec!(next_block_id)); (full_result, group_impl.prev_block_ids.display_prevs()) }).await; diff --git a/src/validator/validator_manager.rs b/src/validator/validator_manager.rs index d9efe34b..8c4eb88e 100644 --- a/src/validator/validator_manager.rs +++ b/src/validator/validator_manager.rs @@ -823,7 +823,7 @@ impl ValidatorManagerImpl { } async fn start_sessions(&mut self, - new_shards: &HashMap>, + new_shards: HashMap>, our_current_shards: &mut HashMap, keyblock_seqno: u32, session_options: validator_session::SessionOptions, @@ -863,7 +863,7 @@ impl ValidatorManagerImpl { let remp_enabled = !do_unsafe_catchain_rotate && is_remp_enabled(self.engine.clone(), mc_state_extra.config()); - for (ident, prev_blocks) in new_shards.iter() { + for (ident, prev_blocks) in new_shards.into_iter() { let cc_seqno_from_state = if ident.is_masterchain() { *master_cc_range.end() } else { @@ -876,12 +876,13 @@ impl ValidatorManagerImpl { ident, cc_seqno_from_state ); + let prev = PrevBlockHistory::with_prevs(&ident, prev_blocks); let subset = match try_calc_subset_for_workchain( &full_validator_set, &mc_state, - ident, + &ident, cc_seqno, - PrevBlockHistory::new_prevs(ident, prev_blocks).get_next_seqno().unwrap_or_default() + prev.get_next_seqno().unwrap_or_default() )? { Some(x) => x, None => { @@ -905,7 +906,7 @@ impl ValidatorManagerImpl { max_vertical_seqno: 0 }); - let prev_block_seqno_opt = prev_blocks.get(0).map(|x| x.seq_no); + let prev_block_seqno_opt = prev.get_prev(0).map(|x| x.seq_no); let session_id = get_session_unsafe_id( general_session_info.clone(), &vsubset.list().to_vec(), @@ -923,7 +924,7 @@ impl ValidatorManagerImpl { let prev_sessions = self.compute_prev_sessions_list( &full_validator_set, &mc_state, - prev_blocks, + prev.get_prevs(), general_session_info.clone(), &session_id ).await?; @@ -1010,7 +1011,7 @@ impl ValidatorManagerImpl { &prev_sessions.get_validator_list()?, &vsubset, group_start_status, - prev_blocks.clone(), + prev.get_prevs().clone(), last_masterchain_block.clone(), SystemTime::UNIX_EPOCH + Duration::from_secs(mc_now.as_u32() as u64), master_cc_range, @@ -1268,7 +1269,7 @@ impl ValidatorManagerImpl { let validation_status = self.engine.validation_status(); if validation_status.allows_validate() { self.start_sessions( - &new_shards, + new_shards, &mut our_current_shards, keyblock_seqno, session_options, diff --git a/src/validator/validator_utils.rs b/src/validator/validator_utils.rs index 1923e491..a899eeeb 100644 --- a/src/validator/validator_utils.rs +++ b/src/validator/validator_utils.rs @@ -498,7 +498,7 @@ pub fn fmt_next_block_descr_from_next_seqno( } impl PrevBlockHistory { - pub fn new(shard: &ShardIdent) -> Self { + pub fn with_shard(shard: &ShardIdent) -> Self { Self { shard: shard.clone(), prev: vec!(), @@ -506,16 +506,17 @@ impl PrevBlockHistory { } } - pub fn new_prevs(shard: &ShardIdent, prevs: &Vec) -> Self { + pub fn with_prevs(shard: &ShardIdent, prev: Vec) -> Self { + let next_seqno = get_first_block_seqno_after_prevs(&prev); Self { shard: shard.clone(), - prev: prevs.clone(), - next_seqno: get_first_block_seqno_after_prevs(prevs) + prev, + next_seqno } } - pub fn update_prev(&mut self, prev: &Vec) { - self.prev = prev.clone(); + pub fn update_prev(&mut self, prev: Vec) { + self.prev = prev; self.next_seqno = get_first_block_seqno_after_prevs(&self.prev); } @@ -530,6 +531,9 @@ impl PrevBlockHistory { pub fn get_prevs(&self) -> &Vec { &self.prev } + pub fn get_prev(&self, index: usize) -> Option<&BlockIdExt> { + self.prev.get(index) + } pub fn same_prevs(&self, other: &PrevBlockHistory) -> bool { self.shard == other.shard && self.prev == other.prev diff --git a/src/validator/verification/mod.rs b/src/validator/verification/mod.rs index 1c2d62b0..a0a45736 100644 --- a/src/validator/verification/mod.rs +++ b/src/validator/verification/mod.rs @@ -11,8 +11,6 @@ * limitations under the License. */ -#![cfg(not(feature = "fast_finality"))] - extern crate catchain; use crate::engine_traits::EngineOperations; diff --git a/storage/src/archives/package.rs b/storage/src/archives/package.rs index 59327f8b..72e765e5 100644 --- a/storage/src/archives/package.rs +++ b/storage/src/archives/package.rs @@ -153,7 +153,7 @@ impl Package { .read(true) .write(!read_only || create) .create(create) - .open(&path).await?) + .open(path).await?) } pub async fn open_file(&self) -> Result { diff --git a/storage/src/dynamic_boc_rc_db.rs b/storage/src/dynamic_boc_rc_db.rs index fb368cd1..4bff6744 100644 --- a/storage/src/dynamic_boc_rc_db.rs +++ b/storage/src/dynamic_boc_rc_db.rs @@ -301,7 +301,7 @@ impl DynamicBocDb { // Is thread-safe pub fn load_boc(self: &Arc, root_cell_id: &UInt256, use_cache: bool) -> Result { - self.load_cell(root_cell_id, use_cache) + self.load_storage_cell(root_cell_id, use_cache) } pub fn check_and_update_cells(&mut self) -> Result<()> { @@ -448,7 +448,7 @@ impl DynamicBocDb { Ok(()) } - pub(crate) fn load_cell( + pub(crate) fn load_storage_cell( self: &Arc, cell_id: &UInt256, use_cache: bool, @@ -825,8 +825,8 @@ impl DoneCellsStorage for DoneCellsStorageAdapter { } fn get(&self, index: u32) -> Result { - let id = UInt256::from_slice(self.index.get(&index.into())?.as_ref()).into(); - Ok(self.boc_db.clone().load_cell(&id, false)?) + let cell_id = UInt256::from_slice(self.index.get(&index.into())?.as_ref()); + Ok(self.boc_db.clone().load_storage_cell(&cell_id, false)?) } fn cleanup(&mut self) -> Result<()> { @@ -856,7 +856,7 @@ impl CellByHashStorageAdapter { impl CellByHashStorage for CellByHashStorageAdapter { fn get_cell_by_hash(&self, hash: &UInt256) -> Result { - self.boc_db.clone().load_cell(&hash, self.use_cache) + self.boc_db.clone().load_storage_cell(&hash, self.use_cache) } } @@ -926,7 +926,7 @@ impl OrderedCellsStorage for OrderedCellsStorageAdapter { fn get_cell_by_index(&self, index: u32) -> Result { let id = UInt256::from_slice(self.index1.get(&index.into())?.as_ref()).into(); - let cell = self.boc_db.clone().load_cell(&id, false)?; + let cell = self.boc_db.clone().load_storage_cell(&id, false)?; let slowdown = self.slowdown(); if index % 1000 == 0 { diff --git a/storage/src/shardstate_db_async.rs b/storage/src/shardstate_db_async.rs index f37dc307..fe1e0547 100644 --- a/storage/src/shardstate_db_async.rs +++ b/storage/src/shardstate_db_async.rs @@ -414,16 +414,6 @@ impl ShardStateDb { Arc::clone(&self.shardstate_db) } - #[cfg(test)] - pub fn enum_shardstate_db(&self) -> Result<()> { - self.shardstate_db.for_each(&mut |_key, val| { - let db_entry = DbEntry::from_slice(val)?; - println!("{}", db_entry.block_id); - Ok(true) - })?; - Ok(()) - } - pub async fn put( &self, id: &BlockIdExt, @@ -517,8 +507,7 @@ impl ShardStateDb { )?) } - pub fn create_hashed_cell_storage( - &self,) -> Result { + pub fn create_hashed_cell_storage(&self) -> Result { Ok(CellByHashStorageAdapter::new( self.dynamic_boc_db.clone(), false diff --git a/storage/src/types/storage_cell.rs b/storage/src/types/storage_cell.rs index 81a24e48..752de42e 100644 --- a/storage/src/types/storage_cell.rs +++ b/storage/src/types/storage_cell.rs @@ -198,17 +198,14 @@ impl StorageCell { }; let boc_db = self.boc_db.upgrade().ok_or_else(|| error!("BocDb is dropped"))?; - let cell = boc_db.load_cell( - &hash, - self.use_cache - )?; + let storage_cell = boc_db.load_storage_cell(&hash, self.use_cache)?; if self.use_cache { self.references.write()[index].cell = - Some(Arc::downgrade(cell.cell_impl()) as Weak); + Some(Arc::downgrade(storage_cell.cell_impl()) as Weak); } - Ok(cell.cell_impl().clone()) + Ok(storage_cell.cell_impl().clone()) } } diff --git a/validator-session/Cargo.toml b/validator-session/Cargo.toml index 0db1b319..0d58a152 100644 --- a/validator-session/Cargo.toml +++ b/validator-session/Cargo.toml @@ -25,6 +25,6 @@ colored = '1.9' env_logger = '0.7' [features] -export_key = [ ] +export_key = [ 'ever_block/export_key' ] slashing = [ ]