diff --git a/Cargo.lock b/Cargo.lock index c40edaf..bdbafae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -645,8 +645,9 @@ dependencies = [ "serde_yaml", "sha2", "shadowsocks", + "sharded-slab", "smoltcp", - "socket2 0.5.4", + "socket2 0.5.7", "tabular", "tarpc", "thiserror", @@ -1851,12 +1852,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.2.6" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" -dependencies = [ - "libc", -] +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" [[package]] name = "hex" @@ -2054,7 +2052,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.5.4", + "socket2 0.4.9", "tokio", "tower-service", "tracing", @@ -2124,7 +2122,7 @@ dependencies = [ "http-body 1.0.0", "hyper 1.2.0", "pin-project-lite", - "socket2 0.5.4", + "socket2 0.5.7", "tokio", "tower", "tower-service", @@ -2530,14 +2528,14 @@ dependencies = [ [[package]] name = "mio" -version = "0.8.6" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ + "hermit-abi 0.3.9", "libc", - "log", "wasi", - "windows-sys 0.45.0", + "windows-sys 0.52.0", ] [[package]] @@ -2736,16 +2734,6 @@ dependencies = [ "libm", ] -[[package]] -name = "num_cpus" -version = "1.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fac9e2da13b5eb447a6ce3d392f23a29d8694bff781bf03a16cd9ac8697593b" -dependencies = [ - "hermit-abi 0.2.6", - "libc", -] - [[package]] name = "object" version = "0.32.1" @@ -3995,7 +3983,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "shadowsocks-crypto", - "socket2 0.5.4", + "socket2 0.5.7", "spin 0.9.8", "thiserror", "tokio", @@ -4022,9 +4010,9 @@ dependencies = [ [[package]] name = "sharded-slab" -version = "0.1.4" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" dependencies = [ "lazy_static", ] @@ -4081,7 +4069,7 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "smoltcp" version = "0.11.0" -source = "git+https://github.com/XOR-op/smoltcp.git?branch=resize-recv-buffer#1555b5e0c1145df92bfbbb4221d550d73efb3b34" +source = "git+https://github.com/XOR-op/smoltcp.git?branch=rcv-buf-pinned#60ede1b4ef49426c794b1b25c5c7e39ac3a0f97f" dependencies = [ "bitflags 1.3.2", "byteorder", @@ -4105,12 +4093,12 @@ dependencies = [ [[package]] name = "socket2" -version = "0.5.4" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" dependencies = [ "libc", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -4400,22 +4388,21 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.32.0" +version = "1.40.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" dependencies = [ "backtrace", "bytes", "libc", "mio", - "num_cpus", "parking_lot", "pin-project-lite", "signal-hook-registry", - "socket2 0.5.4", + "socket2 0.5.7", "tokio-macros", "tracing", - "windows-sys 0.48.0", + "windows-sys 0.52.0", ] [[package]] @@ -4430,9 +4417,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.1.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", @@ -5157,15 +5144,6 @@ dependencies = [ "windows_x86_64_msvc 0.36.1", ] -[[package]] -name = "windows-sys" -version = "0.45.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" -dependencies = [ - "windows-targets 0.42.1", -] - [[package]] name = "windows-sys" version = "0.48.0" @@ -5184,21 +5162,6 @@ dependencies = [ "windows-targets 0.52.6", ] -[[package]] -name = "windows-targets" -version = "0.42.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7" -dependencies = [ - "windows_aarch64_gnullvm 0.42.1", - "windows_aarch64_msvc 0.42.1", - "windows_i686_gnu 0.42.1", - "windows_i686_msvc 0.42.1", - "windows_x86_64_gnu 0.42.1", - "windows_x86_64_gnullvm 0.42.1", - "windows_x86_64_msvc 0.42.1", -] - [[package]] name = "windows-targets" version = "0.48.5" @@ -5230,12 +5193,6 @@ dependencies = [ "windows_x86_64_msvc 0.52.6", ] -[[package]] -name = "windows_aarch64_gnullvm" -version = "0.42.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c9864e83243fdec7fc9c5444389dcbbfd258f745e7853198f365e3c4968a608" - [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -5254,12 +5211,6 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" -[[package]] -name = "windows_aarch64_msvc" -version = "0.42.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c8b1b673ffc16c47a9ff48570a9d85e25d265735c503681332589af6253c6c7" - [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -5278,12 +5229,6 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" -[[package]] -name = "windows_i686_gnu" -version = "0.42.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3887528ad530ba7bdbb1faa8275ec7a1155a45ffa57c37993960277145d640" - [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -5308,12 +5253,6 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" -[[package]] -name = "windows_i686_msvc" -version = "0.42.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf4d1122317eddd6ff351aa852118a2418ad4214e6613a50e0191f7004372605" - [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -5332,12 +5271,6 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" -[[package]] -name = "windows_x86_64_gnu" -version = "0.42.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1040f221285e17ebccbc2591ffdc2d44ee1f9186324dd3e84e99ac68d699c45" - [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -5350,12 +5283,6 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" -[[package]] -name = "windows_x86_64_gnullvm" -version = "0.42.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "628bfdf232daa22b0d64fdb62b09fcc36bb01f05a3939e20ab73aaf9470d0463" - [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -5374,12 +5301,6 @@ version = "0.36.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" -[[package]] -name = "windows_x86_64_msvc" -version = "0.42.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "447660ad36a13288b1db4d4248e857b510e8c3a225c822ba4fb748c0aafecffd" - [[package]] name = "windows_x86_64_msvc" version = "0.48.5" diff --git a/Cargo.toml b/Cargo.toml index c039c21..4fe1698 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,6 @@ debug = true [patch.crates-io] rustls = { git = "https://github.com/XOR-op/rustls.delta.git", branch = "unofficial-rel-0.23" } -smoltcp = { git = "https://github.com/XOR-op/smoltcp.git", branch = "resize-recv-buffer" } +smoltcp = { git = "https://github.com/XOR-op/smoltcp.git", branch = "rcv-buf-pinned" } # Only used to bump x25519-dalek; will be removed once 0.6.1 is released boringtun = { git = "https://github.com/XOR-op/boringtun", branch = "master"} diff --git a/boltconn/Cargo.toml b/boltconn/Cargo.toml index 0221d42..59fd448 100644 --- a/boltconn/Cargo.toml +++ b/boltconn/Cargo.toml @@ -39,9 +39,10 @@ network-interface = "2.0.0" nix = { version = "0.29.0", features = ["user", "fs"] } rand = { version = "0.8.5", features = ["small_rng"] } regex = "1.7.0" +sharded-slab = "0.1.7" socket2 = { version = "0.5.1", features = ["all"] } thiserror = "1.0.37" -tokio = { version = "1.32.0", features = [ +tokio = { version = "1.40.0", features = [ "rt", "rt-multi-thread", "net", diff --git a/boltconn/src/common/id_gen.rs b/boltconn/src/common/id_gen.rs deleted file mode 100644 index 7285754..0000000 --- a/boltconn/src/common/id_gen.rs +++ /dev/null @@ -1,10 +0,0 @@ -use std::sync::atomic::{AtomicU64, Ordering}; - -#[derive(Default, Debug)] -pub struct IdGenerator(AtomicU64); - -impl IdGenerator { - pub fn get(&self) -> u64 { - self.0.fetch_add(1, Ordering::Relaxed) - } -} diff --git a/boltconn/src/common/mod.rs b/boltconn/src/common/mod.rs index 1a19576..6247497 100644 --- a/boltconn/src/common/mod.rs +++ b/boltconn/src/common/mod.rs @@ -19,8 +19,8 @@ pub mod client_hello; pub mod duplex_chan; pub mod evictable_vec; pub mod host_matcher; -pub mod id_gen; mod sync; +pub mod utils; pub use sync::{local_async_run, AbortCanary}; @@ -45,6 +45,7 @@ impl StreamOutboundTrait for tokio::net::windows::named_pipe::NamedPipeServer {} impl StreamOutboundTrait for tokio::net::UnixStream {} pub const MAX_PKT_SIZE: usize = 65576; +pub const MAX_UDP_PKT_SIZE: usize = 1518; pub async fn read_to_bytes_mut( buf: &mut BytesMut, diff --git a/boltconn/src/common/utils.rs b/boltconn/src/common/utils.rs new file mode 100644 index 0000000..4d51e8b --- /dev/null +++ b/boltconn/src/common/utils.rs @@ -0,0 +1,53 @@ +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant}; + +#[derive(Default, Debug)] +pub struct IdGenerator(AtomicU64); + +impl IdGenerator { + pub fn get(&self) -> u64 { + self.0.fetch_add(1, Ordering::Relaxed) + } +} + +#[allow(unused)] +pub struct TputMeasurement { + inner: std::sync::Mutex, +} + +struct TputMeasurementInner { + pub accum_bytes: u64, + pub last_time: Instant, + pub interval: Duration, +} + +impl TputMeasurement { + pub fn new(interval: Duration) -> Self { + Self { + inner: std::sync::Mutex::new(TputMeasurementInner { + accum_bytes: 0, + last_time: Instant::now(), + interval, + }), + } + } + + pub fn update(&self, bytes: usize) -> Option { + let mut inner = self.inner.lock().unwrap(); + inner.accum_bytes += bytes as u64; + let now = Instant::now(); + let elapsed = now - inner.last_time; + if elapsed >= inner.interval { + let tput = inner.accum_bytes as f64 / elapsed.as_secs_f64(); + inner.accum_bytes = 0; + inner.last_time = now; + Some(tput) + } else { + None + } + } + + pub fn update_to_mbps(&self, bytes: usize) -> Option { + self.update(bytes).map(|tput| tput * 8.0 / 1_000_000.0) + } +} diff --git a/boltconn/src/intercept/http_intercept.rs b/boltconn/src/intercept/http_intercept.rs index b293961..ce657e7 100644 --- a/boltconn/src/intercept/http_intercept.rs +++ b/boltconn/src/intercept/http_intercept.rs @@ -1,6 +1,6 @@ use crate::adapter::{Connector, Outbound}; use crate::common::duplex_chan::DuplexChan; -use crate::common::id_gen::IdGenerator; +use crate::common::utils::IdGenerator; use crate::intercept::modifier::Modifier; use crate::intercept::{HyperBody, ModifierContext}; use crate::proxy::error::InterceptError; diff --git a/boltconn/src/intercept/https_intercept.rs b/boltconn/src/intercept/https_intercept.rs index 8f58a80..0f0127c 100644 --- a/boltconn/src/intercept/https_intercept.rs +++ b/boltconn/src/intercept/https_intercept.rs @@ -2,7 +2,7 @@ use crate::adapter::{Connector, Outbound}; use crate::common::client_hello::get_overrider; use crate::common::create_tls_connector; use crate::common::duplex_chan::DuplexChan; -use crate::common::id_gen::IdGenerator; +use crate::common::utils::IdGenerator; use crate::intercept::modifier::Modifier; use crate::intercept::{sign_site_cert, HyperBody, ModifierContext}; use crate::proxy::error::InterceptError; diff --git a/boltconn/src/transport/smol.rs b/boltconn/src/transport/smol.rs index ea289d5..f1e9cdd 100644 --- a/boltconn/src/transport/smol.rs +++ b/boltconn/src/transport/smol.rs @@ -15,7 +15,7 @@ use hickory_proto::TokioTime; use hickory_resolver::name_server::RuntimeProvider; use hickory_resolver::TokioHandle; use rand::Rng; -use smoltcp::iface::{Interface, SocketHandle, SocketSet}; +use smoltcp::iface::{Interface, PollResult, SocketHandle, SocketSet}; use smoltcp::phy::{Device, DeviceCapabilities, Medium, RxToken, TxToken}; use smoltcp::socket::{ tcp::Socket as SmolTcpSocket, udp::PacketBuffer as UdpSocketBuffer, @@ -427,7 +427,7 @@ impl SmolStack { } } - pub fn drive_iface(&mut self) -> bool { + pub fn drive_iface(&mut self) -> PollResult { self.iface.poll( SmolInstant::now(), &mut self.ip_device, @@ -750,6 +750,7 @@ impl Device for VirtualIpDevice { VirtualRxToken { buf: item }, VirtualTxToken { sender: self.outbound.clone(), + buf: BytesMut::with_capacity(self.mtu), }, )), Err(_) => None, @@ -764,8 +765,10 @@ impl Device for VirtualIpDevice { { Some(VirtualTxToken { sender: self.outbound.clone(), + buf: BytesMut::with_capacity(self.mtu), }) } else { + tracing::trace!("smol: VirtIP Device allocate TxToken failed"); None } } @@ -783,24 +786,29 @@ pub struct VirtualRxToken { } impl RxToken for VirtualRxToken { - fn consume R>(mut self, f: F) -> R { + fn consume R>(mut self, f: F) -> R { f(&mut self.buf) } } pub struct VirtualTxToken { sender: flume::Sender, + buf: BytesMut, } impl TxToken for VirtualTxToken { fn consume R>(self, len: usize, f: F) -> R { - let mut buf = BytesMut::with_capacity(len); + let mut buf = self.buf; + debug_assert!(len <= buf.capacity()); // Safety: f exactly writes _len_ bytes, so all bytes are initialized. unsafe { buf.set_len(len); } let r = f(&mut buf); - let _ = self.sender.send(buf); + let send_result = self.sender.try_send(buf); + if send_result.is_err() { + tracing::trace!("smol: VirtIP Device Tx send failed"); + } r } } diff --git a/boltconn/src/transport/wireguard.rs b/boltconn/src/transport/wireguard.rs index 6cbf2e5..316204a 100644 --- a/boltconn/src/transport/wireguard.rs +++ b/boltconn/src/transport/wireguard.rs @@ -1,14 +1,14 @@ -use crate::common::io_err; -use crate::common::MAX_PKT_SIZE; +use crate::common::{io_err, local_async_run, MAX_PKT_SIZE, MAX_UDP_PKT_SIZE}; use crate::config::DnsPreference; use crate::network::dns::Dns; use crate::proxy::error::TransportError; use crate::proxy::NetworkAddr; -use crate::transport::AdapterOrSocket; +use crate::transport::{AdapterOrSocket, UdpSocketAdapter}; use boringtun::noise::errors::WireGuardError; use boringtun::noise::{Tunn, TunnResult}; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use hickory_resolver::config::ResolverConfig; +use sharded_slab::Pool; use std::fmt::{Debug, Formatter}; use std::hash::{Hash, Hasher}; use std::io; @@ -69,6 +69,16 @@ impl Hash for WireguardConfig { } } +enum AdapterOrChannel { + Adapter(Arc), + Channel(flume::Sender, flume::Receiver>>), +} + +enum BufferIndex { + Pool(usize), + Raw(T), +} + /// Wireguard Tunnel, with only one peer. pub struct WireguardTunnel { tunnel: tokio::sync::Mutex, @@ -76,10 +86,11 @@ pub struct WireguardTunnel { } struct WireguardTunnelInner { - outbound: AdapterOrSocket, + outbound: AdapterOrChannel, /// remote address on the genuine Internet endpoint: SocketAddr, smol_notify: Arc, + inbound_buf_pool: Arc>>, reserved: Option<[u8; 3]>, } @@ -111,12 +122,76 @@ impl WireguardTunnel { 13, None, ); + let buf_pool = Arc::new({ + let pool = Pool::>::new(); + // allocate memory in advance + const INIT_POOL_SIZE: usize = 4096; + let mut index_arr = [0; INIT_POOL_SIZE]; + for entry in index_arr.iter_mut() { + *entry = pool + .create_with(|x| { + x.resize(MAX_PKT_SIZE, 0); + }) + .expect("slab pool initialization failed"); + } + for entry in index_arr.iter() { + pool.clear(*entry); + } + pool + }); Ok(Self { tunnel: tokio::sync::Mutex::new(tunnel), inner: WireguardTunnelInner { - outbound, + outbound: match outbound { + AdapterOrSocket::Adapter(a) => AdapterOrChannel::Adapter(a), + AdapterOrSocket::Socket(s) => { + let (out_tx, out_rx) = flume::bounded::(4096); + let (in_tx, in_rx) = flume::bounded::>>(4096); + let socket = Arc::new(s); + let socket_clone = socket.clone(); + let pool = buf_pool.clone(); + local_async_run(async move { + // dedicated to poll UDP from small kernel buffer + loop { + let key = match pool.clone().create_owned() { + Some(mut buf) => { + let key = BufferIndex::Pool(buf.key()); + buf.resize(MAX_UDP_PKT_SIZE, 0); + let Ok(len) = socket.recv(&mut buf).await else { + break; + }; + buf.resize(len, 0); + key + } + None => { + let mut buf = vec![0; MAX_UDP_PKT_SIZE]; + let Ok(len) = socket.recv(&mut buf).await else { + break; + }; + buf.resize(len, 0); + BufferIndex::Raw(buf) + } + }; + if let Err(err) = in_tx.try_send(key) { + if let BufferIndex::Pool(key) = err.into_inner() { + pool.clear(key); + } + tracing::warn!("channel full, dropping packet"); + } + } + }); + tokio::spawn(async move { + while let Ok(data) = out_rx.recv_async().await { + socket_clone.send(&data).await?; + } + Ok::<(), io::Error>(()) + }); + AdapterOrChannel::Channel(out_tx, in_rx) + } + }, endpoint, smol_notify, + inbound_buf_pool: buf_pool, reserved: config.reserved, }, }) @@ -252,18 +327,43 @@ impl WireguardTunnelInner { } } match &self.outbound { - AdapterOrSocket::Adapter(a) => { + AdapterOrChannel::Adapter(a) => { a.send_to(data, NetworkAddr::Raw(self.endpoint)).await?; Ok(data.len()) } - AdapterOrSocket::Socket(s) => Ok(s.send(data).await?), + AdapterOrChannel::Channel(c, _) => { + let data = Bytes::copy_from_slice(data); + let len = data.len(); + let _ = c.send(data); + Ok(len) + } } } async fn outbound_recv(&self, data: &mut [u8]) -> Result { match &self.outbound { - AdapterOrSocket::Adapter(a) => Ok(a.recv_from(data).await?.0), - AdapterOrSocket::Socket(s) => Ok(s.recv(data).await?), + AdapterOrChannel::Adapter(a) => Ok(a.recv_from(data).await?.0), + AdapterOrChannel::Channel(_, c) => { + let index = c + .recv_async() + .await + .map_err(|_| io_err("WireGuard outbound channel closed"))?; + Ok(match index { + BufferIndex::Pool(key) => { + // panic safety: if key is invalid, there will be an obvious bug in the code, so we need to panic. + let buf = self.inbound_buf_pool.get(key).expect("pool key not found"); + let len = buf.len(); + (data[..len]).copy_from_slice(&buf); + self.inbound_buf_pool.clear(key); + len + } + BufferIndex::Raw(buf) => { + let len = buf.len(); + (data[..len]).copy_from_slice(&buf); + len + } + }) + } } } }