diff --git a/CHANGELOG.md b/CHANGELOG.md index 97f404a..9b02dee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- Add network blocklist implementation [#117] + ### Changed - Change `Peer::new` to return a Result [#115] - Change `blake2` dependency from `0.9` to `0.10` [#115] @@ -106,6 +109,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 [#110]: https://github.com/dusk-network/kadcast/issues/110 [#112]: https://github.com/dusk-network/kadcast/issues/112 [#115]: https://github.com/dusk-network/kadcast/issues/115 +[#117]: https://github.com/dusk-network/kadcast/issues/117 diff --git a/Cargo.toml b/Cargo.toml index 30bae37..7d49b75 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ arrayvec = "0.7" blake2 = "0.10" rand = "0.8" tokio = { version = "1", features = ["rt", "net", "sync", "time", "io-std", "rt-multi-thread", "macros"] } -raptorq = "1.6" +raptorq = { version = "1.6", optional = true } tracing = "0.1" itertools = "0.10" konst = "0.2" @@ -32,6 +32,9 @@ rustc_tools_util = "0.2" tracing-subscriber = "0.2" toml = "0.5" +[features] +default = [ "raptorq" ] + [[example]] name = "kadcast" path = "examples/main.rs" diff --git a/src/config.rs b/src/config.rs index 7eff581..060e371 100644 --- a/src/config.rs +++ b/src/config.rs @@ -33,6 +33,7 @@ pub const DEFAULT_CHANNEL_SIZE: usize = 1000; pub const DEFAULT_SEND_RETRY_COUNT: u8 = 3; pub const DEFAULT_SEND_RETRY_SLEEP_MILLIS: u64 = 5; +pub const DEFAULT_BLOCKLIST_REFRESH_SECS: u64 = 10; #[derive(Clone, Serialize, Deserialize)] pub struct Config { @@ -146,6 +147,8 @@ pub struct NetworkConfig { #[serde(with = "humantime_serde")] pub udp_send_retry_interval: Duration, pub udp_send_retry_count: u8, + #[serde(with = "humantime_serde")] + pub blocklist_refresh_interval: Duration, } impl Default for FECConfig { @@ -165,6 +168,9 @@ impl Default for NetworkConfig { DEFAULT_SEND_RETRY_SLEEP_MILLIS, ), udp_send_retry_count: DEFAULT_SEND_RETRY_COUNT, + blocklist_refresh_interval: Duration::from_secs( + DEFAULT_BLOCKLIST_REFRESH_SECS, + ), } } } diff --git a/src/kbucket.rs b/src/kbucket.rs index 82c264e..71df2fe 100644 --- a/src/kbucket.rs +++ b/src/kbucket.rs @@ -130,6 +130,14 @@ impl Tree { } } + pub(crate) fn remove_peer(&mut self, peer: &BinaryKey) -> Option> { + self.root.id().calculate_distance(peer).and_then(|height| { + self.buckets + .get_mut(&height) + .and_then(|bucket| bucket.remove_id(peer)) + }) + } + pub(crate) fn remove_idle_nodes(&mut self) { self.buckets .iter_mut() @@ -147,6 +155,7 @@ impl Tree { .get(&height) .map_or(false, |bucket| bucket.is_full()) } + pub(crate) fn new(root: Node, config: BucketConfig) -> Tree { info!( "Building table [K={}] with root: {:?}", diff --git a/src/kbucket/bucket.rs b/src/kbucket/bucket.rs index 4db3540..42458fd 100644 --- a/src/kbucket/bucket.rs +++ b/src/kbucket/bucket.rs @@ -213,6 +213,18 @@ impl Bucket { pub(crate) fn is_full(&self) -> bool { self.nodes.is_full() } + + pub(crate) fn remove_id(&mut self, id: &[u8]) -> Option> { + let node_idx = + self.nodes.iter().position(|s| s.id().as_binary() == id)?; + + self.nodes.pop_at(node_idx).map(|removed| { + if let Some(pending) = self.pending_node.take() { + self.nodes.push(pending); + } + removed + }) + } } #[cfg(test)] @@ -234,17 +246,6 @@ mod tests { pub fn least_used_id(&self) -> Option<&BinaryKey> { self.nodes.first().map(|n| n.id().as_binary()) } - - pub fn remove_id(&mut self, id: &[u8]) -> Option> { - let update_idx = - self.nodes.iter().position(|s| s.id().as_binary() == id)?; - - let removed = self.nodes.pop_at(update_idx); - if let Some(pending) = self.pending_node.take() { - self.nodes.push(pending); - } - removed - } } impl crate::kbucket::Tree { diff --git a/src/lib.rs b/src/lib.rs index bd2b952..90fde7a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. +use std::collections::HashSet; use std::net::AddrParseError; use std::net::SocketAddr; use std::time::Duration; @@ -62,6 +63,7 @@ pub struct Peer { outbound_sender: Sender, ktable: RwLock>, header: Header, + blocklist: RwLock>, } /// [NetworkListen] is notified each time a broadcasted @@ -94,10 +96,12 @@ impl Peer { let header = tree.root().as_header(); let table = RwLock::new(tree, Duration::from_secs(1)); + let blocklist = RwLock::new(HashSet::new(), Duration::from_secs(1)); let peer = Peer { outbound_sender: outbound_channel_tx.clone(), ktable: table.clone(), header, + blocklist: blocklist.clone(), }; let bootstrapping_nodes = config.bootstrapping_nodes.clone(); MessageHandler::start( @@ -107,7 +111,12 @@ impl Peer { notification_channel_tx, &config, ); - WireNetwork::start(inbound_channel_tx, outbound_channel_rx, config); + WireNetwork::start( + inbound_channel_tx, + outbound_channel_rx, + config, + blocklist, + ); TableMantainer::start(bootstrapping_nodes, table, outbound_channel_tx); task::spawn(Peer::notifier(listener_channel_rx, listener)); Ok(peer) @@ -238,6 +247,12 @@ impl Peer { error!("Unable to send from send method {}", e) }); } + + pub async fn block_source(&self, source: SocketAddr) { + self.blocklist.write().await.insert(source); + let binary_key = PeerNode::compute_id(&source.ip(), source.port()); + self.ktable.write().await.remove_peer(&binary_key); + } } #[cfg(test)] diff --git a/src/transport.rs b/src/transport.rs index 8f170bd..a4a66d9 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -4,7 +4,9 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. +use std::collections::HashSet; use std::net::SocketAddr; +use std::time::Instant; use socket2::SockRef; use tokio::io; @@ -16,6 +18,7 @@ use crate::config::Config; use crate::encoding::message::Message; use crate::encoding::Marshallable; use crate::peer::PeerNode; +use crate::rwlock::RwLock; use crate::transport::encoding::{ Configurable, Decoder, Encoder, TransportDecoder, TransportEncoder, }; @@ -36,6 +39,7 @@ impl WireNetwork { inbound_channel_tx: Sender, outbound_channel_rx: Receiver, conf: Config, + blocklist: RwLock>, ) { let c = conf.clone(); let (dec_chan_tx, dec_chan_rx) = mpsc::channel(conf.channel_size); @@ -51,7 +55,7 @@ impl WireNetwork { }); tokio::spawn(async move { - WireNetwork::listen_in(dec_chan_tx.clone(), c1) + WireNetwork::listen_in(dec_chan_tx.clone(), c1, blocklist) .await .unwrap_or_else(|op| error!("Error in listen_in {:?}", op)); }); @@ -60,6 +64,7 @@ impl WireNetwork { async fn listen_in( dec_chan_tx: Sender, conf: Config, + blocklist: RwLock>, ) -> io::Result<()> { debug!("WireNetwork::listen_in started"); @@ -74,12 +79,21 @@ impl WireNetwork { }; info!("Listening on: {}", socket.local_addr()?); + // Using a local blocklist prevent the library to constantly requests a + // read access to the RwLock + let last_blocklist_refresh = Instant::now(); + let blocklist_refresh = conf.network.blocklist_refresh_interval; + let mut local_blocklist = blocklist.read().await.clone(); + // Try to extend socket recv buffer size WireNetwork::configure_socket(&socket, conf)?; // Read UDP socket recv buffer and delegate the processing to decode // task loop { + if last_blocklist_refresh.elapsed() > blocklist_refresh { + local_blocklist = blocklist.read().await.clone(); + } let mut bytes = [0; MAX_DATAGRAM_SIZE]; let (len, remote_address) = socket.recv_from(&mut bytes).await.map_err(|e| { @@ -87,6 +101,10 @@ impl WireNetwork { e })?; + if local_blocklist.contains(&remote_address) { + continue; + } + dec_chan_tx .send((bytes[0..len].to_vec(), remote_address)) .await diff --git a/src/transport/encoding.rs b/src/transport/encoding.rs index e24c180..ef80247 100644 --- a/src/transport/encoding.rs +++ b/src/transport/encoding.rs @@ -4,14 +4,26 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. +#[cfg(not(feature = "raptorq"))] mod plain_encoder; -mod raptorq; -use std::io; +#[cfg(not(feature = "raptorq"))] +pub(crate) use plain_encoder::PlainEncoder as TransportDecoder; + +#[cfg(not(feature = "raptorq"))] +pub(crate) use plain_encoder::PlainEncoder as TransportEncoder; + +#[cfg(feature = "raptorq")] +mod raptorq; +#[cfg(feature = "raptorq")] pub(crate) use self::raptorq::RaptorQDecoder as TransportDecoder; + +#[cfg(feature = "raptorq")] pub(crate) use self::raptorq::RaptorQEncoder as TransportEncoder; + use crate::encoding::message::Message; +use std::io; pub type TransportEncoderConfig = ::TConf; diff --git a/src/transport/encoding/plain_encoder.rs b/src/transport/encoding/plain_encoder.rs index 3329d17..5e2e6a8 100644 --- a/src/transport/encoding/plain_encoder.rs +++ b/src/transport/encoding/plain_encoder.rs @@ -4,19 +4,16 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. -use std::collections::HashMap; use std::io; use super::{Configurable, Decoder, Encoder}; use crate::encoding::message::Message; -pub(crate) struct PlainEncoder {} +pub struct PlainEncoder {} impl Configurable for PlainEncoder { - type TConf = HashMap; - fn default_configuration() -> Self::TConf { - HashMap::new() - } + type TConf = (); + fn default_configuration() -> Self::TConf {} fn configure(_: &Self::TConf) -> Self { PlainEncoder {} } @@ -30,10 +27,6 @@ impl Encoder for PlainEncoder { impl Decoder for PlainEncoder { fn decode(&mut self, chunk: Message) -> io::Result> { - if let Message::Broadcast(header, payload) = chunk { - Ok(Some(Message::Broadcast(header, payload))) - } else { - Ok(Some(chunk)) - } + Ok(Some(chunk)) } }