Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix find_new_nodes to query the proper buckets #156

Merged
merged 4 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add network version to handshake messages
- Add Ray-ID to MessageInfo for message tracking
- Add warning when discarding incomplete messages
- Add tracing when broadcasting to an eclipsed network

### Fixed

- Fix raptorQ cache default config
- Fix ObjectTransmissionInformation deserialization
- Fix duplicate processing for messages with different RaptorQ configurations
- Fix idle nodes removal on maintainance
- Fix `find_new_nodes` to query the proper buckets

### Changed

Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "kadcast"
authors = ["herr-seppia <[email protected]>"]
version = "0.7.0-rc.10"
version = "0.7.0-rc.11"
edition = "2018"
description = "Implementation of the Kadcast Network Protocol."
categories = ["network-programming"]
Expand Down
34 changes: 14 additions & 20 deletions src/kbucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub use bucket::InsertError;
pub use bucket::InsertOk;
pub use bucket::{NodeInsertError, NodeInsertOk};
use itertools::Itertools;
pub use key::MAX_BUCKET_HEIGHT;
pub use key::{BinaryID, BinaryKey, BinaryNonce};
pub use node::Node;
use std::collections::hash_map::Entry;
Expand All @@ -20,7 +21,6 @@ mod bucket;
mod key;
mod node;
use crate::config::BucketConfig;
use crate::K_ALPHA;
use crate::K_BETA;

pub type BucketHeight = u8;
Expand Down Expand Up @@ -118,25 +118,12 @@ impl<V> Tree<V> {
.map(|(&height, bucket)| (height, bucket.peers()))
}

#[allow(dead_code)]
pub(crate) fn idle_or_empty_heigth(
&'static self,
) -> impl Iterator<Item = BucketHeight> {
let max_buckets = (crate::K_ID_LEN_BYTES * 8) as BucketHeight;
(0..max_buckets).filter(move |h| {
self.buckets.get(h).map_or_else(|| true, |b| b.has_idle())
})
}

