diff --git a/streamer/src/nonblocking/mod.rs b/streamer/src/nonblocking/mod.rs index 31b583e9622dec..fa627e0944dc77 100644 --- a/streamer/src/nonblocking/mod.rs +++ b/streamer/src/nonblocking/mod.rs @@ -1,3 +1,4 @@ pub mod quic; pub mod recvmmsg; pub mod sendmmsg; +mod stream_throttle; diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 50c81ff509e938..6b49ab08ed4e72 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1,4 +1,5 @@ use { + super::stream_throttle::ConnectionStreamCounter, crate::{ quic::{configure_server, QuicServerError, StreamStats}, streamer::StakedNodes, @@ -49,7 +50,7 @@ use { // introduce any other awaits while holding the RwLock. sync::{Mutex, MutexGuard}, task::JoinHandle, - time::timeout, + time::{sleep, timeout}, }, }; @@ -72,7 +73,6 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4; const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many"; -const STREAM_STOP_CODE_THROTTLING: u32 = 15; /// Limit to 500K PPS pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 500; @@ -360,14 +360,16 @@ fn handle_and_cache_new_connection( remote_addr, ); - if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection( - ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), - remote_addr.port(), - Some(connection.clone()), - params.stake, - timing::timestamp(), - params.max_connections_per_peer, - ) { + if let Some((last_update, stream_exit, stream_counter)) = connection_table_l + .try_add_connection( + ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), + remote_addr.port(), + Some(connection.clone()), + params.stake, + timing::timestamp(), + params.max_connections_per_peer, + ) + { let peer_type = connection_table_l.peer_type; drop(connection_table_l); @@ -387,6 +389,7 @@ fn handle_and_cache_new_connection( wait_for_chunk_timeout, max_unstaked_connections, max_streams_per_ms, + stream_counter, )); Ok(()) } else { @@ -770,15 +773,6 @@ fn max_streams_for_connection_in_100ms( } } -fn reset_throttling_params_if_needed(last_instant: &mut tokio::time::Instant) -> bool { - if tokio::time::Instant::now().duration_since(*last_instant) > STREAM_THROTTLING_INTERVAL { - *last_instant = tokio::time::Instant::now(); - true - } else { - false - } -} - #[allow(clippy::too_many_arguments)] async fn handle_connection( connection: Connection, @@ -791,6 +785,7 @@ async fn handle_connection( wait_for_chunk_timeout: Duration, max_unstaked_connections: usize, max_streams_per_ms: u64, + stream_counter: Arc, ) { let stats = params.stats; debug!( @@ -801,41 +796,54 @@ async fn handle_connection( ); let stable_id = connection.stable_id(); stats.total_connections.fetch_add(1, Ordering::Relaxed); - let max_streams_per_100ms = max_streams_for_connection_in_100ms( + let max_streams_per_throttling_interval = max_streams_for_connection_in_100ms( peer_type, params.stake, params.total_stake, max_unstaked_connections, max_streams_per_ms, ); - let mut last_throttling_instant = tokio::time::Instant::now(); - let mut streams_in_current_interval = 0; + while !stream_exit.load(Ordering::Relaxed) { if let Ok(stream) = tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await { match stream { Ok(mut stream) => { - if reset_throttling_params_if_needed(&mut last_throttling_instant) { - streams_in_current_interval = 0; - } else if streams_in_current_interval >= max_streams_per_100ms { - stats.throttled_streams.fetch_add(1, Ordering::Relaxed); - match peer_type { - ConnectionPeerType::Unstaked => { - stats - .throttled_unstaked_streams - .fetch_add(1, Ordering::Relaxed); - } - ConnectionPeerType::Staked => { - stats - .throttled_staked_streams - .fetch_add(1, Ordering::Relaxed); + let throttle_interval_start: tokio::time::Instant = + stream_counter.reset_throttling_params_if_needed(); + let streams_read_in_throttle_interval = + stream_counter.stream_count.load(Ordering::Relaxed); + + if streams_read_in_throttle_interval >= max_streams_per_throttling_interval { + // The peer is sending faster than we're willing to read. Sleep for what's + // left of this read interval so the peer backs off. + let throttle_duration = STREAM_THROTTLING_INTERVAL + .saturating_sub(throttle_interval_start.elapsed()); + if !throttle_duration.is_zero() { + debug!("Throttling stream from {remote_addr:?}, peer type: {:?}, total stake: {}, \ + max_streams_per_interval: {max_streams_per_throttling_interval}, read_interval_streams: {streams_read_in_throttle_interval} \ + throttle_duration: {throttle_duration:?}", + peer_type, params.total_stake); + stats.throttled_streams.fetch_add(1, Ordering::Relaxed); + + match peer_type { + ConnectionPeerType::Unstaked => { + stats + .throttled_unstaked_streams + .fetch_add(1, Ordering::Relaxed); + } + ConnectionPeerType::Staked => { + stats + .throttled_staked_streams + .fetch_add(1, Ordering::Relaxed); + } } + sleep(throttle_duration).await; } - let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING)); - continue; } - streams_in_current_interval = streams_in_current_interval.saturating_add(1); + stream_counter.stream_count.fetch_add(1, Ordering::Relaxed); + stats.total_streams.fetch_add(1, Ordering::Relaxed); stats.total_new_streams.fetch_add(1, Ordering::Relaxed); let stream_exit = stream_exit.clone(); @@ -1041,6 +1049,7 @@ struct ConnectionEntry { last_update: Arc, port: u16, connection: Option, + stream_counter: Arc, } impl ConnectionEntry { @@ -1050,6 +1059,7 @@ impl ConnectionEntry { last_update: Arc, port: u16, connection: Option, + stream_counter: Arc, ) -> Self { Self { exit, @@ -1057,6 +1067,7 @@ impl ConnectionEntry { last_update, port, connection, + stream_counter, } } @@ -1167,7 +1178,11 @@ impl ConnectionTable { stake: u64, last_update: u64, max_connections_per_peer: usize, - ) -> Option<(Arc, Arc)> { + ) -> Option<( + Arc, + Arc, + Arc, + )> { let connection_entry = self.table.entry(key).or_default(); let has_connection_capacity = connection_entry .len() @@ -1177,15 +1192,20 @@ impl ConnectionTable { if has_connection_capacity { let exit = Arc::new(AtomicBool::new(false)); let last_update = Arc::new(AtomicU64::new(last_update)); + let stream_counter = connection_entry + .first() + .map(|entry| entry.stream_counter.clone()) + .unwrap_or(Arc::new(ConnectionStreamCounter::new())); connection_entry.push(ConnectionEntry::new( exit.clone(), stake, last_update.clone(), port, connection, + stream_counter.clone(), )); self.total_size += 1; - Some((last_update, exit)) + Some((last_update, exit, stream_counter)) } else { if let Some(connection) = connection { connection.close( diff --git a/streamer/src/nonblocking/stream_throttle.rs b/streamer/src/nonblocking/stream_throttle.rs new file mode 100644 index 00000000000000..3d520690ee0e44 --- /dev/null +++ b/streamer/src/nonblocking/stream_throttle.rs @@ -0,0 +1,47 @@ +use std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + RwLock, + }, + time::Duration, +}; + +pub const STREAM_THROTTLING_INTERVAL_MS: u64 = 100; +pub const STREAM_THROTTLING_INTERVAL: Duration = + Duration::from_millis(STREAM_THROTTLING_INTERVAL_MS); + +#[derive(Debug)] +pub(crate) struct ConnectionStreamCounter { + pub(crate) stream_count: AtomicU64, + last_throttling_instant: RwLock, +} + +impl ConnectionStreamCounter { + pub(crate) fn new() -> Self { + Self { + stream_count: AtomicU64::default(), + last_throttling_instant: RwLock::new(tokio::time::Instant::now()), + } + } + + /// Reset the counter and last throttling instant and + /// return last_throttling_instant regardless it is reset or not. + pub(crate) fn reset_throttling_params_if_needed(&self) -> tokio::time::Instant { + let last_throttling_instant = *self.last_throttling_instant.read().unwrap(); + if tokio::time::Instant::now().duration_since(last_throttling_instant) + > STREAM_THROTTLING_INTERVAL + { + let mut last_throttling_instant = self.last_throttling_instant.write().unwrap(); + // Recheck as some other thread might have done throttling since this thread tried to acquire the write lock. + if tokio::time::Instant::now().duration_since(*last_throttling_instant) + > STREAM_THROTTLING_INTERVAL + { + *last_throttling_instant = tokio::time::Instant::now(); + self.stream_count.store(0, Ordering::Relaxed); + } + *last_throttling_instant + } else { + last_throttling_instant + } + } +}