diff --git a/Cargo.lock b/Cargo.lock index 33c94d9b8e4cbc..e216f098be22de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7286,6 +7286,7 @@ dependencies = [ "async-channel", "bytes", "crossbeam-channel", + "dashmap", "futures-util", "histogram", "indexmap 2.2.5", diff --git a/bench-tps/src/cli.rs b/bench-tps/src/cli.rs index 04bb869c2626bb..5f0191742ac068 100644 --- a/bench-tps/src/cli.rs +++ b/bench-tps/src/cli.rs @@ -11,6 +11,7 @@ use { pubkey::Pubkey, signature::{read_keypair_file, Keypair}, }, + solana_streamer::nonblocking::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC}, std::{ net::{IpAddr, Ipv4Addr}, @@ -68,6 +69,7 @@ pub struct Config { pub external_client_type: ExternalClientType, pub use_quic: bool, pub tpu_connection_pool_size: usize, + pub tpu_max_connections_per_ipaddr_per_minute: u64, pub compute_unit_price: Option, pub skip_tx_account_data_size: bool, pub use_durable_nonce: bool, @@ -103,6 +105,8 @@ impl Default for Config { external_client_type: ExternalClientType::default(), use_quic: DEFAULT_TPU_USE_QUIC, tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE, + tpu_max_connections_per_ipaddr_per_minute: + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, compute_unit_price: None, skip_tx_account_data_size: false, use_durable_nonce: false, diff --git a/client/src/connection_cache.rs b/client/src/connection_cache.rs index 74e1c8d344c618..a682b6e6db2247 100644 --- a/client/src/connection_cache.rs +++ b/client/src/connection_cache.rs @@ -227,7 +227,10 @@ mod tests { crossbeam_channel::unbounded, solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair}, solana_streamer::{ - nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, + nonblocking::quic::{ + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + }, quic::SpawnServerResult, streamer::StakedNodes, }, @@ -272,6 +275,7 @@ mod tests { 10, 10, DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index b594e04e5d18ad..abe575ab10d1bf 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -108,6 +108,7 @@ impl Tpu { banking_tracer: Arc, tracer_thread_hdl: TracerThread, tpu_enable_udp: bool, + tpu_max_connections_per_ipaddr_per_minute: u64, prioritization_fee_cache: &Arc, block_production_method: BlockProductionMethod, _generator_config: Option, /* vestigial code for replay invalidator */ @@ -164,6 +165,7 @@ impl Tpu { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, + tpu_max_connections_per_ipaddr_per_minute, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, tpu_coalesce, ) @@ -185,6 +187,7 @@ impl Tpu { MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS), 0, // Prevent unstaked nodes from forwarding transactions DEFAULT_MAX_STREAMS_PER_MS, + tpu_max_connections_per_ipaddr_per_minute, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, tpu_coalesce, ) diff --git a/core/src/validator.rs b/core/src/validator.rs index 34ccbf59eb9cc9..c107b8e24ac323 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -510,6 +510,7 @@ impl Validator { use_quic: bool, tpu_connection_pool_size: usize, tpu_enable_udp: bool, + tpu_max_connections_per_ipaddr_per_minute: u64, admin_rpc_service_post_init: Arc>>, ) -> Result { let start_time = Instant::now(); @@ -1399,6 +1400,7 @@ impl Validator { banking_tracer, tracer_thread, tpu_enable_udp, + tpu_max_connections_per_ipaddr_per_minute, &prioritization_fee_cache, config.block_production_method.clone(), config.generator_config.clone(), @@ -2581,6 +2583,7 @@ mod tests { DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, + 32, // max connections per IpAddr per minute for test Arc::new(RwLock::new(None)), ) .expect("assume successful validator start"); @@ -2666,6 +2669,7 @@ mod tests { DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, + 32, // max connections per IpAddr per minute for test Arc::new(RwLock::new(None)), ) .expect("assume successful validator start") diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index fc511f83d5296d..e07b11677b181a 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -338,6 +338,7 @@ impl LocalCluster { DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, + 32, // max connections per IpAddr per minute Arc::new(RwLock::new(None)), ) .expect("assume successful validator start"); @@ -543,6 +544,7 @@ impl LocalCluster { DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, + 32, // max connections per IpAddr per mintute Arc::new(RwLock::new(None)), ) .expect("assume successful validator start"); @@ -1013,6 +1015,7 @@ impl Cluster for LocalCluster { DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, + 32, // max connections per IpAddr per minute, use higher value because of tests Arc::new(RwLock::new(None)), ) .expect("assume successful validator start"); diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index ab1aab93517c9c..2f32165827ab3a 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6330,6 +6330,7 @@ dependencies = [ "async-channel", "bytes", "crossbeam-channel", + "dashmap", "futures-util", "histogram", "indexmap 2.2.5", diff --git a/quic-client/tests/quic_client.rs b/quic-client/tests/quic_client.rs index 1e7a189d73a4c7..7f97786da6668b 100644 --- a/quic-client/tests/quic_client.rs +++ b/quic-client/tests/quic_client.rs @@ -10,7 +10,10 @@ mod tests { }, solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair}, solana_streamer::{ - nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT}, + nonblocking::quic::{ + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + }, quic::SpawnServerResult, streamer::StakedNodes, tls_certificates::new_dummy_x509_certificate, @@ -85,6 +88,7 @@ mod tests { 10, 10, DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -170,6 +174,7 @@ mod tests { 10, 10, DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, Duration::from_secs(1), // wait_for_chunk_timeout DEFAULT_TPU_COALESCE, ) @@ -233,6 +238,7 @@ mod tests { 10, 10, DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -262,6 +268,7 @@ mod tests { 10, 10, DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index ebd7c7401a494a..d173da1a568c8c 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -13,6 +13,7 @@ edition = { workspace = true } async-channel = { workspace = true } bytes = { workspace = true } crossbeam-channel = { workspace = true } +dashmap = { workspace = true } futures-util = { workspace = true } histogram = { workspace = true } indexmap = { workspace = true } diff --git a/streamer/src/nonblocking/connection_rate_limiter.rs b/streamer/src/nonblocking/connection_rate_limiter.rs new file mode 100644 index 00000000000000..bc53e49e201a97 --- /dev/null +++ b/streamer/src/nonblocking/connection_rate_limiter.rs @@ -0,0 +1,99 @@ +use { + crate::nonblocking::{keyed_rate_limiter::KeyedRateLimiter, rate_limiter::RateLimiter}, + std::{net::IpAddr, time::Duration}, +}; + +pub struct ConnectionRateLimiter { + limiter: KeyedRateLimiter, +} + +impl ConnectionRateLimiter { + /// Create a new rate limiter per IpAddr. The rate is specified as the count per minute to allow for + /// less frequent connections. + pub fn new(limit_per_minute: u64) -> Self { + Self { + limiter: KeyedRateLimiter::new(limit_per_minute, Duration::from_secs(60)), + } + } + + /// Check if the connection from the said `ip` is allowed. + pub fn is_allowed(&self, ip: &IpAddr) -> bool { + // Acquire a permit from the rate limiter for the given IP address + if self.limiter.check_and_update(*ip) { + debug!("Request from IP {:?} allowed", ip); + true // Request allowed + } else { + debug!("Request from IP {:?} blocked", ip); + false // Request blocked + } + } + + /// retain only keys whose throttle start date is within the throttle interval. + /// Otherwise drop them as inactive + pub fn retain_recent(&self) { + self.limiter.retain_recent() + } + + /// Returns the number of "live" keys in the rate limiter. + pub fn len(&self) -> usize { + self.limiter.len() + } + + /// Returns `true` if the rate limiter has no keys in it. + pub fn is_empty(&self) -> bool { + self.limiter.is_empty() + } +} + +/// Connection rate limiter for enforcing connection rates from +/// all clients. +pub struct TotalConnectionRateLimiter { + limiter: RateLimiter, +} + +impl TotalConnectionRateLimiter { + /// Create a new rate limiter. The rate is specified as the count per second. + pub fn new(limit_per_second: u64) -> Self { + Self { + limiter: RateLimiter::new(limit_per_second, Duration::from_secs(1)), + } + } + + /// Check if a connection is allowed. + pub fn is_allowed(&mut self) -> bool { + self.limiter.check_and_update() + } +} + +#[cfg(test)] +pub mod test { + use {super::*, std::net::Ipv4Addr}; + + #[tokio::test] + async fn test_total_connection_rate_limiter() { + let mut limiter = TotalConnectionRateLimiter::new(2); + assert!(limiter.is_allowed()); + assert!(limiter.is_allowed()); + assert!(!limiter.is_allowed()); + } + + #[tokio::test] + async fn test_connection_rate_limiter() { + let limiter = ConnectionRateLimiter::new(4); + let ip1 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)); + assert!(limiter.is_allowed(&ip1)); + assert!(limiter.is_allowed(&ip1)); + assert!(limiter.is_allowed(&ip1)); + assert!(limiter.is_allowed(&ip1)); + assert!(!limiter.is_allowed(&ip1)); + + assert!(limiter.len() == 1); + let ip2 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)); + assert!(limiter.is_allowed(&ip2)); + assert!(limiter.len() == 2); + assert!(limiter.is_allowed(&ip2)); + assert!(limiter.is_allowed(&ip2)); + assert!(limiter.is_allowed(&ip2)); + assert!(!limiter.is_allowed(&ip2)); + } +} diff --git a/streamer/src/nonblocking/keyed_rate_limiter.rs b/streamer/src/nonblocking/keyed_rate_limiter.rs new file mode 100644 index 00000000000000..790fda72701081 --- /dev/null +++ b/streamer/src/nonblocking/keyed_rate_limiter.rs @@ -0,0 +1,104 @@ +use { + crate::nonblocking::rate_limiter::RateLimiter, + dashmap::DashMap, + std::{hash::Hash, time::Duration}, +}; + +pub struct KeyedRateLimiter { + limiters: DashMap, + interval: Duration, + limit: u64, +} + +impl KeyedRateLimiter +where + K: Eq + Hash, +{ + /// Create a keyed rate limiter with `limit` count with a rate limit `interval` + pub fn new(limit: u64, interval: Duration) -> Self { + Self { + limiters: DashMap::default(), + interval, + limit, + } + } + + /// Check if the connection from the said `key` is allowed to pass through the rate limiter. + /// When it is allowed, the rate limiter state is updated to reflect it has been + /// allowed. For a unique request, the caller should call it only once when it is allowed. + pub fn check_and_update(&self, key: K) -> bool { + let allowed = match self.limiters.entry(key) { + dashmap::mapref::entry::Entry::Occupied(mut entry) => { + let limiter = entry.get_mut(); + limiter.check_and_update() + } + dashmap::mapref::entry::Entry::Vacant(entry) => entry + .insert(RateLimiter::new(self.limit, self.interval)) + .value_mut() + .check_and_update(), + }; + allowed + } + + /// retain only keys whose throttle start date is within the throttle interval. + /// Otherwise drop them as inactive + pub fn retain_recent(&self) { + let now = tokio::time::Instant::now(); + self.limiters.retain(|_key, limiter| { + now.duration_since(*limiter.throttle_start_instant()) <= self.interval + }); + } + + /// Returns the number of "live" keys in the rate limiter. + pub fn len(&self) -> usize { + self.limiters.len() + } + + /// Returns `true` if the rate limiter has no keys in it. + pub fn is_empty(&self) -> bool { + self.limiters.is_empty() + } +} + +#[cfg(test)] +pub mod test { + use {super::*, tokio::time::sleep}; + + #[allow(clippy::len_zero)] + #[tokio::test] + async fn test_rate_limiter() { + let limiter = KeyedRateLimiter::::new(2, Duration::from_millis(100)); + assert!(limiter.len() == 0); + assert!(limiter.is_empty()); + assert!(limiter.check_and_update(1)); + assert!(limiter.check_and_update(1)); + assert!(!limiter.check_and_update(1)); + assert!(limiter.len() == 1); + assert!(limiter.check_and_update(2)); + assert!(limiter.check_and_update(2)); + assert!(!limiter.check_and_update(2)); + assert!(limiter.len() == 2); + + // sleep 150 ms, the throttle parameters should have been reset. + sleep(Duration::from_millis(150)).await; + assert!(limiter.len() == 2); + + assert!(limiter.check_and_update(1)); + assert!(limiter.check_and_update(1)); + assert!(!limiter.check_and_update(1)); + + assert!(limiter.check_and_update(2)); + assert!(limiter.check_and_update(2)); + assert!(!limiter.check_and_update(2)); + assert!(limiter.len() == 2); + + // sleep another 150 and clean outdatated, key 2 will be removed + sleep(Duration::from_millis(150)).await; + assert!(limiter.check_and_update(1)); + assert!(limiter.check_and_update(1)); + assert!(!limiter.check_and_update(1)); + + limiter.retain_recent(); + assert!(limiter.len() == 1); + } +} diff --git a/streamer/src/nonblocking/mod.rs b/streamer/src/nonblocking/mod.rs index fa627e0944dc77..9eed5e402c5f25 100644 --- a/streamer/src/nonblocking/mod.rs +++ b/streamer/src/nonblocking/mod.rs @@ -1,4 +1,7 @@ +pub mod connection_rate_limiter; +pub mod keyed_rate_limiter; pub mod quic; +pub mod rate_limiter; pub mod recvmmsg; pub mod sendmmsg; mod stream_throttle; diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 2367c5d63b331e..c4ad535bd41c89 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -1,8 +1,11 @@ use { crate::{ - nonblocking::stream_throttle::{ - ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_THROTTLING_INTERVAL, - STREAM_THROTTLING_INTERVAL_MS, + nonblocking::{ + connection_rate_limiter::{ConnectionRateLimiter, TotalConnectionRateLimiter}, + stream_throttle::{ + ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_THROTTLING_INTERVAL, + STREAM_THROTTLING_INTERVAL_MS, + }, }, quic::{configure_server, QuicServerError, StreamStats}, streamer::StakedNodes, @@ -79,6 +82,21 @@ const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many"; /// Limit to 250K PPS pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250; +/// The new connections per minute from a particular IP address. +/// Heuristically set to the default maximum concurrent connections +/// per IP address. Might be adjusted later. +pub const DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE: u64 = 8; + +/// Total new connection counts per second. Heuristically taken from +/// the default staked and unstaked connection limits. Might be adjusted +/// later. +const TOTAL_CONNECTIONS_PER_SECOND: u64 = 2500; + +/// The threshold of the size of the connection rate limiter map. When +/// the map size is above this, we will trigger a cleanup of older +/// entries used by past requests. +const CONNECITON_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD: usize = 100_000; + // A sequence of bytes that is part of a packet // along with where in the packet it is struct PacketChunk { @@ -135,6 +153,7 @@ pub fn spawn_server( max_staked_connections: usize, max_unstaked_connections: usize, max_streams_per_ms: u64, + max_connections_per_ipaddr_per_min: u64, wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result { @@ -162,6 +181,7 @@ pub fn spawn_server( max_staked_connections, max_unstaked_connections, max_streams_per_ms, + max_connections_per_ipaddr_per_min, stats.clone(), wait_for_chunk_timeout, coalesce, @@ -185,10 +205,15 @@ async fn run_server( max_staked_connections: usize, max_unstaked_connections: usize, max_streams_per_ms: u64, + max_connections_per_ipaddr_per_min: u64, stats: Arc, wait_for_chunk_timeout: Duration, coalesce: Duration, ) { + let rate_limiter = ConnectionRateLimiter::new(max_connections_per_ipaddr_per_min); + let mut overall_connection_rate_limiter = + TotalConnectionRateLimiter::new(TOTAL_CONNECTIONS_PER_SECOND); + const WAIT_FOR_CONNECTION_TIMEOUT: Duration = Duration::from_secs(1); debug!("spawn quic server"); let mut last_datapoint = Instant::now(); @@ -218,7 +243,37 @@ async fn run_server( } if let Ok(Some(connection)) = timeout_connection { - info!("Got a connection {:?}", connection.remote_address()); + let remote_address = connection.remote_address(); + + // first check overall connection rate limit: + if !overall_connection_rate_limiter.is_allowed() { + debug!( + "Reject connection from {:?} -- total rate limiting exceeded", + remote_address.ip() + ); + stats + .connection_throttled_across_all + .fetch_add(1, Ordering::Relaxed); + continue; + } + + if rate_limiter.len() > CONNECITON_RATE_LIMITER_CLEANUP_SIZE_THRESHOLD { + rate_limiter.retain_recent(); + } + stats + .connection_rate_limiter_length + .store(rate_limiter.len(), Ordering::Relaxed); + info!("Got a connection {remote_address:?}"); + if !rate_limiter.is_allowed(&remote_address.ip()) { + info!( + "Reject connection from {:?} -- rate limiting exceeded", + remote_address + ); + stats + .connection_throttled_per_ipaddr + .fetch_add(1, Ordering::Relaxed); + continue; + } tokio::spawn(setup_connection( connection, unstaked_connection_table.clone(), @@ -1362,6 +1417,7 @@ pub mod test { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, Duration::from_secs(2), DEFAULT_TPU_COALESCE, ) @@ -1803,6 +1859,7 @@ pub mod test { MAX_STAKED_CONNECTIONS, 0, // Do not allow any connection from unstaked clients/nodes DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -1838,6 +1895,7 @@ pub mod test { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/streamer/src/nonblocking/rate_limiter.rs b/streamer/src/nonblocking/rate_limiter.rs new file mode 100644 index 00000000000000..e9e7e126545a48 --- /dev/null +++ b/streamer/src/nonblocking/rate_limiter.rs @@ -0,0 +1,74 @@ +use {std::time::Duration, tokio::time::Instant}; + +#[derive(Debug)] +pub struct RateLimiter { + /// count of requests in an interval + pub(crate) count: u64, + + /// Throttle start time + throttle_start_instant: Instant, + interval: Duration, + limit: u64, +} + +/// A naive rate limiter, to be replaced by using governor which has more even +/// distribution of requests passing through using GCRA algroithm. +impl RateLimiter { + pub fn new(limit: u64, interval: Duration) -> Self { + Self { + count: 0, + throttle_start_instant: Instant::now(), + interval, + limit, + } + } + + /// Reset the counter and throttling start instant if needed. + pub fn reset_throttling_params_if_needed(&mut self) { + if Instant::now().duration_since(self.throttle_start_instant) > self.interval { + self.throttle_start_instant = Instant::now(); + self.count = 0; + } + } + + /// Check if a single request should be allowed to pass through the rate limiter + /// When it is allowed, the rate limiter state is updated to reflect it has been + /// allowed. For a unique request, the caller should call it only once when it is allowed. + pub fn check_and_update(&mut self) -> bool { + self.reset_throttling_params_if_needed(); + if self.count >= self.limit { + return false; + } + + self.count = self.count.saturating_add(1); + true + } + + /// Return the start instant for the current throttle interval. + pub fn throttle_start_instant(&self) -> &Instant { + &self.throttle_start_instant + } +} + +#[cfg(test)] +pub mod test { + use {super::*, tokio::time::sleep}; + + #[tokio::test] + async fn test_rate_limiter() { + let mut limiter = RateLimiter::new(2, Duration::from_millis(100)); + assert!(limiter.check_and_update()); + assert!(limiter.check_and_update()); + assert!(!limiter.check_and_update()); + let instant1 = *limiter.throttle_start_instant(); + + // sleep 150 ms, the throttle parameters should have been reset. + sleep(Duration::from_millis(150)).await; + assert!(limiter.check_and_update()); + assert!(limiter.check_and_update()); + assert!(!limiter.check_and_update()); + + let instant2 = *limiter.throttle_start_instant(); + assert!(instant2 > instant1); + } +} diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 66537b1ad8dcc4..bdc0c889224706 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -170,6 +170,8 @@ pub struct StreamStats { pub(crate) connection_setup_error_locally_closed: AtomicUsize, pub(crate) connection_removed: AtomicUsize, pub(crate) connection_remove_failed: AtomicUsize, + pub(crate) connection_throttled_across_all: AtomicUsize, + pub(crate) connection_throttled_per_ipaddr: AtomicUsize, pub(crate) throttled_streams: AtomicUsize, pub(crate) stream_load_ema: AtomicUsize, pub(crate) stream_load_ema_overflow: AtomicUsize, @@ -180,6 +182,7 @@ pub struct StreamStats { pub(crate) total_unstaked_packets_sent_for_batching: AtomicUsize, pub(crate) throttled_staked_streams: AtomicUsize, pub(crate) throttled_unstaked_streams: AtomicUsize, + pub(crate) connection_rate_limiter_length: AtomicUsize, } impl StreamStats { @@ -314,6 +317,18 @@ impl StreamStats { .swap(0, Ordering::Relaxed), i64 ), + ( + "connection_throttled_across_all", + self.connection_throttled_across_all + .swap(0, Ordering::Relaxed), + i64 + ), + ( + "connection_throttled_per_ipaddr", + self.connection_throttled_per_ipaddr + .swap(0, Ordering::Relaxed), + i64 + ), ( "invalid_chunk", self.total_invalid_chunks.swap(0, Ordering::Relaxed), @@ -491,6 +506,11 @@ impl StreamStats { self.perf_track_overhead_us.swap(0, Ordering::Relaxed), i64 ), + ( + "connection_rate_limiter_length", + self.connection_rate_limiter_length.load(Ordering::Relaxed), + i64 + ), ); } } @@ -508,6 +528,7 @@ pub fn spawn_server( max_staked_connections: usize, max_unstaked_connections: usize, max_streams_per_ms: u64, + max_connections_per_ipaddr_per_min: u64, wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result { @@ -525,6 +546,7 @@ pub fn spawn_server( max_staked_connections, max_unstaked_connections, max_streams_per_ms, + max_connections_per_ipaddr_per_min, wait_for_chunk_timeout, coalesce, ) @@ -553,7 +575,8 @@ mod test { use { super::*, crate::nonblocking::quic::{ - test::*, DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, + test::*, DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, }, crossbeam_channel::unbounded, solana_sdk::net::DEFAULT_TPU_COALESCE, @@ -588,6 +611,7 @@ mod test { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -648,6 +672,7 @@ mod test { MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS, DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) @@ -695,6 +720,7 @@ mod test { MAX_STAKED_CONNECTIONS, 0, // Do not allow any connection from unstaked clients/nodes DEFAULT_MAX_STREAMS_PER_MS, + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, DEFAULT_TPU_COALESCE, ) diff --git a/test-validator/src/lib.rs b/test-validator/src/lib.rs index e0e4c17e866932..90edff8d36ff7c 100644 --- a/test-validator/src/lib.rs +++ b/test-validator/src/lib.rs @@ -1053,6 +1053,7 @@ impl TestValidator { DEFAULT_TPU_USE_QUIC, DEFAULT_TPU_CONNECTION_POOL_SIZE, config.tpu_enable_udp, + 32, // max connections per IpAddr per minute for test config.admin_rpc_service_post_init.clone(), )?); diff --git a/validator/src/cli.rs b/validator/src/cli.rs index 002d7797cba3cf..e2085d458d7bd9 100644 --- a/validator/src/cli.rs +++ b/validator/src/cli.rs @@ -53,7 +53,10 @@ use { }; pub mod thread_args; -use thread_args::{thread_args, DefaultThreadArgs}; +use { + solana_streamer::nonblocking::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, + thread_args::{thread_args, DefaultThreadArgs}, +}; const EXCLUDE_KEY: &str = "account-index-exclude-key"; const INCLUDE_KEY: &str = "account-index-include-key"; @@ -881,6 +884,15 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> { .validator(is_parsable::) .help("Controls the TPU connection pool size per remote address"), ) + .arg( + Arg::with_name("tpu_max_connections_per_ipaddr_per_minute") + .long("tpu-max-connections-per-ipaddr-per-minute") + .takes_value(true) + .default_value(&default_args.tpu_max_connections_per_ipaddr_per_minute) + .validator(is_parsable::) + .hidden(hidden_unless_forced()) + .help("Controls the rate of the clients connections per IpAddr per minute."), + ) .arg( Arg::with_name("staked_nodes_overrides") .long("staked-nodes-overrides") @@ -2206,6 +2218,7 @@ pub struct DefaultArgs { pub accounts_shrink_optimize_total_space: String, pub accounts_shrink_ratio: String, pub tpu_connection_pool_size: String, + pub tpu_max_connections_per_ipaddr_per_minute: String, // Exit subcommand pub exit_min_idle_time: String, @@ -2295,6 +2308,8 @@ impl DefaultArgs { .to_string(), accounts_shrink_ratio: DEFAULT_ACCOUNTS_SHRINK_RATIO.to_string(), tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE.to_string(), + tpu_max_connections_per_ipaddr_per_minute: + DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE.to_string(), rpc_max_request_body_size: MAX_REQUEST_BODY_SIZE.to_string(), exit_min_idle_time: "10".to_string(), exit_max_delinquent_stake: "5".to_string(), diff --git a/validator/src/main.rs b/validator/src/main.rs index 9f021bb46310b7..7257e3dfe81698 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1085,6 +1085,8 @@ pub fn main() { }; let tpu_connection_pool_size = value_t_or_exit!(matches, "tpu_connection_pool_size", usize); + let tpu_max_connections_per_ipaddr_per_minute = + value_t_or_exit!(matches, "tpu_max_connections_per_ipaddr_per_minute", u64); let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64); if !(0.0..=1.0).contains(&shrink_ratio) { @@ -1980,6 +1982,7 @@ pub fn main() { tpu_use_quic, tpu_connection_pool_size, tpu_enable_udp, + tpu_max_connections_per_ipaddr_per_minute, admin_service_post_init, ) .unwrap_or_else(|e| {