Skip to content

Commit

Permalink
peer manager computation spawner
Browse files Browse the repository at this point in the history
  • Loading branch information
stedfn committed Jan 6, 2025
1 parent 9444962 commit c085f2a
Show file tree
Hide file tree
Showing 15 changed files with 103 additions and 86 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 10 additions & 9 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,15 +638,16 @@ pub(crate) static BLOCK_PRODUCER_MISSING_ENDORSEMENT_COUNT: LazyLock<HistogramVe
.unwrap()
});

pub(crate) static PARTIAL_WITNESS_ENCODE_TIME: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
"near_partial_witness_encode_time",
"Partial state witness generation from encoded state witness time in seconds",
&["shard_id"],
Some(linear_buckets(0.0, 0.005, 20).unwrap()),
)
.unwrap()
});
pub(crate) static PARTIAL_WITNESS_ENCODE_AND_SEND_TIME: LazyLock<HistogramVec> =
LazyLock::new(|| {
try_create_histogram_vec(
"near_partial_witness_encode_and_send_time",
"Partial state witness generation from encoded state witness time in seconds",
&["shard_id"],
Some(linear_buckets(0.0, 0.005, 20).unwrap()),
)
.unwrap()
});

pub(crate) static PARTIAL_WITNESS_TIME_TO_LAST_PART: LazyLock<HistogramVec> = LazyLock::new(|| {
try_create_histogram_vec(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,14 +249,14 @@ impl PartialWitnessActor {
}

// Function to generate the parts of the state witness and return them as a tuple of chunk_validator and part.
fn generate_state_witness_parts(
fn generate_and_send_state_witness_parts(
&mut self,
epoch_id: EpochId,
chunk_header: &ShardChunkHeader,
witness_bytes: EncodedChunkStateWitness,
chunk_validators: &[AccountId],
signer: &ValidatorSigner,
) -> Vec<(AccountId, PartialEncodedStateWitness)> {
) {
tracing::debug!(
target: "client",
chunk_hash=?chunk_header.chunk_hash(),
Expand All @@ -268,12 +268,8 @@ impl PartialWitnessActor {
let encoder = self.witness_encoders.entry(chunk_validators.len());
let (parts, encoded_length) = encoder.encode(&witness_bytes);

let mut generated_parts = vec![];
chunk_validators
.par_iter()
.zip_eq(parts.into_par_iter())
.enumerate()
.map(|(part_ord, (chunk_validator, part))| {
chunk_validators.par_iter().zip_eq(parts.into_par_iter()).enumerate().for_each(
|(part_ord, (chunk_validator, part))| {
// It's fine to unwrap part here as we just constructed the parts above and we expect
// all of them to be present.
let partial_witness = PartialEncodedStateWitness::new(
Expand All @@ -284,11 +280,16 @@ impl PartialWitnessActor {
encoded_length,
signer,
);
(chunk_validator.clone(), partial_witness)
})
.collect_into_vec(&mut generated_parts);

generated_parts
// Send the parts to the corresponding chunk validator owners.
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitness(
chunk_validator.clone(),
partial_witness,
),
));
},
);
}

fn generate_contract_deploys_parts(
Expand Down Expand Up @@ -343,10 +344,10 @@ impl PartialWitnessActor {

// Record time taken to encode the state witness parts.
let shard_id_label = chunk_header.shard_id().to_string();
let encode_timer = metrics::PARTIAL_WITNESS_ENCODE_TIME
let encode_timer = metrics::PARTIAL_WITNESS_ENCODE_AND_SEND_TIME
.with_label_values(&[shard_id_label.as_str()])
.start_timer();
let validator_witness_tuple = self.generate_state_witness_parts(
self.generate_and_send_state_witness_parts(
epoch_id,
chunk_header,
witness_bytes,
Expand All @@ -360,13 +361,8 @@ impl PartialWitnessActor {
self.state_witness_tracker.record_witness_sent(
chunk_hash,
witness_size_in_bytes,
validator_witness_tuple.len(),
chunk_validators.len(),
);

// Send the parts to the corresponding chunk validator owners.
self.network_adapter.send(PeerManagerMessageRequest::NetworkRequests(
NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple),
));
}

/// Sends the witness part to the chunk validators, except the chunk producer that generated the witness part.
Expand Down
14 changes: 6 additions & 8 deletions chain/client/src/test_utils/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,14 +756,12 @@ fn process_peer_manager_message_default(
}
}
}
NetworkRequests::PartialEncodedStateWitness(partial_witnesses) => {
for (account, partial_witness) in partial_witnesses {
for (i, name) in validators.iter().enumerate() {
if name == account {
connectors[i]
.partial_witness_sender
.send(PartialEncodedStateWitnessMessage(partial_witness.clone()));
}
NetworkRequests::PartialEncodedStateWitness(account, partial_witness) => {
for (i, name) in validators.iter().enumerate() {
if name == account {
connectors[i]
.partial_witness_sender
.send(PartialEncodedStateWitnessMessage(partial_witness.clone()));
}
}
}
Expand Down
39 changes: 19 additions & 20 deletions chain/network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ near-store.workspace = true
near-schema-checker-lib.workspace = true

[dev-dependencies]
near-chain.workspace = true
assert_matches.workspace = true
bolero.workspace = true
criterion.workspace = true
Expand All @@ -76,29 +77,27 @@ serde_json.workspace = true

[features]
nightly_protocol = [
"near-async/nightly_protocol",
"near-chain-configs/nightly_protocol",
"near-fmt/nightly_protocol",
"near-o11y/nightly_protocol",
"near-primitives/nightly_protocol",
"near-store/nightly_protocol",
"near-async/nightly_protocol",
"near-chain-configs/nightly_protocol",
"near-fmt/nightly_protocol",
"near-o11y/nightly_protocol",
"near-primitives/nightly_protocol",
"near-store/nightly_protocol",
]
nightly = [
"near-async/nightly",
"near-chain-configs/nightly",
"near-fmt/nightly",
"near-o11y/nightly",
"near-primitives/nightly",
"near-store/nightly",
"nightly_protocol",
]
performance_stats = [
"near-performance-metrics/performance_stats",
"near-async/nightly",
"near-chain-configs/nightly",
"near-fmt/nightly",
"near-o11y/nightly",
"near-primitives/nightly",
"near-store/nightly",
"nightly_protocol",
]
performance_stats = ["near-performance-metrics/performance_stats"]
test_features = []
protocol_schema = [
"near-schema-checker-lib/protocol_schema",
"near-crypto/protocol_schema",
"near-primitives/protocol_schema",
"near-store/protocol_schema",
"near-schema-checker-lib/protocol_schema",
"near-crypto/protocol_schema",
"near-primitives/protocol_schema",
"near-store/protocol_schema",
]
54 changes: 33 additions & 21 deletions chain/network/src/peer_manager/peer_manager_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use ::time::ext::InstantExt as _;
use actix::fut::future::wrap_future;
use actix::{Actor as _, AsyncContext as _};
use anyhow::Context as _;
use near_async::futures::{AsyncComputationSpawner, AsyncComputationSpawnerExt};
use near_async::messaging::{SendAsync, Sender};
use near_async::time;
use near_o11y::{handler_debug_span, handler_trace_span, WithSpanContext};
Expand Down Expand Up @@ -102,6 +103,8 @@ pub struct PeerManagerActor {

/// State that is shared between multiple threads (including PeerActors).
pub(crate) state: Arc<NetworkState>,

rayon_spawner: Arc<dyn AsyncComputationSpawner>,
}

/// TEST-ONLY
Expand Down Expand Up @@ -219,6 +222,7 @@ impl PeerManagerActor {
shards_manager_adapter: Sender<ShardsManagerRequestFromNetwork>,
partial_witness_adapter: PartialWitnessSenderForNetwork,
genesis_id: GenesisId,
rayon_spawner: Arc<dyn AsyncComputationSpawner>,
) -> anyhow::Result<actix::Addr<Self>> {
let config = config.verify().context("config")?;
let store = store::Store::from(store);
Expand Down Expand Up @@ -353,6 +357,7 @@ impl PeerManagerActor {
started_connect_attempts: false,
state,
clock,
rayon_spawner,
}))
}

Expand Down Expand Up @@ -1070,35 +1075,42 @@ impl PeerManagerActor {
);
NetworkResponses::NoResponse
}
NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple) => {
NetworkRequests::PartialEncodedStateWitness(chunk_validator, partial_witness) => {
let arbiter = actix::Arbiter::current();
validator_witness_tuple.into_par_iter().for_each(
|(chunk_validator, partial_witness)| {
self.state.send_message_to_account_with_arbiter(
&self.clock,
&chunk_validator,
RoutedMessageBody::PartialEncodedStateWitness(partial_witness),
arbiter.clone(),
);
},
);
let state = self.state.clone();
let clock = self.clock.clone();
self.rayon_spawner.spawn("Peer_Manager_PartialEncodedStateWitness", move || {
state.send_message_to_account_with_arbiter(
&clock,
&chunk_validator,
RoutedMessageBody::PartialEncodedStateWitness(partial_witness),
arbiter,
);
});
NetworkResponses::NoResponse
}
NetworkRequests::PartialEncodedStateWitnessForward(
chunk_validators,
partial_witness,
) => {
let arbiter = actix::Arbiter::current();
chunk_validators.into_par_iter().for_each(|chunk_validator| {
self.state.send_message_to_account_with_arbiter(
&self.clock,
&chunk_validator,
RoutedMessageBody::PartialEncodedStateWitnessForward(
partial_witness.clone(),
),
arbiter.clone(),
);
});
let state = self.state.clone();
let clock = self.clock.clone();
self.rayon_spawner.spawn(
"Peer_Manager_PartialEncodedStateWitnessForward",
move || {
chunk_validators.into_par_iter().for_each(|chunk_validator| {
state.send_message_to_account_with_arbiter(
&clock,
&chunk_validator,
RoutedMessageBody::PartialEncodedStateWitnessForward(
partial_witness.clone(),
),
arbiter.clone(),
);
});
},
);
NetworkResponses::NoResponse
}
NetworkRequests::EpochSyncRequest { peer_id } => {
Expand Down
2 changes: 2 additions & 0 deletions chain/network/src/peer_manager/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::types::{
};
use crate::PeerManagerActor;
use futures::FutureExt;
use near_async::futures::StdThreadAsyncComputationSpawnerForTest;
use near_async::messaging::IntoMultiSender;
use near_async::messaging::Sender;
use near_async::time;
Expand Down Expand Up @@ -656,6 +657,7 @@ pub(crate) async fn start(
shards_manager_sender,
state_witness_sender.break_apart().into_multi_sender(),
genesis_id,
Arc::new(StdThreadAsyncComputationSpawnerForTest),
)
.unwrap()
}
Expand Down
12 changes: 5 additions & 7 deletions chain/network/src/test_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,13 +348,11 @@ fn network_message_to_partial_witness_handler(
None
}

NetworkRequests::PartialEncodedStateWitness(validator_witness_tuple) => {
for (target, partial_witness) in validator_witness_tuple.into_iter() {
shared_state
.senders_for_account(&target)
.partial_witness_sender
.send(PartialEncodedStateWitnessMessage(partial_witness));
}
NetworkRequests::PartialEncodedStateWitness(target, partial_witness) => {
shared_state
.senders_for_account(&target)
.partial_witness_sender
.send(PartialEncodedStateWitnessMessage(partial_witness));
None
}
NetworkRequests::PartialEncodedStateWitnessForward(chunk_validators, partial_witness) => {
Expand Down
2 changes: 1 addition & 1 deletion chain/network/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ pub enum NetworkRequests {
/// Message for a chunk endorsement, sent by a chunk validator to the block producer.
ChunkEndorsement(AccountId, ChunkEndorsement),
/// Message from chunk producer to set of chunk validators to send state witness part.
PartialEncodedStateWitness(Vec<(AccountId, PartialEncodedStateWitness)>),
PartialEncodedStateWitness(AccountId, PartialEncodedStateWitness),
/// Message from chunk validator to all other chunk validators to forward state witness part.
PartialEncodedStateWitnessForward(Vec<AccountId>, PartialEncodedStateWitness),
/// Requests an epoch sync
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/src/tests/network/peer_handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ fn make_peer_manager(
peer_max_count: u32,
) -> actix::Addr<PeerManagerActor> {
use near_async::messaging::{noop, IntoMultiSender, IntoSender};
use near_chain::rayon_spawner::RayonAsyncComputationSpawner;

let mut config = config::NetworkConfig::from_seed(seed, node_addr);
config.peer_store.boot_nodes = convert_boot_nodes(boot_nodes);
Expand All @@ -40,6 +41,7 @@ fn make_peer_manager(
noop().into_sender(),
noop().into_multi_sender(),
GenesisId::default(),
Arc::new(RayonAsyncComputationSpawner),
)
.unwrap()
}
Expand Down
1 change: 1 addition & 0 deletions integration-tests/src/tests/network/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ fn setup_network_node(
shards_manager_adapter.as_sender(),
partial_witness_actor.with_auto_span_context().into_multi_sender(),
genesis_id,
Arc::new(RayonAsyncComputationSpawner),
)
.unwrap();
network_adapter.bind(peer_manager.clone().with_auto_span_context());
Expand Down
2 changes: 2 additions & 0 deletions integration-tests/src/tests/network/stress_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;
use actix::{Actor, AsyncContext, System};
use futures::FutureExt;
use near_async::messaging::{noop, IntoMultiSender, IntoSender};
use near_chain::rayon_spawner::RayonAsyncComputationSpawner;
use tracing::info;

use near_actix_test_utils::run_actix;
Expand Down Expand Up @@ -34,6 +35,7 @@ fn make_peer_manager(
noop().into_sender(),
noop().into_multi_sender(),
GenesisId::default(),
Arc::new(RayonAsyncComputationSpawner),
)
.unwrap()
}
Expand Down
1 change: 1 addition & 0 deletions nearcore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,7 @@ pub fn start_with_config_and_synchronization(
shards_manager_adapter.as_sender(),
partial_witness_actor.with_auto_span_context().into_multi_sender(),
genesis_id,
Arc::new(RayonAsyncComputationSpawner),
)
.context("PeerManager::spawn()")?;
network_adapter.bind(network_actor.clone().with_auto_span_context());
Expand Down
1 change: 1 addition & 0 deletions tools/chainsync-loadtest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ rand.workspace = true
time.workspace = true
tokio.workspace = true

near-chain.workspace = true
near-async.workspace = true
near-chain-configs.workspace = true
near-config-utils.workspace = true
Expand Down
Loading

0 comments on commit c085f2a

Please sign in to comment.