diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index b6293da0487729..e4c48e03b2aac0 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2038,49 +2038,7 @@ impl ClusterInfo { self.gossip.process_push_message(messages, now) }; // Generate prune messages. - let self_pubkey = self.id(); - let prunes = { - let _st = ScopedTimer::from(&self.stats.prune_received_cache); - self.gossip - .prune_received_cache(&self_pubkey, origins, stakes) - }; - let prunes: Vec<(Pubkey /*from*/, Vec /*origins*/)> = prunes - .into_iter() - .flat_map(|(from, prunes)| { - repeat(from).zip( - prunes - .into_iter() - .chunks(MAX_PRUNE_DATA_NODES) - .into_iter() - .map(Iterator::collect) - .collect::>(), - ) - }) - .collect(); - - let prune_messages: Vec<_> = { - let gossip_crds = self.gossip.crds.read().unwrap(); - let wallclock = timestamp(); - thread_pool.install(|| { - prunes - .into_par_iter() - .with_min_len(256) - .filter_map(|(from, prunes)| { - let peer: &ContactInfo = gossip_crds.get(from)?; - let mut prune_data = PruneData { - pubkey: self_pubkey, - prunes, - signature: Signature::default(), - destination: from, - wallclock, - }; - prune_data.sign(&self.keypair()); - let prune_message = Protocol::PruneMessage(self_pubkey, prune_data); - Some((peer.gossip().ok()?, prune_message)) - }) - .collect() - }) - }; + let prune_messages = self.generate_prune_messages(thread_pool, origins, stakes); let mut packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests( recycler, "handle_batch_push_messages", @@ -2115,6 +2073,65 @@ impl ClusterInfo { } } + fn generate_prune_messages( + &self, + thread_pool: &ThreadPool, + // Unique origin pubkeys of upserted CRDS values from push messages. + origins: impl IntoIterator, + stakes: &HashMap, + ) -> Vec<(SocketAddr, Protocol /*::PruneMessage*/)> { + let _st = ScopedTimer::from(&self.stats.generate_prune_messages); + let self_pubkey = self.id(); + // Obtain redundant gossip links which can be pruned. + let prunes: HashMap> = { + let _st = ScopedTimer::from(&self.stats.prune_received_cache); + self.gossip + .prune_received_cache(&self_pubkey, origins, stakes) + }; + // Look up gossip addresses of destination nodes. + let prunes: Vec<( + Pubkey, // gossip peer to be pruned + SocketAddr, // gossip socket-addr of peer + Vec, // CRDS value origins + )> = { + let gossip_crds = self.gossip.crds.read().unwrap(); + thread_pool.install(|| { + prunes + .into_par_iter() + .filter_map(|(pubkey, prunes)| { + let addr = gossip_crds.get::<&ContactInfo>(pubkey)?.gossip().ok()?; + Some((pubkey, addr, prunes)) + }) + .collect() + }) + }; + // Create and sign Protocol::PruneMessages. + thread_pool.install(|| { + let wallclock = timestamp(); + let keypair: Arc = self.keypair().clone(); + prunes + .into_par_iter() + .flat_map(|(destination, addr, prunes)| { + // Chunk up origins so that each chunk fits into a packet. + let prunes = prunes.into_par_iter().chunks(MAX_PRUNE_DATA_NODES); + rayon::iter::repeat((destination, addr)).zip(prunes) + }) + .map(|((destination, addr), prunes)| { + let mut prune_data = PruneData { + pubkey: self_pubkey, + prunes, + signature: Signature::default(), + destination, + wallclock, + }; + prune_data.sign(&keypair); + let prune_message = Protocol::PruneMessage(self_pubkey, prune_data); + (addr, prune_message) + }) + .collect() + }) + } + fn require_stake_for_gossip(&self, stakes: &HashMap) -> bool { if stakes.len() < MIN_NUM_STAKED_NODES { self.stats diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index a5d70c024eb79f..9be73f19759979 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -98,6 +98,7 @@ pub struct GossipStats { pub(crate) filter_crds_values_dropped_requests: Counter, pub(crate) filter_crds_values_dropped_values: Counter, pub(crate) filter_pull_response: Counter, + pub(crate) generate_prune_messages: Counter, pub(crate) generate_pull_responses: Counter, pub(crate) get_epoch_duration_no_working_bank: Counter, pub(crate) get_votes: Counter, @@ -473,6 +474,11 @@ pub(crate) fn submit_gossip_stats( stats.get_epoch_duration_no_working_bank.clear(), i64 ), + ( + "generate_prune_messages", + stats.generate_prune_messages.clear(), + i64 + ), ); datapoint_info!( "cluster_info_stats5",