Skip to content

Commit

Permalink
Connection rate limiting (#948)
Browse files Browse the repository at this point in the history
* use rate limit on connectings

use rate limit on connectings; missing file

* Change connection rate limit to 8/min instead of 4/s

* Addressed some feedback from Trent

* removed some comments

* fix test failures which are opening connections more frequently

* moved the flag up

* turn off rate limiting to debug CI

* Fix CI test failures

* differentiate of the two throttling cases in stats: across connections or per ip addr

* fmt issues

* Addressed some feedback from Trent

* Added unit tests

Cleanup connection cache rate limiter if exceeding certain threshold

missing files

CONNECITON_RATE_LIMITER_CLEANUP_THRESHOLD to 100_000

clippy issue

clippy issue

sort crates

* revert Cargo.lock changes

* Addressed some feedback from Pankaj

(cherry picked from commit f54c120)

# Conflicts:
#	client/src/connection_cache.rs
#	quic-client/tests/quic_client.rs
#	streamer/src/nonblocking/quic.rs
#	streamer/src/quic.rs
#	validator/src/cli.rs
  • Loading branch information
lijunwangs authored and mergify[bot] committed May 15, 2024
1 parent 254eccd commit bd624af
Show file tree
Hide file tree
Showing 18 changed files with 471 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions bench-tps/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use {
pubkey::Pubkey,
signature::{read_keypair_file, Keypair},
},
solana_streamer::nonblocking::quic::DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
solana_tpu_client::tpu_client::{DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_USE_QUIC},
std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
Expand Down Expand Up @@ -74,6 +75,7 @@ pub struct Config {
pub external_client_type: ExternalClientType,
pub use_quic: bool,
pub tpu_connection_pool_size: usize,
pub tpu_max_connections_per_ipaddr_per_minute: u64,
pub compute_unit_price: Option<ComputeUnitPrice>,
pub use_durable_nonce: bool,
pub instruction_padding_config: Option<InstructionPaddingConfig>,
Expand Down Expand Up @@ -109,6 +111,8 @@ impl Default for Config {
external_client_type: ExternalClientType::default(),
use_quic: DEFAULT_TPU_USE_QUIC,
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE,
tpu_max_connections_per_ipaddr_per_minute:
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
compute_unit_price: None,
use_durable_nonce: false,
instruction_padding_config: None,
Expand Down
9 changes: 9 additions & 0 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,15 @@ mod tests {
crossbeam_channel::unbounded,
solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair},
solana_streamer::{
<<<<<<< HEAD
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
=======
nonblocking::quic::{
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
quic::SpawnServerResult,
>>>>>>> f54c120450 (Connection rate limiting (#948))
streamer::StakedNodes,
},
std::{
Expand Down Expand Up @@ -258,6 +266,7 @@ mod tests {
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
3 changes: 3 additions & 0 deletions core/src/tpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ impl Tpu {
banking_tracer: Arc<BankingTracer>,
tracer_thread_hdl: TracerThread,
tpu_enable_udp: bool,
tpu_max_connections_per_ipaddr_per_minute: u64,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
block_production_method: BlockProductionMethod,
_generator_config: Option<GeneratorConfig>, /* vestigial code for replay invalidator */
Expand Down Expand Up @@ -164,6 +165,7 @@ impl Tpu {
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
tpu_max_connections_per_ipaddr_per_minute,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
Expand All @@ -185,6 +187,7 @@ impl Tpu {
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
DEFAULT_MAX_STREAMS_PER_MS,
tpu_max_connections_per_ipaddr_per_minute,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
Expand Down
4 changes: 4 additions & 0 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ impl Validator {
use_quic: bool,
tpu_connection_pool_size: usize,
tpu_enable_udp: bool,
tpu_max_connections_per_ipaddr_per_minute: u64,
admin_rpc_service_post_init: Arc<RwLock<Option<AdminRpcRequestMetadataPostInit>>>,
) -> Result<Self, String> {
let id = identity_keypair.pubkey();
Expand Down Expand Up @@ -1315,6 +1316,7 @@ impl Validator {
banking_tracer,
tracer_thread,
tpu_enable_udp,
tpu_max_connections_per_ipaddr_per_minute,
&prioritization_fee_cache,
config.block_production_method.clone(),
config.generator_config.clone(),
Expand Down Expand Up @@ -2485,6 +2487,7 @@ mod tests {
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute for test
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -2570,6 +2573,7 @@ mod tests {
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute for test
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start")
Expand Down
3 changes: 3 additions & 0 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ impl LocalCluster {
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -541,6 +542,7 @@ impl LocalCluster {
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per mintute
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down Expand Up @@ -938,6 +940,7 @@ impl Cluster for LocalCluster {
DEFAULT_TPU_USE_QUIC,
DEFAULT_TPU_CONNECTION_POOL_SIZE,
DEFAULT_TPU_ENABLE_UDP,
32, // max connections per IpAddr per minute, use higher value because of tests
Arc::new(RwLock::new(None)),
)
.expect("assume successful validator start");
Expand Down
1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions quic-client/tests/quic_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,15 @@ mod tests {
},
solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::{
<<<<<<< HEAD
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
=======
nonblocking::quic::{
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE, DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
quic::SpawnServerResult,
>>>>>>> f54c120450 (Connection rate limiting (#948))
streamer::StakedNodes,
tls_certificates::new_self_signed_tls_certificate,
},
Expand Down Expand Up @@ -81,6 +89,7 @@ mod tests {
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -162,6 +171,7 @@ mod tests {
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
Duration::from_secs(1), // wait_for_chunk_timeout
DEFAULT_TPU_COALESCE,
)
Expand Down Expand Up @@ -221,6 +231,7 @@ mod tests {
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand All @@ -246,6 +257,7 @@ mod tests {
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
Expand Down
1 change: 1 addition & 0 deletions streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ edition = { workspace = true }
async-channel = { workspace = true }
bytes = { workspace = true }
crossbeam-channel = { workspace = true }
dashmap = { workspace = true }
futures-util = { workspace = true }
histogram = { workspace = true }
indexmap = { workspace = true }
Expand Down
99 changes: 99 additions & 0 deletions streamer/src/nonblocking/connection_rate_limiter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use {
crate::nonblocking::{keyed_rate_limiter::KeyedRateLimiter, rate_limiter::RateLimiter},
std::{net::IpAddr, time::Duration},
};

pub struct ConnectionRateLimiter {
limiter: KeyedRateLimiter<IpAddr>,
}

impl ConnectionRateLimiter {
/// Create a new rate limiter per IpAddr. The rate is specified as the count per minute to allow for
/// less frequent connections.
pub fn new(limit_per_minute: u64) -> Self {
Self {
limiter: KeyedRateLimiter::new(limit_per_minute, Duration::from_secs(60)),
}
}

/// Check if the connection from the said `ip` is allowed.
pub fn is_allowed(&self, ip: &IpAddr) -> bool {
// Acquire a permit from the rate limiter for the given IP address
if self.limiter.check_and_update(*ip) {
debug!("Request from IP {:?} allowed", ip);
true // Request allowed
} else {
debug!("Request from IP {:?} blocked", ip);
false // Request blocked
}
}

/// retain only keys whose throttle start date is within the throttle interval.
/// Otherwise drop them as inactive
pub fn retain_recent(&self) {
self.limiter.retain_recent()
}

/// Returns the number of "live" keys in the rate limiter.
pub fn len(&self) -> usize {
self.limiter.len()
}

/// Returns `true` if the rate limiter has no keys in it.
pub fn is_empty(&self) -> bool {
self.limiter.is_empty()
}
}

/// Connection rate limiter for enforcing connection rates from
/// all clients.
pub struct TotalConnectionRateLimiter {
limiter: RateLimiter,
}

impl TotalConnectionRateLimiter {
/// Create a new rate limiter. The rate is specified as the count per second.
pub fn new(limit_per_second: u64) -> Self {
Self {
limiter: RateLimiter::new(limit_per_second, Duration::from_secs(1)),
}
}

/// Check if a connection is allowed.
pub fn is_allowed(&mut self) -> bool {
self.limiter.check_and_update()
}
}

#[cfg(test)]
pub mod test {
use {super::*, std::net::Ipv4Addr};

#[tokio::test]
async fn test_total_connection_rate_limiter() {
let mut limiter = TotalConnectionRateLimiter::new(2);
assert!(limiter.is_allowed());
assert!(limiter.is_allowed());
assert!(!limiter.is_allowed());
}

#[tokio::test]
async fn test_connection_rate_limiter() {
let limiter = ConnectionRateLimiter::new(4);
let ip1 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));
assert!(limiter.is_allowed(&ip1));
assert!(limiter.is_allowed(&ip1));
assert!(limiter.is_allowed(&ip1));
assert!(limiter.is_allowed(&ip1));
assert!(!limiter.is_allowed(&ip1));

assert!(limiter.len() == 1);
let ip2 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2));
assert!(limiter.is_allowed(&ip2));
assert!(limiter.len() == 2);
assert!(limiter.is_allowed(&ip2));
assert!(limiter.is_allowed(&ip2));
assert!(limiter.is_allowed(&ip2));
assert!(!limiter.is_allowed(&ip2));
}
}
Loading

0 comments on commit bd624af

Please sign in to comment.