diff --git a/Cargo.toml b/Cargo.toml old mode 100644 new mode 100755 index e31a2e00..acc3a6e7 --- a/Cargo.toml +++ b/Cargo.toml @@ -47,20 +47,20 @@ stream-cancel = '0.8.0' string-builder = '^0.2.0' tokio = { features = [ 'rt-multi-thread' ], version = '1.5' } tokio-util = '0.7' -adnl = { features = [ 'client', 'node', 'server' ], git = 'https://github.com/tonlabs/ever-adnl.git', tag = '0.9.15' } +adnl = { features = [ 'client', 'node', 'server' ], git = 'https://github.com/tonlabs/ever-adnl.git', tag = '0.9.18' } catchain = { path = 'catchain' } -dht = { git = 'https://github.com/tonlabs/ever-dht.git', tag = '0.6.79' } +dht = { git = 'https://github.com/tonlabs/ever-dht.git', tag = '0.6.82' } lockfree = { git = 'https://github.com/tonlabs/lockfree.git' } -overlay = { git = 'https://github.com/tonlabs/ever-overlay.git', tag = '0.7.11' } -rldp = { git = 'https://github.com/tonlabs/ever-rldp.git', tag = '0.8.8' } +overlay = { git = 'https://github.com/tonlabs/ever-overlay.git', tag = '0.7.14' } +rldp = { git = 'https://github.com/tonlabs/ever-rldp.git', tag = '0.8.11' } storage = { path = 'storage' } -ton_abi = { git = 'https://github.com/tonlabs/ever-abi.git', optional = true, tag = '2.4.8' } -ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.53' } -ton_block = { git = 'https://github.com/tonlabs/ever-block.git', tag = '1.9.117' } -ton_block_json = { git = 'https://github.com/tonlabs/ever-block-json.git', tag = '0.7.205' } -ton_executor = { git = 'https://github.com/tonlabs/ever-executor.git', tag = '1.16.97' } +ton_abi = { git = 'https://github.com/tonlabs/ever-abi.git', optional = true, tag = '2.4.11' } +ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.55' } +ton_block = { git = 'https://github.com/tonlabs/ever-block.git', tag = '1.9.119' } +ton_block_json = { git = 'https://github.com/tonlabs/ever-block-json.git', tag = '0.7.207' } +ton_executor = { git = 'https://github.com/tonlabs/ever-executor.git', tag = '1.16.99' } ton_types = { git = 'https://github.com/tonlabs/ever-types.git', tag = '2.0.31' } -ton_vm = { git = 'https://github.com/tonlabs/ever-vm.git', tag = '1.8.225' } +ton_vm = { git = 'https://github.com/tonlabs/ever-vm.git', tag = '1.8.227' } validator_session = { path = 'validator-session' } [features] diff --git a/README.md b/README.md index ff722c7f..6c17e7d4 100644 --- a/README.md +++ b/README.md @@ -13,12 +13,13 @@ Everscale/Venom node and validator - [About](#about) - [Getting Started](#getting-started) - [Usage](#usage) + - [Metrics](#metrics) - [Contributing](#contributing) - [License](#license) ## About -Implementation of Everscal/Venom node and validator in safe Rust. +Implementation of Everscal/Venom node and validator in safe Rust. ## Getting Started @@ -37,10 +38,110 @@ cargo build --release ## Usage To get help about command line arguments, run + ``` ton_node --help ``` +### Metrics + +#### Collator metrics + +The main collator metrics shows the perfomance aspects of the collation process. All metrics are detailed down to hosts and shards with labels: "host" and "shard". + +##### Collation times cumulative flow + +**collator_elapsed_on_empty_collations** - shows how much time from the start of producing of a new block spent on empty collations. Empty collation is when there is no any message to process during the collation iteration. After the empty collation, the collator sleeps until 200 ms and retries to collate a new block. If there is any message to process it will produce a new block, otherwise it will wait the next 200 ms. After 800 ms of retries, it will produce an empty block and will go to collate the next one. +So the collator may have less than 1000 ms time to collate block if before there were some empty collation tries. + +It is recommended to render _elapsed_on_empty_collations_ on a separate graph. + +The following metrics are shown as a cumulative flow graph which allows to show them all clearly on one graph. All metrics relate to one collation process iteration and show the balance between different collation operations. + +- **collator_elapsed_on_prepare_data** - time spent from the beginning of non-empty collation until the masterchain and shardchains state were loaded +- **collator_elapsed_on_initial_clean** - time spent until the initial out queue clean finished +- **collator_elapsed_on_internals_processed** - time spent until the inbound internals processing finished +- **collator_elapsed_on_remp_processed** - time spent until the inbound external messages processing via remp finished +- **collator_elapsed_on_externals_processed** - time spent until the inbound externals processing finished +- **collator_elapsed_on_new_processed** - time spent until the new internal messages processing finished +- **collator_elapsed_on_secondary_clean** - time spent until the secondary out queue cleaning finished +- **collator_elapsed_on_finalize** - time spent until the the collated block was finalized + +##### Out messages queue metrics + +Out messages queue is cleaned in two steps: + +- inital clean with the ordered algorithm which mostly should delete all processed out internal messages from the queue +- secondary clean with the random access algorithm which executes when not all messages in queue were processed and no any out message was deleted + +Collator reports these metrics for each step: + +- **collator_clean_out_queue_partial** - if not all messages that could be deleted were processed. The ordered algorithm can stop when it reached the message with max possible processed lt. In this case, the result is not partial. But when it stops by the timeout the result will be partial. When the random algorithm doesn't process all messages for any reason the result will be partial. +- **collator_clean_out_queue_elapsed** - time spent on the cleaning +- **collator_clean_out_queue_processed** - how many out internal messages were processed during the clean +- **collator_clean_out_queue_deleted** - how many messages were deleted from the out queue + +For each step metrics have additional label "step" which takes value "initial" or "secondary". + +##### Messages processing stats + +- **collator_processed_in_int_msgs_count** - the number of inbound internal messages processed +- **collator_dequeued_our_out_int_msgs_count** - the number of processed internal messages that were created previously in the current shard, so they were deleted from the shard's out queue after processing +- **collator_processed_remp_msgs_count** - the number of inbound external messages processed via remp +- **collator_processed_in_ext_msgs_count** - the number of processed inbound external messages (not via remp) +- **collator_created_new_msgs_count** - the number of new internal messages created during collation (both with destination to current shard and with destination to other shards) +- **collator_processed_new_msgs_count** - the number of created new messages that were processed immediately in the current collation +- **collator_reverted_transactions_count** - the number of transactions that were reverted when parallel collation was stopped due to a time limit + +##### Collation stop info + +The collation may be gracefully stopped before all messages are processed. There are two main reasons: + +- by timeout +- by reaching the block limits + +**collator_stopped_on_timeout** - the integer value that shows on what operation the collation timeout was reached: + +- 0 = no timeout +- 1 = on new messages processing +- 2 = on externals processing +- 3 = on externals processing via remp +- 4 = on internals processing + +**collator_stopped_on_block_limit** - first operations can be stopped when the Soft block limit is reached and the next ones can be stopped when the Medium block limit is reached. Eg first the internals processing can be stopped by the Soft limit then the collator will process externals and can be stopped again by the Medium limit. Every case of stopping by limits has an integer representation and the collator reports the sum of these values + +``` +collator_stopped_on_block_limit = stopped_on_soft_limit + stopped_on_remp_limit + stopped_on_medium_limit +``` + +Any possible combination of `stopped_on_soft_limit`, `stopped_on_remp_limit`, and `stopped_on_medium_limit` values will result in a unique sum value and we are able to recognize stop cases by this value. + +`stopped_on_soft_limit` possible values: + +- 0 = NotStopped +- 1 = Internals +- 2 = InitialClean + +`stopped_on_remp_limit` possible values: + +- 0 = NotStopped +- 3 = Remp + +`stopped_on_medium_limit` possible values: + +- 0 = NotStopped +- 4 = NewMessages +- 5 = Externals + +**collator_not_all_msgs_processed** - one metric that shows what kind of messages were not fully processed during the collation because of graceful stop. Every case has unique value, metric value is the sum, and this sum is unique for any combination. + +- +1 = new messages processed partially +- +2 = externals processed partially +- +4 = remp processed partially +- +8 = internals processed partially + +Eg if new messages and externals processed partially, the metric value will be: `5 = 1 + 4` + ## Contributing Contribution to the project is expected to be done via pull requests submission. @@ -51,4 +152,4 @@ See the [LICENSE](LICENSE) file for details. ## Tags -`blockchain` `everscale` `rust` `venom-blockchain` `venom-developer-program` `venom-node` `venom-validator` +`blockchain` `everscale` `rust` `venom-blockchain` `venom-developer-program` `venom-node` `venom-validator` diff --git a/catchain/Cargo.toml b/catchain/Cargo.toml index 947aacd2..9d7bb901 100644 --- a/catchain/Cargo.toml +++ b/catchain/Cargo.toml @@ -19,11 +19,11 @@ quanta = '0.11.1' rand = '0.8' regex = '1.3.1' tokio = { features = [ 'rt-multi-thread' ], version = '1.5' } -adnl = { features = [ 'node' ], git = 'https://github.com/tonlabs/ever-adnl.git', tag = '0.9.15' } -overlay = { git = 'https://github.com/tonlabs/ever-overlay.git', tag = '0.7.11' } -rldp = { git = 'https://github.com/tonlabs/ever-rldp.git', tag = '0.8.8' } +adnl = { features = [ 'node' ], git = 'https://github.com/tonlabs/ever-adnl.git', tag = '0.9.18' } +overlay = { git = 'https://github.com/tonlabs/ever-overlay.git', tag = '0.7.14' } +rldp = { git = 'https://github.com/tonlabs/ever-rldp.git', tag = '0.8.11' } storage = { path = '../storage' } -ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.53' } +ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.55' } ton_types = { git = 'https://github.com/tonlabs/ever-types.git', tag = '2.0.31' } [features] diff --git a/src/config.rs b/src/config.rs index 059bd60a..89200080 100644 --- a/src/config.rs +++ b/src/config.rs @@ -80,23 +80,32 @@ impl Default for CellsGcConfig { pub struct CollatorConfig { pub cutoff_timeout_ms: u32, pub stop_timeout_ms: u32, + pub finalize_parallel_percentage_points: u32, pub clean_timeout_percentage_points: u32, pub optimistic_clean_percentage_points: u32, pub max_secondary_clean_timeout_percentage_points: u32, pub max_collate_threads: u32, + pub max_collate_msgs_queue_on_account: u32, pub retry_if_empty: bool, pub finalize_empty_after_ms: u32, pub empty_collation_sleep_ms: u32 } +impl CollatorConfig { + pub fn get_finalize_parallel_timeout_ms(&self) -> u32 { + self.stop_timeout_ms * self.finalize_parallel_percentage_points / 1000 + } +} impl Default for CollatorConfig { fn default() -> Self { Self { cutoff_timeout_ms: 1000, stop_timeout_ms: 1500, + finalize_parallel_percentage_points: 800, // 0.8 = 80% * stop_timeout_ms = 1200 clean_timeout_percentage_points: 150, // 0.150 = 15% = 150ms optimistic_clean_percentage_points: 1000, // 1.000 = 100% = 150ms max_secondary_clean_timeout_percentage_points: 350, // 0.350 = 35% = 350ms max_collate_threads: 10, + max_collate_msgs_queue_on_account: 3, retry_if_empty: false, finalize_empty_after_ms: 800, empty_collation_sleep_ms: 100 diff --git a/src/types/accounts.rs b/src/types/accounts.rs index 8df24471..652dc768 100644 --- a/src/types/accounts.rs +++ b/src/types/accounts.rs @@ -11,31 +11,47 @@ * limitations under the License. */ -use std::sync::{atomic::AtomicU64, Arc}; use ton_block::{ Account, AccountBlock, Augmentation, CopyleftRewards, Deserializable, HashUpdate, HashmapAugType, LibDescr, Libraries, Serializable, ShardAccount, ShardAccounts, StateInitLib, Transaction, Transactions, }; -use ton_types::{fail, AccountId, Cell, HashmapRemover, Result, UInt256}; +use ton_types::{error, fail, AccountId, Cell, HashmapRemover, Result, UInt256, SliceData}; pub struct ShardAccountStuff { account_addr: AccountId, account_root: Cell, last_trans_hash: UInt256, last_trans_lt: u64, - lt: Arc, - transactions: Transactions, + lt: u64, + transactions: Option, state_update: HashUpdate, - orig_libs: StateInitLib, - copyleft_rewards: CopyleftRewards, + orig_libs: Option, + copyleft_rewards: Option, + + /// * Sync key of message, which updated account state + /// * It is an incremental counter set by executor + update_msg_sync_key: Option, + + // /// * Executor sets transaction that updated account to current state + // /// * Initial account state contains None + // last_transaction: Option<(Cell, CurrencyCollection)>, + + /// LT of transaction, which updated account state + update_trans_lt: Option, + + /// The copyleft_reward of transaction, which updated account state (if exists) + update_copyleft_reward_address: Option, + + /// Executor stores prevoius account state + prev_account_stuff: Option>, } impl ShardAccountStuff { pub fn new( account_addr: AccountId, shard_acc: ShardAccount, - lt: Arc, + lt: u64, ) -> Result { let account_hash = shard_acc.account_cell().repr_hash(); let account_root = shard_acc.account_cell(); @@ -43,16 +59,64 @@ impl ShardAccountStuff { let last_trans_lt = shard_acc.last_trans_lt(); Ok(Self{ account_addr, - orig_libs: shard_acc.read_account()?.libraries(), + orig_libs: Some(shard_acc.read_account()?.libraries()), account_root, last_trans_hash, last_trans_lt, lt, - transactions: Transactions::default(), + transactions: Some(Transactions::default()), state_update: HashUpdate::with_hashes(account_hash.clone(), account_hash), - copyleft_rewards: CopyleftRewards::default(), + copyleft_rewards: Some(CopyleftRewards::default()), + + update_msg_sync_key: None, + //last_transaction: None, + update_trans_lt: None, + update_copyleft_reward_address: None, + prev_account_stuff: None, }) } + /// Returns: + /// * None - if no updates or no matching records in history + /// * Some(particular) - record from history that matches update_msg_sync_key == on_msg_sync_key + pub fn commit(mut self, on_msg_sync_key: usize) -> Result> { + while let Some(current_update_msg_sync_key) = self.update_msg_sync_key { + if current_update_msg_sync_key == on_msg_sync_key { + log::debug!("account {:x} state committed by processed message {} in the queue", self.account_addr(), on_msg_sync_key); + return Ok(Some(self)); + } else { + if !self.revert()? { + log::debug!("unable to revert account {:x} state, current state is a first in history", self.account_addr()); + return Ok(None); + } else { + log::debug!("account {:x} state reverted one step back to message {:?} in the queue", self.account_addr(), self.update_msg_sync_key); + } + } + } + Ok(None) + } + fn revert(&mut self) -> Result { + let mut taked_prev = match self.prev_account_stuff.take() { + Some(prev) => prev, + None => return Ok(false), + }; + let prev = taked_prev.as_mut(); + + prev.orig_libs = self.orig_libs.take(); + + prev.transactions = self.transactions.take(); + if let Some(update_trans_lt) = self.update_trans_lt { + prev.remove_trans(update_trans_lt)?; + } + + prev.copyleft_rewards = self.copyleft_rewards.take(); + if let Some(update_copyleft_reward_address) = self.update_copyleft_reward_address.as_ref() { + prev.remove_copyleft_reward(update_copyleft_reward_address)?; + } + + std::mem::swap(self, prev); + + Ok(true) + } pub fn update_shard_state(&mut self, new_accounts: &mut ShardAccounts) -> Result { let account = self.read_account()?; if account.is_none() { @@ -62,10 +126,10 @@ impl ShardAccountStuff { let value = shard_acc.write_to_new_cell()?; new_accounts.set_builder_serialized(self.account_addr().clone(), &value, &account.aug()?)?; } - AccountBlock::with_params(&self.account_addr, &self.transactions, &self.state_update) + AccountBlock::with_params(&self.account_addr, self.transactions()?, &self.state_update) } - pub fn lt(&self) -> Arc { - self.lt.clone() + pub fn lt(&self) -> u64 { + self.lt } pub fn read_account(&self) -> Result { Account::construct_from_cell(self.account_root()) @@ -79,30 +143,106 @@ impl ShardAccountStuff { pub fn account_addr(&self) -> &AccountId { &self.account_addr } - pub fn copyleft_rewards(&self) -> &CopyleftRewards { - &self.copyleft_rewards + pub fn copyleft_rewards(&self) -> Result<&CopyleftRewards> { + self.copyleft_rewards.as_ref() + .ok_or_else(|| error!( + "`copyleft_rewards` field is None, possibly you try access a not root record in the history, run commit() before" + )) + } + fn copyleft_rewards_mut(&mut self) -> Result<&mut CopyleftRewards> { + self.copyleft_rewards.as_mut() + .ok_or_else(|| error!( + "`copyleft_rewards` field is None, possibly you try access a not root record in the history, run commit() before" + )) + } + fn remove_copyleft_reward(&mut self, address: &AccountId) -> Result { + self.copyleft_rewards_mut()?.remove(address) + } + + fn transactions(&self) -> Result<&Transactions> { + self.transactions.as_ref() + .ok_or_else(|| error!( + "`transactions` field is None, possibly you try access a not root record in the history, run commit() before" + )) + } + fn transactions_mut(&mut self) -> Result<&mut Transactions> { + self.transactions.as_mut() + .ok_or_else(|| error!( + "`transactions` field is None, possibly you try access a not root record in the history, run commit() before" + )) + } + fn remove_trans(&mut self, trans_lt: u64) -> Result<()> { + let key = SliceData::load_builder(trans_lt.write_to_new_cell()?)?; + self.transactions_mut()?.remove(key)?; + Ok(()) + } + + fn orig_libs(&self) -> Result<&StateInitLib> { + self.orig_libs.as_ref() + .ok_or_else(|| error!( + "`orig_libs` field is None, possibly you try access a not root record in the history, run commit() before" + )) + } + + pub fn apply_transaction_res( + &mut self, + update_msg_sync_key: usize, + tx_last_lt: u64, + transaction_res: &mut Result, + account_root: Cell, + ) -> Result<()> { + let mut res = ShardAccountStuff { + account_addr: self.account_addr.clone(), + account_root: self.account_root.clone(), + last_trans_hash: self.last_trans_hash.clone(), + last_trans_lt: self.last_trans_lt, + lt: tx_last_lt, // 1014 or 1104 or 1024 + transactions: self.transactions.take(), + state_update: self.state_update.clone(), + orig_libs: self.orig_libs.take(), + copyleft_rewards: Some(CopyleftRewards::default()), + update_msg_sync_key: Some(update_msg_sync_key), + update_trans_lt: None, + update_copyleft_reward_address: None, + prev_account_stuff: None, + }; + + if let Ok(transaction) = transaction_res { + res.add_transaction(transaction, account_root)?; + } + + std::mem::swap(self, &mut res); + + self.prev_account_stuff = Some(Box::new(res)); + + Ok(()) } pub fn add_transaction(&mut self, transaction: &mut Transaction, account_root: Cell) -> Result<()> { transaction.set_prev_trans_hash(self.last_trans_hash.clone()); - transaction.set_prev_trans_lt(self.last_trans_lt); + transaction.set_prev_trans_lt(self.last_trans_lt); // 1010 // log::trace!("{} {}", self.collated_block_descr, debug_transaction(transaction.clone())?); self.account_root = account_root; self.state_update.new_hash = self.account_root.repr_hash(); let tr_root = transaction.serialize()?; + let tr_lt = transaction.logical_time(); // 1011 + self.last_trans_hash = tr_root.repr_hash(); - self.last_trans_lt = transaction.logical_time(); + self.last_trans_lt = tr_lt; + + self.update_trans_lt = Some(tr_lt); - self.transactions.setref( - &transaction.logical_time(), + self.transactions_mut()?.setref( + &tr_lt, &tr_root, transaction.total_fees() )?; if let Some(copyleft_reward) = transaction.copyleft_reward() { log::trace!("Copyleft reward {} {} from transaction {}", copyleft_reward.address, copyleft_reward.reward, self.last_trans_hash); - self.copyleft_rewards.add_copyleft_reward(©left_reward.address, ©left_reward.reward)?; + self.copyleft_rewards_mut()?.add_copyleft_reward(©left_reward.address, ©left_reward.reward)?; + self.update_copyleft_reward_address = Some(copyleft_reward.address.clone()); } Ok(()) @@ -110,8 +250,9 @@ impl ShardAccountStuff { pub fn update_public_libraries(&self, libraries: &mut Libraries) -> Result<()> { let account = self.read_account()?; let new_libs = account.libraries(); - if new_libs.root() != self.orig_libs.root() { - new_libs.scan_diff(&self.orig_libs, |key: UInt256, old, new| { + let orig_libs = self.orig_libs()?; + if new_libs.root() != orig_libs.root() { + new_libs.scan_diff(orig_libs, |key: UInt256, old, new| { let old = old.unwrap_or_default(); let new = new.unwrap_or_default(); if old.is_public_library() && !new.is_public_library() { diff --git a/src/types/limits.rs b/src/types/limits.rs index cb3e00d5..1ecfa171 100644 --- a/src/types/limits.rs +++ b/src/types/limits.rs @@ -107,8 +107,8 @@ impl BlockLimitStatus { } /// Update logical time - pub fn update_lt(&mut self, lt: u64) { - self.lt_current = max(self.lt_current, lt); + pub fn update_lt(&mut self, lt: u64, force: bool) { + self.lt_current = if force { lt } else { max(self.lt_current, lt) }; if self.lt_start > self.lt_current { self.lt_start = lt; } diff --git a/src/types/messages.rs b/src/types/messages.rs index d9f90ab8..14770038 100644 --- a/src/types/messages.rs +++ b/src/types/messages.rs @@ -77,6 +77,11 @@ impl MsgEnvelopeStuff { pub fn message(&self) -> &Message { &self.msg } pub fn message_hash(&self) -> UInt256 { self.env.message_hash() } pub fn message_cell(&self) -> Cell { self.env.message_cell() } + pub fn out_msg_key(&self) -> OutMsgQueueKey { + OutMsgQueueKey::with_account_prefix(&self.next_prefix(), self.message_hash()) + } + #[cfg(test)] + pub fn src_prefix(&self) -> &AccountIdPrefixFull { &self.src_prefix } pub fn dst_prefix(&self) -> &AccountIdPrefixFull { &self.dst_prefix } pub fn cur_prefix(&self) -> &AccountIdPrefixFull { &self.cur_prefix } pub fn next_prefix(&self) -> &AccountIdPrefixFull { &self.next_prefix } diff --git a/src/validator/collator.rs b/src/validator/collator.rs index 33edec43..0c799a19 100644 --- a/src/validator/collator.rs +++ b/src/validator/collator.rs @@ -39,6 +39,7 @@ use crate::{ use adnl::common::Wait; use futures::try_join; use rand::Rng; +use tokio::sync::Mutex; use std::{ cmp::{max, min}, collections::{BinaryHeap, HashMap, HashSet}, @@ -49,6 +50,7 @@ use std::{ }, time::{Duration, Instant}, }; +use std::collections::BTreeMap; use ton_block::{ AddSub, BlkPrevInfo, Block, BlockCreateStats, BlockExtra, BlockIdExt, BlockInfo, CommonMsgInfo, ConfigParams, CopyleftRewards, CreatorStats, CurrencyCollection, Deserializable, ExtBlkRef, @@ -58,13 +60,15 @@ use ton_block::{ ParamLimitIndex, Serializable, ShardAccount, ShardAccountBlocks, ShardAccounts, ShardDescr, ShardFees, ShardHashes, ShardIdent, ShardStateSplit, ShardStateUnsplit, TopBlockDescrSet, Transaction, TransactionTickTock, UnixTime32, ValidatorSet, ValueFlow, WorkchainDescr, - Workchains, Account, AccountIdPrefixFull, OutQueueUpdates, OutMsgQueueInfo, MASTERCHAIN_ID + Workchains, Account, AccountIdPrefixFull, OutQueueUpdates, OutMsgQueueInfo, MASTERCHAIN_ID, + EnqueuedMsg, GetRepresentationHash }; use ton_executor::{ BlockchainConfig, ExecuteParams, OrdinaryTransactionExecutor, TickTockTransactionExecutor, TransactionExecutor, }; use ton_types::{error, fail, AccountId, Cell, HashmapType, Result, UInt256, UsageTree, SliceData}; +use ton_types::HashmapRemover; use crate::validator::validator_utils::is_remp_enabled; @@ -168,14 +172,26 @@ enum AsyncMessage { Copyleft(Message), Ext(Message), Int(MsgEnqueueStuff, bool), - New(MsgEnvelopeStuff, Cell), // prev_trans_cell + New(MsgEnvelopeStuff, Cell, u64), // prev_trans_cell TickTock(TransactionTickTock), } impl AsyncMessage { fn is_external(&self) -> bool { matches!(self, Self::Ext(_)) } + fn compute_message_hash(&self) -> Result> { + let hash_opt = match self { + Self::Recover(msg) | Self::Mint(msg) | Self::Copyleft(msg) | Self::Ext(msg) => Some(msg.hash()?), + Self::Int(enq, _) => Some(enq.message_hash()), + Self::New(env, _, _) => Some(env.message_hash()), + Self::TickTock(_) => None, + }; + Ok(hash_opt) + } } +#[derive(Debug)] +struct AsyncMessageSync(usize, AsyncMessage); + #[derive(Clone, Eq, PartialEq)] struct NewMessage { lt_hash: (u64, UInt256), @@ -210,20 +226,44 @@ impl PartialOrd for NewMessage { struct CollatorData { // lists, empty by default in_msgs: InMsgDescr, + /// * key - msg_sync_key + /// * value - in msg descr hash + in_msgs_descr_history: HashMap, out_msgs: OutMsgDescr, + /// * key - msg_sync_key + /// * value - list of out msgs descrs hashes + out_msgs_descr_history: HashMap)>>, accounts: ShardAccountBlocks, out_msg_queue_info: OutMsgQueueInfoStuff, + /// * key - msg_sync_key + /// * value - removed out msg key, EnqueuedMsg and is_new flag + del_out_queue_msg_history: HashMap, + /// * key - msg_sync_key + /// * value - msg key in out queue + add_out_queue_msg_history: HashMap>, shard_fees: ShardFees, shard_top_block_descriptors: Vec>, block_create_count: HashMap, new_messages: BinaryHeap, // using for priority queue + /// * key - msg_sync_key + /// * value - list of new msgs + new_messages_buffer: BTreeMap>, accepted_ext_messages: Vec, + /// * key - msg_sync_key + /// * value - ext msg id + accepted_ext_messages_buffer: HashMap, + /// * key - msg_sync_key + /// * value - ext msg id and error info rejected_ext_messages: Vec<(UInt256, String)>, + rejected_ext_messages_buffer: HashMap, accepted_remp_messages: Vec, rejected_remp_messages: Vec<(UInt256, String)>, ignored_remp_messages: Vec, usage_tree: UsageTree, imported_visited: HashSet, + /// * key - msg_sync_key + /// * value - last account lt after msg processing + tx_last_lt_buffer: HashMap, // determined fields gen_utime: u32, @@ -236,17 +276,31 @@ struct CollatorData { prev_stuff: Option, shards: Option, mint_msg: Option, + /// * key - msg_sync_key + /// * value - mint msg descr + mint_msg_buffer: BTreeMap>, recover_create_msg: Option, + /// * key - msg_sync_key + /// * value - recover create msg descr + recover_create_msg_buffer: BTreeMap>, copyleft_msgs: Vec, + /// * key - msg_sync_key + /// * value - list of copyleft msgs + copyleft_msgs_buffer: BTreeMap, // fields with default values skip_topmsgdescr: bool, skip_extmsg: bool, shard_conf_adjusted: bool, + // Will not support history. When parallel collation cancelled + // no new msgs can be processed so we do not need to check limits anymore block_limit_status: BlockLimitStatus, block_create_total: u64, inbound_queues_empty: bool, + /// * key - msg_sync_key + /// * value - incoming internal msg LT HASH last_proc_int_msg: (u64, UInt256), + last_proc_int_msg_buffer: BTreeMap, shards_max_end_lt: u64, before_split: bool, now_upper_limit: u32, @@ -265,6 +319,11 @@ struct CollatorData { execute_count: usize, out_msg_count: usize, in_msg_count: usize, + + // string with format like `-1:8000000000000000, 100500`, is used for logging. + collated_block_descr: Arc, + + metrics: CollatorMetrics, } impl CollatorData { @@ -275,24 +334,34 @@ impl CollatorData { usage_tree: UsageTree, prev_data: &PrevData, is_masterchain: bool, + collated_block_descr: Arc, + shard: ShardIdent, ) -> Result { let limits = Arc::new(config.raw_config().block_limits(is_masterchain)?); let ret = Self { in_msgs: InMsgDescr::default(), + in_msgs_descr_history: Default::default(), out_msgs: OutMsgDescr::default(), + out_msgs_descr_history: Default::default(), accounts: ShardAccountBlocks::default(), out_msg_queue_info: OutMsgQueueInfoStuff::default(), + del_out_queue_msg_history: Default::default(), + add_out_queue_msg_history: Default::default(), shard_fees: ShardFees::default(), shard_top_block_descriptors: Vec::new(), block_create_count: HashMap::new(), new_messages: Default::default(), + new_messages_buffer: Default::default(), accepted_ext_messages: Default::default(), + accepted_ext_messages_buffer: Default::default(), rejected_ext_messages: Default::default(), + rejected_ext_messages_buffer: Default::default(), accepted_remp_messages: Default::default(), rejected_remp_messages: Default::default(), ignored_remp_messages: Default::default(), usage_tree, imported_visited: HashSet::new(), + tx_last_lt_buffer: Default::default(), gen_utime, config, start_lt: None, @@ -303,8 +372,11 @@ impl CollatorData { prev_stuff: None, shards: None, mint_msg: None, + mint_msg_buffer: BTreeMap::new(), recover_create_msg: None, + recover_create_msg_buffer: BTreeMap::new(), copyleft_msgs: Default::default(), + copyleft_msgs_buffer: BTreeMap::new(), skip_topmsgdescr: false, skip_extmsg: false, shard_conf_adjusted: false, @@ -312,6 +384,7 @@ impl CollatorData { block_create_total: 0, inbound_queues_empty: false, last_proc_int_msg: (0, UInt256::default()), + last_proc_int_msg_buffer: Default::default(), want_merge: false, underload_history: prev_data.underload_history() << 1, want_split: false, @@ -324,6 +397,8 @@ impl CollatorData { out_msg_count: 0, in_msg_count: 0, before_split: false, + collated_block_descr, + metrics: CollatorMetrics::new(shard), }; Ok(ret) } @@ -342,13 +417,41 @@ impl CollatorData { self.out_msgs.data().cloned().ok_or_else(|| error!("out msg descr is empty")) } + /// Stores processed internal message LT HASH to buffer + fn add_last_proc_int_msg_to_buffer(&mut self, src_msg_sync_key: usize, lt_hash: (u64, UInt256)) { + log::trace!( + "{}: added last_proc_int_msg {}: ({}, {:x}) to buffer", + self.collated_block_descr, + src_msg_sync_key, lt_hash.0, lt_hash.1, + ); + self.last_proc_int_msg_buffer.insert(src_msg_sync_key, lt_hash); + } + /// Clean out processed internal message LT HASH from buffer by src msg + fn revert_last_proc_int_msg_by_src_msg(&mut self, src_msg_sync_key: &usize) { + let lt_hash = self.last_proc_int_msg_buffer.remove(src_msg_sync_key); + log::trace!( + "{}: removed last_proc_int_msg {}: ({:?}) to buffer", + self.collated_block_descr, + src_msg_sync_key, lt_hash, + ); + } + /// Updates last processed internal message LT HASH from not reverted in buffer + fn commit_last_proc_int_msg(&mut self) -> Result<()> { + log::trace!("{}: last_proc_int_msg_buffer: {:?}", self.collated_block_descr, self.last_proc_int_msg_buffer); + while let Some((_, lt_hash)) = self.last_proc_int_msg_buffer.pop_first() { + self.update_last_proc_int_msg(lt_hash)?; + } + Ok(()) + } + fn update_last_proc_int_msg(&mut self, new_lt_hash: (u64, UInt256)) -> Result<()> { if self.last_proc_int_msg < new_lt_hash { CHECK!(new_lt_hash.0 > 0); - log::trace!("last_proc_int_msg updated to ({},{:x})", new_lt_hash.0, new_lt_hash.1); + log::trace!("{}: last_proc_int_msg updated to ({},{:x})", self.collated_block_descr, new_lt_hash.0, new_lt_hash.1); self.last_proc_int_msg = new_lt_hash; } else { - log::error!("processed message ({},{:x}) AFTER message ({},{:x})", new_lt_hash.0, new_lt_hash.1, + log::error!("{}: processed message ({},{:x}) AFTER message ({},{:x})", + self.collated_block_descr, new_lt_hash.0, new_lt_hash.1, self.last_proc_int_msg.0, self.last_proc_int_msg.1); self.last_proc_int_msg.0 = std::u64::MAX; fail!("internal message processing order violated!") @@ -357,12 +460,37 @@ impl CollatorData { } fn update_lt(&mut self, lt: u64) { - self.block_limit_status.update_lt(lt); + self.block_limit_status.update_lt(lt, false); + } + + /// Stores transaction last LT to buffer by src msg, updates block_limit_status + fn add_tx_last_lt_to_buffer(&mut self, src_msg_sync_key: usize, tx_last_lt: u64) { + self.tx_last_lt_buffer.insert(src_msg_sync_key, tx_last_lt); + self.block_limit_status.update_lt(tx_last_lt, false); + } + /// Clean transaction last LT from buffer by src msg + fn revert_tx_last_lt_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.tx_last_lt_buffer.remove(src_msg_sync_key); + } + /// Saves max transaction last LT to block_limit_status and returns value + fn commit_tx_last_lt(&mut self) -> Option { + if let Some(max_lt) = self.tx_last_lt_buffer.values().reduce(|curr, next| curr.max(next)) { + self.block_limit_status.update_lt(*max_lt, true); + Some(*max_lt) + } else { + None + } } /// add in and out messages from to block, and to new message queue - fn new_transaction(&mut self, transaction: &Transaction, tr_cell: Cell, in_msg_opt: Option<&InMsg>) -> Result<()> { + fn new_transaction( + &mut self, + transaction: &Transaction, + tr_cell: Cell, + in_msg_opt: Option<&InMsg>, + src_msg_sync_key: usize, + ) -> Result<()> { // log::trace!( // "new transaction, message {:x}\n{}", // in_msg_opt.map(|m| m.message_cell().unwrap().repr_hash()).unwrap_or_default(), @@ -374,6 +502,7 @@ impl CollatorData { self.block_limit_status.add_transaction(transaction.logical_time() == self.start_lt()? + 1); if let Some(in_msg) = in_msg_opt { self.add_in_msg_to_block(in_msg)?; + self.add_in_msg_descr_to_history(src_msg_sync_key, in_msg)?; } let shard = self.out_msg_queue_info.shard().clone(); transaction.out_msgs.iterate_slices(|slice| { @@ -386,17 +515,25 @@ impl CollatorData { let use_hypercube = !self.config.has_capability(GlobalCapabilities::CapOffHypercube); let fwd_fee = *info.fwd_fee(); let enq = MsgEnqueueStuff::new(msg.clone(), &shard, fwd_fee, use_hypercube)?; - self.enqueue_count += 1; - self.out_msg_queue_info.add_message(&enq)?; - // Add to message block here for counting time later it may be replaced + let out_msg = OutMsg::new(enq.envelope_cell(), tr_cell.clone()); - self.add_out_msg_to_block(msg_hash.clone(), &out_msg)?; - self.new_messages.push(NewMessage::new((info.created_lt, msg_hash), msg, tr_cell.clone(), enq.next_prefix().clone())); + let new_msg = NewMessage::new((info.created_lt, msg_hash.clone()), msg, tr_cell.clone(), enq.next_prefix().clone()); + self.add_out_queue_msg_with_history(src_msg_sync_key, enq)?; + + // Add to message block here for counting time later it may be replaced + let prev_out_msg_slice_opt = self.add_out_msg_to_block(msg_hash.clone(), &out_msg)?; + self.add_out_msg_descr_to_history(src_msg_sync_key, msg_hash, prev_out_msg_slice_opt); + + self.add_new_message_to_buffer(src_msg_sync_key, new_msg); + + self.metrics.created_new_msgs_count += 1; } CommonMsgInfo::ExtOutMsgInfo(_) => { let out_msg = OutMsg::external(msg_cell, tr_cell.clone()); - self.add_out_msg_to_block(out_msg.read_message_hash()?, &out_msg)?; + let msg_hash = out_msg.read_message_hash()?; + let prev_out_msg_slice_opt = self.add_out_msg_to_block(msg_hash.clone(), &out_msg)?; + self.add_out_msg_descr_to_history(src_msg_sync_key, msg_hash, prev_out_msg_slice_opt); } CommonMsgInfo::ExtInMsgInfo(_) => fail!("External inbound message cannot be output") }; @@ -408,30 +545,165 @@ impl CollatorData { /// put InMsg to block fn add_in_msg_to_block(&mut self, in_msg: &InMsg) -> Result<()> { self.in_msg_count += 1; - let msg_cell = in_msg.serialize()?; self.in_msgs.insert(in_msg)?; + + let msg_cell = in_msg.serialize()?; self.block_limit_status.register_in_msg_op(&msg_cell, &self.in_msgs_root()?) } + /// Stores in_msg descr hash by src msg + fn add_in_msg_descr_to_history(&mut self, src_msg_sync_key: usize, in_msg: &InMsg) -> Result<()> { + let msg_hash = in_msg.message_cell()?.repr_hash(); + self.in_msgs_descr_history.insert(src_msg_sync_key, msg_hash); + Ok(()) + } + /// Removes in_msg descr created by src msg. Does not update block_limit_status + fn revert_in_msgs_descr_by_src_msg(&mut self, src_msg_sync_key: &usize) -> Result<()> { + if let Some(msg_hash) = self.in_msgs_descr_history.remove(src_msg_sync_key) { + self.in_msg_count -= 1; + let key = SliceData::load_builder(msg_hash.write_to_new_cell()?)?; + self.in_msgs.remove(key)?; + } + Ok(()) + } + /// Clean out in_msg descr history + fn commit_in_msgs_descr_by_src_msg(&mut self) { + self.in_msgs_descr_history.clear(); + self.in_msgs_descr_history.shrink_to_fit(); + } + /// put OutMsg to block - fn add_out_msg_to_block(&mut self, key: UInt256, out_msg: &OutMsg) -> Result<()> { + fn add_out_msg_to_block(&mut self, key: UInt256, out_msg: &OutMsg) -> Result> { self.out_msg_count += 1; - self.out_msgs.insert_with_key(key, out_msg)?; + + let prev_value = self.out_msgs.insert_with_key_return_prev(key, out_msg)?; let msg_cell = out_msg.serialize()?; - self.block_limit_status.register_out_msg_op(&msg_cell, &self.out_msgs_root()?) + self.block_limit_status.register_out_msg_op(&msg_cell, &self.out_msgs_root()?)?; + + Ok(prev_value) + } + /// put OutMsg to block, does not update block_limit_status + fn add_out_msg_to_block_without_limits_update(&mut self, key: UInt256, out_msg: &OutMsg) -> Result> { + self.out_msg_count += 1; + + self.out_msgs.insert_with_key_return_prev(key, out_msg) + } + + /// Stores out_msg descr hash by src msg + fn add_out_msg_descr_to_history( + &mut self, + src_msg_sync_key: usize, + out_msg_hash: UInt256, + prev_out_msg_slice_opt: Option, + ) { + if let Some(v) = self.out_msgs_descr_history.get_mut(&src_msg_sync_key) { + v.push((out_msg_hash, prev_out_msg_slice_opt)); + } else { + self.out_msgs_descr_history.insert(src_msg_sync_key, vec![(out_msg_hash, prev_out_msg_slice_opt)]); + } + } + /// Removes all out_msg descrs created by src msg. Does not update block_limit_status + fn revert_out_msgs_descr_by_src_msg(&mut self, src_msg_sync_key: &usize) -> Result<()> { + if let Some(msgs_history) = self.out_msgs_descr_history.remove(src_msg_sync_key) { + for (msg_hash, prev_out_msg_slice_opt) in msgs_history { + self.out_msg_count -= 1; + + // return prev out msg descr to map if exists + if let Some(mut prev_out_msg_slice) = prev_out_msg_slice_opt { + log::debug!("{}: previous out msg descr {:x} reverted to block", self.collated_block_descr, msg_hash); + let prev_out_msg = OutMsg::construct_from(&mut prev_out_msg_slice)?; + self.add_out_msg_to_block_without_limits_update(msg_hash, &prev_out_msg)?; + } else { + let key = SliceData::load_builder(msg_hash.write_to_new_cell()?)?; + self.out_msgs.remove(key)?; + } + } + } + Ok(()) + } + /// Clean out out_msg descrs history + fn commit_out_msgs_descr_by_src_msg(&mut self) { + self.out_msgs_descr_history.clear(); + self.out_msgs_descr_history.shrink_to_fit(); + } + + /// Stores accepted ext message id in buffer of accepted by src msg sync id + fn add_accepted_ext_message_to_buffer(&mut self, src_msg_sync_key: usize, msg_id: UInt256) { + self.accepted_ext_messages_buffer.insert(src_msg_sync_key, msg_id); + } + /// Clean accepted ext message id from buffer + fn revert_accepted_ext_message_by_src_msg(&mut self, src_msg_sync_key: &usize) -> bool { + self.accepted_ext_messages_buffer.remove(src_msg_sync_key).is_some() + } + /// Add accepted ext messages from buffer to collator data + fn commit_accepted_ext_messages(&mut self) { + for (_, msg_id) in self.accepted_ext_messages_buffer.drain() { + self.accepted_ext_messages.push(msg_id); + } + } + + /// Stores rejected ext message info in buffer of accepted by src msg sync id + fn add_rejected_ext_message_to_buffer(&mut self, src_msg_sync_key: usize, rejected_msg: (UInt256, String)) { + self.rejected_ext_messages_buffer.insert(src_msg_sync_key, rejected_msg); + } + /// Clean rejected ext message info from buffer + fn revert_rejected_ext_message_by_src_msg(&mut self, src_msg_sync_key: &usize) -> bool { + self.rejected_ext_messages_buffer.remove(src_msg_sync_key).is_some() + } + /// Add rejected ext messages info from buffer to collator data + fn commit_rejected_ext_messages(&mut self) { + for (_, msg_info) in self.rejected_ext_messages_buffer.drain() { + self.rejected_ext_messages.push(msg_info); + } } /// delete message from state queue - fn del_out_msg_from_state(&mut self, key: &OutMsgQueueKey) -> Result<()> { - log::debug!("del_out_msg_from_state {:x}", key); + fn del_out_msg_from_state(&mut self, key: &OutMsgQueueKey) -> Result { + log::debug!("{}: del_out_msg_from_state {:x}", self.collated_block_descr, key); self.dequeue_count += 1; - self.out_msg_queue_info.del_message(key)?; + let enq = self.out_msg_queue_info.del_message(key)?; self.block_limit_status.register_out_msg_queue_op( self.out_msg_queue_info.out_queue()?.data(), &self.usage_tree, false )?; + Ok(enq) + } + + /// Removes msg from out queue, stores msg in the history to be able to revert it futher + fn del_out_queue_msg_with_history(&mut self, src_msg_sync_key: usize, key: OutMsgQueueKey, is_new: bool) -> Result<()> { + log::debug!("{}: del_out_queue_msg_with_history {:x}", self.collated_block_descr, key); + if is_new { self.enqueue_count -= 1; } else { self.dequeue_count += 1; } + let enq = self.out_msg_queue_info.del_message(&key)?; + self.block_limit_status.register_out_msg_queue_op( + self.out_msg_queue_info.out_queue()?.data(), + &self.usage_tree, + false + )?; + self.del_out_queue_msg_history.insert(src_msg_sync_key, (key, enq, is_new)); + Ok(()) + } + /// Reverts previously removed msg from out queue + fn revert_del_out_queue_msg_by_src_msg(&mut self, src_msg_sync_key: &usize) -> Result> { + if let Some((key, enq, is_new)) = self.del_out_queue_msg_history.remove(src_msg_sync_key) { + if is_new { + self.enqueue_count += 1; + } else { + self.dequeue_count -= 1; + } + let enq_stuff = MsgEnqueueStuff::from_enqueue(enq)?; + self.out_msg_queue_info.add_message(&enq_stuff)?; + log::debug!("{}: reverted del_out_queue_msg {:x}", self.collated_block_descr, key); + Ok(Some(is_new)) + } else { + Ok(None) + } + } + /// Cleans out queue msgs removing history + fn commit_del_out_queue_msgs(&mut self) -> Result<()> { + self.del_out_queue_msg_history.clear(); + self.del_out_queue_msg_history.shrink_to_fit(); Ok(()) } @@ -447,6 +719,112 @@ impl CollatorData { Ok(()) } + /// Adds new msg to out queue, stores history to be able to revert futher + fn add_out_queue_msg_with_history(&mut self, src_msg_sync_key: usize, enq_stuff: MsgEnqueueStuff) -> Result<()> { + self.enqueue_count += 1; + self.out_msg_queue_info.add_message(&enq_stuff)?; + let key = enq_stuff.out_msg_key(); + if let Some(v) = self.add_out_queue_msg_history.get_mut(&src_msg_sync_key) { + v.push(key); + } else { + self.add_out_queue_msg_history.insert(src_msg_sync_key, vec![key]); + } + Ok(()) + } + /// Removes previously added new msgs from out queue + fn revert_add_out_queue_msgs_by_src_msg(&mut self, src_msg_sync_key: &usize) -> Result<()> { + if let Some(keys) = self.add_out_queue_msg_history.remove(src_msg_sync_key) { + let remove_count = keys.len(); + for key in keys { + self.enqueue_count -= 1; + self.out_msg_queue_info.del_message(&key)?; + } + log::debug!("{}: {} new created messages removed from out queue", self.collated_block_descr, remove_count); + } + Ok(()) + } + /// Cleans out queue msgs adding history + fn commit_add_out_queue_msgs(&mut self) -> Result<()> { + self.add_out_queue_msg_history.clear(); + self.add_out_queue_msg_history.shrink_to_fit(); + Ok(()) + } + + /// Stores new internal msg, created by src msg, to buffer + fn add_new_message_to_buffer(&mut self, src_msg_sync_key: usize, new_msg: NewMessage) { + if let Some(v) = self.new_messages_buffer.get_mut(&src_msg_sync_key) { + v.push(new_msg); + } else { + self.new_messages_buffer.insert(src_msg_sync_key, vec![new_msg]); + } + } + /// Clean out new internal msgs, created by src msg, from buffer + fn revert_new_messages_by_src_msg(&mut self, src_msg_sync_key: &usize) -> Option { + self.new_messages_buffer.remove(src_msg_sync_key).map(|removed| removed.len()) + } + /// Adds new internal msgs to new_messages queue for processing + fn commit_new_messages(&mut self) { + let new_msgs_count = self.new_messages_buffer.len(); + while let Some((_, msgs)) = self.new_messages_buffer.pop_first() { + for new_msg in msgs { + log::trace!( + "{}: committed new created msg {:x} (bounced: {:?}) from {:x} to account {:x} from buffer to new_messages", + self.collated_block_descr, new_msg.lt_hash.1, + new_msg.msg.int_header().map(|h| h.bounced), + new_msg.msg.src().unwrap_or_default().address(), + new_msg.msg.dst().unwrap_or_default().address(), + ); + self.new_messages.push(new_msg); + } + } + log::debug!("{}: {} new created messages committed from buffer to new_messages", self.collated_block_descr, new_msgs_count); + } + + /// Stores mint message in buffer by src msg + fn add_mint_msg_to_buffer(&mut self, src_msg_sync_key: usize, msg: Option) { + self.mint_msg_buffer.insert(src_msg_sync_key, msg); + } + /// Clean mint message from buffer by src msg + fn revert_mint_msg_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.mint_msg_buffer.remove(src_msg_sync_key); + } + /// Save the last processed and not reverted mint message to collator data + fn commit_mint_msg(&mut self) { + if let Some((k, v)) = self.mint_msg_buffer.pop_last() { + self.mint_msg = v; + } + } + + /// Stores recover create message in buffer by src msg + fn add_recover_create_msg_to_buffer(&mut self, src_msg_sync_key: usize, msg: Option) { + self.recover_create_msg_buffer.insert(src_msg_sync_key, msg); + } + /// Clean recover create message from buffer by src msg + fn revert_recover_create_msg_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.recover_create_msg_buffer.remove(src_msg_sync_key); + } + /// Save the last processed and not reverted recover create message to collator data + fn commit_recover_create_msg(&mut self) { + if let Some((k, v)) = self.recover_create_msg_buffer.pop_last() { + self.recover_create_msg = v; + } + } + + /// Stores copyleft message in buffer by src msg + fn add_copyleft_msg_to_buffer(&mut self, src_msg_sync_key: usize, msg: InMsg) { + self.copyleft_msgs_buffer.insert(src_msg_sync_key, msg); + } + /// Clean copyleft message from buffer by src msg + fn revert_copyleft_msg_by_src_msg(&mut self, src_msg_sync_key: &usize) { + self.copyleft_msgs_buffer.remove(src_msg_sync_key); + } + /// Save all not reverted copyleft messages to collator data + fn commit_copyleft_msgs(&mut self) { + while let Some((_, msg)) = self.copyleft_msgs_buffer.pop_first() { + self.copyleft_msgs.push(msg); + } + } + fn enqueue_transit_message( &mut self, shard: &ShardIdent, @@ -535,7 +913,7 @@ impl CollatorData { if self.start_lt.is_some() { fail!("`start_lt` is already initialized") } - self.block_limit_status.update_lt(lt); + self.block_limit_status.update_lt(lt, false); self.start_lt = Some(lt); Ok(()) } @@ -603,7 +981,7 @@ impl CollatorData { fn shard_conf_adjusted(&self) -> bool { self.shard_conf_adjusted } fn set_shard_conf_adjusted(&mut self) { self.shard_conf_adjusted = true; } - fn dequeue_message(&mut self, enq: MsgEnqueueStuff, deliver_lt: u64, short: bool) -> Result<()> { + fn dequeue_message(&mut self, enq: MsgEnqueueStuff, deliver_lt: u64, short: bool) -> Result> { self.dequeue_count += 1; let out_msg = match short { true => OutMsg::dequeue_short(enq.envelope_hash(), enq.next_prefix(), deliver_lt), @@ -641,25 +1019,455 @@ impl CollatorData { } } +#[derive(Default)] +pub(super) struct CollatorMetrics { + shard: ShardIdent, + + elapsed_on_empty_collations_ms: u128, + elapsed_on_prepare_data_ms: u128, + elapsed_on_initial_clean_ms: u128, + elapsed_on_internals_processed_ms: u128, + elapsed_on_remp_processed_ms: u128, + elapsed_on_externals_processed_ms: u128, + elapsed_on_new_processed_ms: u128, + elapsed_on_secondary_clean_ms: u128, + elapsed_on_finalize_ms: u128, + + stopped_on_timeout: CollationStoppedOnTimeoutStep, + + stopped_on_soft_limit: CollationStoppedOnBlockLimitStep, + stopped_on_remp_limit: CollationStoppedOnBlockLimitStep, + stopped_on_medium_limit: CollationStoppedOnBlockLimitStep, + + processed_in_int_msgs_count: usize, + dequeued_our_out_int_msgs_count: usize, + processed_remp_msgs_count: usize, + processed_in_ext_msgs_count: usize, + created_new_msgs_count: usize, + processed_new_msgs_count: usize, + + reverted_transactions_count: usize, + + not_all_internals_processed: bool, + not_all_remp_processed: bool, + not_all_externals_processed: bool, + not_all_new_messages_processed: bool, + + initial_out_queue_clean: CleanOutQueueMetrics, + secondary_out_queue_clean: CleanOutQueueMetrics, +} + +#[derive(Default)] +struct CleanOutQueueMetrics { + partial: bool, + elapsed: u128, + processed: i32, + deleted: i32, +} + +#[derive(Debug, Clone, Copy, PartialEq)] +enum CollationStoppedOnTimeoutStep { + NoTimeout = 0, + NewMessages, + Externals, + Remp, + Internals, +} +impl Default for CollationStoppedOnTimeoutStep { + fn default() -> Self { + Self::NoTimeout + } +} +#[derive(Debug, Clone, Copy)] +enum CollationStoppedOnBlockLimitStep { + NotStopped = 0, + Internals, + InitialClean, + Remp, + NewMessages, + Externals, +} +impl Default for CollationStoppedOnBlockLimitStep { + fn default() -> Self { + Self::NotStopped + } +} + +impl CollatorMetrics { + fn new(shard: ShardIdent) -> Self { + Self { + shard, + ..Default::default() + } + } + + fn set_elapsed_on_empty_collations(&mut self, elapsed: u128) { + self.elapsed_on_empty_collations_ms = elapsed; + } + fn save_elapsed_on_prepare_data(&mut self, collator: &Collator) { + self.elapsed_on_prepare_data_ms = collator.started.elapsed().as_millis(); + } + fn save_elapsed_on_initial_clean(&mut self, collator: &Collator) { + self.elapsed_on_initial_clean_ms = collator.started.elapsed().as_millis(); + } + fn save_elapsed_on_internals_processed(&mut self, collator: &Collator) { + self.elapsed_on_internals_processed_ms = collator.started.elapsed().as_millis(); + } + fn save_elapsed_on_remp_processed(&mut self, collator: &Collator) { + self.elapsed_on_remp_processed_ms = collator.started.elapsed().as_millis(); + } + fn save_elapsed_on_externals_processed(&mut self, collator: &Collator) { + self.elapsed_on_externals_processed_ms = collator.started.elapsed().as_millis(); + } + fn save_elapsed_on_new_processed(&mut self, collator: &Collator) { + self.elapsed_on_new_processed_ms = collator.started.elapsed().as_millis(); + } + fn save_elapsed_on_secondary_clean(&mut self, collator: &Collator) { + self.elapsed_on_secondary_clean_ms = collator.started.elapsed().as_millis(); + } + fn save_elapsed_on_finalize(&mut self, collator: &Collator) { + self.elapsed_on_finalize_ms = collator.started.elapsed().as_millis(); + } + + fn recalculate_elapsed_time(&mut self) { + if self.elapsed_on_prepare_data_ms < self.elapsed_on_empty_collations_ms { + self.elapsed_on_prepare_data_ms = self.elapsed_on_empty_collations_ms; + } + if self.elapsed_on_initial_clean_ms < self.elapsed_on_prepare_data_ms { + self.elapsed_on_initial_clean_ms = self.elapsed_on_prepare_data_ms; + } + if self.elapsed_on_internals_processed_ms < self.elapsed_on_initial_clean_ms { + self.elapsed_on_internals_processed_ms = self.elapsed_on_initial_clean_ms; + } + if self.elapsed_on_remp_processed_ms < self.elapsed_on_internals_processed_ms { + self.elapsed_on_remp_processed_ms = self.elapsed_on_internals_processed_ms; + } + if self.elapsed_on_externals_processed_ms < self.elapsed_on_remp_processed_ms { + self.elapsed_on_externals_processed_ms = self.elapsed_on_remp_processed_ms; + } + if self.elapsed_on_new_processed_ms < self.elapsed_on_externals_processed_ms { + self.elapsed_on_new_processed_ms = self.elapsed_on_externals_processed_ms; + } + if self.elapsed_on_secondary_clean_ms < self.elapsed_on_new_processed_ms { + self.elapsed_on_secondary_clean_ms = self.elapsed_on_new_processed_ms; + } + if self.elapsed_on_finalize_ms < self.elapsed_on_secondary_clean_ms { + self.elapsed_on_finalize_ms = self.elapsed_on_secondary_clean_ms; + } + + // substract empty collations time from collation cumulative flow times for better graph view + self.elapsed_on_finalize_ms -= self.elapsed_on_empty_collations_ms; + self.elapsed_on_secondary_clean_ms -= self.elapsed_on_empty_collations_ms; + self.elapsed_on_new_processed_ms -= self.elapsed_on_empty_collations_ms; + self.elapsed_on_externals_processed_ms -= self.elapsed_on_empty_collations_ms; + self.elapsed_on_remp_processed_ms -= self.elapsed_on_empty_collations_ms; + self.elapsed_on_internals_processed_ms -= self.elapsed_on_empty_collations_ms; + self.elapsed_on_initial_clean_ms -= self.elapsed_on_empty_collations_ms; + self.elapsed_on_prepare_data_ms -= self.elapsed_on_empty_collations_ms; + } + + fn save_stopped_by_timeout_on(&mut self, step: CollationStoppedOnTimeoutStep) { + if self.stopped_on_timeout == CollationStoppedOnTimeoutStep::NoTimeout { + self.stopped_on_timeout = step; + } + } + + fn save_stopped_by_limits_on_initial_clean(&mut self) { + self.stopped_on_soft_limit = CollationStoppedOnBlockLimitStep::InitialClean; + } + fn save_stopped_by_limits_on_internals(&mut self) { + self.stopped_on_soft_limit = CollationStoppedOnBlockLimitStep::Internals; + } + fn save_stopped_by_limits_on_remp(&mut self) { + self.stopped_on_remp_limit = CollationStoppedOnBlockLimitStep::Remp; + } + fn save_stopped_by_limits_on_externals(&mut self) { + self.stopped_on_medium_limit = CollationStoppedOnBlockLimitStep::Externals; + } + fn save_stopped_by_limits_on_new_messages(&mut self) { + self.stopped_on_medium_limit = CollationStoppedOnBlockLimitStep::NewMessages; + } + + fn save_clean_out_queue_metrics(&mut self, is_initial: bool, partial: bool, elapsed: u128, processed: i32, deleted: i32) { + let metrics = if is_initial { &mut self.initial_out_queue_clean } else { &mut self.secondary_out_queue_clean }; + metrics.partial = partial; + metrics.elapsed = elapsed; + metrics.processed = processed; + metrics.deleted = deleted; + } + + fn set_not_all_internals_processed(&mut self) { + self.not_all_internals_processed = true; + } + fn set_not_all_remp_processed(&mut self) { + self.not_all_remp_processed = true; + } + fn set_not_all_externals_processed(&mut self) { + self.not_all_externals_processed = true; + } + fn set_not_all_new_messages_processed(&mut self) { + self.not_all_new_messages_processed = true; + } + + pub(super) fn report_zero_metrics(shard: ShardIdent) { + let mut metrics = Self::new(shard); + metrics.report_metrics(); + } + + fn report_metrics(&mut self) { + self.recalculate_elapsed_time(); + + let shard_label = self.shard.to_string(); + let labels = [("shard", shard_label.clone())]; + + metrics::gauge!("collator_elapsed_on_empty_collations", self.elapsed_on_empty_collations_ms as f64, &labels); + metrics::gauge!("collator_elapsed_on_prepare_data", self.elapsed_on_prepare_data_ms as f64, &labels); + metrics::gauge!("collator_elapsed_on_initial_clean", self.elapsed_on_initial_clean_ms as f64, &labels); + metrics::gauge!("collator_elapsed_on_internals_processed", self.elapsed_on_internals_processed_ms as f64, &labels); + metrics::gauge!("collator_elapsed_on_remp_processed", self.elapsed_on_remp_processed_ms as f64, &labels); + metrics::gauge!("collator_elapsed_on_externals_processed", self.elapsed_on_externals_processed_ms as f64, &labels); + metrics::gauge!("collator_elapsed_on_new_processed", self.elapsed_on_new_processed_ms as f64, &labels); + metrics::gauge!("collator_elapsed_on_secondary_clean", self.elapsed_on_secondary_clean_ms as f64, &labels); + metrics::gauge!("collator_elapsed_on_finalize", self.elapsed_on_finalize_ms as f64, &labels); + + metrics::gauge!("collator_stopped_on_timeout", (self.stopped_on_timeout as i32) as f64, &labels); + + self.report_stopped_on_block_limit_metric(shard_label.clone()); + self.report_not_all_msgs_processed_metric(shard_label.clone()); + + metrics::gauge!("collator_processed_in_int_msgs_count", self.processed_in_int_msgs_count as f64, &labels); + metrics::gauge!("collator_dequeued_our_out_int_msgs_count", self.dequeued_our_out_int_msgs_count as f64, &labels); + metrics::gauge!("collator_processed_remp_msgs_count", self.processed_remp_msgs_count as f64, &labels); + metrics::gauge!("collator_processed_in_ext_msgs_count", self.processed_in_ext_msgs_count as f64, &labels); + metrics::gauge!("collator_created_new_msgs_count", self.created_new_msgs_count as f64, &labels); + metrics::gauge!("collator_processed_new_msgs_count", self.processed_new_msgs_count as f64, &labels); + metrics::gauge!("collator_reverted_transactions_count", self.reverted_transactions_count as f64, &labels); + + self.report_clean_out_queue_metrics(&self.initial_out_queue_clean, shard_label.clone(), "initial".into()); + self.report_clean_out_queue_metrics(&self.secondary_out_queue_clean, shard_label.clone(), "secondary".into()); + } + + /// If internals processing was stopped by reaching the Soft block limit and then + /// new messages processing stopped when the Medium block limit was reached, then we will push: + /// ``` + /// 5 = 1 (CollationStoppedOnBlockLimitStep::Internals) + 4 (CollationStoppedOnBlockLimitStep::NewMessages) + /// ``` + /// Any combination of `stopped_on_soft_limit`, `stopped_on_remp_limit`, and `stopped_on_medium_limit` + /// will result into unique integer value so we are able to recognize a case in the metrics view. + /// + /// `stopped_on_soft_limit` possible values: + /// * 0 = NotStopped + /// * 1 = Internals + /// * 2 = InitialClean + /// + /// `stopped_on_remp_limit` possible values: + /// * 0 = NotStopped + /// * 3 = Remp + /// + /// `stopped_on_medium_limit` possible values: + /// * 0 = NotStopped + /// * 4 = NewMessages + /// * 5 = Externals + fn report_stopped_on_block_limit_metric(&self, shard_label: String) { + let labels = [("shard", shard_label)]; + let value = self.stopped_on_soft_limit as i32 + self.stopped_on_remp_limit as i32 + self.stopped_on_medium_limit as i32; + metrics::gauge!("collator_stopped_on_block_limit", value as f64, &labels); + } + + /// * 1 = new messages processed partially + /// * 2 = externals processed partially + /// * 4 = remp processed partially + /// * 8 = internals processed partially + /// + /// If new messages and externals processed partially, the metric value will be: + /// ``` + /// 5 = 1 + 4 + /// ``` + fn report_not_all_msgs_processed_metric(&self, shard_label: String) { + let labels = [("shard", shard_label)]; + let mut value = 0; + if self.not_all_new_messages_processed { value += 1; } + if self.not_all_externals_processed { value += 2; } + if self.not_all_remp_processed { value += 4; } + if self.not_all_internals_processed { value += 8; } + metrics::gauge!("collator_not_all_msgs_processed", value as f64, &labels); + } + + fn report_clean_out_queue_metrics(&self, metrics: &CleanOutQueueMetrics, shard_label: String, step: String) { + let labels = [("shard", shard_label), ("step", step)]; + metrics::gauge!("collator_clean_out_queue_partial", if metrics.partial { 1.0 } else { 0.0 }, &labels); + metrics::gauge!("collator_clean_out_queue_elapsed", metrics.elapsed as f64, &labels); + metrics::gauge!("collator_clean_out_queue_processed", metrics.processed as f64, &labels); + metrics::gauge!("collator_clean_out_queue_deleted", metrics.deleted as f64, &labels); + } + + fn log_metrics(&mut self, collated_block_descr: Arc) { + + log::debug!( + "{}: collator_elapsed_on: empty_collations = {} ms, prepare_data = {} ms, + initial_clean = {} ms, internal_processed = {} ms, remp_processed = {} ms, + external_processed = {} ms, new_processed = {} ms, secondary_clean = {} ms, finalize = {} ms", + collated_block_descr, + self.elapsed_on_empty_collations_ms, self.elapsed_on_prepare_data_ms, + self.elapsed_on_initial_clean_ms, self.elapsed_on_internals_processed_ms, + self.elapsed_on_remp_processed_ms, self.elapsed_on_externals_processed_ms, + self.elapsed_on_new_processed_ms, self.elapsed_on_secondary_clean_ms, + self.elapsed_on_finalize_ms, + ); + + log::debug!("{}: collator_stopped_on_timeout = {:?}", collated_block_descr, self.stopped_on_timeout); + + self.log_stopped_on_block_limit_metric(collated_block_descr.clone()); + self.log_not_all_msgs_processed_metric(collated_block_descr.clone()); + + log::debug!( + "{}: collator_counts: processed_in_int_msgs = {}, dequeued_our_out_int_msgs = {}, processed_remp_msgs = {} + processed_in_ext_msgs = {}, created_new_msgs = {}, reverted_transactions {}", + collated_block_descr, + self.processed_in_int_msgs_count, self.dequeued_our_out_int_msgs_count, + self.processed_remp_msgs_count, self.processed_in_ext_msgs_count, + self.created_new_msgs_count, self.reverted_transactions_count, + ); + + self.log_clean_out_queue_metrics(&self.initial_out_queue_clean, collated_block_descr.clone(), "initial".into()); + self.log_clean_out_queue_metrics(&self.secondary_out_queue_clean, collated_block_descr.clone(), "secondary".into()); + } + + fn log_stopped_on_block_limit_metric(&self, collated_block_descr: Arc) { + let value = self.stopped_on_soft_limit as i32 + self.stopped_on_remp_limit as i32 + self.stopped_on_medium_limit as i32; + log::debug!( + "{}: collator_stopped_on_block_limit = {}: stopped_on_soft_limit = {:?}, + stopped_on_remp_limit = {:?}, stopped_on_medium_limit = {:?}", + collated_block_descr, value, + self.stopped_on_soft_limit, self.stopped_on_remp_limit, self.stopped_on_medium_limit, + ); + } + + fn log_not_all_msgs_processed_metric(&self, collated_block_descr: Arc) { + let mut value = 0; + if self.not_all_new_messages_processed { value += 1; } + if self.not_all_externals_processed { value += 2; } + if self.not_all_remp_processed { value += 4; } + if self.not_all_internals_processed { value += 8; } + log::debug!( + "{}: collator_not_all_msgs_processed = {}: not_all_new_messages_processed = {}, + not_all_externals_processed = {}, not_all_remp_processed = {}, not_all_internals_processed = {}", + collated_block_descr, value, + self.not_all_new_messages_processed, self.not_all_externals_processed, + self.not_all_remp_processed, self.not_all_internals_processed, + ); + } + + fn log_clean_out_queue_metrics(&self, metrics: &CleanOutQueueMetrics, collated_block_descr: Arc, step: String) { + log::debug!( + "{}: {} collator_clean_out_queue metrics: partial = {}, elapsed = {} ms, processed = {}, deleted = {}", + collated_block_descr, step, + metrics.partial, metrics.elapsed, + metrics.processed, metrics.deleted, + ); + } +} + +struct ParallelMsgsCounter { + max_parallel_threads: usize, + max_msgs_queue_on_account: usize, + + limits_reached: Arc, + msgs_by_accounts: Arc)>>, +} + +impl ParallelMsgsCounter { + pub fn new(max_parallel_threads: usize, max_msgs_queue_on_account: usize) -> Self { + Self { + max_parallel_threads: max_parallel_threads.max(1), + max_msgs_queue_on_account: max_msgs_queue_on_account.max(1), + + limits_reached: Arc::new(AtomicBool::new(false)), + + msgs_by_accounts: Arc::new(Mutex::new((0, HashMap::new()))), + } + } + + pub fn limits_reached(&self) -> bool { + self.limits_reached.load(Ordering::Relaxed) + } + fn set_limits_reached(&self, val: bool) { + self.limits_reached.store(val, Ordering::Relaxed); + } + + pub async fn add_account_msgs_counter(&self, account_id: AccountId) { + let account_id_str = format!("{:x}", account_id); + let mut guard = self.msgs_by_accounts.clone().lock_owned().await; + let (active_threads, msgs_by_account) = &mut *guard; + let msgs_count = msgs_by_account + .entry(account_id) + .and_modify(|val| { + if *val == 0 { + *active_threads += 1; + } + *val += 1; + }) + .or_insert_with(|| { + *active_threads += 1; + 1 + }); + if *msgs_count >= self.max_msgs_queue_on_account || *active_threads >= self.max_parallel_threads { + self.set_limits_reached(true); + } + + log::trace!("ParallelMsgsCounter: msgs count inreased for {}, counter state is: ({}, {:?})", account_id_str, active_threads, msgs_by_account); + } + + pub async fn sub_account_msgs_counter(&self, account_id: AccountId) { + let account_id_str = format!("{:x}", account_id); + let mut guard = self.msgs_by_accounts.clone().lock_owned().await; + let (active_threads, msgs_by_account) = &mut *guard; + let msgs_count = msgs_by_account + .entry(account_id) + .and_modify(|val| { + *val -= 1; + if *val == 0 { + *active_threads -= 1; + } + }) + .or_insert(0); + if *msgs_count < self.max_msgs_queue_on_account { + if *active_threads < self.max_parallel_threads && msgs_by_account.values().all(|c| *c < self.max_msgs_queue_on_account) { + self.set_limits_reached(false); + } + } + + log::trace!("ParallelMsgsCounter: msgs count decreased for {}, counter state is: ({}, {:?})", account_id_str, active_threads, msgs_by_account); + } +} + struct ExecutionManager { changed_accounts: HashMap< AccountId, ( - tokio::sync::mpsc::UnboundedSender>, + tokio::sync::mpsc::UnboundedSender>, tokio::task::JoinHandle> ) >, - - receive_tr: tokio::sync::mpsc::UnboundedReceiver, Result)>>, - wait_tr: Arc, Result)>>, + + msgs_queue: Vec<(AccountId, bool, Option)>, + accounts_processed_msgs: HashMap>, + + cancellation_token: tokio_util::sync::CancellationToken, + f_check_finalize_parallel_timeout: Box (bool, u32) + Send>, + + receive_tr: tokio::sync::mpsc::UnboundedReceiver, Result, u64)>>, + wait_tr: Arc, Result, u64)>>, max_collate_threads: usize, libraries: Libraries, gen_utime: u32, + parallel_msgs_counter: ParallelMsgsCounter, + // bloc's start logical time start_lt: u64, // actual maximum logical time - max_lt: Arc, + max_lt: u64, // this time is used if account's lt is smaller min_lt: Arc, // block random seed @@ -673,6 +1481,9 @@ struct ExecutionManager { collated_block_descr: Arc, debug: bool, config: BlockchainConfig, + + #[cfg(test)] + test_msg_process_sleep: u64, } impl ExecutionManager { @@ -685,16 +1496,23 @@ impl ExecutionManager { libraries: Libraries, config: BlockchainConfig, max_collate_threads: usize, + max_collate_msgs_queue_on_account: usize, collated_block_descr: Arc, debug: bool, + f_check_finalize_parallel_timeout: Box (bool, u32) + Send>, ) -> Result { log::trace!("{}: ExecutionManager::new", collated_block_descr); let (wait_tr, receive_tr) = Wait::new(); Ok(Self { changed_accounts: HashMap::new(), + msgs_queue: Vec::new(), + accounts_processed_msgs: HashMap::new(), + cancellation_token: tokio_util::sync::CancellationToken::new(), + f_check_finalize_parallel_timeout, receive_tr, wait_tr, max_collate_threads, + parallel_msgs_counter: ParallelMsgsCounter::new(max_collate_threads, max_collate_msgs_queue_on_account), libraries, config, start_lt, @@ -702,28 +1520,54 @@ impl ExecutionManager { seed_block, #[cfg(feature = "signature_with_id")] signature_id, - max_lt: Arc::new(AtomicU64::new(start_lt + 1)), + max_lt: start_lt + 1, min_lt: Arc::new(AtomicU64::new(start_lt + 1)), total_trans_duration: Arc::new(AtomicU64::new(0)), collated_block_descr, debug, + #[cfg(test)] + test_msg_process_sleep: 0, }) } + #[cfg(test)] + pub fn set_test_msg_process_sleep(&mut self, sleep_timeout: u64) { + self.test_msg_process_sleep = sleep_timeout; + } + // waits and finalizes all parallel tasks - pub async fn wait_transactions(&mut self, collator_data: &mut CollatorData) -> Result<()> { + pub async fn wait_transactions( + &mut self, + collator_data: &mut CollatorData, + ) -> Result<()> { log::trace!("{}: wait_transactions", self.collated_block_descr); + if self.is_parallel_processing_cancelled() { + log::debug!("{}: parallel collation was already stopped, do not wait transactions anymore", self.collated_block_descr); + return Ok(()); + } while self.wait_tr.count() > 0 { + log::trace!("{}: wait_tr count = {}", self.collated_block_descr, self.wait_tr.count()); self.wait_transaction(collator_data).await?; + + // stop parallel collation if finalize timeout reached + let check_finalize_parallel = (self.f_check_finalize_parallel_timeout)(); + if check_finalize_parallel.0 { + log::warn!("{}: FINALIZE PARALLEL TIMEOUT ({}ms) is elapsed, stop parallel collation", + self.collated_block_descr, check_finalize_parallel.1, + ); + self.cancel_parallel_processing(); + break; + } } - self.min_lt.fetch_max(self.max_lt.load(Ordering::Relaxed), Ordering::Relaxed); + self.commit_processed_msgs_changes(collator_data)?; + self.min_lt.fetch_max(self.max_lt, Ordering::Relaxed); Ok(()) } - // checks if a number of parallel transactilns is not too big, waits and finalizes some if needed. + // checks limits of parallel transactions reached, waits and finalizes some if needed. pub async fn check_parallel_transactions(&mut self, collator_data: &mut CollatorData) -> Result<()> { log::trace!("{}: check_parallel_transactions", self.collated_block_descr); - if self.wait_tr.count() >= self.max_collate_threads { + if self.parallel_msgs_counter.limits_reached() { self.wait_transaction(collator_data).await?; } Ok(()) @@ -735,17 +1579,30 @@ impl ExecutionManager { msg: AsyncMessage, prev_data: &PrevData, collator_data: &mut CollatorData, - ) -> Result<()> { + ) -> Result> { log::trace!("{}: execute (adding into queue): {:x}", self.collated_block_descr, account_id); - let msg = Arc::new(msg); + let msg_sync_key = self.get_next_msg_sync_key(); + + // store last processed internal (incl. New) message LT HASH in buffer + if let Some(lt_hash) = match &msg { + AsyncMessage::Int(enq, _) => Some((enq.created_lt(), enq.message_hash())), + AsyncMessage::New(env, _, created_lt) => Some((*created_lt, env.message_hash())), + _ => None, + } { + collator_data.add_last_proc_int_msg_to_buffer(msg_sync_key, lt_hash); + } + + let msg_hash = msg.compute_message_hash()?; + + let msg = Arc::new(AsyncMessageSync(msg_sync_key, msg)); if let Some((sender, _handle)) = self.changed_accounts.get(&account_id) { self.wait_tr.request(); sender.send(msg)?; } else { let shard_acc = if let Some(shard_acc) = prev_data.accounts().account(&account_id)? { shard_acc - } else if msg.is_external() { - return Ok(()); // skip external messages for unexisting accounts + } else if msg.1.is_external() { + return Ok(None); // skip external messages for unexisting accounts } else { ShardAccount::default() }; @@ -755,25 +1612,28 @@ impl ExecutionManager { )?; self.wait_tr.request(); sender.send(msg)?; - self.changed_accounts.insert(account_id, (sender, handle)); - } + self.changed_accounts.insert(account_id.clone(), (sender, handle)); + }; + + self.append_msgs_queue(msg_sync_key, &account_id, msg_hash); + self.parallel_msgs_counter.add_account_msgs_counter(account_id).await; self.check_parallel_transactions(collator_data).await?; - Ok(()) + Ok(Some(msg_sync_key)) } fn start_account_job( &self, account_addr: AccountId, shard_acc: ShardAccount, - ) -> Result<(tokio::sync::mpsc::UnboundedSender>, tokio::task::JoinHandle>)> { + ) -> Result<(tokio::sync::mpsc::UnboundedSender>, tokio::task::JoinHandle>)> { log::trace!("{}: start_account_job: {:x}", self.collated_block_descr, account_addr); let mut shard_acc = ShardAccountStuff::new( account_addr, shard_acc, - Arc::new(AtomicU64::new(self.min_lt.load(Ordering::Relaxed))), + self.min_lt.load(Ordering::Relaxed), )?; let debug = self.debug; @@ -783,34 +1643,49 @@ impl ExecutionManager { #[cfg(feature = "signature_with_id")] let signature_id = self.signature_id; let collated_block_descr = self.collated_block_descr.clone(); - let total_trans_duration = self.total_trans_duration.clone(); - let wait_tr = self.wait_tr.clone(); + let exec_mgr_total_trans_duration_rw = self.total_trans_duration.clone(); + let exec_mgr_wait_tr = self.wait_tr.clone(); let config = self.config.clone(); - let min_lt = self.min_lt.clone(); - let max_lt = self.max_lt.clone(); + let exec_mgr_min_lt_ro = self.min_lt.clone(); let libraries = self.libraries.clone().inner(); - let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::>(); + let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::>(); + let cancellation_token = self.cancellation_token.clone(); + #[cfg(test)] + let test_msg_process_sleep = self.test_msg_process_sleep; let handle = tokio::spawn(async move { while let Some(new_msg) = receiver.recv().await { + if cancellation_token.is_cancelled() { + log::debug!( + "{}: parallel collation was cancelled before message {} processing on {:x}", + collated_block_descr, + new_msg.0, + shard_acc.account_addr(), + ); + exec_mgr_wait_tr.respond(None); + break; + } + + #[cfg(test)] + { + let sleep_ms = test_msg_process_sleep; + log::trace!("{}: (msg_sync_key: {}) sleep {}ms to emulate hard load and slow smart contract...", collated_block_descr, new_msg.0, sleep_ms); + tokio::time::sleep(tokio::time::Duration::from_millis(sleep_ms)).await; + } + log::trace!("{}: new message for {:x}", collated_block_descr, shard_acc.account_addr()); let config = config.clone(); // TODO: use Arc - shard_acc.lt().fetch_max(min_lt.load(Ordering::Relaxed), Ordering::Relaxed); - shard_acc.lt().fetch_max( - shard_acc.last_trans_lt() + 1, - Ordering::Relaxed - ); - shard_acc.lt().fetch_max( - shard_acc.last_trans_lt() + 1, - Ordering::Relaxed - ); + let mut lt = shard_acc.lt().max(exec_mgr_min_lt_ro.load(Ordering::Relaxed)); // 1000 + lt = lt.max(shard_acc.last_trans_lt() + 1); // 1010+1=1011 + + let tx_last_lt = Arc::new(AtomicU64::new(lt)); let mut account_root = shard_acc.account_root(); let params = ExecuteParams { state_libs: libraries.clone(), block_unixtime, block_lt, - last_tr_lt: shard_acc.lt(), + last_tr_lt: tx_last_lt.clone(), // 1011, passed by reference seed_block: seed_block.clone(), debug, block_version: supported_version(), @@ -822,21 +1697,43 @@ impl ExecutionManager { let (mut transaction_res, account_root, duration) = tokio::task::spawn_blocking(move || { let now = std::time::Instant::now(); ( - Self::execute_new_message(&new_msg1, &mut account_root, config, params), + Self::execute_new_message(&new_msg1.1, &mut account_root, config, params), account_root, now.elapsed().as_micros() as u64 ) }).await?; - if let Ok(transaction) = transaction_res.as_mut() { - shard_acc.add_transaction(transaction, account_root)?; + if cancellation_token.is_cancelled() { + log::debug!( + "{}: parallel collation was cancelled after message {} processing on {:x}", + collated_block_descr, + new_msg.0, + shard_acc.account_addr(), + ); + exec_mgr_wait_tr.respond(None); + break; } - total_trans_duration.fetch_add(duration, Ordering::Relaxed); + + // LT transformations during execution: + // * params.last_tr_lt = max(account.last_tr_time(), params.last_tr_lt, in_msg.lt() + 1) + // * transaction.logical_time() = params.last_tr_lt (copy) + // * params.last_tr_lt = 1 + out_msgs.len() + // * tx_last_lt = params.last_tr_lt (by ref) + // So for 2 out_msgs may be: + // * transaction.logical_time() == 1011 + // * params.last_tr_lt == 1014 (1011+1+2) or 1104 (account.last_tr_time()+1+2) or 1024 (in_msg.lt()+1+1+2) + // * account.last_tr_time() == params.last_tr_lt == 1014 or 1104 or 1024 + // * tx_last_lt == params.last_tr_lt == 1014 or 1104 or 1024 + + let tx_last_lt = tx_last_lt.load(Ordering::Relaxed); + + shard_acc.apply_transaction_res(new_msg.0, tx_last_lt, &mut transaction_res, account_root)?; + + exec_mgr_total_trans_duration_rw.fetch_add(duration, Ordering::Relaxed); log::trace!("{}: account {:x} TIME execute {}μ;", collated_block_descr, shard_acc.account_addr(), duration); - max_lt.fetch_max(shard_acc.lt().load(Ordering::Relaxed), Ordering::Relaxed); - wait_tr.respond(Some((new_msg, transaction_res))); + exec_mgr_wait_tr.respond(Some((new_msg, transaction_res, tx_last_lt))); } Ok(shard_acc) }); @@ -853,7 +1750,7 @@ impl ExecutionManager { AsyncMessage::Int(enq, _our) => { (Box::new(OrdinaryTransactionExecutor::new(config)), Some(enq.message())) } - AsyncMessage::New(env, _prev_tr_cell) => { + AsyncMessage::New(env, _prev_tr_cell, _created_lt) => { (Box::new(OrdinaryTransactionExecutor::new(config)), Some(env.message())) } AsyncMessage::Recover(msg) | AsyncMessage::Mint(msg) | AsyncMessage::Ext(msg) => { @@ -872,19 +1769,28 @@ impl ExecutionManager { async fn wait_transaction(&mut self, collator_data: &mut CollatorData) -> Result<()> { log::trace!("{}: wait_transaction", self.collated_block_descr); let wait_op = self.wait_tr.wait(&mut self.receive_tr, false).await; - if let Some(Some((new_msg, transaction_res))) = wait_op { - self.finalize_transaction(new_msg, transaction_res, collator_data)?; + if let Some(Some((new_msg, transaction_res, tx_last_lt))) = wait_op { + // we can safely decrease parallel_msgs_counter because + // sender sends some until parallel processing not cancelled + let account_id = self.finalize_transaction(&new_msg, transaction_res, tx_last_lt, collator_data)?; + // decrease account msgs counter to control parallel processing limits + self.parallel_msgs_counter.sub_account_msgs_counter(account_id).await; + + // mark message as processed + self.set_msg_processed(new_msg.0); } Ok(()) } fn finalize_transaction( &mut self, - new_msg: Arc, + new_msg_sync: &AsyncMessageSync, transaction_res: Result, + tx_last_lt: u64, collator_data: &mut CollatorData - ) -> Result<()> { - if let AsyncMessage::Ext(ref msg) = new_msg.deref() { + ) -> Result { + let AsyncMessageSync(msg_sync_key, new_msg) = new_msg_sync; + if let AsyncMessage::Ext(ref msg) = new_msg { let msg_id = msg.serialize()?.repr_hash(); let account_id = msg.int_dst_account_id().unwrap_or_default(); if let Err(err) = transaction_res { @@ -893,8 +1799,8 @@ impl ExecutionManager { "{}: account {:x} rejected inbound external message {:x}, by reason: {}", self.collated_block_descr, account_id, msg_id, err ); - collator_data.rejected_ext_messages.push((msg_id, err.to_string())); - return Ok(()) + collator_data.add_rejected_ext_message_to_buffer(*msg_sync_key, (msg_id, err.to_string())); + return Ok(account_id) } else { log::debug!( target: EXT_MESSAGES_TRACE_TARGET, @@ -905,24 +1811,33 @@ impl ExecutionManager { } } let tr = transaction_res?; + let account_id = tr.account_id().clone(); let tr_cell = tr.serialize()?; log::trace!("{}: finalize_transaction {} with hash {:x}, {:x}", self.collated_block_descr, tr.logical_time(), tr_cell.repr_hash(), tr.account_id()); - let in_msg_opt = match new_msg.deref() { + let in_msg_opt = match new_msg { AsyncMessage::Int(enq, our) => { let in_msg = InMsg::final_msg(enq.envelope_cell(), tr_cell.clone(), enq.fwd_fee_remaining().clone()); if *our { let out_msg = OutMsg::dequeue_immediate(enq.envelope_cell(), in_msg.serialize()?); - collator_data.add_out_msg_to_block(enq.message_hash(), &out_msg)?; - collator_data.del_out_msg_from_state(&enq.out_msg_key())?; + let msg_hash = enq.message_hash(); + let prev_out_msg_slice_opt = collator_data.add_out_msg_to_block(msg_hash.clone(), &out_msg)?; + collator_data.add_out_msg_descr_to_history(*msg_sync_key, msg_hash, prev_out_msg_slice_opt); + collator_data.del_out_queue_msg_with_history(*msg_sync_key, enq.out_msg_key(), false)?; + collator_data.metrics.dequeued_our_out_int_msgs_count += 1; } + collator_data.metrics.processed_in_int_msgs_count += 1; Some(in_msg) } - AsyncMessage::New(env, prev_tr_cell) => { + AsyncMessage::New(env, prev_tr_cell, _created_lt) => { let env_cell = env.inner().serialize()?; let in_msg = InMsg::immediate(env_cell.clone(), tr_cell.clone(), env.fwd_fee_remaining().clone()); let out_msg = OutMsg::immediate(env_cell, prev_tr_cell.clone(), in_msg.serialize()?); - collator_data.add_out_msg_to_block(env.message_hash(), &out_msg)?; + let msg_hash = env.message_hash(); + let prev_out_msg_slice_opt = collator_data.add_out_msg_to_block(msg_hash.clone(), &out_msg)?; + collator_data.add_out_msg_descr_to_history(*msg_sync_key, msg_hash, prev_out_msg_slice_opt); + collator_data.del_out_queue_msg_with_history(*msg_sync_key, env.out_msg_key(), true)?; + collator_data.metrics.processed_new_msgs_count += 1; Some(in_msg) } AsyncMessage::Mint(msg) | @@ -936,6 +1851,7 @@ impl ExecutionManager { } AsyncMessage::Ext(msg) => { let in_msg = InMsg::external(msg.serialize()?, tr_cell.clone()); + collator_data.metrics.processed_in_ext_msgs_count += 1; Some(in_msg) } AsyncMessage::TickTock(_) => None @@ -947,18 +1863,179 @@ impl ExecutionManager { tr.in_msg_cell().unwrap_or_default().repr_hash() ); } - collator_data.new_transaction(&tr, tr_cell, in_msg_opt.as_ref())?; - collator_data.update_lt(self.max_lt.load(Ordering::Relaxed)); + collator_data.new_transaction(&tr, tr_cell, in_msg_opt.as_ref(), *msg_sync_key)?; + + collator_data.add_tx_last_lt_to_buffer(*msg_sync_key, tx_last_lt); - match new_msg.deref() { - AsyncMessage::Mint(_) => collator_data.mint_msg = in_msg_opt, - AsyncMessage::Recover(_) => collator_data.recover_create_msg = in_msg_opt, - AsyncMessage::Copyleft(_) => collator_data.copyleft_msgs.push(in_msg_opt.ok_or_else(|| error!("Can't unwrap `in_msg_opt`"))?), + match new_msg { + AsyncMessage::Mint(_) => collator_data.add_mint_msg_to_buffer(*msg_sync_key, in_msg_opt), + AsyncMessage::Recover(_) => collator_data.add_recover_create_msg_to_buffer(*msg_sync_key, in_msg_opt), + AsyncMessage::Copyleft(_) => collator_data.add_copyleft_msg_to_buffer(*msg_sync_key, in_msg_opt.ok_or_else(|| error!("Can't unwrap `in_msg_opt`"))?), _ => () } + + // Will not support history. When parallel collation cancelled + // no new msgs can be processed so we do not need to check limits anymore collator_data.block_full |= !collator_data.block_limit_status.fits(ParamLimitIndex::Normal); + Ok(account_id) + } + + /// Actually the length of messages queue + fn get_next_msg_sync_key(&self) -> usize { + self.msgs_queue.len() + } + fn append_msgs_queue(&mut self, msg_sync_key: usize, account_addr: &AccountId, msg_hash_opt: Option) { + self.msgs_queue.insert(msg_sync_key, (account_addr.clone(), false, msg_hash_opt)); + } + fn set_msg_processed(&mut self, msg_sync_key: usize) { + if let Some(entry) = self.msgs_queue.get_mut(msg_sync_key) { + entry.1 = true; + self.accounts_processed_msgs + .entry(entry.0.clone()) + .and_modify(|list| list.push(msg_sync_key)) + .or_insert([msg_sync_key].into()); + } + } + fn revert_last_account_processed_msg(&mut self, account_addr: &AccountId) { + if let Some(list) = self.accounts_processed_msgs.get_mut(account_addr) { + list.pop(); + } + } + + fn accounts_processed_msgs(&self) -> &HashMap> { + &self.accounts_processed_msgs + } + fn get_last_processed_msg_sync_key<'a>( + accounts_processed_msgs: &'a HashMap>, + for_account_addr: &'a AccountId, + ) -> Option<&'a usize> { + if let Some(entry) = accounts_processed_msgs.get(for_account_addr) { + entry.last() + } else { + None + } + } + + /// Signal to cancellation_token due to a finalizing timeout + pub fn cancel_parallel_processing(&mut self) { + self.cancellation_token.cancel(); + } + /// When cancellation_token was cancelled due to a finalizing timeout + pub fn is_parallel_processing_cancelled(&self) -> bool { + self.cancellation_token.is_cancelled() + } + + fn commit_processed_msgs_changes(&mut self, collator_data: &mut CollatorData) -> Result<()> { + // revert processed messages which going after first unprocessed + let mut msgs_to_revert = vec![]; + let mut msgs_to_revert_last_proc_int = vec![]; + let mut found_first_unprocessed = false; + for msg_sync_key in 0..self.msgs_queue.len() { + if let Some((account_addr, processed, msg_hash)) = self.msgs_queue.get(msg_sync_key) { + if *processed { + // collect all processed messages which going after first unprocessed + if found_first_unprocessed { + msgs_to_revert.push((account_addr.clone(), msg_sync_key, msg_hash.clone())); + msgs_to_revert_last_proc_int.push(msg_sync_key); + } + } else { + if !found_first_unprocessed { + found_first_unprocessed = true; + } + msgs_to_revert_last_proc_int.push(msg_sync_key); + } + } + } + collator_data.metrics.reverted_transactions_count += msgs_to_revert.len(); + for (account_addr, msg_sync_key, msg_hash) in msgs_to_revert.into_iter().rev() { + if let Some(msg_info) = self.msgs_queue.get_mut(msg_sync_key) { + msg_info.1 = false; + } + self.revert_msg_changes(collator_data, &msg_sync_key, &account_addr)?; + log::debug!( + "{}: reverted changes from message {:x} (sync_key: {}) on account {:x}", + self.collated_block_descr, msg_hash.unwrap_or_default(), msg_sync_key, account_addr, + ); + } + for msg_sync_key in msgs_to_revert_last_proc_int { + collator_data.revert_last_proc_int_msg_by_src_msg(&msg_sync_key); + } + + // commit all not reverted changes + self.commit_not_reverted_changes(collator_data)?; + + log::debug!("{}: all not reverted account changes committed", self.collated_block_descr); + + Ok(()) + } + fn revert_msg_changes( + &mut self, + collator_data: &mut CollatorData, + msg_sync_key: &usize, + account_addr: &AccountId, + ) -> Result<()> { + collator_data.execute_count -= 1; + + collator_data.revert_in_msgs_descr_by_src_msg(msg_sync_key)?; + collator_data.revert_out_msgs_descr_by_src_msg(msg_sync_key)?; + + let mut reverted_ext_msg = false; + if collator_data.revert_accepted_ext_message_by_src_msg(msg_sync_key) || + collator_data.revert_rejected_ext_message_by_src_msg(msg_sync_key) { + collator_data.metrics.processed_in_ext_msgs_count -= 1; + reverted_ext_msg = true; + } + + let mut reverted_new_msg = false; + if let Some(is_new) = collator_data.revert_del_out_queue_msg_by_src_msg(msg_sync_key)? { + if is_new { + collator_data.metrics.processed_new_msgs_count -= 1; + reverted_new_msg = true; + } else { + collator_data.metrics.dequeued_our_out_int_msgs_count -= 1; + } + } + collator_data.revert_add_out_queue_msgs_by_src_msg(msg_sync_key)?; + if let Some(reverted_count) = collator_data.revert_new_messages_by_src_msg(msg_sync_key) { + collator_data.metrics.created_new_msgs_count -= reverted_count; + } + + // if current reverted message is not external and not new then it is inbound internal + if !reverted_ext_msg && !reverted_new_msg { + collator_data.metrics.processed_in_int_msgs_count -= 1; + } + + collator_data.revert_mint_msg_by_src_msg(msg_sync_key); + collator_data.revert_recover_create_msg_by_src_msg(msg_sync_key); + collator_data.revert_copyleft_msg_by_src_msg(msg_sync_key); + collator_data.revert_tx_last_lt_by_src_msg(msg_sync_key); + + self.revert_last_account_processed_msg(account_addr); + + Ok(()) + } + fn commit_not_reverted_changes(&mut self, collator_data: &mut CollatorData) -> Result<()> { + collator_data.commit_in_msgs_descr_by_src_msg(); + collator_data.commit_out_msgs_descr_by_src_msg(); + collator_data.commit_accepted_ext_messages(); + collator_data.commit_rejected_ext_messages(); + collator_data.commit_del_out_queue_msgs()?; + collator_data.commit_add_out_queue_msgs()?; + collator_data.commit_new_messages(); + + collator_data.commit_mint_msg(); + collator_data.commit_recover_create_msg(); + collator_data.commit_copyleft_msgs(); + + collator_data.commit_last_proc_int_msg()?; + + // save max lt + if let Some(max_lt) = collator_data.commit_tx_last_lt() { + self.max_lt = max_lt; + } + Ok(()) } } @@ -983,6 +2060,11 @@ pub struct Collator { started: Instant, stop_flag: Arc, + + finalize_parallel_timeout_ms: u32, + + #[cfg(test)] + test_msg_process_sleep: u64, } impl Collator { @@ -994,7 +2076,7 @@ impl Collator { created_by: UInt256, engine: Arc, rand_seed: Option, - collator_settings: CollatorSettings + collator_settings: CollatorSettings, ) -> Result { log::debug!( @@ -1065,6 +2147,7 @@ impl Collator { root_hash: UInt256::default(), file_hash: UInt256::default(), }, + finalize_parallel_timeout_ms: engine.collator_config().get_finalize_parallel_timeout_ms(), engine, shard, min_mc_seqno, @@ -1079,9 +2162,16 @@ impl Collator { collator_settings, started: Instant::now(), stop_flag: Arc::new(AtomicBool::new(false)), + #[cfg(test)] + test_msg_process_sleep: 0, }) } + #[cfg(test)] + pub fn set_test_msg_process_sleep(&mut self, sleep_timeout: u64) { + self.test_msg_process_sleep = sleep_timeout; + } + pub async fn collate(mut self) -> Result<(BlockCandidate, ShardStateUnsplit)> { log::info!( "{}: COLLATE min_mc_seqno = {}, prev_blocks_ids: {} {}", @@ -1095,6 +2185,7 @@ impl Collator { let mut collator_data; let mut attempt = 0; let mut duration; + let mut elapsed_on_empty_collation = 0; // inside the loop try to collate new block let (candidate, state, exec_manager) = loop { @@ -1118,6 +2209,9 @@ impl Collator { e })?; + collator_data.metrics.set_elapsed_on_empty_collations(elapsed_on_empty_collation); + collator_data.metrics.save_elapsed_on_prepare_data(&self); + // load messages and process them to produce block candidate let result = self.do_collate(&mc_data, &prev_data, &mut collator_data).await .map_err(|e| { @@ -1125,6 +2219,11 @@ impl Collator { self.collated_block_descr, self.started.elapsed().as_millis(), e); e }); + if result.is_err() { + #[cfg(feature = "log_metrics")] + collator_data.metrics.report_metrics(); + collator_data.metrics.log_metrics(self.collated_block_descr.clone()); + } duration = attempt_started.elapsed().as_millis() as u32; if let Some(result) = result? { break result; @@ -1140,6 +2239,8 @@ impl Collator { ); tokio::time::sleep(Duration::from_millis(sleep as u64)).await; + + elapsed_on_empty_collation = self.started.elapsed().as_millis(); }; let ratio = match duration { @@ -1158,6 +2259,10 @@ impl Collator { candidate.block_id, ); + #[cfg(feature = "log_metrics")] + collator_data.metrics.report_metrics(); + collator_data.metrics.log_metrics(self.collated_block_descr.clone()); + #[cfg(feature = "log_metrics")] report_collation_metrics( &self.shard, @@ -1250,6 +2355,8 @@ impl Collator { usage_tree, &prev_data, is_masterchain, + self.collated_block_descr.clone(), + self.shard.clone(), )?; if !self.shard.is_masterchain() { let (now_upper_limit, before_split, _accept_msgs) = check_this_shard_mc_info( @@ -1317,27 +2424,23 @@ impl Collator { // loads out queues from neighbors and out queue of current shard let mut output_queue_manager = self.request_neighbor_msg_queues(mc_data, prev_data, collator_data).await?; - let mut out_queue_cleaned_partial = false; - let mut out_queue_clean_deleted = 0; + // indicates if initial out queue clean was partial + let mut initial_out_queue_clean_partial = false; + // stores the deleted messages count during the inital clean + let mut initial_out_queue_clean_deleted_count = 0; + // delete delivered messages from output queue for a limited time if !self.after_split { - // delete delivered messages from output queue for a limited time - let now = std::time::Instant::now(); - let cc = self.engine.collator_config(); - let clean_timeout_nanos = (cc.cutoff_timeout_ms as i128) * 1_000_000 * (cc.clean_timeout_percentage_points as i128) / 1000; - let processed; - (out_queue_cleaned_partial, processed, out_queue_clean_deleted) = - self.clean_out_msg_queue(mc_data, collator_data, &mut output_queue_manager, clean_timeout_nanos, cc.optimistic_clean_percentage_points).await?; - let elapsed = now.elapsed().as_millis(); + let clean_timeout_nanos = self.get_initial_clean_timeout_nanos(); + let elapsed; + (initial_out_queue_clean_partial, initial_out_queue_clean_deleted_count, elapsed) = + self.clean_out_msg_queue(mc_data, collator_data, &mut output_queue_manager, + clean_timeout_nanos, self.engine.collator_config().optimistic_clean_percentage_points, true, + ).await?; + collator_data.metrics.save_elapsed_on_initial_clean(&self); log::debug!("{}: TIME: clean_out_msg_queue initial {}ms;", self.collated_block_descr, elapsed); - let labels = [("shard", self.shard.to_string()), ("step", "initial".to_owned())]; - metrics::gauge!("clean_out_msg_queue_partial", if out_queue_cleaned_partial { 1.0 } else { 0.0 }, &labels); - metrics::gauge!("clean_out_msg_queue_elapsed", elapsed as f64, &labels); - metrics::gauge!("clean_out_msg_queue_processed", processed as f64, &labels); - metrics::gauge!("clean_out_msg_queue_deleted", out_queue_clean_deleted as f64, &labels); } else { - log::debug!("{}: TIME: clean_out_msg_queue initial SKIPPED because of after_split block", - self.collated_block_descr); + log::debug!("{}: TIME: clean_out_msg_queue initial SKIPPED because of after_split block", self.collated_block_descr); } // copy out msg queue from next state which is cleared compared to previous @@ -1347,6 +2450,14 @@ impl Collator { // compute created / minted / recovered / from_prev_blk self.update_value_flow(mc_data, &prev_data, collator_data)?; + // closure to check the finalize timeout for parallel transactions + let collation_started = self.started.clone(); + let finalize_parallel_timeout_ms = self.finalize_parallel_timeout_ms; + let check_finilize_parallel_timeout_closure = move || ( + collation_started.elapsed().as_millis() as u32 > finalize_parallel_timeout_ms, + finalize_parallel_timeout_ms, + ); + let mut exec_manager = ExecutionManager::new( collator_data.gen_utime(), collator_data.start_lt()?, @@ -1356,10 +2467,15 @@ impl Collator { mc_data.libraries()?.clone(), collator_data.config.clone(), self.engine.collator_config().max_collate_threads as usize, + self.engine.collator_config().max_collate_msgs_queue_on_account as usize, self.collated_block_descr.clone(), self.debug, + Box::new(check_finilize_parallel_timeout_closure), )?; + #[cfg(test)] + exec_manager.set_test_msg_process_sleep(self.test_msg_process_sleep); + // tick & special transactions if self.shard.is_masterchain() { self.create_ticktock_transactions( @@ -1378,6 +2494,7 @@ impl Collator { let now = std::time::Instant::now(); self.process_inbound_internal_messages(prev_data, collator_data, &output_queue_manager, &mut exec_manager).await?; + collator_data.metrics.save_elapsed_on_internals_processed(&self); log::debug!("{}: TIME: process_inbound_internal_messages {}ms;", self.collated_block_descr, now.elapsed().as_millis()); @@ -1385,8 +2502,10 @@ impl Collator { // import remp messages (if space&gas left) let now = std::time::Instant::now(); let total = remp_messages.len(); - let processed = self.process_remp_messages(prev_data, collator_data, &mut exec_manager, - remp_messages).await?; + let processed = self.process_remp_messages( + prev_data, collator_data, &mut exec_manager, remp_messages + ).await?; + collator_data.metrics.save_elapsed_on_remp_processed(&self); log::debug!("{}: TIME: process_remp_messages {}ms, processed {}, ignored {}", self.collated_block_descr, now.elapsed().as_millis(), processed, total - processed); } @@ -1394,15 +2513,21 @@ impl Collator { // import inbound external messages (if space&gas left) let now = std::time::Instant::now(); self.process_inbound_external_messages(prev_data, collator_data, &mut exec_manager).await?; - log::debug!("{}: TIME: process_inbound_external_messages {}ms;", - self.collated_block_descr, now.elapsed().as_millis()); + collator_data.metrics.save_elapsed_on_externals_processed(&self); + log::debug!( + "{}: TIME: process_inbound_external_messages {}ms;", + self.collated_block_descr, + now.elapsed().as_millis(), + ); metrics::histogram!("collator_process_inbound_external_messages_time", now.elapsed()); // process newly-generated messages (if space&gas left) - // (if we were unable to process all inbound messages, all new messages must be queued) + // (all new messages were queued already, we remove them if we process them) let now = std::time::Instant::now(); - self.process_new_messages(!collator_data.inbound_queues_empty, prev_data, - collator_data, &mut exec_manager).await?; + if collator_data.inbound_queues_empty { + self.process_new_messages(prev_data, collator_data, &mut exec_manager).await?; + collator_data.metrics.save_elapsed_on_new_processed(&self); + } log::debug!("{}: TIME: process_new_messages {}ms;", self.collated_block_descr, now.elapsed().as_millis()); metrics::histogram!("collator_process_new_messages_time", now.elapsed()); @@ -1411,41 +2536,30 @@ impl Collator { self.collated_block_descr); } - let clean_remaining_timeout_nanos = self.get_remaining_clean_time_limit_nanos(); - - if !collator_data.block_full && out_queue_cleaned_partial && out_queue_clean_deleted == 0 && clean_remaining_timeout_nanos > 10_000_000 { + // perform secondary out queue clean + // if block limits not reached, inital clean was partial and not messages were deleted + let secondary_clean_timeout_nanos = self.get_secondary_clean_timeout_nanos(); + if self.check_should_perform_secondary_clean( + collator_data.block_full, + initial_out_queue_clean_partial, + initial_out_queue_clean_deleted_count, + secondary_clean_timeout_nanos, + ) { if !self.after_split { - // we have collation time left and out msg queue was not fully processed - // so will try to clean more for a remaining time only by random algorithm - let now = std::time::Instant::now(); - // set current out msg queue to manager to process new clean *output_queue_manager.next_mut() = std::mem::take(&mut collator_data.out_msg_queue_info); - let processed; - (out_queue_cleaned_partial, processed, out_queue_clean_deleted) = - self.clean_out_msg_queue(mc_data, collator_data, &mut output_queue_manager, clean_remaining_timeout_nanos, 0).await?; - let elapsed = now.elapsed().as_millis(); - log::debug!("{}: TIME: clean_out_msg_queue remaining {}ms;", self.collated_block_descr, elapsed); - let labels = [("shard", self.shard.to_string()), ("step", "remaining".to_owned())]; - metrics::gauge!("clean_out_msg_queue_partial", if out_queue_cleaned_partial { 1.0 } else { 0.0 }, &labels); - metrics::gauge!("clean_out_msg_queue_elapsed", elapsed as f64, &labels); - metrics::gauge!("clean_out_msg_queue_processed", processed as f64, &labels); - metrics::gauge!("clean_out_msg_queue_deleted", out_queue_clean_deleted as f64, &labels); - + let (_, _, elapsed) = + self.clean_out_msg_queue(mc_data, collator_data, &mut output_queue_manager, secondary_clean_timeout_nanos, 0, false).await?; + collator_data.metrics.save_elapsed_on_secondary_clean(&self); + log::debug!("{}: TIME: clean_out_msg_queue secondary {}ms;", self.collated_block_descr, elapsed); + // copy out msg queue from manager after clean collator_data.out_msg_queue_info = output_queue_manager.take_next(); collator_data.out_msg_queue_info.forced_fix_out_queue()?; } else { - log::debug!("{}: TIME: clean_out_msg_queue remaining SKIPPED because of after_split block", - self.collated_block_descr); + log::debug!("{}: TIME: clean_out_msg_queue secondary SKIPPED because of after_split block", self.collated_block_descr); } - } else { - let labels = [("shard", self.shard.to_string()), ("step", "remaining".to_owned())]; - metrics::gauge!("clean_out_msg_queue_partial", 0.0, &labels); - metrics::gauge!("clean_out_msg_queue_elapsed", 0.0, &labels); - metrics::gauge!("clean_out_msg_queue_processed", 0.0, &labels); - metrics::gauge!("clean_out_msg_queue_deleted", 0.0, &labels); } // split prepare / split install @@ -1457,10 +2571,6 @@ impl Collator { true, mc_data, prev_data, collator_data, &mut exec_manager).await?; } - // process newly-generated messages (only by including them into output queue) - self.process_new_messages( - true, prev_data, collator_data, &mut exec_manager).await?; - // If block is empty - stop collation to try one more time (may be there are some new messages) let cc = self.engine.collator_config(); if !self.after_split && @@ -1488,6 +2598,8 @@ impl Collator { let result = self.finalize_block( mc_data, prev_data, collator_data, exec_manager, new_state_copyleft_rewards).await?; + collator_data.metrics.save_elapsed_on_finalize(&self); + Ok(Some(result)) } @@ -1498,11 +2610,16 @@ impl Collator { output_queue_manager: &mut MsgQueueManager, clean_timeout_nanos: i128, optimistic_clean_percentage_points: u32, - ) -> Result<(bool, i32, i32)> { - log::debug!("{}: clean_out_msg_queue", self.collated_block_descr); + is_initial: bool, + ) -> Result<(bool, i32, u128)> { + log::debug!("{}: clean_out_msg_queue {}", self.collated_block_descr, if is_initial { "initial" } else { "secondary" }); let short = mc_data.config().has_capability(GlobalCapabilities::CapShortDequeue); - output_queue_manager.clean_out_msg_queue(clean_timeout_nanos, optimistic_clean_percentage_points, |message, root| { + let now = std::time::Instant::now(); + + let ( + partial, processed, deleted, + ) = output_queue_manager.clean_out_msg_queue(clean_timeout_nanos, optimistic_clean_percentage_points, |message, root| { self.check_stop_flag()?; if let Some((enq, deliver_lt)) = message { log::trace!("{}: dequeue message: {:x}", self.collated_block_descr, enq.message_hash()); @@ -1510,12 +2627,21 @@ impl Collator { collator_data.block_limit_status.register_out_msg_queue_op(root, &collator_data.usage_tree, false)?; // normal limit reached, but we can add for soft and hard limit let stop = !collator_data.block_limit_status.fits(ParamLimitIndex::Normal); + if stop && is_initial { + collator_data.metrics.save_stopped_by_limits_on_initial_clean(); + } Ok(stop) } else { collator_data.block_limit_status.register_out_msg_queue_op(root, &collator_data.usage_tree, true)?; Ok(true) } - }).await + }).await?; + + let elapsed = now.elapsed().as_millis(); + + collator_data.metrics.save_clean_out_queue_metrics(is_initial, partial, elapsed, processed, deleted); + + Ok((partial, deleted, elapsed)) } // @@ -2273,7 +3399,9 @@ impl Collator { "{}: message {:x}, lt: {}, enq lt: {}", self.collated_block_descr, key, created_lt, enq.enqueued_lt() ); - collator_data.update_last_proc_int_msg((created_lt, enq.message_hash()))?; + + // Do not need to update last processed int message LT_HASH here + // if it is already processed or not sent to us if collator_data.out_msg_queue_info.already_processed(&enq)? { log::trace!( "{}: message {:x} has been already processed by us before, skipping", @@ -2283,13 +3411,17 @@ impl Collator { self.check_inbound_internal_message(&key, &enq, created_lt, block_id.shard()) .map_err(|err| error!("problem processing internal inbound message \ with hash {:x} : {}", key.hash, err))?; + let src_addr = enq.message().src().unwrap_or_default().address(); let our = self.shard.contains_full_prefix(&enq.cur_prefix()); let to_us = self.shard.contains_full_prefix(&enq.dst_prefix()); if to_us { let account_id = enq.dst_account_id()?; - log::debug!("{}: message {:x} sent to execution to account {:x}", self.collated_block_descr, key.hash, account_id); let msg = AsyncMessage::Int(enq, our); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + let msg_sync_key = exec_manager.execute(account_id.clone(), msg, prev_data, collator_data).await?; + log::debug!( + "{}: int message {:x} (sync_key: {:?}) from {:x} sent to execution to account {:x}", + self.collated_block_descr, key.hash, msg_sync_key, src_addr, account_id, + ); } else { // println!("{:x} {:#}", key, enq); // println!("cur: {}, dst: {}", enq.cur_prefix(), enq.dst_prefix()); @@ -2301,10 +3433,12 @@ impl Collator { } } if collator_data.block_full { - log::debug!("{}: BLOCK FULL, stop processing internal messages", self.collated_block_descr); + collator_data.metrics.save_stopped_by_limits_on_internals(); + log::debug!("{}: BLOCK FULL (>= Soft), stop processing internal messages", self.collated_block_descr); break } if self.check_cutoff_timeout() { + collator_data.metrics.save_stopped_by_timeout_on(CollationStoppedOnTimeoutStep::Internals); log::warn!("{}: TIMEOUT ({}ms) is elapsed, stop processing internal messages", self.collated_block_descr, self.engine.collator_config().cutoff_timeout_ms); break @@ -2313,6 +3447,7 @@ impl Collator { } // all internal messages are processed collator_data.inbound_queues_empty = iter.next().is_none(); + collator_data.metrics.not_all_internals_processed = !collator_data.inbound_queues_empty; Ok(()) } @@ -2357,24 +3492,37 @@ impl Collator { return Ok(()) } + if exec_manager.is_parallel_processing_cancelled() { + log::debug!("{}: parallel processing cancelled, skipping processing of inbound external messages", self.collated_block_descr); + return Ok(()) + } + log::debug!("{}: process_inbound_external_messages", self.collated_block_descr); for (msg, id) in self.engine.get_external_messages_iterator(self.shard.clone()) { + if !collator_data.block_limit_status.fits(ParamLimitIndex::Soft) { + collator_data.metrics.save_stopped_by_limits_on_externals(); + collator_data.metrics.not_all_externals_processed = true; + log::debug!("{}: BLOCK FULL (>= Medium), stop processing external messages", self.collated_block_descr); + break; + } + if self.check_cutoff_timeout() { + collator_data.metrics.save_stopped_by_timeout_on(CollationStoppedOnTimeoutStep::Externals); + collator_data.metrics.not_all_externals_processed = true; + log::warn!("{}: TIMEOUT ({}ms) is elapsed, stop processing external messages", + self.collated_block_descr, self.engine.collator_config().cutoff_timeout_ms, + ); + break; + } let header = msg.ext_in_header().ok_or_else(|| error!("message {:x} \ is not external inbound message", id))?; if self.shard.contains_address(&header.dst)? { - if !collator_data.block_limit_status.fits(ParamLimitIndex::Soft) { - log::debug!("{}: BLOCK FULL, stop processing external messages", self.collated_block_descr); - break - } - if self.check_cutoff_timeout() { - log::warn!("{}: TIMEOUT is elapsed, stop processing external messages", - self.collated_block_descr); - break - } let (_, account_id) = header.dst.extract_std_address(true)?; let msg = AsyncMessage::Ext(msg.deref().clone()); - log::debug!("{}: message {:x} sent to execution", self.collated_block_descr, id); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + let msg_sync_key = exec_manager.execute(account_id.clone(), msg, prev_data, collator_data).await?; + log::debug!( + "{}: ext message {:x} (sync_key: {:?}) sent to execution to account {:x}", + self.collated_block_descr, id, msg_sync_key, account_id, + ); } else { // usually node collates more than one shard, the message can belong another one, // so we can't postpone it @@ -2417,10 +3565,14 @@ impl Collator { is not external inbound message", id))?; if self.shard.contains_address(&header.dst)? { if !collator_data.block_limit_status.fits_normal(REMP_CUTOFF_LIMIT) { + collator_data.metrics.save_stopped_by_limits_on_remp(); + collator_data.metrics.not_all_remp_processed = true; log::trace!("{}: block is loaded enough, stop processing remp messages", self.collated_block_descr); ignored.push(id); ignore = true; } else if self.check_cutoff_timeout() { + collator_data.metrics.save_stopped_by_timeout_on(CollationStoppedOnTimeoutStep::Remp); + collator_data.metrics.not_all_remp_processed = true; log::warn!("{}: TIMEOUT is elapsed, stop processing remp messages", self.collated_block_descr); ignored.push(id); @@ -2428,8 +3580,11 @@ impl Collator { } else { let (_, account_id) = header.dst.extract_std_address(true)?; let msg = AsyncMessage::Ext(msg.deref().clone()); - log::trace!("{}: remp message {:x} sent to execution", self.collated_block_descr, id); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + let msg_sync_key = exec_manager.execute(account_id.clone(), msg, prev_data, collator_data).await?; + log::trace!( + "{}: remp message {:x} (sync_key: {:?}) sent to execution to account {:x}", + self.collated_block_descr, id, msg_sync_key, account_id, + ); } } else { log::warn!( @@ -2443,50 +3598,78 @@ impl Collator { exec_manager.wait_transactions(collator_data).await?; let (accepted, rejected) = collator_data.withdraw_ext_msg_statuses(); let processed = accepted.len() + rejected.len(); + collator_data.metrics.processed_remp_msgs_count += processed; + //let accepted = accepted.into_iter().map(|(id, _)| id).collect(); collator_data.set_remp_msg_statuses(accepted, rejected, ignored); Ok(processed) } async fn process_new_messages( &self, - mut enqueue_only: bool, prev_data: &PrevData, collator_data: &mut CollatorData, exec_manager: &mut ExecutionManager, ) -> Result<()> { + if exec_manager.is_parallel_processing_cancelled() { + log::debug!("{}: parallel processing cancelled, skipping processing of new messages", self.collated_block_descr); + return Ok(()) + } + log::debug!("{}: process_new_messages", self.collated_block_descr); let use_hypercube = !collator_data.config.has_capability(GlobalCapabilities::CapOffHypercube); - while !collator_data.new_messages.is_empty() { + let mut stop_processing = false; + while !stop_processing && !collator_data.new_messages.is_empty() { // In the iteration we execute only existing messages. // Newly generating messages will be executed next itaration (only after waiting). let mut new_messages = std::mem::take(&mut collator_data.new_messages); + log::debug!("{}: new_messages count: {}", self.collated_block_descr, new_messages.len()); // we can get sorted items somehow later while let Some(NewMessage{ lt_hash: (created_lt, hash), msg, tr_cell, prefix }) = new_messages.pop() { let info = msg.int_header().ok_or_else(|| error!("message is not internal"))?; - let fwd_fee = *info.fwd_fee(); - enqueue_only |= collator_data.block_full | self.check_cutoff_timeout(); - if enqueue_only || !self.shard.contains_address(&info.dst)? { - // everything was made in new_transaction + + if !collator_data.block_limit_status.fits(ParamLimitIndex::Soft) { + collator_data.metrics.save_stopped_by_limits_on_new_messages(); + log::debug!("{}: BLOCK FULL (>= Medium), stop processing new messages", self.collated_block_descr); + stop_processing = true; + break; + } + if self.check_cutoff_timeout() { + collator_data.metrics.save_stopped_by_timeout_on(CollationStoppedOnTimeoutStep::NewMessages); + log::warn!("{}: TIMEOUT ({}ms) is elapsed, stop processing new messages", + self.collated_block_descr, self.engine.collator_config().cutoff_timeout_ms, + ); + stop_processing = true; + break; + } + + if !self.shard.contains_address(&info.dst)? { + // skip msg if it is not to our shard } else { CHECK!(info.created_at.as_u32(), collator_data.gen_utime); - let key = OutMsgQueueKey::with_account_prefix(&prefix, hash.clone()); - collator_data.out_msg_queue_info.del_message(&key)?; - collator_data.enqueue_count -= 1; + let fwd_fee = *info.fwd_fee(); let env = MsgEnvelopeStuff::new(msg, &self.shard, fwd_fee, use_hypercube)?; let account_id = env.message().int_dst_account_id().unwrap_or_default(); - collator_data.update_last_proc_int_msg((created_lt, hash))?; - let msg = AsyncMessage::New(env, tr_cell); - log::debug!("{}: message {:x} sent to execution", self.collated_block_descr, key.hash); - exec_manager.execute(account_id, msg, prev_data, collator_data).await?; + let msg = AsyncMessage::New(env, tr_cell, created_lt); + let msg_sync_key = exec_manager.execute(account_id.clone(), msg, prev_data, collator_data).await?; + log::debug!( + "{}: new int message {:x} (sync_key: {:?}) sent to execution to account {:x}", + self.collated_block_descr, hash, msg_sync_key, account_id, + ); }; self.check_stop_flag()?; } + if !new_messages.is_empty() { + collator_data.metrics.not_all_new_messages_processed = true; + } exec_manager.wait_transactions(collator_data).await?; self.check_stop_flag()?; } + if !collator_data.new_messages.is_empty() { + collator_data.metrics.not_all_new_messages_processed = true; + } Ok(()) } @@ -2635,10 +3818,24 @@ impl Collator { let mut changed_accounts = HashMap::new(); let mut new_config_opt = None; let mut current_workchain_copyleft_rewards = CopyleftRewards::default(); + let accounts_processed_msgs = exec_manager.accounts_processed_msgs().clone(); for (account_id, (sender, handle)) in exec_manager.changed_accounts.drain() { std::mem::drop(sender); let mut shard_acc = handle.await .map_err(|err| error!("account {:x} thread didn't finish: {}", account_id, err))??; + + // commit account state by last processed msg before the canceling of parallel collation + shard_acc = match ExecutionManager::get_last_processed_msg_sync_key( + &accounts_processed_msgs, + &account_id, + ) { + None => continue, + Some(msg_sync_key) => match shard_acc.commit(*msg_sync_key)? { + None => continue, + Some(committed) => committed, + } + }; + let account = shard_acc.read_account()?; if let Some(addr) = &config_addr { if addr == &account_id { @@ -2653,7 +3850,7 @@ impl Collator { if !acc_block.transactions().is_empty() { accounts.insert(&acc_block)?; } - current_workchain_copyleft_rewards.merge_rewards(shard_acc.copyleft_rewards())?; + current_workchain_copyleft_rewards.merge_rewards(shard_acc.copyleft_rewards()?)?; changed_accounts.insert(account_id, shard_acc); } @@ -2883,23 +4080,23 @@ impl Collator { if workchain_id != -1 && (collator_data.dequeue_count > 0 || collator_data.enqueue_count > 0 || collator_data.in_msg_count > 0 || collator_data.out_msg_count > 0 || collator_data.execute_count > 0 - || collator_data.transit_count > 0 + || collator_data.transit_count > 0 || changed_accounts.len() > 0 ) { log::debug!( "{}: finalize_block finished: dequeue_count: {}, enqueue_count: {}, in_msg_count: {}, out_msg_count: {}, \ - execute_count: {}, transit_count: {}", + execute_count: {}, transit_count: {}, changed_accounts: {}", self.collated_block_descr, collator_data.dequeue_count, collator_data.enqueue_count, collator_data.in_msg_count, collator_data.out_msg_count, collator_data.execute_count, - collator_data.transit_count, + collator_data.transit_count, changed_accounts.len(), ); } log::trace!( "{}: finalize_block finished: dequeue_count: {}, enqueue_count: {}, in_msg_count: {}, out_msg_count: {}, \ - execute_count: {}, transit_count: {}, data len: {}", + execute_count: {}, transit_count: {}, changed_accounts: {}, data len: {}", self.collated_block_descr, collator_data.dequeue_count, collator_data.enqueue_count, collator_data.in_msg_count, collator_data.out_msg_count, collator_data.execute_count, - collator_data.transit_count, candidate.data.len(), + collator_data.transit_count, changed_accounts.len(), candidate.data.len(), ); Ok((candidate, new_state, exec_manager)) } @@ -3532,19 +4729,41 @@ impl Collator { self.started.elapsed().as_millis() as u32 > cutoff_timeout } + fn check_finilize_parallel_timeout(&self) -> (bool, u32) { + ( + self.started.elapsed().as_millis() as u32 > self.finalize_parallel_timeout_ms, + self.finalize_parallel_timeout_ms, + ) + } + fn get_remaining_cutoff_time_limit_nanos(&self) -> i128 { let cutoff_timeout_nanos = self.engine.collator_config().cutoff_timeout_ms as i128 * 1_000_000; let elapsed_nanos = self.started.elapsed().as_nanos() as i128; cutoff_timeout_nanos - elapsed_nanos } - fn get_remaining_clean_time_limit_nanos(&self) -> i128 { + fn get_initial_clean_timeout_nanos(&self) -> i128 { + let cc = self.engine.collator_config(); + (cc.cutoff_timeout_ms as i128) * 1_000_000 * (cc.clean_timeout_percentage_points as i128) / 1000 + } + + fn get_secondary_clean_timeout_nanos(&self) -> i128 { let remaining_cutoff_timeout_nanos = self.get_remaining_cutoff_time_limit_nanos(); let cc = self.engine.collator_config(); let max_secondary_clean_timeout_nanos = (cc.cutoff_timeout_ms as i128) * 1_000_000 * (cc.max_secondary_clean_timeout_percentage_points as i128) / 1000; remaining_cutoff_timeout_nanos.min(max_secondary_clean_timeout_nanos) } + fn check_should_perform_secondary_clean( + &self, + block_full: bool, + prev_out_queue_cleaned_partial: bool, + prev_out_queue_clean_deleted_count: i32, + clean_timeout_nanos: i128, + ) -> bool { + !block_full && prev_out_queue_cleaned_partial && prev_out_queue_clean_deleted_count == 0 && clean_timeout_nanos >= 10_000_000 + } + fn check_stop_flag(&self) -> Result<()> { if self.stop_flag.load(Ordering::Relaxed) { fail!("Stop flag was set") @@ -3608,4 +4827,3 @@ pub fn report_collation_metrics( metrics::histogram!("gas_rate_collator", gas_rate as f64, &labels); metrics::histogram!("block_size", block_size as f64, &labels); } - diff --git a/src/validator/out_msg_queue.rs b/src/validator/out_msg_queue.rs index f71c9cda..7a9c4261 100644 --- a/src/validator/out_msg_queue.rs +++ b/src/validator/out_msg_queue.rs @@ -27,7 +27,7 @@ use ton_block::{ OutMsgQueueInfo, OutMsgQueue, OutMsgQueueKey, IhrPendingInfo, ProcessedInfo, ProcessedUpto, ProcessedInfoKey, ShardHashes, AccountIdPrefixFull, - HashmapAugType, ShardStateUnsplit, + HashmapAugType, ShardStateUnsplit, EnqueuedMsg, }; use ton_types::{ error, fail, BuilderData, Cell, LabelReader, SliceData, IBitstring, Result, UInt256, @@ -590,13 +590,14 @@ impl OutMsgQueueInfoStuff { self.out_queue_mut()?.set(&key, enq.enqueued(), &enq.created_lt()) } - pub fn del_message(&mut self, key: &OutMsgQueueKey) -> Result<()> { + pub fn del_message(&mut self, key: &OutMsgQueueKey) -> Result { let labels = [("shard", self.shard().to_string())]; metrics::counter!("out_msg_queue_del", 1, &labels); - if self.out_queue_mut()?.remove(SliceData::load_bitstring(key.write_to_new_cell()?)?)?.is_none() { + if let Some(mut msg_data) = self.out_queue_mut()?.remove(SliceData::load_bitstring(key.write_to_new_cell()?)?)? { + EnqueuedMsg::construct_from(&mut msg_data) + } else { fail!("error deleting from out_msg_queue dictionary: {:x}", key) } - Ok(()) } // remove all messages which are not from new_shard @@ -1125,7 +1126,7 @@ impl MsgQueueManager { pub async fn clean_out_msg_queue( &mut self, clean_timeout_nanos: i128, - optimistic_clean_percentage_points: u32, + ordered_clean_percentage_points: u32, mut on_message: impl FnMut(Option<(MsgEnqueueStuff, u64)>, Option<&Cell>) -> Result ) -> Result<(bool, i32, i32)> { let timer = std::time::Instant::now(); @@ -1149,7 +1150,7 @@ impl MsgQueueManager { let mut deleted = 0; let mut skipped = 0; - let ordered_cleaning_timeout_nanos = clean_timeout_nanos * (optimistic_clean_percentage_points as i128) / 1000; + let ordered_cleaning_timeout_nanos = clean_timeout_nanos * (ordered_clean_percentage_points as i128) / 1000; let random_cleaning_timeout_nanos = clean_timeout_nanos - ordered_cleaning_timeout_nanos; log::debug!( @@ -1163,29 +1164,17 @@ impl MsgQueueManager { if ordered_cleaning_timeout_nanos > 0 { let max_processed_lt = self.get_max_processed_lt_from_queue_info(); - let mut clean_timeout_check = 50_000_000; - let max_clean_timeout_check = 550_000_000; - partial = out_msg_queue_cleaner::hashmap_filter_ordered_by_lt_hash( &mut queue, max_processed_lt, ordered_cleaning_timeout_nanos, |node_obj| { if block_full { - log::debug!("{}: BLOCK FULL when ordered cleaning output queue, cleanup is partial", self.block_descr); + log::debug!("{}: BLOCK FULL (>= Soft) when ordered cleaning output queue, cleanup is partial", self.block_descr); partial = true; return Ok(HashmapFilterResult::Stop); } - let elapsed_nanos = timer.elapsed().as_nanos() as i128; - if clean_timeout_check <= max_clean_timeout_check && elapsed_nanos >= clean_timeout_check { - log::debug!( - "{}: clean_out_msg_queue: ordered cleaning time elapsed {} nanos: processed = {}, deleted = {}, skipped = {}", - self.block_descr, elapsed_nanos, deleted + skipped, deleted, skipped, - ); - clean_timeout_check += 50_000_000; - } - let lt = node_obj.lt(); let mut data_and_refs = node_obj.data_and_refs()?; let enq = MsgEnqueueStuff::construct_from(&mut data_and_refs, lt)?; @@ -1236,28 +1225,15 @@ impl MsgQueueManager { let random_clean_timer = std::time::Instant::now(); - let mut clean_timeout_check = 50_000_000; - let max_clean_timeout_check = 550_000_000; - queue.hashmap_filter(|_key, mut slice| { if block_full { - log::debug!("{}: BLOCK FULL when random cleaning output queue, cleanup is partial", self.block_descr); + log::debug!("{}: BLOCK FULL (>= Soft) when random cleaning output queue, cleanup is partial", self.block_descr); partial = true; return Ok(HashmapFilterResult::Stop) } - let elapsed_nanos = random_clean_timer.elapsed().as_nanos() as i128; - - if clean_timeout_check <= max_clean_timeout_check && elapsed_nanos >= clean_timeout_check { - log::debug!( - "{}: clean_out_msg_queue: random cleaning time elapsed {} nanos: processed = {}, deleted = {}, skipped = {}", - self.block_descr, elapsed_nanos, - random_deleted + random_skipped, random_deleted, random_skipped, - ); - clean_timeout_check += 50_000_000; - } - // stop when reached the time limit + let elapsed_nanos = random_clean_timer.elapsed().as_nanos() as i128; if elapsed_nanos >= random_cleaning_timeout_nanos { log::debug!( "{}: clean_out_msg_queue: stopped random cleaning output queue because of time elapsed {} nanos >= {} nanos limit", diff --git a/src/validator/out_msg_queue_cleaner.rs b/src/validator/out_msg_queue_cleaner.rs index 5192b0eb..efe67cf8 100644 --- a/src/validator/out_msg_queue_cleaner.rs +++ b/src/validator/out_msg_queue_cleaner.rs @@ -762,23 +762,6 @@ impl HashmapOrderedFilterCursor { } } - // stop pocessing when max_lt reached - #[cfg(not(feature = "only_sorted_clean"))] - if current.node_obj_ref().lt() == self.max_lt { - log::debug!( - "clean_out_msg_queue: hop {}: stop processing when current node (bottom_bit_len = {}, key = {}) lt {} == max_lt {}, elapsed = {} nanos", - self.hops_counter, - current.bottom_bit_len(), - current.node_obj_ref().key_hex(), - current.node_obj_ref().lt(), - self.max_lt, - self.timer.elapsed().as_nanos(), - ); - - self.stop_processing = true; - self.stopped_by_max_lt = true; - } - current.is_processed = true; self.processed_count += 1; @@ -1263,7 +1246,8 @@ where cursor_creation_elapsed, ); - let partial = filter_cursor.stop_processing | filter_cursor.cancel_processing; + let partial = (filter_cursor.stop_processing | filter_cursor.cancel_processing) + && !filter_cursor.stopped_by_max_lt; Ok(partial) } diff --git a/src/validator/validator_manager.rs b/src/validator/validator_manager.rs index 375ba0e5..02d4e794 100644 --- a/src/validator/validator_manager.rs +++ b/src/validator/validator_manager.rs @@ -931,6 +931,10 @@ impl ValidatorManagerImpl { } } else { log::trace!(target: "validator_manager", "We are not in subset for {}", ident); + + // push zero metrics after restart for shards which are not validated by current node + #[cfg(feature = "log_metrics")] + super::collator::CollatorMetrics::report_zero_metrics(ident.clone()); } log::trace!(target: "validator_manager", "Session {} started (if necessary)", ident); } diff --git a/src/validator/validator_session_listener.rs b/src/validator/validator_session_listener.rs index 56d74505..10f55841 100644 --- a/src/validator/validator_session_listener.rs +++ b/src/validator/validator_session_listener.rs @@ -356,4 +356,8 @@ pub async fn process_validation_queue( } } log::info!(target: "validator", "({}): Exiting from validation queue processing: {}", g.get_next_block_descr().await, g.info().await); + + //push zero metrics when node finished processing validation queue + #[cfg(feature = "log_metrics")] + super::collator::CollatorMetrics::report_zero_metrics(g.shard().clone()); } diff --git a/storage/Cargo.toml b/storage/Cargo.toml index ebe24f26..28702564 100644 --- a/storage/Cargo.toml +++ b/storage/Cargo.toml @@ -22,10 +22,10 @@ serde_derive = '1.0.114' strum = '0.18.0' strum_macros = '0.18.0' tokio = { features = [ 'fs', 'rt-multi-thread' ], version = '1.5' } -adnl = { git = 'https://github.com/tonlabs/ever-adnl.git', tag = '0.9.15' } +adnl = { git = 'https://github.com/tonlabs/ever-adnl.git', tag = '0.9.18' } lockfree = { git = 'https://github.com/tonlabs/lockfree.git' } -ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.53' } -ton_block = { git = 'https://github.com/tonlabs/ever-block.git', tag = '1.9.117' } +ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.55' } +ton_block = { git = 'https://github.com/tonlabs/ever-block.git', tag = '1.9.119' } ton_types = { git = 'https://github.com/tonlabs/ever-types.git', tag = '2.0.31' } [build-dependencies] diff --git a/validator-session/Cargo.toml b/validator-session/Cargo.toml index ac00a422..fccbf941 100644 --- a/validator-session/Cargo.toml +++ b/validator-session/Cargo.toml @@ -16,10 +16,10 @@ metrics = '0.21.0' metrics-core = '0.5' rand = '0.8' catchain = { path = '../catchain' } -overlay = { git = 'https://github.com/tonlabs/ever-overlay.git', tag = '0.7.11' } +overlay = { git = 'https://github.com/tonlabs/ever-overlay.git', tag = '0.7.14' } storage = { path = '../storage' } -ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.53' } -ton_block = { git = 'https://github.com/tonlabs/ever-block.git', tag = '1.9.117' } +ton_api = { git = 'https://github.com/tonlabs/ever-tl.git', package = 'ton_api', tag = '0.3.55' } +ton_block = { git = 'https://github.com/tonlabs/ever-block.git', tag = '1.9.119' } ton_types = { git = 'https://github.com/tonlabs/ever-types.git', tag = '2.0.31' } [features]