Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(network): discv5 discovery #463

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 3 additions & 10 deletions bin/tools/src/bin/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use std::{net::SocketAddr, time::Duration};
use std::net::SocketAddr;

use clap::Parser;
use ethers::types::H256;
use rundler_network::{Config as NetworkConfig, ConnectionConfig, Network};
use rundler_network::{Config as NetworkConfig, Network};
use tokio::sync::mpsc;

const PRIVATE_KEY: &str = "b0ddfec7d365b4599ff8367e960f8c4890364f99e2151beac352338cc0cfe1bc";
Expand Down Expand Up @@ -45,13 +44,7 @@ async fn main() -> anyhow::Result<()> {
private_key,
listen_address,
bootnodes,
network_config: ConnectionConfig {
max_chunk_size: 1048576,
request_timeout: Duration::from_secs(10),
ttfb_timeout: Duration::from_secs(5),
},
supported_mempools: vec![H256::random()],
metadata_seq_number: 0,
..Default::default()
};

let (_, action_recv) = mpsc::unbounded_channel();
Expand Down
4 changes: 3 additions & 1 deletion crates/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ use libp2p::{
swarm::{keep_alive, NetworkBehaviour},
};

use crate::rpc;
use crate::{discovery, rpc};

#[derive(NetworkBehaviour)]
pub(crate) struct Behaviour {
// TODO(danc): temp, remove when not needed
pub(crate) keep_alive: keep_alive::Behaviour,
// Request/response protocol
pub(crate) rpc: rpc::Behaviour,
// Discv5 based discovery protocol
pub(crate) discovery: discovery::Behaviour,
// Identity protocol
pub(crate) identify: identify::Behaviour,
}
190 changes: 190 additions & 0 deletions crates/network/src/discovery/behaviour.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// This file is part of Rundler.
//
// Rundler is free software: you can redistribute it and/or modify it under the
// terms of the GNU Lesser General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later version.
//
// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

use std::{
pin::Pin,
task::{Context, Poll, Waker},
};

use discv5::{
enr::{CombinedKey, NodeId},
Discv5, Discv5ConfigBuilder, Discv5Error, Discv5Event, Enr, ListenConfig, QueryError,
};
use futures::Future;
use libp2p::{
core::Endpoint,
swarm::{
dummy::ConnectionHandler, ConnectionDenied, ConnectionId, DialFailure, FromSwarm,
NetworkBehaviour, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
},
Multiaddr, PeerId,
};
use tokio::{sync::mpsc, time::Interval};
use tracing::{debug, error};

use crate::{Config, Error, Result as NetworkResult};

#[derive(Debug)]
pub(crate) struct DiscoveredPeers {
pub(crate) peers: Vec<Enr>,
}

type QueryFuture = Pin<Box<dyn Future<Output = Result<Vec<Enr>, QueryError>> + Send>>;

pub(crate) struct Behaviour {
discv5: Discv5,
query_fut: Option<QueryFuture>,
event_stream: mpsc::Receiver<Discv5Event>,
waker: Option<Waker>,

// TODO move this to peer manager
tick: Interval,
target_num_peers: usize,
}

impl Behaviour {
pub(crate) async fn new(
config: &Config,
enr: Enr,
enr_key: CombinedKey,
) -> NetworkResult<Self> {
let listen_config =
ListenConfig::from_ip(config.listen_address.ip(), config.listen_address.port() + 1);
debug!("Discv5 listening on {:?}", listen_config);

let discv5_config = Discv5ConfigBuilder::new(listen_config).build();
let mut discv5 = Discv5::new(enr, enr_key, discv5_config)
.map_err(|e| Error::Discovery(Discv5Error::Custom(e)))?;

for bootnode in config.bootnodes.iter() {
discv5
.add_enr(bootnode.clone())
.map_err(|e| Error::Discovery(Discv5Error::Custom(e)))?;
}

discv5.start().await.map_err(Error::Discovery)?;
let event_stream = discv5.event_stream().await.map_err(Error::Discovery)?;

Ok(Self {
discv5,
query_fut: None,
event_stream,
waker: None,
tick: tokio::time::interval(config.tick_interval),
target_num_peers: config.target_num_peers,
})
}

pub(crate) fn start_query(&mut self, num_peers: usize) {
if self.query_fut.is_some() {
return;
}

let predicate = |enr: &Enr| enr.tcp4().is_some() || enr.tcp6().is_some();

debug!("Starting discovery query");
self.query_fut = Some(Box::pin(self.discv5.find_node_predicate(
NodeId::random(),
Box::new(predicate),
num_peers,
)));
self.wake();
}

fn wake(&mut self) {
if let Some(waker) = self.waker.take() {
waker.wake();
}
}
}

