Skip to content

Commit

Permalink
Drain IP broadcast sink receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
aramperes committed Oct 16, 2021
1 parent d0497e4 commit 25bcedc
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 6 deletions.
121 changes: 120 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "onetun"
version = "0.1.0"
version = "0.1.2"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand All @@ -14,3 +14,4 @@ anyhow = "1"
smoltcp = { git = "https://github.com/smoltcp-rs/smoltcp", branch = "master" }
tokio = { version = "1", features = ["full"] }
lockfree = "0.5.1"
futures = "0.3.17"
6 changes: 6 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ async fn main() -> anyhow::Result<()> {
tokio::spawn(async move { wg.consume_task().await });
}

{
// Start IP broadcast drain task for WireGuard
let wg = wg.clone();
tokio::spawn(async move { wg.broadcast_drain_task().await });
}

info!(
"Tunnelling [{}]->[{}] (via [{}] as peer {})",
&config.source_addr, &config.dest_addr, &config.endpoint_addr, &config.source_peer_ip
Expand Down
37 changes: 33 additions & 4 deletions src/wg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ use std::time::Duration;

use anyhow::Context;
use boringtun::noise::{Tunn, TunnResult};
use futures::lock::Mutex;
use log::Level;
use smoltcp::phy::ChecksumCapabilities;
use smoltcp::wire::{
IpAddress, IpProtocol, IpVersion, Ipv4Packet, Ipv4Repr, Ipv6Packet, Ipv6Repr, TcpControl,
TcpPacket, TcpRepr, TcpSeqNumber,
};
use tokio::net::UdpSocket;
use tokio::sync::broadcast::error::RecvError;

use crate::config::Config;
use crate::port_pool::PortPool;
Expand All @@ -32,8 +34,8 @@ pub struct WireGuardTunnel {
endpoint: SocketAddr,
/// Broadcast sender for received IP packets.
ip_broadcast_tx: tokio::sync::broadcast::Sender<Vec<u8>>,
/// Placeholder so that the broadcaster doesn't close.
_ip_broadcast_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
/// Sink so that the broadcaster doesn't close. A repeating task should drain this as much as possible.
ip_broadcast_rx_sink: Mutex<tokio::sync::broadcast::Receiver<Vec<u8>>>,
/// Port pool.
port_pool: Arc<PortPool>,
}
Expand All @@ -47,7 +49,7 @@ impl WireGuardTunnel {
.await
.with_context(|| "Failed to create UDP socket for WireGuard connection")?;
let endpoint = config.endpoint_addr;
let (ip_broadcast_tx, ip_broadcast_rx) =
let (ip_broadcast_tx, ip_broadcast_rx_sink) =
tokio::sync::broadcast::channel(BROADCAST_CAPACITY);

Ok(Self {
Expand All @@ -56,7 +58,7 @@ impl WireGuardTunnel {
udp,
endpoint,
ip_broadcast_tx,
_ip_broadcast_rx: ip_broadcast_rx,
ip_broadcast_rx_sink: Mutex::new(ip_broadcast_rx_sink),
port_pool,
})
}
Expand Down Expand Up @@ -228,6 +230,33 @@ impl WireGuardTunnel {
}
}

/// A repeating task that drains the default IP broadcast channel receiver.
/// It is necessary to keep this receiver alive to prevent the overall channel from closing,
/// so draining its backlog regularly is required to avoid memory leaks.
pub async fn broadcast_drain_task(&self) {
trace!("Starting IP broadcast sink drain task");

loop {
let mut sink = self.ip_broadcast_rx_sink.lock().await;
match sink.recv().await {
Ok(_) => {
trace!("Drained a packet from IP broadcast sink");
}
Err(e) => match e {
RecvError::Closed => {
trace!("IP broadcast sink finished draining: channel closed");
break;
}
RecvError::Lagged(_) => {
warn!("IP broadcast sink is falling behind");
}
},
}
}

trace!("Stopped IP broadcast sink drain");
}

fn create_tunnel(config: &Config) -> anyhow::Result<Box<Tunn>> {
Tunn::new(
config.private_key.clone(),
Expand Down

0 comments on commit 25bcedc

Please sign in to comment.