Skip to content

Commit

Permalink
signs prune messages after dropping the lock on gossip CRDS table (#3697
Browse files Browse the repository at this point in the history
)

Signing prune messages while holding a lock on gossip CRDS table can
cause lock contentions:
https://github.com/anza-xyz/agave/blob/c2880bbb3/gossip/src/cluster_info.rs#L2061-L2083
  • Loading branch information
behzadnouri authored Nov 20, 2024
1 parent bf33b8c commit a8fd26f
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 43 deletions.
103 changes: 60 additions & 43 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2038,49 +2038,7 @@ impl ClusterInfo {
self.gossip.process_push_message(messages, now)
};
// Generate prune messages.
let self_pubkey = self.id();
let prunes = {
let _st = ScopedTimer::from(&self.stats.prune_received_cache);
self.gossip
.prune_received_cache(&self_pubkey, origins, stakes)
};
let prunes: Vec<(Pubkey /*from*/, Vec<Pubkey> /*origins*/)> = prunes
.into_iter()
.flat_map(|(from, prunes)| {
repeat(from).zip(
prunes
.into_iter()
.chunks(MAX_PRUNE_DATA_NODES)
.into_iter()
.map(Iterator::collect)
.collect::<Vec<_>>(),
)
})
.collect();

let prune_messages: Vec<_> = {
let gossip_crds = self.gossip.crds.read().unwrap();
let wallclock = timestamp();
thread_pool.install(|| {
prunes
.into_par_iter()
.with_min_len(256)
.filter_map(|(from, prunes)| {
let peer: &ContactInfo = gossip_crds.get(from)?;
let mut prune_data = PruneData {
pubkey: self_pubkey,
prunes,
signature: Signature::default(),
destination: from,
wallclock,
};
prune_data.sign(&self.keypair());
let prune_message = Protocol::PruneMessage(self_pubkey, prune_data);
Some((peer.gossip().ok()?, prune_message))
})
.collect()
})
};
let prune_messages = self.generate_prune_messages(thread_pool, origins, stakes);
let mut packet_batch = PacketBatch::new_unpinned_with_recycler_data_and_dests(
recycler,
"handle_batch_push_messages",
Expand Down Expand Up @@ -2115,6 +2073,65 @@ impl ClusterInfo {
}
}

fn generate_prune_messages(
&self,
thread_pool: &ThreadPool,
// Unique origin pubkeys of upserted CRDS values from push messages.
origins: impl IntoIterator<Item = Pubkey>,
stakes: &HashMap<Pubkey, u64>,
) -> Vec<(SocketAddr, Protocol /*::PruneMessage*/)> {
let _st = ScopedTimer::from(&self.stats.generate_prune_messages);
let self_pubkey = self.id();
// Obtain redundant gossip links which can be pruned.
let prunes: HashMap</*gossip peer:*/ Pubkey, /*origins:*/ Vec<Pubkey>> = {
let _st = ScopedTimer::from(&self.stats.prune_received_cache);
self.gossip
.prune_received_cache(&self_pubkey, origins, stakes)
};
// Look up gossip addresses of destination nodes.
let prunes: Vec<(
Pubkey, // gossip peer to be pruned
SocketAddr, // gossip socket-addr of peer
Vec<Pubkey>, // CRDS value origins
)> = {
let gossip_crds = self.gossip.crds.read().unwrap();
thread_pool.install(|| {
prunes
.into_par_iter()
.filter_map(|(pubkey, prunes)| {
let addr = gossip_crds.get::<&ContactInfo>(pubkey)?.gossip().ok()?;
Some((pubkey, addr, prunes))
})
.collect()
})
};
// Create and sign Protocol::PruneMessages.
thread_pool.install(|| {
let wallclock = timestamp();
let keypair: Arc<Keypair> = self.keypair().clone();
prunes
.into_par_iter()
.flat_map(|(destination, addr, prunes)| {
// Chunk up origins so that each chunk fits into a packet.
let prunes = prunes.into_par_iter().chunks(MAX_PRUNE_DATA_NODES);
rayon::iter::repeat((destination, addr)).zip(prunes)
})
.map(|((destination, addr), prunes)| {
let mut prune_data = PruneData {
pubkey: self_pubkey,
prunes,
signature: Signature::default(),
destination,
wallclock,
};
prune_data.sign(&keypair);
let prune_message = Protocol::PruneMessage(self_pubkey, prune_data);
(addr, prune_message)
})
.collect()
})
}

fn require_stake_for_gossip(&self, stakes: &HashMap<Pubkey, u64>) -> bool {
if stakes.len() < MIN_NUM_STAKED_NODES {
self.stats
Expand Down
6 changes: 6 additions & 0 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub struct GossipStats {
pub(crate) filter_crds_values_dropped_requests: Counter,
pub(crate) filter_crds_values_dropped_values: Counter,
pub(crate) filter_pull_response: Counter,
pub(crate) generate_prune_messages: Counter,
pub(crate) generate_pull_responses: Counter,
pub(crate) get_epoch_duration_no_working_bank: Counter,
pub(crate) get_votes: Counter,
Expand Down Expand Up @@ -473,6 +474,11 @@ pub(crate) fn submit_gossip_stats(
stats.get_epoch_duration_no_working_bank.clear(),
i64
),
(
"generate_prune_messages",
stats.generate_prune_messages.clear(),
i64
),
);
datapoint_info!(
"cluster_info_stats5",
Expand Down

0 comments on commit a8fd26f

Please sign in to comment.