Skip to content

Commit

Permalink
[Feat]: Dymanic TCP Receiving Window Resizing (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
XOR-op authored Jul 12, 2024
2 parents 51da53d + 277f418 commit 7183c9d
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 10 deletions.
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ debug = true

[patch.crates-io]
rustls = { git = "https://github.com/XOR-op/rustls.delta.git", branch = "unofficial-rel-0.23" }
smoltcp = { git = "https://github.com/XOR-op/smoltcp.git", branch = "resize-recv-buffer" }
6 changes: 3 additions & 3 deletions boltconn/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ brotli = "3.4.0"
flate2 = "1.0.28"
http-body = "1.0.0"
http-body-util = "0.1.1"
hyper = { version = "1.2.0", features = ["server", "client", "http1", "http2" ] }
hyper = { version = "1.2.0", features = ["server", "client", "http1", "http2"] }
hyper-util = "0.1.3"
ja-tools = {git = "https://github.com/XOR-op/ja-tools.git", branch = "main"}
ja-tools = { git = "https://github.com/XOR-op/ja-tools.git", branch = "main" }
rcgen = { version = "0.11.0", features = ["pem", "x509-parser"] }
rquickjs = { version = "0.4.0-beta.4", features = ["bindgen", "futures", "macro", "classes"] }
rusqlite = { version = "0.29.0", features = ["bundled"] }
Expand All @@ -78,7 +78,7 @@ fast-socks5 = "0.9.1"
boringtun = "0.6.0"
sha2 = "0.10.8"
shadowsocks = { version = "1.16.0", default-features = false }
smoltcp = "0.11.0"
smoltcp = { version = "0.11.0", features = ["socket-tcp-cubic"] }
# Command line
clap = { version = "4.4.6", features = ["derive"] }
clap_complete = "4.4.3"
Expand Down
114 changes: 109 additions & 5 deletions boltconn/src/transport/smol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,82 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, Mutex, Notify};

const TCP_RECV_BUF_SIZE: usize = 1024 * 1024;
const TCP_SEND_BUF_SIZE: usize = 256 * 1024;
struct TcpTuning {
estimated_rtt: Duration,
last_time: Instant,
accum_bytes: usize,
// saturated usage of bandwidth in recent RTTs
recent_business: u32,
}
impl TcpTuning {
pub const TCP_REV_BUF_INIT: usize = 128 * 1024;
pub const TCP_RCV_BUF_MAX: usize = 4 * 1024 * 1024;
pub const TCP_SND_BUF_INIT: usize = 128 * 1024;
pub const DEFAULT_WINDOW_SCALE: u8 = 8;
const MULTI_GROW_THRESHOLD: usize = 1024 * 1024;
const LINEAR_GROW_STEP: usize = 1024 * 1024;
const RECENCY_THRESHOLD: u32 = 6;

/// Based on current size and usage, compute whether there is a need to increase the buffer.
pub fn increase_tcp_rev_buf(
&mut self,
transferred_bytes: usize,
recv_win_size: usize,
) -> Option<usize> {
let elapsed = Instant::now() - self.last_time;
let last_rtt_usage = self.accum_bytes + transferred_bytes;
if elapsed < self.estimated_rtt && last_rtt_usage < recv_win_size {
self.accum_bytes += transferred_bytes;
return None;
}

self.accum_bytes = 0;
self.last_time = Instant::now();

// (1) Update estimated RTT
// We only decrease the estimated RTT here since the estimation from DRS is a lower-bound.
if elapsed < self.estimated_rtt {
debug_assert!(last_rtt_usage >= recv_win_size);
// smooth the RTT estimation
self.estimated_rtt = self.estimated_rtt.mul_f32(0.3) + elapsed.mul_f32(0.7);
}

// (2) Update receive window size
if recv_win_size < Self::MULTI_GROW_THRESHOLD {
// (2.1) if buffer is small enough, grow it by 2x
if 3 * last_rtt_usage > recv_win_size {
let new_size = recv_win_size * 2;
return Some(new_size);
}
} else if recv_win_size < Self::TCP_RCV_BUF_MAX {
if last_rtt_usage + Self::LINEAR_GROW_STEP > recv_win_size
&& self.recent_business > Self::RECENCY_THRESHOLD
{
// (2.2) if buffer is large enough, grow it only by 1MB instead of 2x
let new_size = recv_win_size + Self::LINEAR_GROW_STEP;
self.recent_business = 0;
return Some(new_size);
} else if last_rtt_usage * 2 > recv_win_size {
// (2.3) enough bandwidth usage means higher possibility of heavy traffic
if self.recent_business < (Self::RECENCY_THRESHOLD + 2) {
self.recent_business += 1;
}
} else {
self.recent_business = self.recent_business.saturating_sub(3);
}
}
None
}

pub fn new(rtt: Duration) -> Self {
Self {
estimated_rtt: rtt,
last_time: Instant::now(),
accum_bytes: 0,
recent_business: 0,
}
}
}

