Skip to content

Commit

Permalink
Connection rate limiting (#948)
Browse files Browse the repository at this point in the history
* use rate limit on connectings

use rate limit on connectings; missing file

* Change connection rate limit to 8/min instead of 4/s

* Addressed some feedback from Trent

* removed some comments

* fix test failures which are opening connections more frequently

* moved the flag up

* turn off rate limiting to debug CI

* Fix CI test failures

* differentiate of the two throttling cases in stats: across connections or per ip addr

* fmt issues

* Addressed some feedback from Trent

* Added unit tests

Cleanup connection cache rate limiter if exceeding certain threshold

missing files

CONNECITON_RATE_LIMITER_CLEANUP_THRESHOLD to 100_000

clippy issue

clippy issue

sort crates

* revert Cargo.lock changes

* Addressed some feedback from Pankaj
  • Loading branch information
lijunwangs authored May 15, 2024
1 parent c0e51b4 commit f54c120
Show file tree
Hide file tree
Showing 18 changed files with 419 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions bench-tps/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use {
pubkey::Pubkey,
signature::{read_keypair_file, Keypair},
},
solana_streamer::nonblocking::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC},
std::{
net::{IpAddr, Ipv4Addr},
Expand Down Expand Up @@ -68,6 +69,7 @@ pub struct Config {
pub external_client_type: ExternalClientType,
pub use_quic: bool,
pub tpu_connection_pool_size: usize,
pub tpu_max_connections_per_ipaddr_per_minute: u64,
pub compute_unit_price: Option<ComputeUnitPrice>,
pub skip_tx_account_data_size: bool,
pub use_durable_nonce: bool,
Expand Down Expand Up @@ -103,6 +105,8 @@ impl Default for Config {
external_client_type: ExternalClientType::default(),
use_quic: DEFAULT_TPU_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_max_connections_per_ipaddr_per_minute:
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
compute_unit_price: None,
skip_tx_account_data_size: false,
use_durable_nonce: false,
Expand Down
6 changes: 5 additions & 1 deletion client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ mod tests {
crossbeam_channel::unbounded,
solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair},
solana_streamer::{
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
nonblocking::quic::{
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
quic::SpawnServerResult,
streamer::StakedNodes,
},
Expand Down Expand Up @@ -272,6 +275,7 @@ mod tests {
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
3 changes: 3 additions & 0 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl Tpu {
banking_tracer: Arc<BankingTracer>,
tracer_thread_hdl: TracerThread,
tpu_enable_udp: bool,
tpu_max_connections_per_ipaddr_per_minute: u64,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
block_production_method: BlockProductionMethod,
_generator_config: Option<GeneratorConfig>, /* vestigial code for replay invalidator */
Expand Down Expand Up @@ -164,6 +165,7 @@ impl Tpu {
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
tpu_max_connections_per_ipaddr_per_minute,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
Expand All @@ -185,6 +187,7 @@ impl Tpu {
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
DEFAULT_MAX_STREAMS_PER_MS,
tpu_max_connections_per_ipaddr_per_minute,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
Expand Down
4 changes: 4 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ impl Validator {
use_quic: bool,
tpu_connection_pool_size: usize,
tpu_enable_udp: bool,
tpu_max_connections_per_ipaddr_per_minute: u64,
admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
) -> Result<Self, String> {
let start_time = Instant::now();
Expand Down Expand Up @@ -1399,6 +1400,7 @@ impl Validator {
banking_tracer,
tracer_thread,
tpu_enable_udp,
tpu_max_connections_per_ipaddr_per_minute,
&prioritization_fee_cache,
config.block_production_method.clone(),
config.generator_config.clone(),
Expand Down Expand Up @@ -2581,6 +2583,7 @@ mod tests {
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute for test
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -2666,6 +2669,7 @@ mod tests {
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute for test
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start")
Expand Down
3 changes: 3 additions & 0 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ impl LocalCluster {
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -543,6 +544,7 @@ impl LocalCluster {
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per mintute
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -1013,6 +1015,7 @@ impl Cluster for LocalCluster {
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute, use higher value because of tests
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down
1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ mod tests {
},
solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::{
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
nonblocking::quic::{
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
quic::SpawnServerResult,
streamer::StakedNodes,
tls_certificates::new_dummy_x509_certificate,
Expand Down Expand Up @@ -85,6 +88,7 @@ mod tests {
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -170,6 +174,7 @@ mod tests {
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
Duration::from_secs(1), // wait_for_chunk_timeout
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -233,6 +238,7 @@ mod tests {
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -262,6 +268,7 @@ mod tests {
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
1 change: 1 addition & 0 deletions streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ edition = { workspace = true }
async-channel = { workspace = true }
bytes = { workspace = true }
crossbeam-channel = { workspace = true }
dashmap = { workspace = true }
futures-util = { workspace = true }
histogram = { workspace = true }
indexmap = { workspace = true }
Expand Down
99 changes: 99 additions & 0 deletions streamer/src/nonblocking/connection_rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use {
crate::nonblocking::{keyed_rate_limiter::KeyedRateLimiter, rate_limiter::RateLimiter},
std::{net::IpAddr, time::Duration},
};

pub struct ConnectionRateLimiter {
limiter: KeyedRateLimiter<IpAddr>,
}

impl ConnectionRateLimiter {
/// Create a new rate limiter per IpAddr. The rate is specified as the count per minute to allow for
/// less frequent connections.
pub fn new(limit_per_minute: u64) -> Self {
Self {
limiter: KeyedRateLimiter::new(limit_per_minute, Duration::from_secs(60)),
}
}

/// Check if the connection from the said `ip` is allowed.
pub fn is_allowed(&self, ip: &IpAddr) -> bool {
// Acquire a permit from the rate limiter for the given IP address
if self.limiter.check_and_update(*ip) {
debug!("Request from IP {:?} allowed", ip);
true // Request allowed
} else {
debug!("Request from IP {:?} blocked", ip);
false // Request blocked
}
}

/// retain only keys whose throttle start date is within the throttle interval.
/// Otherwise drop them as inactive
pub fn retain_recent(&self) {
self.limiter.retain_recent()
}

/// Returns the number of "live" keys in the rate limiter.
pub fn len(&self) -> usize {
self.limiter.len()
}

/// Returns `true` if the rate limiter has no keys in it.
pub fn is_empty(&self) -> bool {
self.limiter.is_empty()
}
}

/// Connection rate limiter for enforcing connection rates from
/// all clients.
pub struct TotalConnectionRateLimiter {
limiter: RateLimiter,
}

impl TotalConnectionRateLimiter {
/// Create a new rate limiter. The rate is specified as the count per second.
pub fn new(limit_per_second: u64) -> Self {
Self {
limiter: RateLimiter::new(limit_per_second, Duration::from_secs(1)),
}
}

/// Check if a connection is allowed.
pub fn is_allowed(&mut self) -> bool {
self.limiter.check_and_update()
}
}

#[cfg(test)]
pub mod test {
use {super::*, std::net::Ipv4Addr};

#[tokio::test]
async fn test_total_connection_rate_limiter() {
let mut limiter = TotalConnectionRateLimiter::new(2);
assert!(limiter.is_allowed());
assert!(limiter.is_allowed());
assert!(!limiter.is_allowed());
}

#[tokio::test]
async fn test_connection_rate_limiter() {
let limiter = ConnectionRateLimiter::new(4);
let ip1 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));
assert!(limiter.is_allowed(&ip1));
assert!(limiter.is_allowed(&ip1));
assert!(limiter.is_allowed(&ip1));
assert!(limiter.is_allowed(&ip1));
assert!(!limiter.is_allowed(&ip1));

assert!(limiter.len() == 1);
let ip2 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2));
assert!(limiter.is_allowed(&ip2));
assert!(limiter.len() == 2);
assert!(limiter.is_allowed(&ip2));
assert!(limiter.is_allowed(&ip2));
assert!(limiter.is_allowed(&ip2));
assert!(!limiter.is_allowed(&ip2));
}
}
104 changes: 104 additions & 0 deletions streamer/src/nonblocking/keyed_rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use {
crate::nonblocking::rate_limiter::RateLimiter,
dashmap::DashMap,
std::{hash::Hash, time::Duration},
};

pub struct KeyedRateLimiter<K> {
limiters: DashMap<K, RateLimiter>,
interval: Duration,
limit: u64,
}

impl<K> KeyedRateLimiter<K>
where
K: Eq + Hash,
{
/// Create a keyed rate limiter with `limit` count with a rate limit `interval`
pub fn new(limit: u64, interval: Duration) -> Self {
Self {
limiters: DashMap::default(),
interval,
limit,
}
}

/// Check if the connection from the said `key` is allowed to pass through the rate limiter.
/// When it is allowed, the rate limiter state is updated to reflect it has been
/// allowed. For a unique request, the caller should call it only once when it is allowed.
pub fn check_and_update(&self, key: K) -> bool {
let allowed = match self.limiters.entry(key) {
dashmap::mapref::entry::Entry::Occupied(mut entry) => {
let limiter = entry.get_mut();
limiter.check_and_update()
}
dashmap::mapref::entry::Entry::Vacant(entry) => entry
.insert(RateLimiter::new(self.limit, self.interval))
.value_mut()
.check_and_update(),
};
allowed
}

/// retain only keys whose throttle start date is within the throttle interval.
/// Otherwise drop them as inactive
pub fn retain_recent(&self) {
let now = tokio::time::Instant::now();
self.limiters.retain(|_key, limiter| {
now.duration_since(*limiter.throttle_start_instant()) <= self.interval
});
}

/// Returns the number of "live" keys in the rate limiter.
pub fn len(&self) -> usize {
self.limiters.len()
}

/// Returns `true` if the rate limiter has no keys in it.
pub fn is_empty(&self) -> bool {
self.limiters.is_empty()
}
}

#[cfg(test)]
pub mod test {
use {super::*, tokio::time::sleep};

#[allow(clippy::len_zero)]
#[tokio::test]
async fn test_rate_limiter() {
let limiter = KeyedRateLimiter::<u64>::new(2, Duration::from_millis(100));
assert!(limiter.len() == 0);
assert!(limiter.is_empty());
assert!(limiter.check_and_update(1));
assert!(limiter.check_and_update(1));
assert!(!limiter.check_and_update(1));
assert!(limiter.len() == 1);
assert!(limiter.check_and_update(2));
assert!(limiter.check_and_update(2));
assert!(!limiter.check_and_update(2));
assert!(limiter.len() == 2);

// sleep 150 ms, the throttle parameters should have been reset.
sleep(Duration::from_millis(150)).await;
assert!(limiter.len() == 2);

assert!(limiter.check_and_update(1));
assert!(limiter.check_and_update(1));
assert!(!limiter.check_and_update(1));

assert!(limiter.check_and_update(2));
assert!(limiter.check_and_update(2));
assert!(!limiter.check_and_update(2));
assert!(limiter.len() == 2);

// sleep another 150 and clean outdatated, key 2 will be removed
sleep(Duration::from_millis(150)).await;
assert!(limiter.check_and_update(1));
assert!(limiter.check_and_update(1));
assert!(!limiter.check_and_update(1));

limiter.retain_recent();
assert!(limiter.len() == 1);
}
}
Loading

0 comments on commit f54c120

Please sign in to comment.