Skip to content

Commit

Permalink
fix(portal-bridge): re-initialize census if no peers are available (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
njgheorghita authored Sep 25, 2024
1 parent 24ed772 commit 0914534
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 10 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions portal-bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ serde = { workspace = true, features = ["rc"] }
serde_json.workspace = true
serde_yaml.workspace = true
ssz_types.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
trin-evm.workspace = true
Expand Down
45 changes: 36 additions & 9 deletions portal-bridge/src/census.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use anyhow::anyhow;
use delay_map::HashMapDelay;
use discv5::enr::NodeId;
use futures::{channel::oneshot, StreamExt};
use thiserror::Error;
use tokio::{
sync::mpsc,
time::{Duration, Instant},
Expand All @@ -20,6 +21,14 @@ use ethportal_api::{
OverlayContentKey, StateContentKey, StateNetworkApiClient,
};

#[derive(Error, Debug)]
pub enum CensusError {
#[error("No peers found in Census")]
NoPeers,
#[error("Failed to initialize Census")]
FailedInitialization,
}

/// Ping delay for liveness check of peers in census
/// Two minutes was chosen somewhat arbitrarily, and can be adjusted
/// in the future based on performance
Expand Down Expand Up @@ -52,11 +61,15 @@ impl Census {
}

impl Census {
pub async fn init(&mut self) {
pub async fn init(&mut self) -> Result<(), CensusError> {
// currently, the census is only initialized for the state network
// only initialized networks will yield inside `run()` loop
info!("Initializing state network census");
self.state.init().await;
if self.state.peers.is_empty() {
return Err(CensusError::FailedInitialization);
}
Ok(())
}

pub async fn run(&mut self) {
Expand All @@ -67,9 +80,19 @@ impl Census {
tokio::select! {
// handle enrs request
Some(request) = self.census_rx.recv() => {
let enrs = self.get_interested_enrs(request.content_key).await;
if let Err(err) = request.resp_tx.send(enrs) {
error!("Error sending enrs response: {err:?}");
match self.get_interested_enrs(request.content_key).await {
Ok(enrs) => {
if let Err(err) = request.resp_tx.send(enrs) {
error!("Error sending enrs response: {err:?}");
}
}
Err(_) => {
error!("No peers found in census, restarting initialization.");
self.state.init().await;
if let Err(err) = request.resp_tx.send(Vec::new()) {
error!("Error sending enrs response: {err:?}");
}
}
}
}
Some(Ok(known_enr)) = self.history.peers.next() => {
Expand All @@ -89,7 +112,10 @@ impl Census {
}
}

pub async fn get_interested_enrs(&self, content_key: ContentKey) -> Vec<Enr> {
pub async fn get_interested_enrs(
&self,
content_key: ContentKey,
) -> Result<Vec<Enr>, CensusError> {
match content_key {
ContentKey::History(content_key) => {
self.history
Expand Down Expand Up @@ -211,15 +237,16 @@ impl Network {
}

// Look up all known interested enrs for a given content id
async fn get_interested_enrs(&self, content_id: [u8; 32]) -> Vec<Enr> {
async fn get_interested_enrs(&self, content_id: [u8; 32]) -> Result<Vec<Enr>, CensusError> {
if self.peers.is_empty() {
error!(
"No known peers in {} census, unable to offer.",
self.subnetwork
);
return Vec::new();
return Err(CensusError::NoPeers);
}
self.peers
Ok(self
.peers
.iter()
.filter_map(|(node_id, (enr, data_radius))| {
let distance = XorMetric::distance(node_id, &content_id);
Expand All @@ -230,7 +257,7 @@ impl Network {
}
})
.take(ENRS_RESPONSE_LIMIT)
.collect()
.collect())
}
}

Expand Down
2 changes: 1 addition & 1 deletion portal-bridge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (census_tx, census_rx) = mpsc::unbounded_channel();
let mut census = Census::new(portal_client.clone(), census_rx);
// initialize the census to acquire critical threshold view of network before gossiping
census.init().await;
census.init().await?;
census_handle = Some(tokio::spawn(async move {
census
.run()
Expand Down

0 comments on commit 0914534

Please sign in to comment.