Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sleep instead of drop when stream rate exceeded limit; #939

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 30 additions & 19 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use {
crate::{
nonblocking::stream_throttle::{
ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_STOP_CODE_THROTTLING,
ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_THROTTLING_INTERVAL,
STREAM_THROTTLING_INTERVAL_MS,
},
quic::{configure_server, QuicServerError, StreamStats},
Expand Down Expand Up @@ -55,7 +55,7 @@ use {
// introduce any other awaits while holding the RwLock.
sync::{Mutex, MutexGuard},
task::JoinHandle,
time::timeout,
time::{sleep, timeout},
},
};

Expand Down Expand Up @@ -825,25 +825,36 @@ async fn handle_connection(
params.total_stake,
);

stream_counter.reset_throttling_params_if_needed();
if stream_counter.stream_count.load(Ordering::Relaxed)
>= max_streams_per_throttling_interval
{
stats.throttled_streams.fetch_add(1, Ordering::Relaxed);
match params.peer_type {
ConnectionPeerType::Unstaked => {
stats
.throttled_unstaked_streams
.fetch_add(1, Ordering::Relaxed);
}
ConnectionPeerType::Staked(_) => {
stats
.throttled_staked_streams
.fetch_add(1, Ordering::Relaxed);
let throttle_interval_start =
stream_counter.reset_throttling_params_if_needed();
let streams_read_in_throttle_interval =
stream_counter.stream_count.load(Ordering::Relaxed);
if streams_read_in_throttle_interval >= max_streams_per_throttling_interval {
// The peer is sending faster than we're willing to read. Sleep for what's
// left of this read interval so the peer backs off.
let throttle_duration = STREAM_THROTTLING_INTERVAL
.saturating_sub(throttle_interval_start.elapsed());

if !throttle_duration.is_zero() {
debug!("Throttling stream from {remote_addr:?}, peer type: {:?}, total stake: {}, \
max_streams_per_interval: {max_streams_per_throttling_interval}, read_interval_streams: {streams_read_in_throttle_interval} \
throttle_duration: {throttle_duration:?}",
params.peer_type, params.total_stake);
stats.throttled_streams.fetch_add(1, Ordering::Relaxed);
match params.peer_type {
ConnectionPeerType::Unstaked => {
stats
.throttled_unstaked_streams
.fetch_add(1, Ordering::Relaxed);
}
ConnectionPeerType::Staked(_) => {
stats
.throttled_staked_streams
.fetch_add(1, Ordering::Relaxed);
}
}
sleep(throttle_duration).await;
}
let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING));
continue;
}
stream_load_ema.increment_load(params.peer_type);
stream_counter.stream_count.fetch_add(1, Ordering::Relaxed);
Expand Down
18 changes: 12 additions & 6 deletions streamer/src/nonblocking/stream_throttle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use {

const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20;
pub const STREAM_THROTTLING_INTERVAL_MS: u64 = 100;
pub const STREAM_STOP_CODE_THROTTLING: u32 = 15;
pub const STREAM_THROTTLING_INTERVAL: Duration =
Duration::from_millis(STREAM_THROTTLING_INTERVAL_MS);
const STREAM_LOAD_EMA_INTERVAL_MS: u64 = 5;
const STREAM_LOAD_EMA_INTERVAL_COUNT: u64 = 10;
const EMA_WINDOW_MS: u64 = STREAM_LOAD_EMA_INTERVAL_MS * STREAM_LOAD_EMA_INTERVAL_COUNT;
Expand Down Expand Up @@ -208,19 +209,24 @@ impl ConnectionStreamCounter {
}
}

pub(crate) fn reset_throttling_params_if_needed(&self) {
const THROTTLING_INTERVAL: Duration = Duration::from_millis(STREAM_THROTTLING_INTERVAL_MS);
if tokio::time::Instant::now().duration_since(*self.last_throttling_instant.read().unwrap())
> THROTTLING_INTERVAL
/// Reset the counter and last throttling instant and
/// return last_throttling_instant regardless it is reset or not.
pub(crate) fn reset_throttling_params_if_needed(&self) -> tokio::time::Instant {
let last_throttling_instant = *self.last_throttling_instant.read().unwrap();
if tokio::time::Instant::now().duration_since(last_throttling_instant)
> STREAM_THROTTLING_INTERVAL
{
let mut last_throttling_instant = self.last_throttling_instant.write().unwrap();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was already the case before this PR so doesn't block this PR, but this is
racy: we're doing a check with a read lock, then dropping and acquiring a write
lock. Multiple threads can find that they're > THROTTLING_INTERVAL with the read
lock and update the last throttling instant.

I think that this connection-wide limit can be enforced without locks. Every connection task keeps
its last_throttling_instant and stream_count. The max streams allowed per connection is scaled by
the number of connections opened by a peer, which can be passed as an atomic to each connection
task.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a separate issue to address as it is orthogonal to this PR.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link

@pgarg66 pgarg66 Apr 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should not be racy, as we check again for the timeout after acquiring the write lock. Only one thread will acquire the write lock at a time, and the first one to acquire will update the last_throttling_instant.

// Recheck as some other thread might have done throttling since this thread tried to acquire the write lock.
if tokio::time::Instant::now().duration_since(*last_throttling_instant)
> THROTTLING_INTERVAL
> STREAM_THROTTLING_INTERVAL
{
*last_throttling_instant = tokio::time::Instant::now();
self.stream_count.store(0, Ordering::Relaxed);
}
*last_throttling_instant
} else {
last_throttling_instant
}
}
}
Expand Down
Loading