diff --git a/Cargo.lock b/Cargo.lock index dd6c8fa..ddc6b67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -242,6 +242,100 @@ dependencies = [ "version_check", ] +[[package]] +name = "futures" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a12aa0eb539080d55c3f2d45a67c3b58b6b0773c1a3ca2dfec66d58c97fd66ca" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5da6ba8c3bb3c165d3c7319fc1cc8304facf1fb8db99c5de877183c08a273888" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88d1c26957f23603395cd326b0ffe64124b818f4449552f960d815cfba83a53d" + +[[package]] +name = "futures-executor" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45025be030969d763025784f7f355043dc6bc74093e4ecc5000ca4dc50d8745c" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "522de2a0fe3e380f1bc577ba0474108faf3f6b18321dbf60b3b9c39a75073377" + +[[package]] +name = "futures-macro" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e4a4b95cea4b4ccbcf1c5675ca7c4ee4e9e75eb79944d07defde18068f79bb" +dependencies = [ + "autocfg", + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36ea153c13024fe480590b3e3d4cad89a0cfacecc24577b68f86c6ced9c2bc11" + +[[package]] +name = "futures-task" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d3d00f4eddb73e498a54394f228cd55853bdf059259e8e7bc6e69d408892e99" + +[[package]] +name = "futures-util" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "36568465210a3a6ee45e1f165136d68671471a501e632e9a98d96872222b5481" +dependencies = [ + "autocfg", + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", +] + [[package]] name = "getrandom" version = "0.2.3" @@ -481,11 +575,12 @@ checksum = "692fcb63b64b1758029e0a96ee63e049ce8c5948587f2f7208df04625e5f6b56" [[package]] name = "onetun" -version = "0.1.0" +version = "0.1.2" dependencies = [ "anyhow", "boringtun", "clap", + "futures", "lockfree", "log", "pretty_env_logger", @@ -530,6 +625,12 @@ version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "pretty_env_logger" version = "0.3.1" @@ -541,6 +642,18 @@ dependencies = [ "log", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + +[[package]] +name = "proc-macro-nested" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc881b2c22681370c6a780e47af9840ef841837bc98118431d4e1868bd0c1086" + [[package]] name = "proc-macro2" version = "1.0.29" @@ -652,6 +765,12 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5" + [[package]] name = "slog" version = "2.7.0" diff --git a/Cargo.toml b/Cargo.toml index aa09475..da107af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "onetun" -version = "0.1.0" +version = "0.1.2" edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -14,3 +14,4 @@ anyhow = "1" smoltcp = { git = "https://github.com/smoltcp-rs/smoltcp", branch = "master" } tokio = { version = "1", features = ["full"] } lockfree = "0.5.1" +futures = "0.3.17" diff --git a/src/main.rs b/src/main.rs index 9811071..4913d82 100644 --- a/src/main.rs +++ b/src/main.rs @@ -47,6 +47,12 @@ async fn main() -> anyhow::Result<()> { tokio::spawn(async move { wg.consume_task().await }); } + { + // Start IP broadcast drain task for WireGuard + let wg = wg.clone(); + tokio::spawn(async move { wg.broadcast_drain_task().await }); + } + info!( "Tunnelling [{}]->[{}] (via [{}] as peer {})", &config.source_addr, &config.dest_addr, &config.endpoint_addr, &config.source_peer_ip diff --git a/src/wg.rs b/src/wg.rs index 893318f..dbc1948 100644 --- a/src/wg.rs +++ b/src/wg.rs @@ -4,6 +4,7 @@ use std::time::Duration; use anyhow::Context; use boringtun::noise::{Tunn, TunnResult}; +use futures::lock::Mutex; use log::Level; use smoltcp::phy::ChecksumCapabilities; use smoltcp::wire::{ @@ -11,6 +12,7 @@ use smoltcp::wire::{ TcpPacket, TcpRepr, TcpSeqNumber, }; use tokio::net::UdpSocket; +use tokio::sync::broadcast::error::RecvError; use crate::config::Config; use crate::port_pool::PortPool; @@ -32,8 +34,8 @@ pub struct WireGuardTunnel { endpoint: SocketAddr, /// Broadcast sender for received IP packets. ip_broadcast_tx: tokio::sync::broadcast::Sender>, - /// Placeholder so that the broadcaster doesn't close. - _ip_broadcast_rx: tokio::sync::broadcast::Receiver>, + /// Sink so that the broadcaster doesn't close. A repeating task should drain this as much as possible. + ip_broadcast_rx_sink: Mutex>>, /// Port pool. port_pool: Arc, } @@ -47,7 +49,7 @@ impl WireGuardTunnel { .await .with_context(|| "Failed to create UDP socket for WireGuard connection")?; let endpoint = config.endpoint_addr; - let (ip_broadcast_tx, ip_broadcast_rx) = + let (ip_broadcast_tx, ip_broadcast_rx_sink) = tokio::sync::broadcast::channel(BROADCAST_CAPACITY); Ok(Self { @@ -56,7 +58,7 @@ impl WireGuardTunnel { udp, endpoint, ip_broadcast_tx, - _ip_broadcast_rx: ip_broadcast_rx, + ip_broadcast_rx_sink: Mutex::new(ip_broadcast_rx_sink), port_pool, }) } @@ -228,6 +230,33 @@ impl WireGuardTunnel { } } + /// A repeating task that drains the default IP broadcast channel receiver. + /// It is necessary to keep this receiver alive to prevent the overall channel from closing, + /// so draining its backlog regularly is required to avoid memory leaks. + pub async fn broadcast_drain_task(&self) { + trace!("Starting IP broadcast sink drain task"); + + loop { + let mut sink = self.ip_broadcast_rx_sink.lock().await; + match sink.recv().await { + Ok(_) => { + trace!("Drained a packet from IP broadcast sink"); + } + Err(e) => match e { + RecvError::Closed => { + trace!("IP broadcast sink finished draining: channel closed"); + break; + } + RecvError::Lagged(_) => { + warn!("IP broadcast sink is falling behind"); + } + }, + } + } + + trace!("Stopped IP broadcast sink drain"); + } + fn create_tunnel(config: &Config) -> anyhow::Result> { Tunn::new( config.private_key.clone(),