Skip to content

Commit

Permalink
fix(overlay_service): don't call ping_node while holding kbuckets lock (
Browse files Browse the repository at this point in the history
  • Loading branch information
morph-dev authored Sep 18, 2024
1 parent ae3e21d commit c17c9d5
Showing 1 changed file with 55 additions and 36 deletions.
91 changes: 55 additions & 36 deletions portalnet/src/overlay/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use discv5::{
};
use futures::{channel::oneshot, future::join_all, prelude::*};
use parking_lot::RwLock;
use rand::seq::SliceRandom;
use rand::seq::IteratorRandom;
use smallvec::SmallVec;
use ssz::Encode;
use ssz_types::BitList;
Expand Down Expand Up @@ -386,8 +386,13 @@ where
Some(Ok(node_id)) = self.peers_to_ping.next() => {
// If the node is in the routing table, then ping and re-queue the node.
let key = kbucket::Key::from(node_id);
if let kbucket::Entry::Present(ref mut entry, _) = self.kbuckets.write().entry(&key) {
self.ping_node(&entry.value().enr());
// Hold the lock as little as possible
let optional_enr = match self.kbuckets.write().entry(&key) {
kbucket::Entry::Present(entry, _) => Some(entry.value().enr()),
_ => None
};
if let Some(enr) = optional_enr {
self.ping_node(&enr);
self.peers_to_ping.insert(node_id);
}
}
Expand All @@ -413,25 +418,21 @@ where
// We only need the 17 bits furthest from our own node ID, because the closest 239 bits of
// buckets are going to be empty-ish.
let target_node_id = {
let buckets = self.kbuckets.read();
let buckets = buckets.buckets_iter().enumerate().collect::<Vec<_>>();
let buckets = &buckets[256 - EXPECTED_NON_EMPTY_BUCKETS..];

// Randomly pick one of these buckets.
let target_bucket = buckets.choose(&mut rand::thread_rng());
match target_bucket {
Some(bucket) => {
trace!(protocol = %self.protocol, bucket = %bucket.0, "Refreshing routing table bucket");
match u8::try_from(bucket.0) {
Ok(idx) => generate_random_node_id(idx, self.local_enr().into()),
Err(err) => {
error!(error = %err, "Error downcasting bucket index");
return;
}
}
}
None => {
error!("Error choosing random bucket index for refresh");
// This should be 256
let buckets_count = self.kbuckets.read().buckets_iter().count();
// Randomly pick one of the buckets.
let Some(bucket) = (buckets_count - EXPECTED_NON_EMPTY_BUCKETS..buckets_count)
.choose(&mut rand::thread_rng())
else {
error!("Error choosing random bucket index for refresh");
return;
};

trace!(protocol = %self.protocol, bucket = %bucket, "Refreshing routing table bucket");
match u8::try_from(bucket) {
Ok(idx) => generate_random_node_id(idx, self.local_enr().into()),
Err(err) => {
error!(error = %err, "Error downcasting bucket index");
return;
}
}
Expand All @@ -446,6 +447,9 @@ where
}

/// Returns the data radius of the node.
///
/// This requires store lock and can block the thread, so it shouldn't be called other lock is
/// already held.
fn data_radius(&self) -> Distance {
self.store.read().radius()
}
Expand Down Expand Up @@ -1318,7 +1322,7 @@ where
// Look up the node in the routing table.
let key = kbucket::Key::from(source);
let optional_node = match self.kbuckets.write().entry(&key) {
kbucket::Entry::Present(ref mut entry, _) => Some(entry.value().clone()),
kbucket::Entry::Present(ref entry, _) => Some(entry.value().clone()),
kbucket::Entry::Pending(ref mut entry, _) => Some(entry.value().clone()),
_ => None,
};
Expand Down Expand Up @@ -1379,7 +1383,7 @@ where
// node is responding to our request.
let key = kbucket::Key::from(source.node_id());
let (node, status) = match self.kbuckets.write().entry(&key) {
kbucket::Entry::Present(ref mut entry, status) => (entry.value().clone(), status),
kbucket::Entry::Present(ref entry, status) => (entry.value().clone(), status),
kbucket::Entry::Pending(ref mut entry, status) => (entry.value().clone(), status),
_ => {
// TODO: Decide default data radius, and define a constant.
Expand Down Expand Up @@ -1731,7 +1735,7 @@ where
// TODO: Perform update on non-ENR node entry state. See note in `process_ping`.
let key = kbucket::Key::from(node_id);
let optional_node = match self.kbuckets.write().entry(&key) {
kbucket::Entry::Present(ref mut entry, _) => Some(entry.value().clone()),
kbucket::Entry::Present(ref entry, _) => Some(entry.value().clone()),
kbucket::Entry::Pending(ref mut entry, _) => Some(entry.value().clone()),
_ => None,
};
Expand Down Expand Up @@ -1933,10 +1937,14 @@ where
let local_node_id = self.local_enr().node_id();

// Acquire write lock here so that we can perform node lookup and insert/update atomically.
// Once we acquire the write lock for the routing table, there are no other locks that we
// need to acquire, so we should not create a deadlock.
// Once we acquire the write lock for the routing table, we shouldn't try to acquire any
// other locks.
// Because `self.ping_node()` can block (requires store lock), we will save disconnected
// peers to ping, and ping them once we relese the lock.
let mut kbuckets = self.kbuckets.write();

let mut disconnected_peers_to_ping: Vec<Enr> = vec![];

for enr in enrs {
let node_id = enr.node_id();

Expand Down Expand Up @@ -1998,7 +2006,7 @@ where
if let kbucket::Entry::Present(node_to_ping, _) =
kbuckets.entry(&disconnected)
{
self.ping_node(&node_to_ping.value().enr());
disconnected_peers_to_ping.push(node_to_ping.value().enr());
}
}
other => {
Expand All @@ -2011,6 +2019,12 @@ where
}
}
}
// Releases the self.kbuckets lock.
drop(kbuckets);

for enr in disconnected_peers_to_ping {
self.ping_node(&enr);
}
}

/// Provide the requested content key and content value for the acceptor
Expand Down Expand Up @@ -2160,6 +2174,8 @@ where
}

/// Submits a request to ping a destination (target) node.
///
/// This can block the thread, so make sure you are not holding any lock while calling this.
fn ping_node(&self, destination: &Enr) {
trace!(
protocol = %self.protocol,
Expand Down Expand Up @@ -2258,8 +2274,13 @@ where

// Ping node to check for connectivity. See comment above for reasoning.
if let Some(key) = node_to_ping {
if let kbucket::Entry::Present(ref mut entry, _) = self.kbuckets.write().entry(&key) {
self.ping_node(&entry.value().enr());
// Hold the lock as little as possible
let optional_enr = match self.kbuckets.write().entry(&key) {
kbucket::Entry::Present(entry, _) => Some(entry.value().enr()),
_ => None,
};
if let Some(enr) = optional_enr {
self.ping_node(&enr);
}
}
}
Expand Down Expand Up @@ -2512,12 +2533,10 @@ where
pub fn find_enr(&self, node_id: &NodeId) -> Option<Enr> {
// Check whether we know this node id in our X's Portal Network's routing table.
let key = kbucket::Key::from(*node_id);
if let kbucket::Entry::Present(entry, _) = self.kbuckets.write().entry(&key) {
return Some(entry.value().clone().enr());
}

if let kbucket::Entry::Pending(mut entry, _) = self.kbuckets.write().entry(&key) {
return Some(entry.value().clone().enr());
match self.kbuckets.write().entry(&key) {
kbucket::Entry::Present(ref entry, _) => return Some(entry.value().enr()),
kbucket::Entry::Pending(ref mut entry, _) => return Some(entry.value().enr()),
_ => {}
}

// Check whether this node id is in our discv5 routing table
Expand Down

0 comments on commit c17c9d5

Please sign in to comment.