From 6c63e9af7395224cc00ae07667cb809ea922c95b Mon Sep 17 00:00:00 2001 From: XOR-op <17672363+XOR-op@users.noreply.github.com> Date: Fri, 12 Jul 2024 01:34:44 -0400 Subject: [PATCH 1/3] feat: dynamic receive window resizing --- Cargo.lock | 3 +- Cargo.toml | 1 + boltconn/Cargo.toml | 6 +- boltconn/src/transport/smol.rs | 121 +++++++++++++++++++++++++++++++-- 4 files changed, 121 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e2a1cc6..4b9e0c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3481,8 +3481,7 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "smoltcp" version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a1a996951e50b5971a2c8c0fa05a381480d70a933064245c4a223ddc87ccc97" +source = "git+https://github.com/XOR-op/smoltcp.git?branch=resize-recv-buffer#c92074f7c3572fe11472cf783fbe11e0abe7470a" dependencies = [ "bitflags 1.3.2", "byteorder", diff --git a/Cargo.toml b/Cargo.toml index c4bb1c7..a38ff2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,4 @@ debug = true [patch.crates-io] rustls = { git = "https://github.com/XOR-op/rustls.delta.git", branch = "unofficial-rel-0.23" } +smoltcp = { git = "https://github.com/XOR-op/smoltcp.git", branch = "resize-recv-buffer" } diff --git a/boltconn/Cargo.toml b/boltconn/Cargo.toml index 03197f8..40ee2ce 100644 --- a/boltconn/Cargo.toml +++ b/boltconn/Cargo.toml @@ -64,9 +64,9 @@ brotli = "3.4.0" flate2 = "1.0.28" http-body = "1.0.0" http-body-util = "0.1.1" -hyper = { version = "1.2.0", features = ["server", "client", "http1", "http2" ] } +hyper = { version = "1.2.0", features = ["server", "client", "http1", "http2"] } hyper-util = "0.1.3" -ja-tools = {git = "https://github.com/XOR-op/ja-tools.git", branch = "main"} +ja-tools = { git = "https://github.com/XOR-op/ja-tools.git", branch = "main" } rcgen = { version = "0.11.0", features = ["pem", "x509-parser"] } rquickjs = { version = "0.4.0-beta.4", features = ["bindgen", "futures", "macro", "classes"] } rusqlite = { version = "0.29.0", features = ["bundled"] } @@ -78,7 +78,7 @@ fast-socks5 = "0.9.1" boringtun = "0.6.0" sha2 = "0.10.8" shadowsocks = { version = "1.16.0", default-features = false } -smoltcp = "0.11.0" +smoltcp = { version = "0.11.0", features = ["socket-tcp-cubic"] } # Command line clap = { version = "4.4.6", features = ["derive"] } clap_complete = "4.4.3" diff --git a/boltconn/src/transport/smol.rs b/boltconn/src/transport/smol.rs index 3025a22..a62da8c 100644 --- a/boltconn/src/transport/smol.rs +++ b/boltconn/src/transport/smol.rs @@ -33,8 +33,89 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use tokio::sync::{mpsc, Mutex, Notify}; -const TCP_RECV_BUF_SIZE: usize = 1024 * 1024; -const TCP_SEND_BUF_SIZE: usize = 256 * 1024; +struct TcpTuning { + estimated_rtt: Duration, + last_time: Instant, + accum_bytes: usize, + window_size: usize, + // saturated usage of bandwidth in recent RTTs + recent_business: u32, +} +impl TcpTuning { + pub const TCP_REV_BUF_INIT: usize = 128 * 1024; + pub const TCP_RCV_BUF_MAX: usize = 4 * 1024 * 1024; + pub const TCP_SND_BUF_INIT: usize = 128 * 1024; + pub const DEFAULT_WINDOW_SCALE: u8 = 8; + const MULTI_GROW_THRESHOLD: usize = 1024 * 1024; + const LINEAR_GROW_STEP: usize = 1024 * 1024; + + /// Based on current size and usage, compute whether there is a need to increase the buffer. + pub fn increase_tcp_rev_buf( + &mut self, + transferred_bytes: usize, + recv_buf_size: usize, + ) -> Option { + let elapsed = Instant::now() - self.last_time; + if elapsed < self.estimated_rtt { + self.accum_bytes += transferred_bytes; + return None; + } + let last_rtt_usage = self.accum_bytes; + self.accum_bytes = 0; + self.last_time = Instant::now(); + + // if buffer is small enough, grow it by 2x + if recv_buf_size < Self::MULTI_GROW_THRESHOLD { + if 3 * last_rtt_usage > recv_buf_size { + let new_size = recv_buf_size * 2; + tracing::debug!( + "rwnd {}->{}, rtt={}", + recv_buf_size, + new_size, + self.estimated_rtt.as_millis() + ); + self.window_size = new_size; + return Some(new_size); + } + } else if recv_buf_size < Self::TCP_RCV_BUF_MAX { + if last_rtt_usage + Self::LINEAR_GROW_STEP > recv_buf_size && self.recent_business > 2 { + // if buffer is large enough, grow it only by 1MB instead of 2x + let new_size = recv_buf_size + Self::LINEAR_GROW_STEP; + tracing::debug!( + "rwnd {}->{}, rtt={}", + recv_buf_size, + new_size, + self.estimated_rtt.as_millis() + ); + self.window_size = new_size; + self.recent_business = 0; + return Some(new_size); + } else if last_rtt_usage > recv_buf_size * 1 / 3 && self.recent_business < 4 { + // enough bandwidth usage means higher possibility of heavy traffic + self.recent_business += 1; + } else { + self.recent_business = self.recent_business.saturating_sub(2); + } + } + tracing::trace!( + "last_usage={}, rwnd {}, rtt={}", + last_rtt_usage, + recv_buf_size, + self.estimated_rtt.as_millis() + ); + None + } + + pub fn new(rtt: Duration) -> Self { + Self { + estimated_rtt: rtt, + last_time: Instant::now(), + accum_bytes: 0, + window_size: Self::TCP_REV_BUF_INIT, + recent_business: 0, + } + } +} #[derive(Debug, Copy, Clone)] enum SmolError { @@ -50,6 +131,7 @@ struct TcpConnTask { remain_to_send: Option<(Bytes, usize)>, // buffer, start_offset start_timestamp: Instant, half_close_timeout: Option, + tcp_tuning: TcpTuning, } impl TcpConnTask { @@ -79,6 +161,7 @@ impl TcpConnTask { remain_to_send: None, start_timestamp: Instant::now(), half_close_timeout: None, + tcp_tuning: TcpTuning::new(Duration::from_millis(50)), } } @@ -118,18 +201,42 @@ impl TcpConnTask { Ok(has_activity) } - pub async fn try_recv(&self, socket: &mut SmolTcpSocket<'_>) -> bool { + pub async fn try_recv(&mut self, socket: &mut SmolTcpSocket<'_>) -> bool { // Receive data let mut has_activity = false; + let mut accum_bytes = 0; while socket.can_recv() && self.back_tx.capacity() > 0 { let mut buf = BytesMut::with_capacity(MAX_PKT_SIZE); if let Ok(size) = socket.recv_slice(unsafe { mut_buf(&mut buf) }) { unsafe { buf.advance_mut(size) }; // must not fail because there is only 1 sender let _ = self.back_tx.send(buf.freeze()).await; + accum_bytes += size; has_activity = true; } } + if has_activity { + // try to increase rcv_buf size when possibilities of full buffer become significant + let cur_capacity = socket.recv_capacity(); + if let Some(new_cap) = self + .tcp_tuning + .increase_tcp_rev_buf(accum_bytes, cur_capacity) + { + let new_buf = vec![0u8; new_cap]; + if socket + .replace_recv_buffer(TcpSocketBuffer::new(new_buf)) + .is_err() + { + tracing::error!( + "smol failed to increase TCP recv buffer size: old={}, new={}, window_scale={}", + cur_capacity, + new_cap, + socket.local_recv_win_scale() + ); + } + } + } + has_activity } } @@ -356,13 +463,17 @@ impl SmolStack { remote_addr: SocketAddr, ) -> io::Result { // create socket resource - let tx_buf = TcpSocketBuffer::new(vec![0u8; TCP_SEND_BUF_SIZE]); - let rx_buf = TcpSocketBuffer::new(vec![0u8; TCP_RECV_BUF_SIZE]); + let tx_buf = TcpSocketBuffer::new(vec![0u8; TcpTuning::TCP_SND_BUF_INIT]); + let rx_buf = TcpSocketBuffer::new(vec![0u8; TcpTuning::TCP_REV_BUF_INIT]); let mut client_socket = SmolTcpSocket::new(rx_buf, tx_buf); // Since we are behind kernel's TCP/IP stack, no second Nagle is needed. client_socket.set_nagle_enabled(false); client_socket.set_ack_delay(None); + client_socket + .set_local_recv_win_scale(TcpTuning::DEFAULT_WINDOW_SCALE) + .expect("set_local_recv_win_scale"); + // connect to remote client_socket .connect( From ee3c0cb0d02f58ee0a605e83ee8342752e18d93c Mon Sep 17 00:00:00 2001 From: XOR-op <17672363+XOR-op@users.noreply.github.com> Date: Fri, 12 Jul 2024 02:31:00 -0400 Subject: [PATCH 2/3] tune: improve rwnd resizing algorithm --- boltconn/src/transport/smol.rs | 69 +++++++++++++++------------------- 1 file changed, 31 insertions(+), 38 deletions(-) diff --git a/boltconn/src/transport/smol.rs b/boltconn/src/transport/smol.rs index a62da8c..818b56c 100644 --- a/boltconn/src/transport/smol.rs +++ b/boltconn/src/transport/smol.rs @@ -37,7 +37,6 @@ struct TcpTuning { estimated_rtt: Duration, last_time: Instant, accum_bytes: usize, - window_size: usize, // saturated usage of bandwidth in recent RTTs recent_business: u32, } @@ -48,61 +47,56 @@ impl TcpTuning { pub const DEFAULT_WINDOW_SCALE: u8 = 8; const MULTI_GROW_THRESHOLD: usize = 1024 * 1024; const LINEAR_GROW_STEP: usize = 1024 * 1024; + const RECENCY_THRESHOLD: u32 = 6; /// Based on current size and usage, compute whether there is a need to increase the buffer. pub fn increase_tcp_rev_buf( &mut self, transferred_bytes: usize, - recv_buf_size: usize, + recv_win_size: usize, ) -> Option { let elapsed = Instant::now() - self.last_time; - if elapsed < self.estimated_rtt { + let last_rtt_usage = self.accum_bytes + transferred_bytes; + if elapsed < self.estimated_rtt && last_rtt_usage < recv_win_size { self.accum_bytes += transferred_bytes; return None; } - let last_rtt_usage = self.accum_bytes; + self.accum_bytes = 0; self.last_time = Instant::now(); - // if buffer is small enough, grow it by 2x - if recv_buf_size < Self::MULTI_GROW_THRESHOLD { - if 3 * last_rtt_usage > recv_buf_size { - let new_size = recv_buf_size * 2; - tracing::debug!( - "rwnd {}->{}, rtt={}", - recv_buf_size, - new_size, - self.estimated_rtt.as_millis() - ); - self.window_size = new_size; + // (1) Update estimated RTT + // We only decrease the estimated RTT here since the estimation from DRS is a lower-bound. + if elapsed < self.estimated_rtt { + debug_assert!(last_rtt_usage >= recv_win_size); + // smooth the RTT estimation + self.estimated_rtt = self.estimated_rtt.mul_f32(0.3) + elapsed.mul_f32(0.7); + } + + // (2) Update receive window size + if recv_win_size < Self::MULTI_GROW_THRESHOLD { + // (2.1) if buffer is small enough, grow it by 2x + if 3 * last_rtt_usage > recv_win_size { + let new_size = recv_win_size * 2; return Some(new_size); } - } else if recv_buf_size < Self::TCP_RCV_BUF_MAX { - if last_rtt_usage + Self::LINEAR_GROW_STEP > recv_buf_size && self.recent_business > 2 { - // if buffer is large enough, grow it only by 1MB instead of 2x - let new_size = recv_buf_size + Self::LINEAR_GROW_STEP; - tracing::debug!( - "rwnd {}->{}, rtt={}", - recv_buf_size, - new_size, - self.estimated_rtt.as_millis() - ); - self.window_size = new_size; + } else if recv_win_size < Self::TCP_RCV_BUF_MAX { + if last_rtt_usage + Self::LINEAR_GROW_STEP > recv_win_size + && self.recent_business > Self::RECENCY_THRESHOLD + { + // (2.2) if buffer is large enough, grow it only by 1MB instead of 2x + let new_size = recv_win_size + Self::LINEAR_GROW_STEP; self.recent_business = 0; return Some(new_size); - } else if last_rtt_usage > recv_buf_size * 1 / 3 && self.recent_business < 4 { - // enough bandwidth usage means higher possibility of heavy traffic - self.recent_business += 1; + } else if last_rtt_usage * 2 > recv_win_size { + // (2.3) enough bandwidth usage means higher possibility of heavy traffic + if self.recent_business < (Self::RECENCY_THRESHOLD + 2) { + self.recent_business += 1; + } } else { - self.recent_business = self.recent_business.saturating_sub(2); + self.recent_business = self.recent_business.saturating_sub(3); } } - tracing::trace!( - "last_usage={}, rwnd {}, rtt={}", - last_rtt_usage, - recv_buf_size, - self.estimated_rtt.as_millis() - ); None } @@ -111,7 +105,6 @@ impl TcpTuning { estimated_rtt: rtt, last_time: Instant::now(), accum_bytes: 0, - window_size: Self::TCP_REV_BUF_INIT, recent_business: 0, } } @@ -161,7 +154,7 @@ impl TcpConnTask { remain_to_send: None, start_timestamp: Instant::now(), half_close_timeout: None, - tcp_tuning: TcpTuning::new(Duration::from_millis(50)), + tcp_tuning: TcpTuning::new(Duration::from_millis(200)), } } From 277f4187cc7f144cd4556416e055fe4aaf3a26b0 Mon Sep 17 00:00:00 2001 From: XOR-op <17672363+XOR-op@users.noreply.github.com> Date: Fri, 12 Jul 2024 13:32:51 -0400 Subject: [PATCH 3/3] chore: Cargo.lock --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 4b9e0c6..39ba8ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3481,7 +3481,7 @@ checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "smoltcp" version = "0.11.0" -source = "git+https://github.com/XOR-op/smoltcp.git?branch=resize-recv-buffer#c92074f7c3572fe11472cf783fbe11e0abe7470a" +source = "git+https://github.com/XOR-op/smoltcp.git?branch=resize-recv-buffer#1555b5e0c1145df92bfbbb4221d550d73efb3b34" dependencies = [ "bitflags 1.3.2", "byteorder",