Skip to content

Commit

Permalink
refactor(iroh): simplify RTT actor (#3072)
Browse files Browse the repository at this point in the history
## Description

also removes the `futures-concurrency` dep, which was adding a noticable
slow down during compiles.

The cleanup now happens automatically when the stream ends, finished
streams get removed and cleand up from `MergeUnbounded`.


## Breaking Changes

<!-- Optional, if there are any breaking changes document them,
including how to migrate older code. -->

## Notes & open questions

<!-- Any notes, remarks or open questions you have to make about the PR.
-->

## Change checklist

- [ ] Self-review.
- [ ] Documentation updates following the [style
guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text),
if relevant.
- [ ] Tests if relevant.
- [ ] All breaking changes documented.
  • Loading branch information
dignifiedquire authored Dec 24, 2024
1 parent bc4f3ca commit 1cd0e96
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 167 deletions.
70 changes: 9 additions & 61 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion iroh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ derive_more = { version = "1.0.0", features = [
] }
ed25519-dalek = "2.0"
futures-buffered = "0.2.8"
futures-concurrency = "7.6"
futures-lite = "2.5"
futures-sink = "0.3"
futures-util = "0.3"
Expand Down
165 changes: 60 additions & 105 deletions iroh/src/endpoint/rtt_actor.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
//! Actor which coordinates the congestion controller for the magic socket
use std::collections::HashMap;
use std::{pin::Pin, task::Poll};

use futures_concurrency::stream::stream_group;
use futures_lite::StreamExt;
use futures_buffered::MergeUnbounded;
use futures_lite::{Stream, StreamExt};
use iroh_base::NodeId;
use iroh_metrics::inc;
use tokio::{
sync::{mpsc, Notify},
time::Duration,
};
use tokio::sync::mpsc;
use tokio_util::task::AbortOnDropHandle;
use tracing::{debug, error, info_span, trace, Instrument};
use tracing::{debug, info_span, Instrument};

use crate::{magicsock::ConnectionType, metrics::MagicsockMetrics, watchable::WatcherStream};

Expand All @@ -25,9 +22,7 @@ pub(super) struct RttHandle {
impl RttHandle {
pub(super) fn new() -> Self {
let mut actor = RttActor {
connection_events: stream_group::StreamGroup::new().keyed(),
connections: HashMap::new(),
tick: Notify::new(),
connection_events: Default::default(),
};
let (msg_tx, msg_rx) = mpsc::channel(16);
let handle = tokio::spawn(
Expand Down Expand Up @@ -61,30 +56,64 @@ pub(super) enum RttMessage {
///
/// The magic socket can change the underlying network path, between two nodes. If we can
/// inform the QUIC congestion controller of this event it will work much more efficiently.
#[derive(Debug)]
#[derive(derive_more::Debug)]
struct RttActor {
/// Stream of connection type changes.
connection_events: stream_group::Keyed<WatcherStream<ConnectionType>>,
/// References to the connections.
///
/// These are weak references so not to keep the connections alive. The key allows
/// removing the corresponding stream from `conn_type_changes`.
/// The boolean is an indiciator of whether this connection was direct before.
#[debug("MergeUnbounded<WatcherStream<ConnectionType>>")]
connection_events: MergeUnbounded<MappedStream>,
}

#[derive(Debug)]
struct MappedStream {
stream: WatcherStream<ConnectionType>,
node_id: NodeId,
/// Reference to the connection.
connection: quinn::WeakConnectionHandle,
/// This an indiciator of whether this connection was direct before.
/// This helps establish metrics on number of connections that became direct.
connections: HashMap<stream_group::Key, (quinn::WeakConnectionHandle, NodeId, bool)>,
/// A way to notify the main actor loop to run over.
was_direct_before: bool,
}

impl Stream for MappedStream {
type Item = ConnectionType;

/// Performs the congestion controller reset for a magic socket path change.
///
/// E.g. when a new stream was added.
tick: Notify,
/// Regardless of which kind of path we are changed to, the congestion controller needs
/// resetting. Even when switching to mixed we should reset the state as e.g. switching
/// from direct to mixed back to direct should be a rare exception and is a bug if this
/// happens commonly.
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(Some(new_conn_type)) => {
if self.connection.network_path_changed() {
debug!(
node_id = %self.node_id.fmt_short(),
new_type = ?new_conn_type,
"Congestion controller state reset",
);
if !self.was_direct_before && matches!(new_conn_type, ConnectionType::Direct(_))
{
self.was_direct_before = true;
inc!(MagicsockMetrics, connection_became_direct);
}
}
Poll::Ready(Some(new_conn_type))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

impl RttActor {
/// Runs the actor main loop.
///
/// The main loop will finish when the sender is dropped.
async fn run(&mut self, mut msg_rx: mpsc::Receiver<RttMessage>) {
let mut cleanup_interval = tokio::time::interval(Duration::from_secs(5));
cleanup_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
biased;
Expand All @@ -94,11 +123,7 @@ impl RttActor {
None => break,
}
}
item = self.connection_events.next(), if !self.connection_events.is_empty() => {
self.do_reset_rtt(item);
}
_ = cleanup_interval.tick() => self.do_connections_cleanup(),
() = self.tick.notified() => continue,
_item = self.connection_events.next(), if !self.connection_events.is_empty() => {}
}
}
debug!("rtt-actor finished");
Expand All @@ -124,82 +149,12 @@ impl RttActor {
conn_type_changes: WatcherStream<ConnectionType>,
node_id: NodeId,
) {
let key = self.connection_events.insert(conn_type_changes);
self.connections.insert(key, (connection, node_id, false));
self.tick.notify_one();
inc!(MagicsockMetrics, connection_handshake_success);
}

/// Performs the congestion controller reset for a magic socket path change.
///
/// Regardless of which kind of path we are changed to, the congestion controller needs
/// resetting. Even when switching to mixed we should reset the state as e.g. switching
/// from direct to mixed back to direct should be a rare exception and is a bug if this
/// happens commonly.
fn do_reset_rtt(&mut self, item: Option<(stream_group::Key, ConnectionType)>) {
match item {
Some((key, new_conn_type)) => match self.connections.get_mut(&key) {
Some((handle, node_id, was_direct_before)) => {
if handle.network_path_changed() {
debug!(
node_id = %node_id.fmt_short(),
new_type = ?new_conn_type,
"Congestion controller state reset",
);
if !*was_direct_before && matches!(new_conn_type, ConnectionType::Direct(_))
{
*was_direct_before = true;
inc!(MagicsockMetrics, connection_became_direct);
}
} else {
debug!(
node_id = %node_id.fmt_short(),
"removing dropped connection",
);
self.connection_events.remove(key);
}
}
None => error!("No connection found for stream item"),
},
None => {
trace!("No more connections");
}
}
}

/// Performs cleanup for closed connection.
fn do_connections_cleanup(&mut self) {
for (key, (handle, node_id, _)) in self.connections.iter() {
if !handle.is_alive() {
trace!(node_id = %node_id.fmt_short(), "removing stale connection");
self.connection_events.remove(*key);
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn test_actor_mspc_close() {
let mut actor = RttActor {
connection_events: stream_group::StreamGroup::new().keyed(),
connections: HashMap::new(),
tick: Notify::new(),
};
let (msg_tx, msg_rx) = mpsc::channel(16);
let handle = tokio::spawn(async move {
actor.run(msg_rx).await;
self.connection_events.push(MappedStream {
stream: conn_type_changes,
connection,
node_id,
was_direct_before: false,
});

// Dropping the msg_tx should stop the actor
drop(msg_tx);

let task_res = tokio::time::timeout(Duration::from_secs(5), handle)
.await
.expect("timeout - actor did not finish");
assert!(task_res.is_ok());
inc!(MagicsockMetrics, connection_handshake_success);
}
}

0 comments on commit 1cd0e96

Please sign in to comment.