Skip to content

Commit

Permalink
refactor(iroh-net)!: Make netcheck::Client !Clone (#2716)
Browse files Browse the repository at this point in the history
## Description

netcheck::Client owns the actor task and when dropped it will abort
the actor task.  Making a struct owning a task Clone means it is easy
to lose track of who should be owning a task like this.  I now believe
each task should have a clear supervisor/owner in charge of it.

This cleans up the multiple-ownership of the netcheck::Client, which
is just a small step into this direction.  Later on, for e.g. #2647,
more supervision will be added.  But small changes are good.

## Breaking Changes

- `iroh_net::netcheck::Client::receive_stun_packet` is no longer
available.
- `iroh_net::netcheck::Client` is not longer `Clone`.

## Notes & open questions

I don't want to make `Addr` public, in fact I'd like all of netcheck to
be
private. So I made some docs not links to avoid the warnings resulting
to
linking to private items from public docs.

Removing `Client::receive_stun_packet` is a bit harsh. I'd like to make
all
of netcheck private, but the cli uses it for the doctor so I'm a bit
stuck.
In any case, the way that uses it does not need `receive_stun_packet`.

## Change checklist

- [x] Self-review.
- [x] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- ~~[ ] Tests if relevant.~~
- ~~[ ] All breaking changes documented.~~
  • Loading branch information
flub authored Sep 9, 2024
1 parent 1df0813 commit ce2cfee
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 42 deletions.
4 changes: 2 additions & 2 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ pub(crate) struct MagicSock {
/// UDP IPv6 socket
pconn6: Option<UdpConn>,
/// Netcheck client
net_checker: netcheck::Client,
net_checker: netcheck::Addr,
/// The state for an active DiscoKey.
disco_secrets: DiscoSecrets,

Expand Down Expand Up @@ -1440,7 +1440,7 @@ impl Handle {
my_relay: Default::default(),
pconn4: pconn4.clone(),
pconn6: pconn6.clone(),
net_checker: net_checker.clone(),
net_checker: net_checker.addr(),
disco_secrets: DiscoSecrets::default(),
node_map,
relay_actor_sender: relay_actor_sender.clone(),
Expand Down
73 changes: 34 additions & 39 deletions iroh-net/src/netcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ impl RelayLatencies {
/// If all [`Client`]s are dropped the actor stops running.
///
/// While running the netcheck actor expects to be passed all received stun packets using
/// [`Client::receive_stun_packet`].
#[derive(Debug, Clone)]
/// `Addr::receive_stun_packet`.
#[derive(Debug)]
pub struct Client {
/// Channel to send message to the [`Actor`].
///
Expand Down Expand Up @@ -217,27 +217,12 @@ impl Client {
})
}

/// Pass a received STUN packet to the netchecker.
///
/// Normally the UDP sockets to send STUN messages from are passed in so that STUN
/// packets are sent from the sockets that carry the real traffic. However because
/// these sockets carry real traffic they will also receive non-STUN traffic, thus the
/// netcheck actor does not read from the sockets directly. If you receive a STUN
/// packet on the socket you should pass it to this method.
///
/// It is safe to call this even when the netcheck actor does not currently have any
/// in-flight STUN probes. The actor will simply ignore any stray STUN packets.
/// Returns a new address to send messages to this actor.
///
/// There is an implicit queue here which may drop packets if the actor does not keep up
/// consuming them.
pub fn receive_stun_packet(&self, payload: Bytes, src: SocketAddr) {
if let Err(mpsc::error::TrySendError::Full(_)) = self.addr.try_send(Message::StunPacket {
payload,
from_addr: src,
}) {
inc!(NetcheckMetrics, stun_packets_dropped);
warn!("dropping stun packet from {}", src);
}
/// Unlike the client itself the returned [`Addr`] does not own the actor task, it only
/// allows sending messages to the actor.
pub(crate) fn addr(&self) -> Addr {
self.addr.clone()
}

/// Runs a netcheck, returning the report.
Expand All @@ -248,7 +233,7 @@ impl Client {
/// STUN packets. This function **will not read from the sockets**, as they may be
/// receiving other traffic as well, normally they are the sockets carrying the real
/// traffic. Thus all stun packets received on those sockets should be passed to
/// [`Client::receive_stun_packet`] in order for this function to receive the stun
/// `Addr::receive_stun_packet` in order for this function to receive the stun
/// responses and function correctly.
///
/// If these are not passed in this will bind sockets for STUN itself, though results
Expand Down Expand Up @@ -346,29 +331,39 @@ pub(crate) enum Message {
/// Unlike [`Client`] this is the raw channel to send messages over. Keeping this alive
/// will not keep the actor alive, which makes this handy to pass to internal tasks.
#[derive(Debug, Clone)]
struct Addr {
pub(crate) struct Addr {
sender: mpsc::Sender<Message>,
}

impl Addr {
/// Pass a received STUN packet to the netchecker.
///
/// Normally the UDP sockets to send STUN messages from are passed in so that STUN
/// packets are sent from the sockets that carry the real traffic. However because
/// these sockets carry real traffic they will also receive non-STUN traffic, thus the
/// netcheck actor does not read from the sockets directly. If you receive a STUN
/// packet on the socket you should pass it to this method.
///
/// It is safe to call this even when the netcheck actor does not currently have any
/// in-flight STUN probes. The actor will simply ignore any stray STUN packets.
///
/// There is an implicit queue here which may drop packets if the actor does not keep up
/// consuming them.
pub fn receive_stun_packet(&self, payload: Bytes, src: SocketAddr) {
if let Err(mpsc::error::TrySendError::Full(_)) = self.sender.try_send(Message::StunPacket {
payload,
from_addr: src,
}) {
inc!(NetcheckMetrics, stun_packets_dropped);
warn!("dropping stun packet from {}", src);
}
}

async fn send(&self, msg: Message) -> Result<(), mpsc::error::SendError<Message>> {
self.sender.send(msg).await.inspect_err(|_| {
error!("netcheck actor lost");
})
}

fn try_send(&self, msg: Message) -> Result<(), mpsc::error::TrySendError<Message>> {
self.sender.try_send(msg).inspect_err(|err| {
match err {
mpsc::error::TrySendError::Full(_) => {
// TODO: metrics, though the only place that uses this already does its
// own metrics.
warn!("netcheck actor inbox full");
}
mpsc::error::TrySendError::Closed(_) => error!("netcheck actor lost"),
}
})
}
}

/// The netcheck actor.
Expand Down Expand Up @@ -1143,7 +1138,7 @@ mod tests {
info!(addr=?sock.local_addr().unwrap(), "Using local addr");
let task = {
let sock = sock.clone();
let client = client.clone();
let addr = client.addr();
tokio::spawn(
async move {
let mut buf = BytesMut::zeroed(64 << 10);
Expand All @@ -1155,7 +1150,7 @@ mod tests {
"Forwarding payload to netcheck client",
);
let payload = buf.split_to(count).freeze();
client.receive_stun_packet(payload, src);
addr.receive_stun_packet(payload, src);
}
}
.instrument(info_span!("pkt-fwd")),
Expand Down
2 changes: 1 addition & 1 deletion iroh-net/src/netcheck/reportgen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ struct Actor {
// Internal state.
/// The report being built.
report: Report,
/// The hairping actor.
/// The hairpin actor.
hairpin_actor: hairpin::Client,
/// Which tasks the [`Actor`] is still waiting on.
///
Expand Down

0 comments on commit ce2cfee

Please sign in to comment.