From 0b74480dc3c82280a72b1fd3c62fcf54e7ca76fc Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Fri, 19 Apr 2024 18:33:26 -0700 Subject: [PATCH 1/3] sleep instead of drop when stream rate exceeded limit; Consider connection count of staked nodes when calculating allowed PPS remove rtt from throttle_duration calculation removed connection count in StreamerCounter -- we do not need it at this point --- streamer/src/nonblocking/quic.rs | 49 ++++++++++------- streamer/src/nonblocking/stream_throttle.rs | 58 ++++++++++++++++----- 2 files changed, 75 insertions(+), 32 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 12361b22f3b345..2367c5d63b331e 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1,7 +1,7 @@ use { crate::{ nonblocking::stream_throttle::{ - ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_STOP_CODE_THROTTLING, + ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_THROTTLING_INTERVAL, STREAM_THROTTLING_INTERVAL_MS, }, quic::{configure_server, QuicServerError, StreamStats}, @@ -55,7 +55,7 @@ use { // introduce any other awaits while holding the RwLock. sync::{Mutex, MutexGuard}, task::JoinHandle, - time::timeout, + time::{sleep, timeout}, }, }; @@ -825,25 +825,36 @@ async fn handle_connection( params.total_stake, ); - stream_counter.reset_throttling_params_if_needed(); - if stream_counter.stream_count.load(Ordering::Relaxed) - >= max_streams_per_throttling_interval - { - stats.throttled_streams.fetch_add(1, Ordering::Relaxed); - match params.peer_type { - ConnectionPeerType::Unstaked => { - stats - .throttled_unstaked_streams - .fetch_add(1, Ordering::Relaxed); - } - ConnectionPeerType::Staked(_) => { - stats - .throttled_staked_streams - .fetch_add(1, Ordering::Relaxed); + let throttle_interval_start = + stream_counter.reset_throttling_params_if_needed(); + let streams_read_in_throttle_interval = + stream_counter.stream_count.load(Ordering::Relaxed); + if streams_read_in_throttle_interval >= max_streams_per_throttling_interval { + // The peer is sending faster than we're willing to read. Sleep for what's + // left of this read interval so the peer backs off. + let throttle_duration = STREAM_THROTTLING_INTERVAL + .saturating_sub(throttle_interval_start.elapsed()); + + if !throttle_duration.is_zero() { + debug!("Throttling stream from {remote_addr:?}, peer type: {:?}, total stake: {}, \ + max_streams_per_interval: {max_streams_per_throttling_interval}, read_interval_streams: {streams_read_in_throttle_interval} \ + throttle_duration: {throttle_duration:?}", + params.peer_type, params.total_stake); + stats.throttled_streams.fetch_add(1, Ordering::Relaxed); + match params.peer_type { + ConnectionPeerType::Unstaked => { + stats + .throttled_unstaked_streams + .fetch_add(1, Ordering::Relaxed); + } + ConnectionPeerType::Staked(_) => { + stats + .throttled_staked_streams + .fetch_add(1, Ordering::Relaxed); + } } + sleep(throttle_duration).await; } - let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING)); - continue; } stream_load_ema.increment_load(params.peer_type); stream_counter.stream_count.fetch_add(1, Ordering::Relaxed); diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index 0497c6993d12e2..51bed53b6a4835 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -13,7 +13,8 @@ use { const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20; pub const STREAM_THROTTLING_INTERVAL_MS: u64 = 100; -pub const STREAM_STOP_CODE_THROTTLING: u32 = 15; +pub const STREAM_THROTTLING_INTERVAL: Duration = + Duration::from_millis(STREAM_THROTTLING_INTERVAL_MS); const STREAM_LOAD_EMA_INTERVAL_MS: u64 = 5; const STREAM_LOAD_EMA_INTERVAL_COUNT: u64 = 10; const EMA_WINDOW_MS: u64 = STREAM_LOAD_EMA_INTERVAL_MS * STREAM_LOAD_EMA_INTERVAL_COUNT; @@ -208,19 +209,24 @@ impl ConnectionStreamCounter { } } - pub(crate) fn reset_throttling_params_if_needed(&self) { - const THROTTLING_INTERVAL: Duration = Duration::from_millis(STREAM_THROTTLING_INTERVAL_MS); - if tokio::time::Instant::now().duration_since(*self.last_throttling_instant.read().unwrap()) - > THROTTLING_INTERVAL + /// Reset the counter and last throttling instant and + /// return last_throttling_instant regardless it is reset or not. + pub(crate) fn reset_throttling_params_if_needed(&self) -> tokio::time::Instant { + let last_throttling_instant = *self.last_throttling_instant.read().unwrap(); + if tokio::time::Instant::now().duration_since(last_throttling_instant) + > STREAM_THROTTLING_INTERVAL { let mut last_throttling_instant = self.last_throttling_instant.write().unwrap(); // Recheck as some other thread might have done throttling since this thread tried to acquire the write lock. if tokio::time::Instant::now().duration_since(*last_throttling_instant) - > THROTTLING_INTERVAL + > STREAM_THROTTLING_INTERVAL { *last_throttling_instant = tokio::time::Instant::now(); self.stream_count.store(0, Ordering::Relaxed); } + *last_throttling_instant + } else { + last_throttling_instant } } } @@ -253,6 +259,7 @@ pub mod test { load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Unstaked, 10000, + 1, ), 10 ); @@ -281,6 +288,7 @@ pub mod test { load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), 10000, + 1, ), 30 ); @@ -291,6 +299,7 @@ pub mod test { load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), 10000, + 1, ), 2000 ); @@ -302,6 +311,7 @@ pub mod test { load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), 10000, + 1, ), 120 ); @@ -312,6 +322,7 @@ pub mod test { load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), 10000, + 1, ), 8000 ); @@ -324,6 +335,7 @@ pub mod test { load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), 10000, + 1, ), 120 ); @@ -333,6 +345,7 @@ pub mod test { load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), 10000, + 1, ), 8000 ); @@ -343,6 +356,7 @@ pub mod test { load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1), 40000, + 1, ), load_ema .max_unstaked_load_in_throttling_window @@ -372,7 +386,8 @@ pub mod test { assert!( (46u64..=47).contains(&load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), - 10000 + 10000, + 1, )) ); @@ -381,7 +396,8 @@ pub mod test { assert!((3124u64..=3125).contains( &load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), - 10000 + 10000, + 1, ) )); @@ -391,7 +407,8 @@ pub mod test { assert!( (92u64..=94).contains(&load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), - 10000 + 10000, + 1, )) ); @@ -400,7 +417,8 @@ pub mod test { assert!((6248u64..=6250).contains( &load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), - 10000 + 10000, + 1, ) )); @@ -411,7 +429,8 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), - 10000 + 10000, + 1, ), 150 ); @@ -420,17 +439,30 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), - 10000 + 10000, + 1, ), 10000 ); + // function = ((12.5K * 12.5K) / 25% of 12.5K) * stake / total_stake / connection_count + // as there are more than 1 connection from the staked node + assert_eq!( + load_ema.available_load_capacity_in_throttling_duration( + ConnectionPeerType::Staked(1000), + 10000, + 4, + ), + 10000 / 4 + ); + // At 1/400000 stake weight, and minimum load, it should still allow // max_unstaked_load_in_throttling_window + 1 streams. assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1), - 400000 + 400000, + 1, ), load_ema .max_unstaked_load_in_throttling_window From f94bde2fb4acef1dfa48e621399ac4ce5a417c02 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Mon, 22 Apr 2024 13:08:04 -0700 Subject: [PATCH 2/3] remove connection count related changes -- they are unrelated to this PR --- streamer/src/nonblocking/stream_throttle.rs | 56 ++++++--------------- 1 file changed, 15 insertions(+), 41 deletions(-) diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index 51bed53b6a4835..6656752bca1f7c 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -258,8 +258,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Unstaked, - 10000, - 1, + 10000 ), 10 ); @@ -287,8 +286,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), - 10000, - 1, + 10000 ), 30 ); @@ -298,8 +296,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), - 10000, - 1, + 10000 ), 2000 ); @@ -310,8 +307,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), - 10000, - 1, + 10000 ), 120 ); @@ -321,8 +317,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), - 10000, - 1, + 10000 ), 8000 ); @@ -334,8 +329,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), - 10000, - 1, + 10000 ), 120 ); @@ -344,8 +338,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), - 10000, - 1, + 10000 ), 8000 ); @@ -355,8 +348,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1), - 40000, - 1, + 40000 ), load_ema .max_unstaked_load_in_throttling_window @@ -386,8 +378,7 @@ pub mod test { assert!( (46u64..=47).contains(&load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), - 10000, - 1, + 10000 )) ); @@ -396,8 +387,7 @@ pub mod test { assert!((3124u64..=3125).contains( &load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), - 10000, - 1, + 10000 ) )); @@ -407,8 +397,7 @@ pub mod test { assert!( (92u64..=94).contains(&load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), - 10000, - 1, + 10000 )) ); @@ -417,8 +406,7 @@ pub mod test { assert!((6248u64..=6250).contains( &load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), - 10000, - 1, + 10000 ) )); @@ -429,8 +417,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), - 10000, - 1, + 10000 ), 150 ); @@ -439,30 +426,17 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), - 10000, - 1, + 10000 ), 10000 ); - // function = ((12.5K * 12.5K) / 25% of 12.5K) * stake / total_stake / connection_count - // as there are more than 1 connection from the staked node - assert_eq!( - load_ema.available_load_capacity_in_throttling_duration( - ConnectionPeerType::Staked(1000), - 10000, - 4, - ), - 10000 / 4 - ); - // At 1/400000 stake weight, and minimum load, it should still allow // max_unstaked_load_in_throttling_window + 1 streams. assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1), - 400000, - 1, + 400000 ), load_ema .max_unstaked_load_in_throttling_window From 28469a9dd9fe55cde63102d3f6b5431b9999b504 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Mon, 22 Apr 2024 13:29:34 -0700 Subject: [PATCH 3/3] revert unintended changes --- streamer/src/nonblocking/stream_throttle.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs index 6656752bca1f7c..699d8d7faf33fb 100644 --- a/streamer/src/nonblocking/stream_throttle.rs +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -258,7 +258,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Unstaked, - 10000 + 10000, ), 10 ); @@ -286,7 +286,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), - 10000 + 10000, ), 30 ); @@ -296,7 +296,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), - 10000 + 10000, ), 2000 ); @@ -307,7 +307,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), - 10000 + 10000, ), 120 ); @@ -317,7 +317,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), - 10000 + 10000, ), 8000 ); @@ -329,7 +329,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(15), - 10000 + 10000, ), 120 ); @@ -338,7 +338,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1000), - 10000 + 10000, ), 8000 ); @@ -348,7 +348,7 @@ pub mod test { assert_eq!( load_ema.available_load_capacity_in_throttling_duration( ConnectionPeerType::Staked(1), - 40000 + 40000, ), load_ema .max_unstaked_load_in_throttling_window