Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocklist #118

Merged
merged 2 commits into from
Oct 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Add network blocklist implementation [#117]

### Changed
- Change `Peer::new` to return a Result [#115]
- Change `blake2` dependency from `0.9` to `0.10` [#115]
Expand Down Expand Up @@ -106,6 +109,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
[#110]: https://github.com/dusk-network/kadcast/issues/110
[#112]: https://github.com/dusk-network/kadcast/issues/112
[#115]: https://github.com/dusk-network/kadcast/issues/115
[#117]: https://github.com/dusk-network/kadcast/issues/117

<!-- Releases -->

Expand Down
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ arrayvec = "0.7"
blake2 = "0.10"
rand = "0.8"
tokio = { version = "1", features = ["rt", "net", "sync", "time", "io-std", "rt-multi-thread", "macros"] }
raptorq = "1.6"
raptorq = { version = "1.6", optional = true }
tracing = "0.1"
itertools = "0.10"
konst = "0.2"
Expand All @@ -32,6 +32,9 @@ rustc_tools_util = "0.2"
tracing-subscriber = "0.2"
toml = "0.5"

[features]
default = [ "raptorq" ]

[[example]]
name = "kadcast"
path = "examples/main.rs"
6 changes: 6 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub const DEFAULT_CHANNEL_SIZE: usize = 1000;

pub const DEFAULT_SEND_RETRY_COUNT: u8 = 3;
pub const DEFAULT_SEND_RETRY_SLEEP_MILLIS: u64 = 5;
pub const DEFAULT_BLOCKLIST_REFRESH_SECS: u64 = 10;

#[derive(Clone, Serialize, Deserialize)]
pub struct Config {
Expand Down Expand Up @@ -146,6 +147,8 @@ pub struct NetworkConfig {
#[serde(with = "humantime_serde")]
pub udp_send_retry_interval: Duration,
pub udp_send_retry_count: u8,
#[serde(with = "humantime_serde")]
pub blocklist_refresh_interval: Duration,
}

impl Default for FECConfig {
Expand All @@ -165,6 +168,9 @@ impl Default for NetworkConfig {
DEFAULT_SEND_RETRY_SLEEP_MILLIS,
),
udp_send_retry_count: DEFAULT_SEND_RETRY_COUNT,
blocklist_refresh_interval: Duration::from_secs(
DEFAULT_BLOCKLIST_REFRESH_SECS,
),
}
}
}
9 changes: 9 additions & 0 deletions src/kbucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ impl<V> Tree<V> {
}
}

pub(crate) fn remove_peer(&mut self, peer: &BinaryKey) -> Option<Node<V>> {
self.root.id().calculate_distance(peer).and_then(|height| {
self.buckets
.get_mut(&height)
.and_then(|bucket| bucket.remove_id(peer))
})
}

pub(crate) fn remove_idle_nodes(&mut self) {
self.buckets
.iter_mut()
Expand All @@ -147,6 +155,7 @@ impl<V> Tree<V> {
.get(&height)
.map_or(false, |bucket| bucket.is_full())
}

pub(crate) fn new(root: Node<V>, config: BucketConfig) -> Tree<V> {
info!(
"Building table [K={}] with root: {:?}",
Expand Down
23 changes: 12 additions & 11 deletions src/kbucket/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,18 @@ impl<V> Bucket<V> {
pub(crate) fn is_full(&self) -> bool {
self.nodes.is_full()
}

pub(crate) fn remove_id(&mut self, id: &[u8]) -> Option<Node<V>> {
let node_idx =
self.nodes.iter().position(|s| s.id().as_binary() == id)?;

self.nodes.pop_at(node_idx).map(|removed| {
if let Some(pending) = self.pending_node.take() {
self.nodes.push(pending);
}
removed
})
}
}

#[cfg(test)]
Expand All @@ -234,17 +246,6 @@ mod tests {
pub fn least_used_id(&self) -> Option<&BinaryKey> {
self.nodes.first().map(|n| n.id().as_binary())
}

pub fn remove_id(&mut self, id: &[u8]) -> Option<Node<V>> {
let update_idx =
self.nodes.iter().position(|s| s.id().as_binary() == id)?;

let removed = self.nodes.pop_at(update_idx);
if let Some(pending) = self.pending_node.take() {
self.nodes.push(pending);
}
removed
}
}

impl<V> crate::kbucket::Tree<V> {
Expand Down
17 changes: 16 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use std::collections::HashSet;
use std::net::AddrParseError;
use std::net::SocketAddr;
use std::time::Duration;
Expand Down Expand Up @@ -62,6 +63,7 @@ pub struct Peer {
outbound_sender: Sender<MessageBeanOut>,
ktable: RwLock<Tree<PeerInfo>>,
header: Header,
blocklist: RwLock<HashSet<SocketAddr>>,
goshawk-3 marked this conversation as resolved.
Show resolved Hide resolved
}

/// [NetworkListen] is notified each time a broadcasted
Expand Down Expand Up @@ -94,10 +96,12 @@ impl Peer {

let header = tree.root().as_header();
let table = RwLock::new(tree, Duration::from_secs(1));
let blocklist = RwLock::new(HashSet::new(), Duration::from_secs(1));
let peer = Peer {
outbound_sender: outbound_channel_tx.clone(),
ktable: table.clone(),
header,
blocklist: blocklist.clone(),
};
let bootstrapping_nodes = config.bootstrapping_nodes.clone();
MessageHandler::start(
Expand All @@ -107,7 +111,12 @@ impl Peer {
notification_channel_tx,
&config,
);
WireNetwork::start(inbound_channel_tx, outbound_channel_rx, config);
WireNetwork::start(
inbound_channel_tx,
outbound_channel_rx,
config,
blocklist,
);
TableMantainer::start(bootstrapping_nodes, table, outbound_channel_tx);
task::spawn(Peer::notifier(listener_channel_rx, listener));
Ok(peer)
Expand Down Expand Up @@ -238,6 +247,12 @@ impl Peer {
error!("Unable to send from send method {}", e)
});
}

pub async fn block_source(&self, source: SocketAddr) {
self.blocklist.write().await.insert(source);
let binary_key = PeerNode::compute_id(&source.ip(), source.port());
self.ktable.write().await.remove_peer(&binary_key);
}
}

#[cfg(test)]
Expand Down
20 changes: 19 additions & 1 deletion src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use std::collections::HashSet;
use std::net::SocketAddr;
use std::time::Instant;

use socket2::SockRef;
use tokio::io;
Expand All @@ -16,6 +18,7 @@ use crate::config::Config;
use crate::encoding::message::Message;
use crate::encoding::Marshallable;
use crate::peer::PeerNode;
use crate::rwlock::RwLock;
use crate::transport::encoding::{
Configurable, Decoder, Encoder, TransportDecoder, TransportEncoder,
};
Expand All @@ -36,6 +39,7 @@ impl WireNetwork {
inbound_channel_tx: Sender<MessageBeanIn>,
outbound_channel_rx: Receiver<MessageBeanOut>,
conf: Config,
blocklist: RwLock<HashSet<SocketAddr>>,
) {
let c = conf.clone();
let (dec_chan_tx, dec_chan_rx) = mpsc::channel(conf.channel_size);
Expand All @@ -51,7 +55,7 @@ impl WireNetwork {
});

tokio::spawn(async move {
WireNetwork::listen_in(dec_chan_tx.clone(), c1)
WireNetwork::listen_in(dec_chan_tx.clone(), c1, blocklist)
.await
.unwrap_or_else(|op| error!("Error in listen_in {:?}", op));
});
Expand All @@ -60,6 +64,7 @@ impl WireNetwork {
async fn listen_in(
dec_chan_tx: Sender<UDPChunk>,
conf: Config,
blocklist: RwLock<HashSet<SocketAddr>>,
) -> io::Result<()> {
debug!("WireNetwork::listen_in started");

Expand All @@ -74,19 +79,32 @@ impl WireNetwork {
};
info!("Listening on: {}", socket.local_addr()?);

// Using a local blocklist prevent the library to constantly requests a
goshawk-3 marked this conversation as resolved.
Show resolved Hide resolved
// read access to the RwLock
let last_blocklist_refresh = Instant::now();
let blocklist_refresh = conf.network.blocklist_refresh_interval;
let mut local_blocklist = blocklist.read().await.clone();

// Try to extend socket recv buffer size
WireNetwork::configure_socket(&socket, conf)?;

// Read UDP socket recv buffer and delegate the processing to decode
// task
loop {
if last_blocklist_refresh.elapsed() > blocklist_refresh {
local_blocklist = blocklist.read().await.clone();
}
let mut bytes = [0; MAX_DATAGRAM_SIZE];
let (len, remote_address) =
socket.recv_from(&mut bytes).await.map_err(|e| {
error!("Error receiving from socket {}", e);
e
})?;

if local_blocklist.contains(&remote_address) {
continue;
}

dec_chan_tx
.send((bytes[0..len].to_vec(), remote_address))
.await
Expand Down
16 changes: 14 additions & 2 deletions src/transport/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,26 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

#[cfg(not(feature = "raptorq"))]
mod plain_encoder;
mod raptorq;

use std::io;
#[cfg(not(feature = "raptorq"))]
pub(crate) use plain_encoder::PlainEncoder as TransportDecoder;

#[cfg(not(feature = "raptorq"))]
pub(crate) use plain_encoder::PlainEncoder as TransportEncoder;

#[cfg(feature = "raptorq")]
mod raptorq;

#[cfg(feature = "raptorq")]
pub(crate) use self::raptorq::RaptorQDecoder as TransportDecoder;

#[cfg(feature = "raptorq")]
pub(crate) use self::raptorq::RaptorQEncoder as TransportEncoder;

use crate::encoding::message::Message;
use std::io;

pub type TransportEncoderConfig =
<self::TransportEncoder as Configurable>::TConf;
Expand Down
15 changes: 4 additions & 11 deletions src/transport/encoding/plain_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,16 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use std::collections::HashMap;
use std::io;

use super::{Configurable, Decoder, Encoder};
use crate::encoding::message::Message;

pub(crate) struct PlainEncoder {}
pub struct PlainEncoder {}

impl Configurable for PlainEncoder {
type TConf = HashMap<String, String>;
fn default_configuration() -> Self::TConf {
HashMap::new()
}
type TConf = ();
fn default_configuration() -> Self::TConf {}
fn configure(_: &Self::TConf) -> Self {
PlainEncoder {}
}
Expand All @@ -30,10 +27,6 @@ impl Encoder for PlainEncoder {

impl Decoder for PlainEncoder {
fn decode(&mut self, chunk: Message) -> io::Result<Option<Message>> {
if let Message::Broadcast(header, payload) = chunk {
Ok(Some(Message::Broadcast(header, payload)))
} else {
Ok(Some(chunk))
}
Ok(Some(chunk))
}
}