Skip to content

Commit

Permalink
chore: ..
Browse files Browse the repository at this point in the history
  • Loading branch information
zeeshanlakhani committed Feb 2, 2024
1 parent 02f07f4 commit 94c0e72
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 47 deletions.
1 change: 1 addition & 0 deletions homestar-runtime/config/defaults.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ transport_connection_timeout = 60
max_connected_peers = 32
max_announce_addresses = 10
dial_interval = 30
bootstrap_interval = 30

[node.network.libp2p.mdns]
enable = true
Expand Down
43 changes: 31 additions & 12 deletions homestar-runtime/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ pub(crate) use event::Event;

type P2PSender = channel::AsyncChannelSender<ResponseEvent>;

struct Quorum {
/// Minimum number of peers required to receive a receipt.
receipt: usize,
/// Minimum number of peers required to receive workflow information.
workflow: usize,
}

struct Bootstrap {
interval: Duration,
}

/// Handler trait for [EventHandler] events.
#[async_trait]
pub(crate) trait Handler<DB>
Expand All @@ -52,10 +63,7 @@ where
#[cfg_attr(docsrs, doc(cfg(feature = "websocket-notify")))]
#[allow(missing_debug_implementations, dead_code)]
pub(crate) struct EventHandler<DB: Database> {
/// Minimum number of peers required to receive a receipt.
receipt_quorum: usize,
/// Minimum number of peers required to receive workflow information.
workflow_quorum: usize,
quorum: Quorum,
/// Timeout for p2p workflow info record requests.
p2p_workflow_info_timeout: Duration,
/// Timeout for p2p workflow info record requests from a provider.
Expand Down Expand Up @@ -94,16 +102,15 @@ pub(crate) struct EventHandler<DB: Database> {
external_address_limit: u32,
/// Interval for polling the cache for expired entries.
poll_cache_interval: Duration,
/// Bootstrap configuration.
bootstrap: Bootstrap,
}

/// Event loop handler for libp2p network events and commands.
#[cfg(not(feature = "websocket-notify"))]
#[allow(missing_debug_implementations, dead_code)]
pub(crate) struct EventHandler<DB: Database> {
/// Minimum number of peers required to receive a receipt.
receipt_quorum: usize,
/// Minimum number of peers required to receive workflow information.
workflow_quorum: usize,
quorum: Quorum,
/// Timeout for p2p workflow info record requests.
p2p_workflow_info_timeout: Duration,
/// Timeout for p2p workflow info record requests from a provider.
Expand Down Expand Up @@ -136,6 +143,8 @@ pub(crate) struct EventHandler<DB: Database> {
external_address_limit: u32,
/// Interval for polling the cache for expired entries.
poll_cache_interval: Duration,
/// Bootstrap configuration.
bootstrap: Bootstrap,
}

/// Rendezvous protocol configurations and state
Expand Down Expand Up @@ -179,8 +188,10 @@ where
let (sender, receiver) = Self::setup_channel(settings);
let sender = Arc::new(sender);
Self {
receipt_quorum: settings.libp2p.dht.receipt_quorum,
workflow_quorum: settings.libp2p.dht.workflow_quorum,
quorum: Quorum {
receipt: settings.libp2p.dht.receipt_quorum,
workflow: settings.libp2p.dht.workflow_quorum,
},
p2p_workflow_info_timeout: settings.libp2p.dht.p2p_workflow_info_timeout,
p2p_provider_timeout: settings.libp2p.dht.p2p_provider_timeout,
db,
Expand Down Expand Up @@ -208,6 +219,9 @@ where
announce_addresses: settings.libp2p.announce_addresses.clone(),
external_address_limit: settings.libp2p.max_announce_addresses,
poll_cache_interval: settings.poll_cache_interval,
bootstrap: Bootstrap {
interval: settings.libp2p.bootstrap_interval,
},
}
}

Expand All @@ -221,8 +235,10 @@ where
let (sender, receiver) = Self::setup_channel(settings);
let sender = Arc::new(sender);
Self {
receipt_quorum: settings.libp2p.dht.receipt_quorum,
workflow_quorum: settings.libp2p.dht.workflow_quorum,
quorum: Quorum {
receipt: settings.libp2p.dht.receipt_quorum,
workflow: settings.libp2p.dht.workflow_quorum,
},
p2p_workflow_info_timeout: settings.libp2p.dht.p2p_workflow_info_timeout,
p2p_provider_timeout: settings.libp2p.dht.p2p_provider_timeout,
db,
Expand All @@ -248,6 +264,9 @@ where
announce_addresses: settings.libp2p.announce_addresses.clone(),
external_address_limit: settings.libp2p.max_announce_addresses,
poll_cache_interval: settings.poll_cache_interval,
bootstrap: Bootstrap {
interval: settings.libp2p.boostrap_interval,
},
}
}

