From 0f278641fc5a526d2e31358f311f979839f7adc1 Mon Sep 17 00:00:00 2001 From: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> Date: Sun, 5 May 2024 16:03:05 -0700 Subject: [PATCH] Added unit tests Cleanup connection cache rate limiter if exceeding certain threshold missing files CONNECITON_RATE_LIMITER_CLEANUP_THRESHOLD to 100_000 clippy issue clippy issue sort crates --- Cargo.lock | 73 +------------ Cargo.toml | 1 - bench-tps/src/cli.rs | 2 +- core/src/tpu.rs | 2 +- core/src/validator.rs | 2 +- programs/sbf/Cargo.lock | 73 +------------ streamer/Cargo.toml | 2 +- .../nonblocking/connection_rate_limiter.rs | 77 ++++++++++--- .../src/nonblocking/keyed_rate_limiter.rs | 102 ++++++++++++++++++ streamer/src/nonblocking/mod.rs | 2 + streamer/src/nonblocking/quic.rs | 18 +++- streamer/src/nonblocking/rate_limiter.rs | 68 ++++++++++++ streamer/src/quic.rs | 8 +- validator/src/main.rs | 2 +- 14 files changed, 259 insertions(+), 173 deletions(-) create mode 100644 streamer/src/nonblocking/keyed_rate_limiter.rs create mode 100644 streamer/src/nonblocking/rate_limiter.rs diff --git a/Cargo.lock b/Cargo.lock index e248078d13388c..ce88f65ac5b58f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2336,12 +2336,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" -[[package]] -name = "futures-timer" -version = "3.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" - [[package]] name = "futures-util" version = "0.3.30" @@ -2511,26 +2505,6 @@ dependencies = [ "scroll", ] -[[package]] -name = "governor" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" -dependencies = [ - "cfg-if 1.0.0", - "dashmap", - "futures 0.3.30", - "futures-timer", - "no-std-compat", - "nonzero_ext", - "parking_lot 0.12.1", - "portable-atomic", - "quanta", - "rand 0.8.5", - "smallvec", - "spinning_top", -] - [[package]] name = "h2" version = "0.3.26" @@ -3543,12 +3517,6 @@ dependencies = [ "memoffset 0.9.0", ] -[[package]] -name = "no-std-compat" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" - [[package]] name = "nom" version = "7.0.0" @@ -3560,12 +3528,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "nonzero_ext" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" - [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -4344,21 +4306,6 @@ dependencies = [ "syn 2.0.52", ] -[[package]] -name = "quanta" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" -dependencies = [ - "crossbeam-utils", - "libc", - "once_cell", - "raw-cpuid", - "wasi 0.11.0+wasi-snapshot-preview1", - "web-sys", - "winapi 0.3.9", -] - [[package]] name = "quick-error" version = "1.2.3" @@ -4539,15 +4486,6 @@ dependencies = [ "rand_core 0.6.4", ] -[[package]] -name = "raw-cpuid" -version = "11.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d86a7c4638d42c44551f4791a20e687dbb4c3de1f33c43dd71e355cd429def1" -dependencies = [ - "bitflags 2.4.2", -] - [[package]] name = "rayon" version = "1.9.0" @@ -7336,8 +7274,8 @@ dependencies = [ "async-channel", "bytes", "crossbeam-channel", + "dashmap", "futures-util", - "governor", "histogram", "indexmap 2.2.5", "itertools", @@ -7842,15 +7780,6 @@ version = "0.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5" -[[package]] -name = "spinning_top" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" -dependencies = [ - "lock_api", -] - [[package]] name = "spl-associated-token-account" version = "2.3.1" diff --git a/Cargo.toml b/Cargo.toml index 33fd34530430a6..786599d4cab5b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -214,7 +214,6 @@ generic-array = { version = "0.14.7", default-features = false } gethostname = "0.2.3" getrandom = "0.2.10" goauth = "0.13.1" -governor = "0.6.3" hex = "0.4.3" hidapi = { version = "2.6.1", default-features = false } histogram = "0.6.9" diff --git a/bench-tps/src/cli.rs b/bench-tps/src/cli.rs index 1f4c8343e82bc1..5f0191742ac068 100644 --- a/bench-tps/src/cli.rs +++ b/bench-tps/src/cli.rs @@ -69,7 +69,7 @@ pub struct Config { pub external_client_type: ExternalClientType, pub use_quic: bool, pub tpu_connection_pool_size: usize, - pub tpu_max_connections_per_ipaddr_per_minute: u32, + pub tpu_max_connections_per_ipaddr_per_minute: u64, pub compute_unit_price: Option, pub skip_tx_account_data_size: bool, pub use_durable_nonce: bool, diff --git a/core/src/tpu.rs b/core/src/tpu.rs index ae4ef179a3d70f..abe575ab10d1bf 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -108,7 +108,7 @@ impl Tpu { banking_tracer: Arc, tracer_thread_hdl: TracerThread, tpu_enable_udp: bool, - tpu_max_connections_per_ipaddr_per_minute: u32, + tpu_max_connections_per_ipaddr_per_minute: u64, prioritization_fee_cache: &Arc, block_production_method: BlockProductionMethod, _generator_config: Option, /* vestigial code for replay invalidator */ diff --git a/core/src/validator.rs b/core/src/validator.rs index 76b6af8484820d..18e50eac430158 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -510,7 +510,7 @@ impl Validator { use_quic: bool, tpu_connection_pool_size: usize, tpu_enable_udp: bool, - tpu_max_connections_per_ipaddr_per_minute: u32, + tpu_max_connections_per_ipaddr_per_minute: u64, admin_rpc_service_post_init: Arc>>, ) -> Result { let id = identity_keypair.pubkey(); diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 1bcdf0b13256b0..d4c0e64a253a59 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -1876,12 +1876,6 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" -[[package]] -name = "futures-timer" -version = "3.0.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" - [[package]] name = "futures-util" version = "0.3.30" @@ -2011,26 +2005,6 @@ dependencies = [ "scroll", ] -[[package]] -name = "governor" -version = "0.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" -dependencies = [ - "cfg-if 1.0.0", - "dashmap", - "futures 0.3.30", - "futures-timer", - "no-std-compat", - "nonzero_ext", - "parking_lot 0.12.1", - "portable-atomic", - "quanta", - "rand 0.8.5", - "smallvec", - "spinning_top", -] - [[package]] name = "h2" version = "0.3.26" @@ -3049,12 +3023,6 @@ dependencies = [ "memoffset 0.9.0", ] -[[package]] -name = "no-std-compat" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b93853da6d84c2e3c7d730d6473e8817692dd89be387eb01b94d7f108ecb5b8c" - [[package]] name = "nom" version = "7.1.1" @@ -3065,12 +3033,6 @@ dependencies = [ "minimal-lexical", ] -[[package]] -name = "nonzero_ext" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" - [[package]] name = "normalize-line-endings" version = "0.3.0" @@ -3791,21 +3753,6 @@ dependencies = [ "syn 2.0.52", ] -[[package]] -name = "quanta" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" -dependencies = [ - "crossbeam-utils", - "libc", - "once_cell", - "raw-cpuid", - "wasi 0.11.0+wasi-snapshot-preview1", - "web-sys", - "winapi 0.3.9", -] - [[package]] name = "quinn" version = "0.10.2" @@ -3943,15 +3890,6 @@ dependencies = [ "rand_core 0.6.4", ] -[[package]] -name = "raw-cpuid" -version = "11.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d86a7c4638d42c44551f4791a20e687dbb4c3de1f33c43dd71e355cd429def1" -dependencies = [ - "bitflags 2.4.2", -] - [[package]] name = "rayon" version = "1.9.0" @@ -6388,8 +6326,8 @@ dependencies = [ "async-channel", "bytes", "crossbeam-channel", + "dashmap", "futures-util", - "governor", "histogram", "indexmap 2.2.5", "itertools", @@ -6763,15 +6701,6 @@ version = "0.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c530c2b0d0bf8b69304b39fe2001993e267461948b890cd037d8ad4293fa1a0d" -[[package]] -name = "spinning_top" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d96d2d1d716fb500937168cc09353ffdc7a012be8475ac7308e1bdf0e3923300" -dependencies = [ - "lock_api", -] - [[package]] name = "spl-associated-token-account" version = "2.3.1" diff --git a/streamer/Cargo.toml b/streamer/Cargo.toml index ba0f08160fef17..d173da1a568c8c 100644 --- a/streamer/Cargo.toml +++ b/streamer/Cargo.toml @@ -13,8 +13,8 @@ edition = { workspace = true } async-channel = { workspace = true } bytes = { workspace = true } crossbeam-channel = { workspace = true } +dashmap = { workspace = true } futures-util = { workspace = true } -governor = { workspace = true } histogram = { workspace = true } indexmap = { workspace = true } itertools = { workspace = true } diff --git a/streamer/src/nonblocking/connection_rate_limiter.rs b/streamer/src/nonblocking/connection_rate_limiter.rs index 1a035fb38525df..39706905762ff8 100644 --- a/streamer/src/nonblocking/connection_rate_limiter.rs +++ b/streamer/src/nonblocking/connection_rate_limiter.rs @@ -1,26 +1,25 @@ use { - governor::{DefaultDirectRateLimiter, DefaultKeyedRateLimiter, Quota, RateLimiter}, - std::{net::IpAddr, num::NonZeroU32}, + crate::nonblocking::{keyed_rate_limiter::KeyedRateLimiter, rate_limiter::RateLimiter}, + std::{net::IpAddr, time::Duration}, }; pub struct ConnectionRateLimiter { - limiter: DefaultKeyedRateLimiter, + limiter: KeyedRateLimiter, } impl ConnectionRateLimiter { /// Create a new rate limiter per IpAddr. The rate is specified as the count per minute to allow for /// less frequent connections. - pub fn new(limit_per_minute: u32) -> Self { - let quota = Quota::per_minute(NonZeroU32::new(limit_per_minute).unwrap()); + pub fn new(limit_per_minute: u64) -> Self { Self { - limiter: DefaultKeyedRateLimiter::keyed(quota), + limiter: KeyedRateLimiter::new(limit_per_minute, Duration::from_secs(60)), } } /// Check if the connection from the said `ip` is allowed. pub fn is_allowed(&self, ip: &IpAddr) -> bool { // Acquire a permit from the rate limiter for the given IP address - if self.limiter.check_key(ip).is_ok() { + if self.limiter.is_allowed(*ip) { debug!("Request from IP {:?} allowed", ip); true // Request allowed } else { @@ -28,29 +27,73 @@ impl ConnectionRateLimiter { false // Request blocked } } + + /// retain only keys whose throttle start date is within the throttle interval. + /// Otherwise drop them as inactive + pub fn retain_recent(&self) { + self.limiter.retain_recent() + } + + /// Returns the number of "live" keys in the rate limiter. + pub fn len(&self) -> usize { + self.limiter.len() + } + + /// Returns `true` if the rate limiter has no keys in it. + pub fn is_empty(&self) -> bool { + self.limiter.is_empty() + } } /// Connection rate limiter for enforcing connection rates from /// all clients. pub struct TotalConnectionRateLimiter { - limiter: DefaultDirectRateLimiter, + limiter: RateLimiter, } impl TotalConnectionRateLimiter { /// Create a new rate limiter. The rate is specified as the count per second. - pub fn new(limit_per_second: u32) -> Self { - let quota = Quota::per_second(NonZeroU32::new(limit_per_second).unwrap()); // Adjust the rate limit as needed + pub fn new(limit_per_second: u64) -> Self { Self { - limiter: RateLimiter::direct(quota), + limiter: RateLimiter::new(limit_per_second, Duration::from_secs(1)), } } /// Check if a connection is allowed. - pub fn is_allowed(&self) -> bool { - if self.limiter.check().is_ok() { - true // Request allowed - } else { - false // Request blocked - } + pub fn is_allowed(&mut self) -> bool { + self.limiter.is_allowed() + } +} + +#[cfg(test)] +pub mod test { + use {super::*, std::net::Ipv4Addr}; + + #[tokio::test] + async fn test_total_connection_rate_limiter() { + let mut limiter = TotalConnectionRateLimiter::new(2); + assert!(limiter.is_allowed()); + assert!(limiter.is_allowed()); + assert!(!limiter.is_allowed()); + } + + #[tokio::test] + async fn test_connection_rate_limiter() { + let limiter = ConnectionRateLimiter::new(4); + let ip1 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)); + assert!(limiter.is_allowed(&ip1)); + assert!(limiter.is_allowed(&ip1)); + assert!(limiter.is_allowed(&ip1)); + assert!(limiter.is_allowed(&ip1)); + assert!(!limiter.is_allowed(&ip1)); + + assert!(limiter.len() == 1); + let ip2 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2)); + assert!(limiter.is_allowed(&ip2)); + assert!(limiter.len() == 2); + assert!(limiter.is_allowed(&ip2)); + assert!(limiter.is_allowed(&ip2)); + assert!(limiter.is_allowed(&ip2)); + assert!(!limiter.is_allowed(&ip2)); } } diff --git a/streamer/src/nonblocking/keyed_rate_limiter.rs b/streamer/src/nonblocking/keyed_rate_limiter.rs new file mode 100644 index 00000000000000..ab74ea0d6cff26 --- /dev/null +++ b/streamer/src/nonblocking/keyed_rate_limiter.rs @@ -0,0 +1,102 @@ +use { + crate::nonblocking::rate_limiter::RateLimiter, + dashmap::DashMap, + std::{hash::Hash, time::Duration}, +}; + +pub struct KeyedRateLimiter { + limiters: DashMap, + interval: Duration, + limit: u64, +} + +impl KeyedRateLimiter +where + K: Eq + Hash, +{ + /// Create a keyed rate limiter with `limit` count with a rate limit `interval` + pub fn new(limit: u64, interval: Duration) -> Self { + Self { + limiters: DashMap::default(), + interval, + limit, + } + } + + /// Check if the connection from the said `key` is allowed. + pub fn is_allowed(&self, key: K) -> bool { + let allowed = match self.limiters.entry(key) { + dashmap::mapref::entry::Entry::Occupied(mut entry) => { + let limiter = entry.get_mut(); + limiter.is_allowed() + } + dashmap::mapref::entry::Entry::Vacant(entry) => entry + .insert(RateLimiter::new(self.limit, self.interval)) + .value_mut() + .is_allowed(), + }; + allowed + } + + /// retain only keys whose throttle start date is within the throttle interval. + /// Otherwise drop them as inactive + pub fn retain_recent(&self) { + let now = tokio::time::Instant::now(); + self.limiters.retain(|_key, limiter| { + now.duration_since(*limiter.throttle_start_instant()) <= self.interval + }); + } + + /// Returns the number of "live" keys in the rate limiter. + pub fn len(&self) -> usize { + self.limiters.len() + } + + /// Returns `true` if the rate limiter has no keys in it. + pub fn is_empty(&self) -> bool { + self.limiters.is_empty() + } +} + +#[cfg(test)] +pub mod test { + use {super::*, tokio::time::sleep}; + + #[allow(clippy::len_zero)] + #[tokio::test] + async fn test_rate_limiter() { + let limiter = KeyedRateLimiter::::new(2, Duration::from_millis(100)); + assert!(limiter.len() == 0); + assert!(limiter.is_empty()); + assert!(limiter.is_allowed(1)); + assert!(limiter.is_allowed(1)); + assert!(!limiter.is_allowed(1)); + assert!(limiter.len() == 1); + assert!(limiter.is_allowed(2)); + assert!(limiter.is_allowed(2)); + assert!(!limiter.is_allowed(2)); + assert!(limiter.len() == 2); + + // sleep 150 ms, the throttle parameters should have been reset. + sleep(Duration::from_millis(150)).await; + assert!(limiter.len() == 2); + + assert!(limiter.is_allowed(1)); + assert!(limiter.is_allowed(1)); + assert!(!limiter.is_allowed(1)); + + assert!(limiter.is_allowed(2)); + assert!(limiter.is_allowed(2)); + assert!(!limiter.is_allowed(2)); + assert!(limiter.len() == 2); + + // sleep another 150 and clean outdatated, key 2 will be removed + sleep(Duration::from_millis(150)).await; + assert!(limiter.is_allowed(1)); + assert!(limiter.is_allowed(1)); + assert!(!limiter.is_allowed(1)); + + limiter.retain_recent(); + assert!(limiter.len() == 1); + } +} diff --git a/streamer/src/nonblocking/mod.rs b/streamer/src/nonblocking/mod.rs index 51d1f22efde165..9eed5e402c5f25 100644 --- a/streamer/src/nonblocking/mod.rs +++ b/streamer/src/nonblocking/mod.rs @@ -1,5 +1,7 @@ pub mod connection_rate_limiter; +pub mod keyed_rate_limiter; pub mod quic; +pub mod rate_limiter; pub mod recvmmsg; pub mod sendmmsg; mod stream_throttle; diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 9c7fd051496a2f..dac712a51ebf50 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -82,8 +82,10 @@ const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many"; /// Limit to 250K PPS pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250; -pub const DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE: u32 = 8; -const TOTAL_CONNECTIONS_PER_SECOND: u32 = 2500; +pub const DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE: u64 = 8; +const TOTAL_CONNECTIONS_PER_SECOND: u64 = 2500; + +const CONNECITON_RATE_LIMITER_CLEANUP_THRESHOLD: usize = 100_000; // A sequence of bytes that is part of a packet // along with where in the packet it is @@ -141,7 +143,7 @@ pub fn spawn_server( max_staked_connections: usize, max_unstaked_connections: usize, max_streams_per_ms: u64, - max_connections_per_ipaddr_per_min: u32, + max_connections_per_ipaddr_per_min: u64, wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result { @@ -193,13 +195,13 @@ async fn run_server( max_staked_connections: usize, max_unstaked_connections: usize, max_streams_per_ms: u64, - max_connections_per_ipaddr_per_min: u32, + max_connections_per_ipaddr_per_min: u64, stats: Arc, wait_for_chunk_timeout: Duration, coalesce: Duration, ) { let rate_limiter = ConnectionRateLimiter::new(max_connections_per_ipaddr_per_min); - let overall_connection_rate_limiter = + let mut overall_connection_rate_limiter = TotalConnectionRateLimiter::new(TOTAL_CONNECTIONS_PER_SECOND); const WAIT_FOR_CONNECTION_TIMEOUT: Duration = Duration::from_secs(1); @@ -245,6 +247,12 @@ async fn run_server( continue; } + if rate_limiter.len() > CONNECITON_RATE_LIMITER_CLEANUP_THRESHOLD { + rate_limiter.retain_recent(); + } + stats + .connection_rate_limiter_length + .store(rate_limiter.len(), Ordering::Relaxed); info!("Got a connection {remote_address:?}"); if !rate_limiter.is_allowed(&remote_address.ip()) { info!( diff --git a/streamer/src/nonblocking/rate_limiter.rs b/streamer/src/nonblocking/rate_limiter.rs new file mode 100644 index 00000000000000..2c619bab49f7ed --- /dev/null +++ b/streamer/src/nonblocking/rate_limiter.rs @@ -0,0 +1,68 @@ +use {std::time::Duration, tokio::time::Instant}; + +#[derive(Debug)] +pub struct RateLimiter { + /// count of requests in an interval + pub(crate) count: u64, + + /// Throttle start time + throttle_start_instant: Instant, + interval: Duration, + limit: u64, +} + +/// A naive rate limiter, to be replaced by using governor +impl RateLimiter { + pub fn new(limit: u64, interval: Duration) -> Self { + Self { + count: 0, + throttle_start_instant: Instant::now(), + interval, + limit, + } + } + + /// Reset the counter and throttling start instant if needed. + pub fn reset_throttling_params_if_needed(&mut self) { + if Instant::now().duration_since(self.throttle_start_instant) > self.interval { + self.throttle_start_instant = Instant::now(); + self.count = 0; + } + } + pub fn is_allowed(&mut self) -> bool { + self.reset_throttling_params_if_needed(); + if self.count >= self.limit { + return false; + } + + self.count = self.count.saturating_add(1); + true + } + + pub fn throttle_start_instant(&self) -> &Instant { + &self.throttle_start_instant + } +} + +#[cfg(test)] +pub mod test { + use {super::*, tokio::time::sleep}; + + #[tokio::test] + async fn test_rate_limiter() { + let mut limiter = RateLimiter::new(2, Duration::from_millis(100)); + assert!(limiter.is_allowed()); + assert!(limiter.is_allowed()); + assert!(!limiter.is_allowed()); + let instant1 = *limiter.throttle_start_instant(); + + // sleep 150 ms, the throttle parameters should have been reset. + sleep(Duration::from_millis(150)).await; + assert!(limiter.is_allowed()); + assert!(limiter.is_allowed()); + assert!(!limiter.is_allowed()); + + let instant2 = *limiter.throttle_start_instant(); + assert!(instant2 > instant1); + } +} diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 6cd0a006d9fb60..bdc0c889224706 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -182,6 +182,7 @@ pub struct StreamStats { pub(crate) total_unstaked_packets_sent_for_batching: AtomicUsize, pub(crate) throttled_staked_streams: AtomicUsize, pub(crate) throttled_unstaked_streams: AtomicUsize, + pub(crate) connection_rate_limiter_length: AtomicUsize, } impl StreamStats { @@ -505,6 +506,11 @@ impl StreamStats { self.perf_track_overhead_us.swap(0, Ordering::Relaxed), i64 ), + ( + "connection_rate_limiter_length", + self.connection_rate_limiter_length.load(Ordering::Relaxed), + i64 + ), ); } } @@ -522,7 +528,7 @@ pub fn spawn_server( max_staked_connections: usize, max_unstaked_connections: usize, max_streams_per_ms: u64, - max_connections_per_ipaddr_per_min: u32, + max_connections_per_ipaddr_per_min: u64, wait_for_chunk_timeout: Duration, coalesce: Duration, ) -> Result { diff --git a/validator/src/main.rs b/validator/src/main.rs index 8252792849c365..706ea305c047b2 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1086,7 +1086,7 @@ pub fn main() { let tpu_connection_pool_size = value_t_or_exit!(matches, "tpu_connection_pool_size", usize); let tpu_max_connections_per_ipaddr_per_minute = - value_t_or_exit!(matches, "tpu_max_connections_per_ipaddr_per_minute", u32); + value_t_or_exit!(matches, "tpu_max_connections_per_ipaddr_per_minute", u64); let shrink_ratio = value_t_or_exit!(matches, "accounts_shrink_ratio", f64); if !(0.0..=1.0).contains(&shrink_ratio) {