diff --git a/Cargo.lock b/Cargo.lock index c2a49977688..bea1fa8fdbe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1293,6 +1293,7 @@ dependencies = [ "futures", "log", "near-async", + "near-chain", "near-chain-configs", "near-config-utils", "near-crypto", @@ -4752,6 +4753,7 @@ dependencies = [ "itertools 0.12.1", "lru 0.12.3", "near-async", + "near-chain", "near-chain-configs", "near-crypto", "near-fmt", diff --git a/chain/client/src/metrics.rs b/chain/client/src/metrics.rs index 971225cc790..5149e2671ac 100644 --- a/chain/client/src/metrics.rs +++ b/chain/client/src/metrics.rs @@ -638,15 +638,16 @@ pub(crate) static BLOCK_PRODUCER_MISSING_ENDORSEMENT_COUNT: LazyLock = LazyLock::new(|| { - try_create_histogram_vec( - "near_partial_witness_encode_time", - "Partial state witness generation from encoded state witness time in seconds", - &["shard_id"], - Some(linear_buckets(0.0, 0.005, 20).unwrap()), - ) - .unwrap() -}); +pub(crate) static PARTIAL_WITNESS_ENCODE_AND_SEND_TIME: LazyLock = + LazyLock::new(|| { + try_create_histogram_vec( + "near_partial_witness_encode_and_send_time", + "Partial state witness generation from encoded state witness time in seconds", + &["shard_id"], + Some(linear_buckets(0.0, 0.005, 20).unwrap()), + ) + .unwrap() + }); pub(crate) static PARTIAL_WITNESS_TIME_TO_LAST_PART: LazyLock = LazyLock::new(|| { try_create_histogram_vec( diff --git a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs index f1e52f2f3fe..e117c597495 100644 --- a/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs +++ b/chain/client/src/stateless_validation/partial_witness/partial_witness_actor.rs @@ -249,14 +249,14 @@ impl PartialWitnessActor { } // Function to generate the parts of the state witness and return them as a tuple of chunk_validator and part. - fn generate_state_witness_parts( + fn generate_and_send_state_witness_parts( &mut self, epoch_id: EpochId, chunk_header: &ShardChunkHeader, witness_bytes: EncodedChunkStateWitness, chunk_validators: &[AccountId], signer: &ValidatorSigner, - ) -> Vec<(AccountId, PartialEncodedStateWitness)> { + ) { tracing::debug!( target: "client", chunk_hash=?chunk_header.chunk_hash(), @@ -268,12 +268,8 @@ impl PartialWitnessActor { let encoder = self.witness_encoders.entry(chunk_validators.len()); let (parts, encoded_length) = encoder.encode(&witness_bytes); - let mut generated_parts = vec![]; - chunk_validators - .par_iter() - .zip_eq(parts.into_par_iter()) - .enumerate() - .map(|(part_ord, (chunk_validator, part))| { + chunk_validators.par_iter().zip_eq(parts.into_par_iter()).enumerate().for_each( + |(part_ord, (chunk_validator, part))| { // It's fine to unwrap part here as we just constructed the parts above and we expect // all of them to be present. let partial_witness = PartialEncodedStateWitness::new( @@ -284,11 +280,16 @@ impl PartialWitnessActor { encoded_length, signer, ); - (chunk_validator.clone(), partial_witness) - }) - .collect_into_vec(&mut generated_parts); - generated_parts + // Send the parts to the corresponding chunk validator owners. + self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::PartialEncodedStateWitness( + chunk_validator.clone(), + partial_witness, + ), + )); + }, + ); } fn generate_contract_deploys_parts( @@ -343,10 +344,10 @@ impl PartialWitnessActor { // Record time taken to encode the state witness parts. let shard_id_label = chunk_header.shard_id().to_string(); - let encode_timer = metrics::PARTIAL_WITNESS_ENCODE_TIME + let encode_timer = metrics::PARTIAL_WITNESS_ENCODE_AND_SEND_TIME .with_label_values(&[shard_id_label.as_str()]) .start_timer(); - let validator_witness_tuple = self.generate_state_witness_parts( + self.generate_and_send_state_witness_parts( epoch_id, chunk_header, witness_bytes, @@ -360,13 +361,8 @@ impl PartialWitnessActor { self.state_witness_tracker.record_witness_sent( chunk_hash, witness_size_in_bytes, - validator_witness_tuple.len(), + chunk_validators.len(), ); - - // Send the parts to the corresponding chunk validator owners. - self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests( - NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple), - )); } /// Sends the witness part to the chunk validators, except the chunk producer that generated the witness part. diff --git a/chain/client/src/test_utils/setup.rs b/chain/client/src/test_utils/setup.rs index 22ca09bde99..4525e33f737 100644 --- a/chain/client/src/test_utils/setup.rs +++ b/chain/client/src/test_utils/setup.rs @@ -756,14 +756,12 @@ fn process_peer_manager_message_default( } } } - NetworkRequests::PartialEncodedStateWitness(partial_witnesses) => { - for (account, partial_witness) in partial_witnesses { - for (i, name) in validators.iter().enumerate() { - if name == account { - connectors[i] - .partial_witness_sender - .send(PartialEncodedStateWitnessMessage(partial_witness.clone())); - } + NetworkRequests::PartialEncodedStateWitness(account, partial_witness) => { + for (i, name) in validators.iter().enumerate() { + if name == account { + connectors[i] + .partial_witness_sender + .send(PartialEncodedStateWitnessMessage(partial_witness.clone())); } } } diff --git a/chain/network/Cargo.toml b/chain/network/Cargo.toml index ce5fa66a9b8..7c7a4884f03 100644 --- a/chain/network/Cargo.toml +++ b/chain/network/Cargo.toml @@ -63,6 +63,7 @@ near-store.workspace = true near-schema-checker-lib.workspace = true [dev-dependencies] +near-chain.workspace = true assert_matches.workspace = true bolero.workspace = true criterion.workspace = true @@ -76,29 +77,27 @@ serde_json.workspace = true [features] nightly_protocol = [ - "near-async/nightly_protocol", - "near-chain-configs/nightly_protocol", - "near-fmt/nightly_protocol", - "near-o11y/nightly_protocol", - "near-primitives/nightly_protocol", - "near-store/nightly_protocol", + "near-async/nightly_protocol", + "near-chain-configs/nightly_protocol", + "near-fmt/nightly_protocol", + "near-o11y/nightly_protocol", + "near-primitives/nightly_protocol", + "near-store/nightly_protocol", ] nightly = [ - "near-async/nightly", - "near-chain-configs/nightly", - "near-fmt/nightly", - "near-o11y/nightly", - "near-primitives/nightly", - "near-store/nightly", - "nightly_protocol", -] -performance_stats = [ - "near-performance-metrics/performance_stats", + "near-async/nightly", + "near-chain-configs/nightly", + "near-fmt/nightly", + "near-o11y/nightly", + "near-primitives/nightly", + "near-store/nightly", + "nightly_protocol", ] +performance_stats = ["near-performance-metrics/performance_stats"] test_features = [] protocol_schema = [ - "near-schema-checker-lib/protocol_schema", - "near-crypto/protocol_schema", - "near-primitives/protocol_schema", - "near-store/protocol_schema", + "near-schema-checker-lib/protocol_schema", + "near-crypto/protocol_schema", + "near-primitives/protocol_schema", + "near-store/protocol_schema", ] diff --git a/chain/network/src/peer_manager/peer_manager_actor.rs b/chain/network/src/peer_manager/peer_manager_actor.rs index 09ad567abae..c30133d475f 100644 --- a/chain/network/src/peer_manager/peer_manager_actor.rs +++ b/chain/network/src/peer_manager/peer_manager_actor.rs @@ -26,6 +26,7 @@ use ::time::ext::InstantExt as _; use actix::fut::future::wrap_future; use actix::{Actor as _, AsyncContext as _}; use anyhow::Context as _; +use near_async::futures::{AsyncComputationSpawner, AsyncComputationSpawnerExt}; use near_async::messaging::{SendAsync, Sender}; use near_async::time; use near_o11y::{handler_debug_span, handler_trace_span, WithSpanContext}; @@ -102,6 +103,8 @@ pub struct PeerManagerActor { /// State that is shared between multiple threads (including PeerActors). pub(crate) state: Arc, + + rayon_spawner: Arc, } /// TEST-ONLY @@ -219,6 +222,7 @@ impl PeerManagerActor { shards_manager_adapter: Sender, partial_witness_adapter: PartialWitnessSenderForNetwork, genesis_id: GenesisId, + rayon_spawner: Arc, ) -> anyhow::Result> { let config = config.verify().context("config")?; let store = store::Store::from(store); @@ -353,6 +357,7 @@ impl PeerManagerActor { started_connect_attempts: false, state, clock, + rayon_spawner, })) } @@ -1070,18 +1075,18 @@ impl PeerManagerActor { ); NetworkResponses::NoResponse } - NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple) => { + NetworkRequests::PartialEncodedStateWitness(chunk_validator, partial_witness) => { let arbiter = actix::Arbiter::current(); - validator_witness_tuple.into_par_iter().for_each( - |(chunk_validator, partial_witness)| { - self.state.send_message_to_account_with_arbiter( - &self.clock, - &chunk_validator, - RoutedMessageBody::PartialEncodedStateWitness(partial_witness), - arbiter.clone(), - ); - }, - ); + let state = self.state.clone(); + let clock = self.clock.clone(); + self.rayon_spawner.spawn("Peer_Manager_PartialEncodedStateWitness", move || { + state.send_message_to_account_with_arbiter( + &clock, + &chunk_validator, + RoutedMessageBody::PartialEncodedStateWitness(partial_witness), + arbiter, + ); + }); NetworkResponses::NoResponse } NetworkRequests::PartialEncodedStateWitnessForward( @@ -1089,16 +1094,23 @@ impl PeerManagerActor { partial_witness, ) => { let arbiter = actix::Arbiter::current(); - chunk_validators.into_par_iter().for_each(|chunk_validator| { - self.state.send_message_to_account_with_arbiter( - &self.clock, - &chunk_validator, - RoutedMessageBody::PartialEncodedStateWitnessForward( - partial_witness.clone(), - ), - arbiter.clone(), - ); - }); + let state = self.state.clone(); + let clock = self.clock.clone(); + self.rayon_spawner.spawn( + "Peer_Manager_PartialEncodedStateWitnessForward", + move || { + chunk_validators.into_par_iter().for_each(|chunk_validator| { + state.send_message_to_account_with_arbiter( + &clock, + &chunk_validator, + RoutedMessageBody::PartialEncodedStateWitnessForward( + partial_witness.clone(), + ), + arbiter.clone(), + ); + }); + }, + ); NetworkResponses::NoResponse } NetworkRequests::EpochSyncRequest { peer_id } => { diff --git a/chain/network/src/peer_manager/testonly.rs b/chain/network/src/peer_manager/testonly.rs index 44989ca0b67..64018d11464 100644 --- a/chain/network/src/peer_manager/testonly.rs +++ b/chain/network/src/peer_manager/testonly.rs @@ -30,6 +30,7 @@ use crate::types::{ }; use crate::PeerManagerActor; use futures::FutureExt; +use near_async::futures::StdThreadAsyncComputationSpawnerForTest; use near_async::messaging::IntoMultiSender; use near_async::messaging::Sender; use near_async::time; @@ -656,6 +657,7 @@ pub(crate) async fn start( shards_manager_sender, state_witness_sender.break_apart().into_multi_sender(), genesis_id, + Arc::new(StdThreadAsyncComputationSpawnerForTest), ) .unwrap() } diff --git a/chain/network/src/test_loop.rs b/chain/network/src/test_loop.rs index 08e01385487..8cd5fb948c9 100644 --- a/chain/network/src/test_loop.rs +++ b/chain/network/src/test_loop.rs @@ -348,13 +348,11 @@ fn network_message_to_partial_witness_handler( None } - NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple) => { - for (target, partial_witness) in validator_witness_tuple.into_iter() { - shared_state - .senders_for_account(&target) - .partial_witness_sender - .send(PartialEncodedStateWitnessMessage(partial_witness)); - } + NetworkRequests::PartialEncodedStateWitness(target, partial_witness) => { + shared_state + .senders_for_account(&target) + .partial_witness_sender + .send(PartialEncodedStateWitnessMessage(partial_witness)); None } NetworkRequests::PartialEncodedStateWitnessForward(chunk_validators, partial_witness) => { diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index 3b257f22c0b..9d1a5004c72 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -288,7 +288,7 @@ pub enum NetworkRequests { /// Message for a chunk endorsement, sent by a chunk validator to the block producer. ChunkEndorsement(AccountId, ChunkEndorsement), /// Message from chunk producer to set of chunk validators to send state witness part. - PartialEncodedStateWitness(Vec<(AccountId, PartialEncodedStateWitness)>), + PartialEncodedStateWitness(AccountId, PartialEncodedStateWitness), /// Message from chunk validator to all other chunk validators to forward state witness part. PartialEncodedStateWitnessForward(Vec, PartialEncodedStateWitness), /// Requests an epoch sync diff --git a/integration-tests/src/tests/network/peer_handshake.rs b/integration-tests/src/tests/network/peer_handshake.rs index 6b5a637260c..c5d67d15a19 100644 --- a/integration-tests/src/tests/network/peer_handshake.rs +++ b/integration-tests/src/tests/network/peer_handshake.rs @@ -24,6 +24,7 @@ fn make_peer_manager( peer_max_count: u32, ) -> actix::Addr { use near_async::messaging::{noop, IntoMultiSender, IntoSender}; + use near_chain::rayon_spawner::RayonAsyncComputationSpawner; let mut config = config::NetworkConfig::from_seed(seed, node_addr); config.peer_store.boot_nodes = convert_boot_nodes(boot_nodes); @@ -40,6 +41,7 @@ fn make_peer_manager( noop().into_sender(), noop().into_multi_sender(), GenesisId::default(), + Arc::new(RayonAsyncComputationSpawner), ) .unwrap() } diff --git a/integration-tests/src/tests/network/runner.rs b/integration-tests/src/tests/network/runner.rs index 8eecd5a410a..56ea484a4c1 100644 --- a/integration-tests/src/tests/network/runner.rs +++ b/integration-tests/src/tests/network/runner.rs @@ -159,6 +159,7 @@ fn setup_network_node( shards_manager_adapter.as_sender(), partial_witness_actor.with_auto_span_context().into_multi_sender(), genesis_id, + Arc::new(RayonAsyncComputationSpawner), ) .unwrap(); network_adapter.bind(peer_manager.clone().with_auto_span_context()); diff --git a/integration-tests/src/tests/network/stress_network.rs b/integration-tests/src/tests/network/stress_network.rs index b38ed815775..77155ca02c1 100644 --- a/integration-tests/src/tests/network/stress_network.rs +++ b/integration-tests/src/tests/network/stress_network.rs @@ -5,6 +5,7 @@ use std::time::Duration; use actix::{Actor, AsyncContext, System}; use futures::FutureExt; use near_async::messaging::{noop, IntoMultiSender, IntoSender}; +use near_chain::rayon_spawner::RayonAsyncComputationSpawner; use tracing::info; use near_actix_test_utils::run_actix; @@ -34,6 +35,7 @@ fn make_peer_manager( noop().into_sender(), noop().into_multi_sender(), GenesisId::default(), + Arc::new(RayonAsyncComputationSpawner), ) .unwrap() } diff --git a/nearcore/src/lib.rs b/nearcore/src/lib.rs index 0b255586fa1..c4868f914fc 100644 --- a/nearcore/src/lib.rs +++ b/nearcore/src/lib.rs @@ -451,6 +451,7 @@ pub fn start_with_config_and_synchronization( shards_manager_adapter.as_sender(), partial_witness_actor.with_auto_span_context().into_multi_sender(), genesis_id, + Arc::new(RayonAsyncComputationSpawner), ) .context("PeerManager::spawn()")?; network_adapter.bind(network_actor.clone().with_auto_span_context()); diff --git a/tools/chainsync-loadtest/Cargo.toml b/tools/chainsync-loadtest/Cargo.toml index 53732875197..3b1617549de 100644 --- a/tools/chainsync-loadtest/Cargo.toml +++ b/tools/chainsync-loadtest/Cargo.toml @@ -29,6 +29,7 @@ rand.workspace = true time.workspace = true tokio.workspace = true +near-chain.workspace = true near-async.workspace = true near-chain-configs.workspace = true near-config-utils.workspace = true diff --git a/tools/chainsync-loadtest/src/main.rs b/tools/chainsync-loadtest/src/main.rs index db9de419661..2d13f495aab 100644 --- a/tools/chainsync-loadtest/src/main.rs +++ b/tools/chainsync-loadtest/src/main.rs @@ -9,6 +9,7 @@ use near_async::messaging::IntoMultiSender; use near_async::messaging::IntoSender; use near_async::messaging::LateBoundSender; use near_async::time; +use near_chain::rayon_spawner::RayonAsyncComputationSpawner; use near_chain_configs::Genesis; use near_chain_configs::MutableConfigValue; use near_network::concurrency::ctx; @@ -51,6 +52,7 @@ pub fn start_with_config(config: NearConfig, qps_limit: u32) -> anyhow::Result