Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(net-report)!: add net_report::Options to specify which probes you want to run #3032

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion iroh-dns-server/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub enum PacketSource {

/// A store for pkarr signed packets.
///
/// Packets are stored in the persistent [`SignedPacketStore`], and cached on-demand in an in-memory LRU
/// Packets are stored in the persistent `SignedPacketStore`, and cached on-demand in an in-memory LRU
/// cache used for resolving DNS queries.
#[derive(Debug, Clone)]
pub struct ZoneStore {
Expand Down
250 changes: 188 additions & 62 deletions iroh-net-report/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#![cfg_attr(not(test), deny(clippy::unwrap_used))]

use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, BTreeSet, HashMap},
fmt::{self, Debug},
net::{SocketAddr, SocketAddrV4, SocketAddrV6},
sync::Arc,
Expand Down Expand Up @@ -38,6 +38,7 @@ mod ping;
mod reportgen;

pub use metrics::Metrics;
use reportgen::ProbeProto;
pub use reportgen::QuicConfig;

const FULL_REPORT_INTERVAL: Duration = Duration::from_secs(5 * 60);
Expand Down Expand Up @@ -206,6 +207,140 @@ impl Default for Reports {
}
}

/// Options for running probes
///
/// By default, will run icmp over IPv4, icmp over IPv6, and Https probes.
///
/// Use [`Options::stun_v4`], [`Options::stun_v6`], and [`Options::quic_config`]
/// to enable STUN over IPv4, STUN over IPv6, and QUIC address discovery.
#[derive(Debug)]
pub struct Options {
/// Socket to send IPv4 STUN probes from.
///
/// Responses are never read from this socket, they must be passed in via internal
/// messaging since, when used internally in iroh, the socket is also used to receive
/// other packets from in the magicsocket (`MagicSock`).
///
/// If not provided, STUN probes will not be sent over IPv4.
stun_sock_v4: Option<Arc<UdpSocket>>,
/// Socket to send IPv6 STUN probes from.
///
/// Responses are never read from this socket, they must be passed in via internal
/// messaging since, when used internally in iroh, the socket is also used to receive
/// other packets from in the magicsocket (`MagicSock`).
///
/// If not provided, STUN probes will not be sent over IPv6.
stun_sock_v6: Option<Arc<UdpSocket>>,
/// The configuration needed to launch QUIC address discovery probes.
///
/// If not provided, will not run QUIC address discovery.
quic_config: Option<QuicConfig>,
/// Enable icmp_v4 probes
///
/// On by default
icmp_v4: bool,
/// Enable icmp_v6 probes
///
/// On by default
icmp_v6: bool,
/// Enable https probes
///
/// On by default
https: bool,
}

impl Default for Options {
fn default() -> Self {
Self {
stun_sock_v4: None,
stun_sock_v6: None,
quic_config: None,
icmp_v4: true,
icmp_v6: true,
https: true,
}
}
}

impl Options {
/// Create an empty Options that enables no probes
pub fn empty() -> Self {
Self {
stun_sock_v4: None,
stun_sock_v6: None,
quic_config: None,
icmp_v4: false,
icmp_v6: false,
https: false,
}
}

/// Set the ipv4 stun socket and enable ipv4 stun probes
pub fn stun_v4(mut self, sock: Option<Arc<UdpSocket>>) -> Self {
self.stun_sock_v4 = sock;
self
}

/// Set the ipv6 stun socket and enable ipv6 stun probes
pub fn stun_v6(mut self, sock: Option<Arc<UdpSocket>>) -> Self {
self.stun_sock_v6 = sock;
self
}

/// Enable quic probes
pub fn quic_config(mut self, quic_config: Option<QuicConfig>) -> Self {
self.quic_config = quic_config;
self
}

/// Enable icmp_v4 probe
pub fn enable_icmp_v4(mut self) -> Self {
self.icmp_v4 = true;
self
}

/// Enable icmp_v6 probe
pub fn enable_icmp_v6(mut self) -> Self {
self.icmp_v6 = true;
self
}

/// Enable https probe
pub fn enable_https(mut self) -> Self {
self.https = true;
self
}

/// Turn the options into set of valid protocols
fn to_protocols(&self) -> BTreeSet<ProbeProto> {
let mut protocols = BTreeSet::new();
if self.stun_sock_v4.is_some() {
protocols.insert(ProbeProto::StunIpv4);
}
if self.stun_sock_v6.is_some() {
protocols.insert(ProbeProto::StunIpv6);
}
if let Some(ref quic) = self.quic_config {
if quic.ipv4 {
protocols.insert(ProbeProto::QuicIpv4);
}
if quic.ipv6 {
protocols.insert(ProbeProto::QuicIpv6);
}
}
if self.icmp_v4 {
protocols.insert(ProbeProto::IcmpV4);
}
if self.icmp_v6 {
protocols.insert(ProbeProto::IcmpV6);
}
if self.https {
protocols.insert(ProbeProto::Https);
}
protocols
}
}