// pick at most Alpha nodes for each idle bucket
pub(crate) fn idle_buckets(
&self,
) -> impl Iterator<Item = (BucketHeight, impl Iterator<Item = &Node<V>>)>
{
self.buckets
.iter()
.filter(|(_, bucket)| bucket.has_idle())
.map(|(&height, bucket)| (height, bucket.pick::<K_ALPHA>()))
pub(crate) fn idle_or_empty_height(&self) -> Vec<BucketHeight> {
(0..MAX_BUCKET_HEIGHT as u8)
.filter(|h| {
self.buckets.get(h).map_or_else(|| true, |b| b.has_idle())
})
.collect()
}

// Return the height of a Peer
Expand Down Expand Up @@ -178,6 +165,13 @@ impl<V> Tree<V> {
.map_or(false, |bucket| bucket.is_full())
}

pub(crate) fn bucket_size(&self, height: BucketHeight) -> usize {
self.buckets
.get(&height)
.map(|bucket| bucket.peers().count())
.unwrap_or_default()
}

pub(crate) fn new(root: Node<V>, config: BucketConfig) -> Tree<V> {
info!(
"Building table [K={}] with root: {:?}",
Expand Down
47 changes: 46 additions & 1 deletion src/kbucket/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ use crate::{K_DIFF_MIN_BIT, K_DIFF_PRODUCED_BIT};

use super::BucketHeight;

pub const MAX_BUCKET_HEIGHT: usize =
K_ID_LEN_BYTES * BucketHeight::BITS as usize;

const _: () = assert!(
(K_ID_LEN_BYTES * BucketHeight::BITS as usize) < BucketHeight::MAX as usize,
MAX_BUCKET_HEIGHT < BucketHeight::MAX as usize,
"K_ID_LEN_BYTES must be lower than BucketHeight::MAX"
);

Expand Down Expand Up @@ -84,6 +87,27 @@ impl BinaryID {
.map(|(i, b)| BinaryID::msb(b).expect("to be Some") + (i << 3) - 1)
}

/// Given a specific `kadcast` distance, this method generates a `BinaryKey`
/// that has the requested XOR-based distance from `self`.
///
/// The method works by flipping the bit at the specified `distance` in the
/// binary representation of the `BinaryId`. The distance is used to
/// identify both the byte (`idx`) and the bit within that byte
/// (`bit_to_change`) to be modified. The bit is toggled (flipped) using
/// an XOR operation, resulting in a new `BinaryKey` that differs from
/// `self` at exactly the requested distance.
pub fn get_at_distance(&self, distance: BucketHeight) -> BinaryKey {
let mut new_key = self.bytes;

let distance = distance as usize;
let idx = distance / 8;
let bit_to_change = distance % 8;

new_key[idx] ^= 1 << bit_to_change;

new_key
}

/// Returns the position of the most significant bit set in a byte.
///
/// Returns `None` if no bit is set.
Expand Down Expand Up @@ -168,6 +192,8 @@ impl BinaryID {
#[cfg(test)]
mod tests {

use itertools::Itertools;

use super::*;
use crate::kbucket::BucketHeight;
use crate::peer::PeerNode;
Expand Down Expand Up @@ -206,6 +232,25 @@ mod tests {
Ok(())
}

fn key_as_string(key: BinaryKey) -> String {
key.iter().map(|b| format!("{b:08b}")).join(" ")
}

#[test]
fn test_get_at_distance() -> Result<()> {
let current = PeerNode::generate("192.168.0.1:666", 0)?;
let current_str = key_as_string(current.as_peer_info().id);
for i in 0..(8 * K_ID_LEN_BYTES) {
let other = current.id().get_at_distance(i as u8);
let other_str = key_as_string(other);
println!("current {current_str}");
println!("other {other_str}");
println!("distance {i:?}");
assert_eq!(current.id().calculate_distance(&other), Some(i as u8))
}
Ok(())
}

#[test]
fn test_id_nonce() -> Result<()> {
let root = PeerNode::generate("192.168.0.1:666", 0)?;
Expand Down
32 changes: 21 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ use encoding::payload::BroadcastPayload;
use handling::MessageHandler;
pub use handling::MessageInfo;
use itertools::Itertools;
use kbucket::MAX_BUCKET_HEIGHT;
use kbucket::{BucketHeight, Tree};
use maintainer::TableMaintainer;
use peer::{PeerInfo, PeerNode};
use rand::prelude::IteratorRandom;
pub(crate) use rwlock::RwLock;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::task;
use tracing::warn;
use tracing::{error, info};
use transport::{MessageBeanOut, WireNetwork};

Expand Down Expand Up @@ -226,10 +228,24 @@ impl Peer {
return;
}

let tosend: Vec<_> = self
.ktable
.read()
.await
for i in self.extract(message, height).await {
self.outbound_sender.send(i).await.unwrap_or_else(|e| {
error!("Unable to send from broadcast {e}")
});
}
}

async fn extract(
&self,
message: &[u8],
height: Option<BucketHeight>,
) -> Vec<(Message, Vec<SocketAddr>)> {
const LAST_BUCKET_IDX: u8 = MAX_BUCKET_HEIGHT as u8 - 1;
let ktable = self.ktable.read().await;
if height.is_none() && ktable.bucket_size(LAST_BUCKET_IDX) == 0 {
warn!("Broadcasting a new message with empty bucket height {LAST_BUCKET_IDX}")
}
ktable
.extract(height)
.map(|(height, nodes)| {
let msg = Message::broadcast(
Expand All @@ -243,13 +259,7 @@ impl Peer {
nodes.map(|node| *node.value().address()).collect();
(msg, targets)
})
.collect();

for i in tosend {
self.outbound_sender.send(i).await.unwrap_or_else(|e| {
error!("Unable to send from broadcast {e}")
});
}
.collect()
}

/// Send a message to a peer in the network
Expand Down
52 changes: 32 additions & 20 deletions src/maintainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::encoding::message::{Header, Message};
use crate::kbucket::Tree;
use crate::peer::PeerInfo;
use crate::transport::MessageBeanOut;
use crate::RwLock;
use crate::{RwLock, K_ALPHA};

pub(crate) struct TableMaintainer {
bootstrapping_nodes: Vec<String>,
Expand Down Expand Up @@ -132,27 +132,39 @@ impl TableMaintainer {
self.ktable.write().await.remove_idle_nodes();
}

/// Search for idle buckets (no message received) and try to contact some of
/// the belonging nodes
/// Searches for idle or empty buckets (those without received messages) in
/// the routing table and requests information about the nodes in these
/// buckets from active peers.
///
/// For each identified idle or empty bucket, it calculates a target binary
/// key using the `get_at_distance` method, which flips a specific bit
/// in the node's binary identifier based on the given distance. This
/// generates a new target key that is used to search for additional
/// nodes.
///
/// A set of active peers, up to `K_ALPHA`, is gathered from the current
/// routing table and combined with the bootstrapping nodes to form the
/// list of peers to contact.
///
/// The purpose of this method is to keep the routing table active and up to
/// date by finding new peers whenever buckets are empty or nodes become
/// unresponsive.
async fn find_new_nodes(&self) {
let table_lock_read = self.ktable.read().await;

let find_node_messages = table_lock_read
.idle_buckets()
.flat_map(|(_, idle_nodes)| idle_nodes)
.map(|target| {
(
Message::FindNodes(
self.header,
self.version.clone(),
*target.id().as_binary(),
),
//TODO: Extract alpha nodes
vec![*target.value().address()],
)
});
for find_node in find_node_messages {
self.send(find_node).await;
let buckets_to_refresh = table_lock_read.idle_or_empty_height();

let alive_peers = table_lock_read
.alive_nodes()
.map(|n| n.as_peer_info().to_socket_address())
.take(K_ALPHA)
.chain(self.bootstrapping_nodes_addr().into_iter())
.collect::<Vec<_>>();

for bucket_h in buckets_to_refresh {
let target = self.header.binary_id().get_at_distance(bucket_h);
let msg =
Message::FindNodes(self.header, self.version.clone(), target);
self.send((msg, alive_peers.clone())).await;
}
}
}