From 21b5109af24aae90ef429bbcff0c4f5082fa777b Mon Sep 17 00:00:00 2001 From: Andrea Date: Fri, 3 Jan 2025 10:37:36 +0100 Subject: [PATCH 1/4] fix(resharding): do not fail flat storage resharding if account id does not belong to parent (#12675) If `account_id` of a key does not map to any child (and thus does not belong to the parent in the first place), just log a warning and avoid failing the entire operation. --- chain/chain/src/flat_storage_resharder.rs | 68 +++++++++++++++++++++-- 1 file changed, 64 insertions(+), 4 deletions(-) diff --git a/chain/chain/src/flat_storage_resharder.rs b/chain/chain/src/flat_storage_resharder.rs index 47e78ca8ebc..a362fbb5a3c 100644 --- a/chain/chain/src/flat_storage_resharder.rs +++ b/chain/chain/src/flat_storage_resharder.rs @@ -7,7 +7,7 @@ use std::sync::{Arc, Mutex}; use near_chain_configs::{MutableConfigValue, ReshardingConfig, ReshardingHandle}; use near_chain_primitives::Error; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use crate::resharding::event_type::{ReshardingEventType, ReshardingSplitShardParams}; use crate::resharding::types::{ @@ -1025,9 +1025,10 @@ fn copy_kv_to_child( // Sanity check we are truly writing to one of the expected children shards. if new_shard_uid != *left_child_shard && new_shard_uid != *right_child_shard { - let err_msg = "account id doesn't map to any child shard!"; - error!(target: "resharding", ?new_shard_uid, ?left_child_shard, ?right_child_shard, ?shard_layout, ?account_id, err_msg); - return Err(Error::ReshardingError(err_msg.to_string())); + let err_msg = "account id doesn't map to any child shard! - skipping it"; + warn!(target: "resharding", ?new_shard_uid, ?left_child_shard, ?right_child_shard, ?shard_layout, ?account_id, err_msg); + // Do not fail resharding. Just skip this entry. + return Ok(()); } // Add the new flat store entry. store_update.set(new_shard_uid, key, value); @@ -2561,4 +2562,63 @@ mod tests { Ok(FlatStorageStatus::Ready(FlatStorageReadyStatus { flat_head })) ); } + + /// This test asserts that resharding doesn't fail if flat storage iteration goes over an account + /// which is not part of any children shards after the shard layout changes. + #[test] + fn unrelated_account_do_not_fail_splitting() { + init_test_logger(); + let (mut chain, resharder, sender) = + create_chain_resharder_sender::(simple_shard_layout()); + let new_shard_layout = shard_layout_after_split(); + let resharding_event_type = event_type_from_chain_and_layout(&chain, &new_shard_layout); + let ReshardingSplitShardParams { + parent_shard, left_child_shard, right_child_shard, .. + } = match resharding_event_type.clone() { + ReshardingEventType::SplitShard(params) => params, + }; + let flat_store = resharder.runtime.store().flat_store(); + + // Add two blocks on top of genesis. This will make the resharding block (height 0) final. + add_blocks_to_chain( + &mut chain, + 2, + PreviousBlockHeight::ChainHead, + NextBlockHeight::ChainHeadPlusOne, + ); + + // Inject an account which doesn't belong to the parent shard into its flat storage. + let mut store_update = flat_store.store_update(); + let test_value = Some(FlatStateValue::Inlined(vec![0])); + let key = TrieKey::Account { account_id: account!("ab") }; + store_update.set(parent_shard, key.to_vec(), test_value); + store_update.commit().unwrap(); + + // Perform resharding. + assert!(resharder.start_resharding(resharding_event_type, &new_shard_layout).is_ok()); + sender.call_split_shard_task(); + + // Check final status of parent flat storage. + let parent = ShardUId { version: 3, shard_id: 1 }; + assert_eq!(flat_store.get_flat_storage_status(parent), Ok(FlatStorageStatus::Empty)); + assert_eq!(flat_store.iter(parent).count(), 0); + assert!(resharder + .runtime + .get_flat_storage_manager() + .get_flat_storage_for_shard(parent) + .is_none()); + + // Check intermediate status of children flat storages. + // If children reached the catching up state, it means that the split task succeeded. + for child in [left_child_shard, right_child_shard] { + assert_eq!( + flat_store.get_flat_storage_status(child), + Ok(FlatStorageStatus::Resharding(FlatStorageReshardingStatus::CatchingUp( + chain.final_head().unwrap().into() + ))) + ); + // However, the unrelated account should not end up in any child. + assert!(flat_store.get(child, &key.to_vec()).is_ok_and(|val| val.is_none())); + } + } } From 035eff8232ac7a2628d24a8b821115efd5ccf8de Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Sat, 4 Jan 2025 12:48:24 +0200 Subject: [PATCH 2/4] . --- .../src/peer_manager/network_state/mod.rs | 92 +++++++++++++++++++ .../src/peer_manager/peer_manager_actor.rs | 27 ++++-- 2 files changed, 109 insertions(+), 10 deletions(-) diff --git a/chain/network/src/peer_manager/network_state/mod.rs b/chain/network/src/peer_manager/network_state/mod.rs index 90370ff2816..cda45d4d4b9 100644 --- a/chain/network/src/peer_manager/network_state/mod.rs +++ b/chain/network/src/peer_manager/network_state/mod.rs @@ -35,6 +35,7 @@ use crate::types::{ ChainInfo, PeerManagerSenderForNetwork, PeerType, ReasonForBan, StatePartRequestBody, Tier3Request, Tier3RequestBody, }; +use actix::ArbiterHandle; use anyhow::Context; use arc_swap::ArcSwap; use near_async::messaging::{CanSend, SendAsync, Sender}; @@ -698,6 +699,97 @@ impl NetworkState { success } + /// Send message to specific account. + /// Return whether the message is sent or not. + /// The message might be sent over TIER1 or TIER2 connection depending on the message type. + pub fn send_message_to_account_with_arbiter( + self: &Arc, + clock: &time::Clock, + account_id: &AccountId, + msg: RoutedMessageBody, + arbiter: ArbiterHandle, + ) -> bool { + // If the message is allowed to be sent to self, we handle it directly. + if self.config.validator.account_id().is_some_and(|id| &id == account_id) { + // For now, we don't allow some types of messages to be sent to self. + debug_assert!(msg.allow_sending_to_self()); + let this = self.clone(); + let clock = clock.clone(); + let peer_id = self.config.node_id(); + let msg = self.sign_message( + &clock, + RawRoutedMessage { target: PeerIdOrHash::PeerId(peer_id.clone()), body: msg }, + ); + arbiter.spawn(async move { + this.receive_routed_message(&clock, peer_id, msg.hash(), msg.msg.body).await; + }); + return true; + } + + let accounts_data = self.accounts_data.load(); + if tcp::Tier::T1.is_allowed_routed(&msg) { + for key in accounts_data.keys_by_id.get(account_id).iter().flat_map(|keys| keys.iter()) + { + let data = match accounts_data.data.get(key) { + Some(data) => data, + None => continue, + }; + let conn = match self.get_tier1_proxy(data) { + Some(conn) => conn, + None => continue, + }; + // TODO(gprusak): in case of PartialEncodedChunk, consider stripping everything + // but the header. This will bound the message size + conn.send_message(Arc::new(PeerMessage::Routed(self.sign_message( + clock, + RawRoutedMessage { + target: PeerIdOrHash::PeerId(data.peer_id.clone()), + body: msg, + }, + )))); + return true; + } + } + + let peer_id_from_account_data = accounts_data + .keys_by_id + .get(account_id) + .iter() + .flat_map(|keys| keys.iter()) + .flat_map(|key| accounts_data.data.get(key)) + .next() + .map(|data| data.peer_id.clone()); + // Find the target peer_id: + // - first look it up in self.accounts_data + // - if missing, fall back to lookup in self.graph.routing_table + // We want to deprecate self.graph.routing_table.account_owner in the next release. + let target = if let Some(peer_id) = peer_id_from_account_data { + metrics::ACCOUNT_TO_PEER_LOOKUPS.with_label_values(&["AccountData"]).inc(); + peer_id + } else if let Some(peer_id) = self.account_announcements.get_account_owner(account_id) { + metrics::ACCOUNT_TO_PEER_LOOKUPS.with_label_values(&["AnnounceAccount"]).inc(); + peer_id + } else { + // TODO(MarX, #1369): Message is dropped here. Define policy for this case. + metrics::MessageDropped::UnknownAccount.inc(&msg); + tracing::debug!(target: "network", + account_id = ?self.config.validator.account_id(), + to = ?account_id, + ?msg,"Drop message: unknown account", + ); + tracing::trace!(target: "network", known_peers = ?self.account_announcements.get_accounts_keys(), "Known peers"); + return false; + }; + + let mut success = false; + let msg = RawRoutedMessage { target: PeerIdOrHash::PeerId(target), body: msg }; + let msg = self.sign_message(clock, msg); + for _ in 0..msg.body.message_resend_count() { + success |= self.send_message_to_peer(clock, tcp::Tier::T2, msg.clone()); + } + success + } + pub async fn receive_routed_message( self: &Arc, clock: &time::Clock, diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 1f4a00cf8d8..09ad567abae 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -40,6 +40,7 @@ use network_protocol::MAX_SHARDS_PER_SNAPSHOT_HOST_INFO; use rand::seq::{IteratorRandom, SliceRandom}; use rand::thread_rng; use rand::Rng; +use rayon::iter::{IntoParallelIterator, ParallelIterator}; use std::cmp::min; use std::collections::HashSet; use std::sync::atomic::Ordering; @@ -1070,28 +1071,34 @@ impl PeerManagerActor { NetworkResponses::NoResponse } NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple) => { - for (chunk_validator, partial_witness) in validator_witness_tuple { - self.state.send_message_to_account( - &self.clock, - &chunk_validator, - RoutedMessageBody::PartialEncodedStateWitness(partial_witness), - ); - } + let arbiter = actix::Arbiter::current(); + validator_witness_tuple.into_par_iter().for_each( + |(chunk_validator, partial_witness)| { + self.state.send_message_to_account_with_arbiter( + &self.clock, + &chunk_validator, + RoutedMessageBody::PartialEncodedStateWitness(partial_witness), + arbiter.clone(), + ); + }, + ); NetworkResponses::NoResponse } NetworkRequests::PartialEncodedStateWitnessForward( chunk_validators, partial_witness, ) => { - for chunk_validator in chunk_validators { - self.state.send_message_to_account( + let arbiter = actix::Arbiter::current(); + chunk_validators.into_par_iter().for_each(|chunk_validator| { + self.state.send_message_to_account_with_arbiter( &self.clock, &chunk_validator, RoutedMessageBody::PartialEncodedStateWitnessForward( partial_witness.clone(), ), + arbiter.clone(), ); - } + }); NetworkResponses::NoResponse } NetworkRequests::EpochSyncRequest { peer_id } => { From 9444962ee8a540f02602247a4a3db354a9e77fb3 Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Sat, 4 Jan 2025 12:56:17 +0200 Subject: [PATCH 3/4] parallel creation --- .../partial_witness/partial_witness_actor.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index 191607df2fc..f1e52f2f3fe 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -39,6 +39,7 @@ use near_store::adapter::trie_store::TrieStoreAdapter; use near_store::{DBCol, StorageError, TrieDBStorage, TrieStorage}; use near_vm_runner::{get_contract_cache_key, ContractCode, ContractRuntimeCache}; use rand::Rng; +use rayon::{iter::ParallelIterator, prelude::*}; use crate::client_actor::ClientSenderForPartialWitness; use crate::metrics; @@ -267,9 +268,10 @@ impl PartialWitnessActor { let encoder = self.witness_encoders.entry(chunk_validators.len()); let (parts, encoded_length) = encoder.encode(&witness_bytes); + let mut generated_parts = vec![]; chunk_validators - .iter() - .zip_eq(parts) + .par_iter() + .zip_eq(parts.into_par_iter()) .enumerate() .map(|(part_ord, (chunk_validator, part))| { // It's fine to unwrap part here as we just constructed the parts above and we expect @@ -278,13 +280,15 @@ impl PartialWitnessActor { epoch_id, chunk_header.clone(), part_ord, - part.unwrap().to_vec(), + part.unwrap().into_vec(), encoded_length, signer, ); (chunk_validator.clone(), partial_witness) }) - .collect_vec() + .collect_into_vec(&mut generated_parts); + + generated_parts } fn generate_contract_deploys_parts( @@ -596,7 +600,7 @@ impl PartialWitnessActor { /// Sends the contract accesses to the same chunk validators /// (except for the chunk producers that track the same shard), - /// which will receive the state witness for the new chunk. + /// which will receive the state witness for the new chunk. fn send_contract_accesses_to_chunk_validators( &self, key: ChunkProductionKey, From c085f2ab745a9e25db62b5a9fcbbbdf74724bd02 Mon Sep 17 00:00:00 2001 From: Stefan Neamtu Date: Mon, 6 Jan 2025 14:46:38 +0200 Subject: [PATCH 4/4] peer manager computation spawner --- Cargo.lock | 2 + chain/client/src/metrics.rs | 19 +++---- .../partial_witness/partial_witness_actor.rs | 36 ++++++------- chain/client/src/test_utils/setup.rs | 14 +++-- chain/network/Cargo.toml | 39 +++++++------- .../src/peer_manager/peer_manager_actor.rs | 54 +++++++++++-------- chain/network/src/peer_manager/testonly.rs | 2 + chain/network/src/test_loop.rs | 12 ++--- chain/network/src/types.rs | 2 +- .../src/tests/network/peer_handshake.rs | 2 + integration-tests/src/tests/network/runner.rs | 1 + .../src/tests/network/stress_network.rs | 2 + nearcore/src/lib.rs | 1 + tools/chainsync-loadtest/Cargo.toml | 1 + tools/chainsync-loadtest/src/main.rs | 2 + 15 files changed, 103 insertions(+), 86 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c2a49977688..bea1fa8fdbe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1293,6 +1293,7 @@ dependencies = [ "futures", "log", "near-async", + "near-chain", "near-chain-configs", "near-config-utils", "near-crypto", @@ -4752,6 +4753,7 @@ dependencies = [ "itertools 0.12.1", "lru 0.12.3", "near-async", + "near-chain", "near-chain-configs", "near-crypto", "near-fmt", diff --git a/chain/client/src/metrics.rs b/chain/client/src/metrics.rs index 971225cc790..5149e2671ac 100644 --- a/chain/client/src/metrics.rs +++ b/chain/client/src/metrics.rs @@ -638,15 +638,16 @@ pub(crate) static BLOCK_PRODUCER_MISSING_ENDORSEMENT_COUNT: LazyLock = LazyLock::new(|| { - try_create_histogram_vec( - "near_partial_witness_encode_time", - "Partial state witness generation from encoded state witness time in seconds", - &["shard_id"], - Some(linear_buckets(0.0, 0.005, 20).unwrap()), - ) - .unwrap() -}); +pub(crate) static PARTIAL_WITNESS_ENCODE_AND_SEND_TIME: LazyLock = + LazyLock::new(|| { + try_create_histogram_vec( + "near_partial_witness_encode_and_send_time", + "Partial state witness generation from encoded state witness time in seconds", + &["shard_id"], + Some(linear_buckets(0.0, 0.005, 20).unwrap()), + ) + .unwrap() + }); pub(crate) static PARTIAL_WITNESS_TIME_TO_LAST_PART: LazyLock = LazyLock::new(|| { try_create_histogram_vec( diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index f1e52f2f3fe..e117c597495 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -249,14 +249,14 @@ impl PartialWitnessActor { } // Function to generate the parts of the state witness and return them as a tuple of chunk_validator and part. - fn generate_state_witness_parts( + fn generate_and_send_state_witness_parts( &mut self, epoch_id: EpochId, chunk_header: &ShardChunkHeader, witness_bytes: EncodedChunkStateWitness, chunk_validators: &[AccountId], signer: &ValidatorSigner, - ) -> Vec<(AccountId, PartialEncodedStateWitness)> { + ) { tracing::debug!( target: "client", chunk_hash=?chunk_header.chunk_hash(), @@ -268,12 +268,8 @@ impl PartialWitnessActor { let encoder = self.witness_encoders.entry(chunk_validators.len()); let (parts, encoded_length) = encoder.encode(&witness_bytes); - let mut generated_parts = vec![]; - chunk_validators - .par_iter() - .zip_eq(parts.into_par_iter()) - .enumerate() - .map(|(part_ord, (chunk_validator, part))| { + chunk_validators.par_iter().zip_eq(parts.into_par_iter()).enumerate().for_each( + |(part_ord, (chunk_validator, part))| { // It's fine to unwrap part here as we just constructed the parts above and we expect // all of them to be present. let partial_witness = PartialEncodedStateWitness::new( @@ -284,11 +280,16 @@ impl PartialWitnessActor { encoded_length, signer, ); - (chunk_validator.clone(), partial_witness) - }) - .collect_into_vec(&mut generated_parts); - generated_parts + // Send the parts to the corresponding chunk validator owners. + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::PartialEncodedStateWitness( + chunk_validator.clone(), + partial_witness, + ), + )); + }, + ); } fn generate_contract_deploys_parts( @@ -343,10 +344,10 @@ impl PartialWitnessActor { // Record time taken to encode the state witness parts. let shard_id_label = chunk_header.shard_id().to_string(); - let encode_timer = metrics::PARTIAL_WITNESS_ENCODE_TIME + let encode_timer = metrics::PARTIAL_WITNESS_ENCODE_AND_SEND_TIME .with_label_values(&[shard_id_label.as_str()]) .start_timer(); - let validator_witness_tuple = self.generate_state_witness_parts( + self.generate_and_send_state_witness_parts( epoch_id, chunk_header, witness_bytes, @@ -360,13 +361,8 @@ impl PartialWitnessActor { self.state_witness_tracker.record_witness_sent( chunk_hash, witness_size_in_bytes, - validator_witness_tuple.len(), + chunk_validators.len(), ); - - // Send the parts to the corresponding chunk validator owners. - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple), - )); } /// Sends the witness part to the chunk validators, except the chunk producer that generated the witness part. diff --git a/chain/client/src/test_utils/setup.rs b/chain/client/src/test_utils/setup.rs index 22ca09bde99..4525e33f737 100644 --- a/chain/client/src/test_utils/setup.rs +++ b/chain/client/src/test_utils/setup.rs @@ -756,14 +756,12 @@ fn process_peer_manager_message_default( } } } - NetworkRequests::PartialEncodedStateWitness(partial_witnesses) => { - for (account, partial_witness) in partial_witnesses { - for (i, name) in validators.iter().enumerate() { - if name == account { - connectors[i] - .partial_witness_sender - .send(PartialEncodedStateWitnessMessage(partial_witness.clone())); - } + NetworkRequests::PartialEncodedStateWitness(account, partial_witness) => { + for (i, name) in validators.iter().enumerate() { + if name == account { + connectors[i] + .partial_witness_sender + .send(PartialEncodedStateWitnessMessage(partial_witness.clone())); } } } diff --git a/chain/network/Cargo.toml b/chain/network/Cargo.toml index ce5fa66a9b8..7c7a4884f03 100644 --- a/chain/network/Cargo.toml +++ b/chain/network/Cargo.toml @@ -63,6 +63,7 @@ near-store.workspace = true near-schema-checker-lib.workspace = true [dev-dependencies] +near-chain.workspace = true assert_matches.workspace = true bolero.workspace = true criterion.workspace = true @@ -76,29 +77,27 @@ serde_json.workspace = true [features] nightly_protocol = [ - "near-async/nightly_protocol", - "near-chain-configs/nightly_protocol", - "near-fmt/nightly_protocol", - "near-o11y/nightly_protocol", - "near-primitives/nightly_protocol", - "near-store/nightly_protocol", + "near-async/nightly_protocol", + "near-chain-configs/nightly_protocol", + "near-fmt/nightly_protocol", + "near-o11y/nightly_protocol", + "near-primitives/nightly_protocol", + "near-store/nightly_protocol", ] nightly = [ - "near-async/nightly", - "near-chain-configs/nightly", - "near-fmt/nightly", - "near-o11y/nightly", - "near-primitives/nightly", - "near-store/nightly", - "nightly_protocol", -] -performance_stats = [ - "near-performance-metrics/performance_stats", + "near-async/nightly", + "near-chain-configs/nightly", + "near-fmt/nightly", + "near-o11y/nightly", + "near-primitives/nightly", + "near-store/nightly", + "nightly_protocol", ] +performance_stats = ["near-performance-metrics/performance_stats"] test_features = [] protocol_schema = [ - "near-schema-checker-lib/protocol_schema", - "near-crypto/protocol_schema", - "near-primitives/protocol_schema", - "near-store/protocol_schema", + "near-schema-checker-lib/protocol_schema", + "near-crypto/protocol_schema", + "near-primitives/protocol_schema", + "near-store/protocol_schema", ] diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 09ad567abae..c30133d475f 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -26,6 +26,7 @@ use ::time::ext::InstantExt as _; use actix::fut::future::wrap_future; use actix::{Actor as _, AsyncContext as _}; use anyhow::Context as _; +use near_async::futures::{AsyncComputationSpawner, AsyncComputationSpawnerExt}; use near_async::messaging::{SendAsync, Sender}; use near_async::time; use near_o11y::{handler_debug_span, handler_trace_span, WithSpanContext}; @@ -102,6 +103,8 @@ pub struct PeerManagerActor { /// State that is shared between multiple threads (including PeerActors). pub(crate) state: Arc, + + rayon_spawner: Arc, } /// TEST-ONLY @@ -219,6 +222,7 @@ impl PeerManagerActor { shards_manager_adapter: Sender, partial_witness_adapter: PartialWitnessSenderForNetwork, genesis_id: GenesisId, + rayon_spawner: Arc, ) -> anyhow::Result> { let config = config.verify().context("config")?; let store = store::Store::from(store); @@ -353,6 +357,7 @@ impl PeerManagerActor { started_connect_attempts: false, state, clock, + rayon_spawner, })) } @@ -1070,18 +1075,18 @@ impl PeerManagerActor { ); NetworkResponses::NoResponse } - NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple) => { + NetworkRequests::PartialEncodedStateWitness(chunk_validator, partial_witness) => { let arbiter = actix::Arbiter::current(); - validator_witness_tuple.into_par_iter().for_each( - |(chunk_validator, partial_witness)| { - self.state.send_message_to_account_with_arbiter( - &self.clock, - &chunk_validator, - RoutedMessageBody::PartialEncodedStateWitness(partial_witness), - arbiter.clone(), - ); - }, - ); + let state = self.state.clone(); + let clock = self.clock.clone(); + self.rayon_spawner.spawn("Peer_Manager_PartialEncodedStateWitness", move || { + state.send_message_to_account_with_arbiter( + &clock, + &chunk_validator, + RoutedMessageBody::PartialEncodedStateWitness(partial_witness), + arbiter, + ); + }); NetworkResponses::NoResponse } NetworkRequests::PartialEncodedStateWitnessForward( @@ -1089,16 +1094,23 @@ impl PeerManagerActor { partial_witness, ) => { let arbiter = actix::Arbiter::current(); - chunk_validators.into_par_iter().for_each(|chunk_validator| { - self.state.send_message_to_account_with_arbiter( - &self.clock, - &chunk_validator, - RoutedMessageBody::PartialEncodedStateWitnessForward( - partial_witness.clone(), - ), - arbiter.clone(), - ); - }); + let state = self.state.clone(); + let clock = self.clock.clone(); + self.rayon_spawner.spawn( + "Peer_Manager_PartialEncodedStateWitnessForward", + move || { + chunk_validators.into_par_iter().for_each(|chunk_validator| { + state.send_message_to_account_with_arbiter( + &clock, + &chunk_validator, + RoutedMessageBody::PartialEncodedStateWitnessForward( + partial_witness.clone(), + ), + arbiter.clone(), + ); + }); + }, + ); NetworkResponses::NoResponse } NetworkRequests::EpochSyncRequest { peer_id } => { diff --git a/chain/network/src/peer_manager/testonly.rs b/chain/network/src/peer_manager/testonly.rs index 44989ca0b67..64018d11464 100644 --- a/chain/network/src/peer_manager/testonly.rs +++ b/chain/network/src/peer_manager/testonly.rs @@ -30,6 +30,7 @@ use crate::types::{ }; use crate::PeerManagerActor; use futures::FutureExt; +use near_async::futures::StdThreadAsyncComputationSpawnerForTest; use near_async::messaging::IntoMultiSender; use near_async::messaging::Sender; use near_async::time; @@ -656,6 +657,7 @@ pub(crate) async fn start( shards_manager_sender, state_witness_sender.break_apart().into_multi_sender(), genesis_id, + Arc::new(StdThreadAsyncComputationSpawnerForTest), ) .unwrap() } diff --git a/chain/network/src/test_loop.rs b/chain/network/src/test_loop.rs index 08e01385487..8cd5fb948c9 100644 --- a/chain/network/src/test_loop.rs +++ b/chain/network/src/test_loop.rs @@ -348,13 +348,11 @@ fn network_message_to_partial_witness_handler( None } - NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple) => { - for (target, partial_witness) in validator_witness_tuple.into_iter() { - shared_state - .senders_for_account(&target) - .partial_witness_sender - .send(PartialEncodedStateWitnessMessage(partial_witness)); - } + NetworkRequests::PartialEncodedStateWitness(target, partial_witness) => { + shared_state + .senders_for_account(&target) + .partial_witness_sender + .send(PartialEncodedStateWitnessMessage(partial_witness)); None } NetworkRequests::PartialEncodedStateWitnessForward(chunk_validators, partial_witness) => { diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 3b257f22c0b..9d1a5004c72 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -288,7 +288,7 @@ pub enum NetworkRequests { /// Message for a chunk endorsement, sent by a chunk validator to the block producer. ChunkEndorsement(AccountId, ChunkEndorsement), /// Message from chunk producer to set of chunk validators to send state witness part. - PartialEncodedStateWitness(Vec<(AccountId, PartialEncodedStateWitness)>), + PartialEncodedStateWitness(AccountId, PartialEncodedStateWitness), /// Message from chunk validator to all other chunk validators to forward state witness part. PartialEncodedStateWitnessForward(Vec, PartialEncodedStateWitness), /// Requests an epoch sync diff --git a/integration-tests/src/tests/network/peer_handshake.rs b/integration-tests/src/tests/network/peer_handshake.rs index 6b5a637260c..c5d67d15a19 100644 --- a/integration-tests/src/tests/network/peer_handshake.rs +++ b/integration-tests/src/tests/network/peer_handshake.rs @@ -24,6 +24,7 @@ fn make_peer_manager( peer_max_count: u32, ) -> actix::Addr { use near_async::messaging::{noop, IntoMultiSender, IntoSender}; + use near_chain::rayon_spawner::RayonAsyncComputationSpawner; let mut config = config::NetworkConfig::from_seed(seed, node_addr); config.peer_store.boot_nodes = convert_boot_nodes(boot_nodes); @@ -40,6 +41,7 @@ fn make_peer_manager( noop().into_sender(), noop().into_multi_sender(), GenesisId::default(), + Arc::new(RayonAsyncComputationSpawner), ) .unwrap() } diff --git a/integration-tests/src/tests/network/runner.rs b/integration-tests/src/tests/network/runner.rs index 8eecd5a410a..56ea484a4c1 100644 --- a/integration-tests/src/tests/network/runner.rs +++ b/integration-tests/src/tests/network/runner.rs @@ -159,6 +159,7 @@ fn setup_network_node( shards_manager_adapter.as_sender(), partial_witness_actor.with_auto_span_context().into_multi_sender(), genesis_id, + Arc::new(RayonAsyncComputationSpawner), ) .unwrap(); network_adapter.bind(peer_manager.clone().with_auto_span_context()); diff --git a/integration-tests/src/tests/network/stress_network.rs b/integration-tests/src/tests/network/stress_network.rs index b38ed815775..77155ca02c1 100644 --- a/integration-tests/src/tests/network/stress_network.rs +++ b/integration-tests/src/tests/network/stress_network.rs @@ -5,6 +5,7 @@ use std::time::Duration; use actix::{Actor, AsyncContext, System}; use futures::FutureExt; use near_async::messaging::{noop, IntoMultiSender, IntoSender}; +use near_chain::rayon_spawner::RayonAsyncComputationSpawner; use tracing::info; use near_actix_test_utils::run_actix; @@ -34,6 +35,7 @@ fn make_peer_manager( noop().into_sender(), noop().into_multi_sender(), GenesisId::default(), + Arc::new(RayonAsyncComputationSpawner), ) .unwrap() } diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 0b255586fa1..c4868f914fc 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -451,6 +451,7 @@ pub fn start_with_config_and_synchronization( shards_manager_adapter.as_sender(), partial_witness_actor.with_auto_span_context().into_multi_sender(), genesis_id, + Arc::new(RayonAsyncComputationSpawner), ) .context("PeerManager::spawn()")?; network_adapter.bind(network_actor.clone().with_auto_span_context()); diff --git a/tools/chainsync-loadtest/Cargo.toml b/tools/chainsync-loadtest/Cargo.toml index 53732875197..3b1617549de 100644 --- a/tools/chainsync-loadtest/Cargo.toml +++ b/tools/chainsync-loadtest/Cargo.toml @@ -29,6 +29,7 @@ rand.workspace = true time.workspace = true tokio.workspace = true +near-chain.workspace = true near-async.workspace = true near-chain-configs.workspace = true near-config-utils.workspace = true diff --git a/tools/chainsync-loadtest/src/main.rs b/tools/chainsync-loadtest/src/main.rs index db9de419661..2d13f495aab 100644 --- a/tools/chainsync-loadtest/src/main.rs +++ b/tools/chainsync-loadtest/src/main.rs @@ -9,6 +9,7 @@ use near_async::messaging::IntoMultiSender; use near_async::messaging::IntoSender; use near_async::messaging::LateBoundSender; use near_async::time; +use near_chain::rayon_spawner::RayonAsyncComputationSpawner; use near_chain_configs::Genesis; use near_chain_configs::MutableConfigValue; use near_network::concurrency::ctx; @@ -51,6 +52,7 @@ pub fn start_with_config(config: NearConfig, qps_limit: u32) -> anyhow::Result