impl NetworkBehaviour for Behaviour {
type ConnectionHandler = ConnectionHandler;
type ToSwarm = DiscoveredPeers;

fn handle_established_inbound_connection(
&mut self,
_connection_id: ConnectionId,
_peer: PeerId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(ConnectionHandler)
}

fn handle_established_outbound_connection(
&mut self,
_connection_id: ConnectionId,
_peer: PeerId,
_addr: &Multiaddr,
_role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(ConnectionHandler)
}

// TODO manage banned peers

// TODO manage ENR updates

fn on_swarm_event(&mut self, event: FromSwarm<'_, Self::ConnectionHandler>) {
if let FromSwarm::DialFailure(DialFailure { peer_id, error, .. }) = event {
debug!("Dial failure to peer {:?}: {:?}", peer_id, error);
}
}

fn on_connection_handler_event(
&mut self,
_peer_id: PeerId,
_connection_id: ConnectionId,
_event: THandlerOutEvent<Self>,
) {
// Do nothing
}

fn poll(
&mut self,
cx: &mut Context<'_>,
_params: &mut impl PollParameters,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
if let Some(fut) = &mut self.query_fut {
match fut.as_mut().poll(cx) {
Poll::Ready(Ok(peers)) => {
self.query_fut = None;
debug!("Query finished, found {} peers", peers.len());
return Poll::Ready(ToSwarm::GenerateEvent(DiscoveredPeers { peers }));
}
Poll::Ready(Err(e)) => {
self.query_fut = None;
error!("Query error: {:?}", e);
}
Poll::Pending => {}
}
}

while let Poll::Ready(Some(event)) = self.event_stream.poll_recv(cx) {
// These aren't currently useful, peers are discovered in queries
match event {
Discv5Event::Discovered(e) => debug!("Discovered peers in event: {:?}", e),
e => debug!("Discovery event: {:?}", e),
}
}

if self.tick.poll_tick(cx).is_ready() {
self.tick.reset();
self.start_query(self.target_num_peers)
}

self.waker = Some(cx.waker().clone());
Poll::Pending
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

// Adapted from https://github.com/sigp/lighthouse/blob/stable/beacon_node/lighthouse_network/src/discovery/enr_ext.rs
//! Utilities for working with Ethereum Network Records (ENRs).
//!
//! Adapted from https://github.com/sigp/lighthouse/blob/stable/beacon_node/lighthouse_network/src/discovery/enr_ext.rs

use discv5::{
enr::{k256, CombinedKey, CombinedPublicKey, EnrBuilder},
Expand All @@ -25,7 +27,8 @@ use libp2p::{

use crate::Config;

pub(crate) fn build_enr(config: &Config) -> (Enr, CombinedKey) {
/// Build an ENR from the given configuration.
pub fn build_enr(config: &Config) -> (Enr, CombinedKey) {
let enr_key: CombinedKey =
k256::ecdsa::SigningKey::from_slice(&hex::decode(&config.private_key).unwrap())
.unwrap()
Expand All @@ -41,9 +44,12 @@ pub(crate) fn build_enr(config: &Config) -> (Enr, CombinedKey) {
(enr, enr_key)
}

pub(crate) trait EnrExt {
/// Extension trait for ENR.
pub trait EnrExt {
/// Returns the multiaddr of the ENR.
fn multiaddr(&self) -> Vec<Multiaddr>;

/// Returns the peer id of the ENR.
fn peer_id(&self) -> PeerId;
}

Expand All @@ -70,7 +76,8 @@ impl EnrExt for Enr {
}
}

pub(crate) trait CombinedKeyPublicExt {
/// Extension trait for CombinedPublicKey
pub trait CombinedKeyPublicExt {
/// Converts the publickey into a peer id, without consuming the key.
fn as_peer_id(&self) -> PeerId;
}
Expand Down
17 changes: 17 additions & 0 deletions crates/network/src/discovery/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// This file is part of Rundler.
//
// Rundler is free software: you can redistribute it and/or modify it under the
// terms of the GNU Lesser General Public License as published by the Free Software
// Foundation, either version 3 of the License, or (at your option) any later version.
//
// Rundler is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
// without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License along with Rundler.
// If not, see https://www.gnu.org/licenses/.

mod behaviour;
pub(crate) use behaviour::Behaviour;

pub mod enr;
6 changes: 5 additions & 1 deletion crates/network/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

/// Network errors
#[derive(thiserror::Error, Debug)]
pub enum Error {}
pub enum Error {
/// Discovery error
#[error("Discovery error: {0}")]
Discovery(discv5::Discv5Error),
}

/// Network result
pub type Result<T> = std::result::Result<T, Error>;
22 changes: 11 additions & 11 deletions crates/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,23 @@
//! Lots of inspiration for the components of this implementation were taken from the Lighthouse implementation
//! of the Ethereum consensus layer p2p protocol. See [here](https://github.com/sigp/lighthouse/) for more details.

mod rpc;
pub use rpc::{
message::{
ErrorKind as ResponseErrorKind, PooledUserOpHashesRequest, PooledUserOpHashesResponse,
PooledUserOpsByHashRequest, PooledUserOpsByHashResponse, MAX_OPS_PER_REQUEST,
},
ConnectionConfig,
};

mod behaviour;
mod discovery;
pub use discovery::enr;

mod network;
pub use network::{
Action, AppRequest, AppRequestId, AppResponse, AppResponseResult, Config, Event, Network,
};

mod enr;

mod error;
pub use error::{Error, Result};

mod rpc;
pub use rpc::{
message::{
ErrorKind as ResponseErrorKind, PooledUserOpHashesRequest, PooledUserOpHashesResponse,
PooledUserOpsByHashRequest, PooledUserOpsByHashResponse, MAX_OPS_PER_REQUEST,
},
ConnectionConfig,
};
Loading
Loading