impl Client {
/// Creates a new net_report client.
///
Expand Down Expand Up @@ -252,39 +387,56 @@ impl Client {
/// using QUIC address discovery.
///
/// When `None`, it will disable the QUIC address discovery probes.
///
/// This will attempt to use *all* probe protocols.
pub async fn get_report(
&mut self,
dm: RelayMap,
stun_conn4: Option<Arc<UdpSocket>>,
stun_conn6: Option<Arc<UdpSocket>>,
relay_map: RelayMap,
stun_sock_v4: Option<Arc<UdpSocket>>,
stun_sock_v6: Option<Arc<UdpSocket>>,
quic_config: Option<QuicConfig>,
) -> Result<Arc<Report>> {
let rx = self
.get_report_channel(dm, stun_conn4, stun_conn6, quic_config)
.await?;
let opts = Options::default()
.stun_v4(stun_sock_v4)
.stun_v6(stun_sock_v6)
.quic_config(quic_config);
let rx = self.get_report_channel(relay_map.clone(), opts).await?;
match rx.await {
Ok(res) => res,
Err(_) => Err(anyhow!("channel closed, actor awol")),
}
}

/// Runs a net_report, returning the report.
///
/// It may not be called concurrently with itself, `&mut self` takes care of that.
///
/// Look at [`Options`] for the different configuration options.
pub async fn get_report_with_opts(
&mut self,
relay_map: RelayMap,
opts: Options,
) -> Result<Arc<Report>> {
let rx = self.get_report_channel(relay_map, opts).await?;
match rx.await {
Ok(res) => res,
Err(_) => Err(anyhow!("channel closed, actor awol")),
}
}

/// Get report with channel
///
/// Look at [`Options`] for the different configuration options.
pub async fn get_report_channel(
&mut self,
dm: RelayMap,
stun_conn4: Option<Arc<UdpSocket>>,
stun_conn6: Option<Arc<UdpSocket>>,
quic_config: Option<QuicConfig>,
relay_map: RelayMap,
opts: Options,
) -> Result<oneshot::Receiver<Result<Arc<Report>>>> {
// TODO: consider if RelayMap should be made to easily clone? It seems expensive
// right now.
let (tx, rx) = oneshot::channel();
self.addr
.send(Message::RunCheck {
relay_map: dm,
stun_sock_v4: stun_conn4,
stun_sock_v6: stun_conn6,
quic_config,
relay_map,
opts,
response_tx: tx,
})
.await?;
Expand All @@ -310,25 +462,10 @@ pub(crate) enum Message {
/// Only one net_report can be run at a time, trying to run multiple concurrently will
/// fail.
RunCheck {
/// The relay configuration.
/// The map of relays we want to probe
relay_map: RelayMap,
/// Socket to send IPv4 STUN probes from.
///
/// Responses are never read from this socket, they must be passed in via the
/// [`Message::StunPacket`] message since the socket is also used to receive
/// other packets from in the magicsocket (`MagicSock`).
///
/// If not provided this will attempt to bind a suitable socket itself.
stun_sock_v4: Option<Arc<UdpSocket>>,
/// Socket to send IPv6 STUN probes from.
///
/// Like `stun_sock_v4` but for IPv6.
stun_sock_v6: Option<Arc<UdpSocket>>,
/// Endpoint and client configuration to create a QUIC
/// connection to do QUIC address discovery.
///
/// If not provided, will not do QUIC address discovery.
quic_config: Option<QuicConfig>,
/// Options for the report
opts: Options,
/// Channel to receive the response.
response_tx: oneshot::Sender<Result<Arc<Report>>>,
},
Expand Down Expand Up @@ -466,18 +603,10 @@ impl Actor {
match msg {
Message::RunCheck {
relay_map,
stun_sock_v4,
stun_sock_v6,
quic_config,
opts,
response_tx,
} => {
self.handle_run_check(
relay_map,
stun_sock_v4,
stun_sock_v6,
quic_config,
response_tx,
);
self.handle_run_check(relay_map, opts, response_tx);
}
Message::ReportReady { report } => {
self.handle_report_ready(report);
Expand All @@ -503,11 +632,16 @@ impl Actor {
fn handle_run_check(
&mut self,
relay_map: RelayMap,
stun_sock_v4: Option<Arc<UdpSocket>>,
stun_sock_v6: Option<Arc<UdpSocket>>,
quic_config: Option<QuicConfig>,
opts: Options,
response_tx: oneshot::Sender<Result<Arc<Report>>>,
) {
let protocols = opts.to_protocols();
let Options {
stun_sock_v4,
stun_sock_v6,
quic_config,
..
} = opts;
if self.current_report_run.is_some() {
response_tx
.send(Err(anyhow!(
Expand All @@ -519,15 +653,6 @@ impl Actor {

let now = Instant::now();

let cancel_token = CancellationToken::new();
let stun_sock_v4 = match stun_sock_v4 {
Some(sock) => Some(sock),
None => bind_local_stun_socket(IpFamily::V4, self.addr(), cancel_token.clone()),
};
let stun_sock_v6 = match stun_sock_v6 {
Some(sock) => Some(sock),
None => bind_local_stun_socket(IpFamily::V6, self.addr(), cancel_token.clone()),
};
let mut do_full = self.reports.next_full
|| now.duration_since(self.reports.last_full) > FULL_REPORT_INTERVAL;

Expand Down Expand Up @@ -558,11 +683,11 @@ impl Actor {
stun_sock_v6,
quic_config,
self.dns_resolver.clone(),
protocols,
);

self.current_report_run = Some(ReportRun {
_reportgen: actor,
_drop_guard: cancel_token.drop_guard(),
report_tx: response_tx,
});
}
Expand Down Expand Up @@ -729,8 +854,6 @@ impl Actor {
struct ReportRun {
/// The handle of the [`reportgen`] actor, cancels the actor on drop.
_reportgen: reportgen::Client,
/// Drop guard to optionally kill workers started by net_report to support reportgen.
_drop_guard: tokio_util::sync::DropGuard,
/// Where to send the completed report.
report_tx: oneshot::Sender<Result<Arc<Report>>>,
}
Expand All @@ -740,7 +863,7 @@ struct ReportRun {
/// If successful this returns the bound socket and will forward STUN responses to the
/// provided *actor_addr*. The *cancel_token* serves to stop the packet forwarding when the
/// socket is no longer needed.
fn bind_local_stun_socket(
pub fn bind_local_stun_socket(
network: IpFamily,
actor_addr: Addr,
cancel_token: CancellationToken,
Expand Down Expand Up @@ -1003,8 +1126,10 @@ mod tests {

// Note that the ProbePlan will change with each iteration.
for i in 0..5 {
let cancel = CancellationToken::new();
let sock = bind_local_stun_socket(IpFamily::V4, client.addr(), cancel.clone());
println!("--round {}", i);
let r = client.get_report(dm.clone(), None, None, None).await?;
let r = client.get_report(dm.clone(), sock, None, None).await?;

assert!(r.udp, "want UDP");
assert_eq!(
Expand All @@ -1020,6 +1145,7 @@ mod tests {
);
assert!(r.global_v4.is_some(), "expected globalV4 set");
assert!(r.preferred_relay.is_some(),);
cancel.cancel();
}

assert!(
Expand Down
Loading
Loading