#[derive(Debug, Copy, Clone)]
enum SmolError {
Expand All @@ -50,6 +124,7 @@ struct TcpConnTask {
remain_to_send: Option<(Bytes, usize)>, // buffer, start_offset
start_timestamp: Instant,
half_close_timeout: Option<Instant>,
tcp_tuning: TcpTuning,
}

impl TcpConnTask {
Expand Down Expand Up @@ -79,6 +154,7 @@ impl TcpConnTask {
remain_to_send: None,
start_timestamp: Instant::now(),
half_close_timeout: None,
tcp_tuning: TcpTuning::new(Duration::from_millis(200)),
}
}

Expand Down Expand Up @@ -118,18 +194,42 @@ impl TcpConnTask {
Ok(has_activity)
}

pub async fn try_recv(&self, socket: &mut SmolTcpSocket<'_>) -> bool {
pub async fn try_recv(&mut self, socket: &mut SmolTcpSocket<'_>) -> bool {
// Receive data
let mut has_activity = false;
let mut accum_bytes = 0;
while socket.can_recv() && self.back_tx.capacity() > 0 {
let mut buf = BytesMut::with_capacity(MAX_PKT_SIZE);
if let Ok(size) = socket.recv_slice(unsafe { mut_buf(&mut buf) }) {
unsafe { buf.advance_mut(size) };
// must not fail because there is only 1 sender
let _ = self.back_tx.send(buf.freeze()).await;
accum_bytes += size;
has_activity = true;
}
}
if has_activity {
// try to increase rcv_buf size when possibilities of full buffer become significant
let cur_capacity = socket.recv_capacity();
if let Some(new_cap) = self
.tcp_tuning
.increase_tcp_rev_buf(accum_bytes, cur_capacity)
{
let new_buf = vec![0u8; new_cap];
if socket
.replace_recv_buffer(TcpSocketBuffer::new(new_buf))
.is_err()
{
tracing::error!(
"smol failed to increase TCP recv buffer size: old={}, new={}, window_scale={}",
cur_capacity,
new_cap,
socket.local_recv_win_scale()
);
}
}
}

has_activity
}
}
Expand Down Expand Up @@ -356,13 +456,17 @@ impl SmolStack {
remote_addr: SocketAddr,
) -> io::Result<SocketHandle> {
// create socket resource
let tx_buf = TcpSocketBuffer::new(vec![0u8; TCP_SEND_BUF_SIZE]);
let rx_buf = TcpSocketBuffer::new(vec![0u8; TCP_RECV_BUF_SIZE]);
let tx_buf = TcpSocketBuffer::new(vec![0u8; TcpTuning::TCP_SND_BUF_INIT]);
let rx_buf = TcpSocketBuffer::new(vec![0u8; TcpTuning::TCP_REV_BUF_INIT]);
let mut client_socket = SmolTcpSocket::new(rx_buf, tx_buf);
// Since we are behind kernel's TCP/IP stack, no second Nagle is needed.
client_socket.set_nagle_enabled(false);
client_socket.set_ack_delay(None);

client_socket
.set_local_recv_win_scale(TcpTuning::DEFAULT_WINDOW_SCALE)
.expect("set_local_recv_win_scale");

// connect to remote
client_socket
.connect(
Expand Down

0 comments on commit 7183c9d

Please sign in to comment.