Expand Down
69 changes: 43 additions & 26 deletions homestar-runtime/src/event_handler/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
use crate::{channel, event_handler::Event};
use libp2p::PeerId;
use moka::{
future::Cache,
notification::RemovalCause::{self, Expired},
future::{Cache, FutureExt},
notification::{
ListenerFuture,
RemovalCause::{self, Expired},
},
Expiry as ExpiryBase,
};
use std::{
Expand Down Expand Up @@ -49,6 +52,7 @@ pub(crate) enum CacheData {
/// Events to be dispatched on cache expiration.
#[derive(Clone, Debug)]
pub(crate) enum DispatchEvent {
Bootstrap,
RegisterPeer,
DiscoverPeers,
DialPeer,
Expand All @@ -58,38 +62,51 @@ pub(crate) enum DispatchEvent {
pub(crate) fn setup_cache(
sender: Arc<channel::AsyncChannelSender<Event>>,
) -> Cache<String, CacheValue> {
let eviction_listener = move |_key: Arc<String>, val: CacheValue, cause: RemovalCause| {
let eviction_listener = move |_key: Arc<String>,
val: CacheValue,
cause: RemovalCause|
-> ListenerFuture {
let tx = Arc::clone(&sender);

if let Some(CacheData::OnExpiration(event)) = val.data.get("on_expiration") {
if cause != Expired {
return;
}

match event {
DispatchEvent::RegisterPeer => {
if let Some(CacheData::Peer(rendezvous_node)) = val.data.get("rendezvous_node")
{
let _ = tx.send(Event::RegisterPeer(rendezvous_node.to_owned()));
};
}
DispatchEvent::DiscoverPeers => {
if let Some(CacheData::Peer(rendezvous_node)) = val.data.get("rendezvous_node")
{
let _ = tx.send(Event::DiscoverPeers(rendezvous_node.to_owned()));
};
}
DispatchEvent::DialPeer => {
if let Some(CacheData::Peer(node)) = val.data.get("node") {
let _ = tx.send(Event::DialPeer(node.to_owned()));
};
async move {
if let Some(CacheData::OnExpiration(event)) = val.data.get("on_expiration") {
if cause == Expired {
match event {
DispatchEvent::Bootstrap => {
let _ = tx.send_async(Event::Bootstrap).await;
}
DispatchEvent::RegisterPeer => {
if let Some(CacheData::Peer(rendezvous_node)) =
val.data.get("rendezvous_node")
{
let _ = tx
.send_async(Event::RegisterPeer(rendezvous_node.to_owned()))
.await;
};
}
DispatchEvent::DiscoverPeers => {
if let Some(CacheData::Peer(rendezvous_node)) =
val.data.get("rendezvous_node")
{
let _ = tx
.send_async(Event::DiscoverPeers(rendezvous_node.to_owned()))
.await;
};
}
DispatchEvent::DialPeer => {
if let Some(CacheData::Peer(node)) = val.data.get("node") {
let _ = tx.send(Event::DialPeer(node.to_owned()));
};
}
}
}
}
}
.boxed()
};

Cache::builder()
.expire_after(Expiry)
.eviction_listener(eviction_listener)
.async_eviction_listener(eviction_listener)
.build()
}
38 changes: 34 additions & 4 deletions homestar-runtime/src/event_handler/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,8 @@ pub(crate) enum Event {
GetNodeInfo(AsyncChannelSender<DynamicNodeInfo>),
/// Dial a peer.
DialPeer(PeerId),
/// Bootstrap the node to join the DHT.
Bootstrap,
}

#[allow(unreachable_patterns)]
Expand Down Expand Up @@ -302,6 +304,34 @@ impl Event {
.map_err(anyhow::Error::new)?;
}
_ => {}
Event::Bootstrap => {
if event_handler
.swarm
.connected_peers()
.peekable()
.peek()
.is_some()
{
let _ = event_handler
.swarm
.behaviour_mut()
.kademlia
.bootstrap()
.map(|_| {
debug!(
subject = "libp2p.kad.bootstrap",
category = "handle_event",
"bootstrapped kademlia"
)
})
.map_err(|err| {
warn!(subject = "libp2p.kad.bootstrap.err",
category = "handle_event",
err=?err,
"error bootstrapping kademlia")
});
}
}
}
Ok(())
}
Expand Down Expand Up @@ -391,14 +421,14 @@ impl Captured {
}
}

let receipt_quorum = if event_handler.receipt_quorum > 0 {
unsafe { Quorum::N(NonZeroUsize::new_unchecked(event_handler.receipt_quorum)) }
let receipt_quorum = if event_handler.quorum.receipt > 0 {
unsafe { Quorum::N(NonZeroUsize::new_unchecked(event_handler.quorum.receipt)) }
} else {
Quorum::One
};

let workflow_quorum = if event_handler.workflow_quorum > 0 {
unsafe { Quorum::N(NonZeroUsize::new_unchecked(event_handler.receipt_quorum)) }
let workflow_quorum = if event_handler.quorum.workflow > 0 {
unsafe { Quorum::N(NonZeroUsize::new_unchecked(event_handler.quorum.receipt)) }
} else {
Quorum::One
};
Expand Down
50 changes: 46 additions & 4 deletions homestar-runtime/src/event_handler/swarm_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ async fn handle_swarm_event<DB: Database>(
),
btreemap! {
"cid" => Ipld::String(key.cid.to_string()),
"quorum" => Ipld::Integer(event_handler.receipt_quorum as i128),
"quorum" => Ipld::Integer(event_handler.quorum.receipt as i128),
},
),
CapsuleTag::Workflow => notification::emit_event(
Expand All @@ -743,7 +743,7 @@ async fn handle_swarm_event<DB: Database>(
),
btreemap! {
"cid" => Ipld::String(key.cid.to_string()),
"quorum" => Ipld::Integer(event_handler.workflow_quorum as i128),
"quorum" => Ipld::Integer(event_handler.quorum.workflow as i128),
},
),
}
Expand Down Expand Up @@ -775,7 +775,7 @@ async fn handle_swarm_event<DB: Database>(
),
btreemap! {
"cid" => Ipld::String(key.cid.to_string()),
"quorum" => Ipld::Integer(event_handler.receipt_quorum as i128),
"quorum" => Ipld::Integer(event_handler.quorum.receipt as i128),
"connectedPeers" => Ipld::Integer(event_handler.connections.peers.len() as i128),
"storedToPeers" => Ipld::List(success.iter().map(|cid| Ipld::String(cid.to_string())).collect())
},
Expand All @@ -787,7 +787,7 @@ async fn handle_swarm_event<DB: Database>(
),
btreemap! {
"cid" => Ipld::String(key.cid.to_string()),
"quorum" => Ipld::Integer(event_handler.workflow_quorum as i128),
"quorum" => Ipld::Integer(event_handler.quorum.workflow as i128),
"connectedPeers" => Ipld::Integer(event_handler.connections.peers.len() as i128),
"storedToPeers" => Ipld::List(success.iter().map(|cid| Ipld::String(cid.to_string())).collect())
},
Expand Down Expand Up @@ -1089,6 +1089,48 @@ async fn handle_swarm_event<DB: Database>(
"address" => Ipld::String(address.to_string())
},
);

// Bootstrap the DHT
if event_handler
.swarm
.connected_peers()
.peekable()
.peek()
.is_some()
{
let _ = event_handler
.swarm
.behaviour_mut()
.kademlia
.bootstrap()
.map(|_| {
debug!(
subject = "libp2p.kad.bootstrap",
category = "handle_swarm_event",
"bootstrapped kademlia"
)
})
.map_err(|err| {
warn!(subject = "libp2p.kad.bootstrap.err",
category = "handle_swarm_event",
err=?err,
"error bootstrapping kademlia")
});
}

event_handler
.cache
.insert(
"bootstrap".to_string(),
CacheValue::new(
event_handler.bootstrap.interval,
HashMap::from([(
"on_expiration".to_string(),
CacheData::OnExpiration(cache::DispatchEvent::Bootstrap),
)]),
),
)
.await;
}
SwarmEvent::IncomingConnection { .. } => {}
SwarmEvent::ConnectionEstablished {
Expand Down
4 changes: 4 additions & 0 deletions homestar-runtime/src/settings/libp2p_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ pub(crate) struct Libp2p {
/// Dial interval.
#[serde_as(as = "DurationSeconds<u64>")]
pub(crate) dial_interval: Duration,
/// Bootstrap dial interval.
#[serde_as(as = "DurationSeconds<u64>")]
pub(crate) bootstrap_interval: Duration,
}

/// DHT settings.
Expand Down Expand Up @@ -142,6 +145,7 @@ impl Default for Libp2p {
rendezvous: Rendezvous::default(),
transport_connection_timeout: Duration::new(60, 0),
dial_interval: Duration::new(30, 0),
bootstrap_interval: Duration::new(30, 0),
}
}
}
Expand Down
1 change: 0 additions & 1 deletion homestar-runtime/tests/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,6 @@ fn test_libp2p_disconnect_known_peers_integration() -> Result<()> {
Ok(())
}

//
#[test]
#[serial_test::parallel]
fn test_libp2p_configured_with_known_dns_multiaddr_integration() -> Result<()> {
Expand Down

0 comments on commit 94c0e72

Please sign in to comment.