Skip to content

Commit

Permalink
Merge pull request #13 from aramperes/12-simultaneous
Browse files Browse the repository at this point in the history
  • Loading branch information
aramperes authored Oct 16, 2021
2 parents b6739a5 + 4b370b1 commit ce3b23e
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 215 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ forward it to the server's port, which handles the TCP segment. The server respo
the peer's local WireGuard interface, gets encrypted, forwarded to the WireGuard endpoint, and then finally back to onetun's UDP port.

When onetun receives an encrypted packet from the WireGuard endpoint, it decrypts it using boringtun.
The resulting IP packet is broadcasted to all virtual interfaces running inside onetun; once the corresponding
interface is matched, the IP packet is read and unpacked, and the virtual client's TCP state is updated.
The resulting IP packet is dispatched to the corresponding virtual interface running inside onetun;
the IP packet is then read and processed by the virtual interface, and the virtual client's TCP state is updated.

Whenever data is sent by the real client, it is simply "sent" by the virtual client, which kicks off the whole IP encapsulation
and WireGuard encryption again. When data is sent by the real server, it ends up routed in the virtual interface, which allows
Expand Down
35 changes: 35 additions & 0 deletions src/ip_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use crate::virtual_device::VirtualIpDevice;
use crate::wg::WireGuardTunnel;
use smoltcp::iface::InterfaceBuilder;
use smoltcp::socket::SocketSet;
use std::sync::Arc;
use tokio::time::Duration;

/// A repeating task that processes unroutable IP packets.
pub async fn run_ip_sink_interface(wg: Arc<WireGuardTunnel>) -> ! {
// Initialize interface
let device = VirtualIpDevice::new_sink(wg)
.await
.expect("Failed to initialize VirtualIpDevice for sink interface");

// No sockets on sink interface
let mut socket_set_entries: [_; 0] = Default::default();
let mut socket_set = SocketSet::new(&mut socket_set_entries[..]);
let mut virtual_interface = InterfaceBuilder::new(device).ip_addrs([]).finalize();

loop {
let loop_start = smoltcp::time::Instant::now();
match virtual_interface.poll(&mut socket_set, loop_start) {
Ok(processed) if processed => {
trace!("[SINK] Virtual interface polled some packets to be processed",);
tokio::time::sleep(Duration::from_millis(1)).await;
}
Err(e) => {
error!("[SINK] Virtual interface poll error: {:?}", e);
}
_ => {
tokio::time::sleep(Duration::from_millis(5)).await;
}
}
}
}
23 changes: 15 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::virtual_device::VirtualIpDevice;
use crate::wg::WireGuardTunnel;

pub mod config;
pub mod ip_sink;
pub mod port_pool;
pub mod virtual_device;
pub mod wg;
Expand All @@ -30,7 +31,7 @@ async fn main() -> anyhow::Result<()> {
init_logger(&config)?;
let port_pool = Arc::new(PortPool::new());

let wg = WireGuardTunnel::new(&config, port_pool.clone())
let wg = WireGuardTunnel::new(&config)
.await
.with_context(|| "Failed to initialize WireGuard tunnel")?;
let wg = Arc::new(wg);
Expand All @@ -48,9 +49,9 @@ async fn main() -> anyhow::Result<()> {
}

{
// Start IP broadcast drain task for WireGuard
// Start IP sink task for incoming IP packets
let wg = wg.clone();
tokio::spawn(async move { wg.broadcast_drain_task().await });
tokio::spawn(async move { ip_sink::run_ip_sink_interface(wg).await });
}

info!(
Expand Down Expand Up @@ -106,9 +107,14 @@ async fn tcp_proxy_server(

tokio::spawn(async move {
let port_pool = Arc::clone(&port_pool);
let result =
handle_tcp_proxy_connection(socket, virtual_port, source_peer_ip, dest_addr, wg)
.await;
let result = handle_tcp_proxy_connection(
socket,
virtual_port,
source_peer_ip,
dest_addr,
wg.clone(),
)
.await;

if let Err(e) = result {
error!(
Expand All @@ -120,6 +126,7 @@ async fn tcp_proxy_server(
}

// Release port when connection drops
wg.release_virtual_interface(virtual_port);
port_pool.release(virtual_port);
});
}
Expand Down Expand Up @@ -270,14 +277,14 @@ async fn virtual_tcp_interface(

// Consumer for IP packets to send through the virtual interface
// Initialize the interface
let device = VirtualIpDevice::new(wg);
let device = VirtualIpDevice::new(virtual_port, wg)
.with_context(|| "Failed to initialize VirtualIpDevice")?;
let mut virtual_interface = InterfaceBuilder::new(device)
.ip_addrs([
// Interface handles IP packets for the sender and recipient
IpCidr::new(IpAddress::from(source_peer_ip), 32),
IpCidr::new(IpAddress::from(dest_addr.ip()), 32),
])
.any_ip(true)
.finalize();

// Server socket: this is a placeholder for the interface to route new connections to.
Expand Down
2 changes: 1 addition & 1 deletion src/port_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const PORT_RANGE: Range<u16> = MIN_PORT..MAX_PORT;
pub struct PortPool {
/// Remaining ports
inner: lockfree::queue::Queue<u16>,
/// Ports in use
/// Ports in use, with their associated IP channel sender.
taken: lockfree::set::Set<u16>,
}

Expand Down
28 changes: 18 additions & 10 deletions src/virtual_device.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,34 @@
use crate::wg::WireGuardTunnel;
use anyhow::Context;
use smoltcp::phy::{Device, DeviceCapabilities, Medium};
use smoltcp::time::Instant;
use std::sync::Arc;

/// A virtual device that processes IP packets. IP packets received from the WireGuard endpoint
/// are made available to this device using a broadcast channel receiver. IP packets sent from this device
/// are made available to this device using a channel receiver. IP packets sent from this device
/// are asynchronously sent out to the WireGuard tunnel.
pub struct VirtualIpDevice {
/// Tunnel to send IP packets to.
wg: Arc<WireGuardTunnel>,
/// Broadcast channel receiver for received IP packets.
ip_broadcast_rx: tokio::sync::broadcast::Receiver<Vec<u8>>,
/// Channel receiver for received IP packets.
ip_dispatch_rx: tokio::sync::mpsc::Receiver<Vec<u8>>,
}

impl VirtualIpDevice {
pub fn new(wg: Arc<WireGuardTunnel>) -> Self {
let ip_broadcast_rx = wg.subscribe();
pub fn new(virtual_port: u16, wg: Arc<WireGuardTunnel>) -> anyhow::Result<Self> {
let ip_dispatch_rx = wg
.register_virtual_interface(virtual_port)
.with_context(|| "Failed to register IP dispatch for virtual interface")?;

Self {
wg,
ip_broadcast_rx,
}
Ok(Self { wg, ip_dispatch_rx })
}

pub async fn new_sink(wg: Arc<WireGuardTunnel>) -> anyhow::Result<Self> {
let ip_dispatch_rx = wg
.register_sink_interface()
.await
.with_context(|| "Failed to register IP dispatch for sink virtual interface")?;
Ok(Self { wg, ip_dispatch_rx })
}
}

Expand All @@ -29,7 +37,7 @@ impl<'a> Device<'a> for VirtualIpDevice {
type TxToken = TxToken;

fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> {
match self.ip_broadcast_rx.try_recv() {
match self.ip_dispatch_rx.try_recv() {
Ok(buffer) => Some((
Self::RxToken { buffer },
Self::TxToken {
Expand Down
Loading

0 comments on commit ce3b23e

Please sign in to comment.