Skip to content

Commit

Permalink
proxy: cull http connections (#7632)
Browse files Browse the repository at this point in the history
## Problem

Some HTTP client connections can stay open for quite a long time.

## Summary of changes

When there are too many HTTP client connections, pick a random
connection and gracefully cancel it.
  • Loading branch information
conradludgate authored May 7, 2024
1 parent 0af66a6 commit 0c99e5e
Show file tree
Hide file tree
Showing 10 changed files with 164 additions and 18 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ humantime = "2.1"
humantime-serde = "1.1.1"
hyper = "0.14"
hyper-tungstenite = "0.13.0"
indexmap = "2"
inotify = "0.10.2"
ipnet = "2.9.0"
itertools = "0.10"
Expand Down
9 changes: 9 additions & 0 deletions libs/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,15 @@ impl<A: CounterPairAssoc> CounterPairVec<A> {
let id = self.vec.with_labels(labels);
self.vec.remove_metric(id)
}

pub fn sample(&self, labels: <A::LabelGroupSet as LabelGroupSet>::Group<'_>) -> u64 {
let id = self.vec.with_labels(labels);
let metric = self.vec.get_metric(id);

let inc = metric.inc.count.load(std::sync::atomic::Ordering::Relaxed);
let dec = metric.dec.count.load(std::sync::atomic::Ordering::Relaxed);
inc.saturating_sub(dec)
}
}

impl<T, A> ::measured::metric::group::MetricGroup<T> for CounterPairVec<A>
Expand Down
1 change: 1 addition & 0 deletions proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ hyper.workspace = true
hyper1 = { package = "hyper", version = "1.2", features = ["server"] }
hyper-util = { version = "0.1", features = ["server", "http1", "http2", "tokio"] }
http-body-util = { version = "0.1" }
indexmap.workspace = true
ipnet.workspace = true
itertools.workspace = true
lasso = { workspace = true, features = ["multi-threaded"] }
Expand Down
9 changes: 9 additions & 0 deletions proxy/src/bin/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use proxy::redis::cancellation_publisher::RedisPublisherClient;
use proxy::redis::connection_with_credentials_provider::ConnectionWithCredentialsProvider;
use proxy::redis::elasticache;
use proxy::redis::notifications;
use proxy::serverless::cancel_set::CancelSet;
use proxy::serverless::GlobalConnPoolOptions;
use proxy::usage_metrics;

Expand Down Expand Up @@ -243,6 +244,12 @@ struct SqlOverHttpArgs {
/// increase memory used by the pool
#[clap(long, default_value_t = 128)]
sql_over_http_pool_shards: usize,

#[clap(long, default_value_t = 10000)]
sql_over_http_client_conn_threshold: u64,

#[clap(long, default_value_t = 64)]
sql_over_http_cancel_set_shards: usize,
}

#[tokio::main]
Expand Down Expand Up @@ -599,6 +606,8 @@ fn build_config(args: &ProxyCliArgs) -> anyhow::Result<&'static ProxyConfig> {
opt_in: args.sql_over_http.sql_over_http_pool_opt_in,
max_total_conns: args.sql_over_http.sql_over_http_pool_max_total_conns,
},
cancel_set: CancelSet::new(args.sql_over_http.sql_over_http_cancel_set_shards),
client_conn_threshold: args.sql_over_http.sql_over_http_client_conn_threshold,
};
let authentication_config = AuthenticationConfig {
scram_protocol_timeout: args.scram_protocol_timeout,
Expand Down
4 changes: 3 additions & 1 deletion proxy/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
auth::{self, backend::AuthRateLimiter},
console::locks::ApiLocks,
rate_limiter::RateBucketInfo,
serverless::GlobalConnPoolOptions,
serverless::{cancel_set::CancelSet, GlobalConnPoolOptions},
Host,
};
use anyhow::{bail, ensure, Context, Ok};
Expand Down Expand Up @@ -56,6 +56,8 @@ pub struct TlsConfig {
pub struct HttpConfig {
pub request_timeout: tokio::time::Duration,
pub pool_options: GlobalConnPoolOptions,
pub cancel_set: CancelSet,
pub client_conn_threshold: u64,
}

pub struct AuthenticationConfig {
Expand Down
47 changes: 33 additions & 14 deletions proxy/src/serverless.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! Handles both SQL over HTTP and SQL over Websockets.
mod backend;
pub mod cancel_set;
mod conn_pool;
mod http_util;
mod json;
Expand Down Expand Up @@ -109,20 +110,37 @@ pub async fn task_main(
let conn_id = uuid::Uuid::new_v4();
let http_conn_span = tracing::info_span!("http_conn", ?conn_id);

connections.spawn(
connection_handler(
config,
backend.clone(),
connections.clone(),
cancellation_handler.clone(),
cancellation_token.clone(),
server.clone(),
tls_acceptor.clone(),
conn,
peer_addr,
)
.instrument(http_conn_span),
);
let n_connections = Metrics::get()
.proxy
.client_connections
.sample(crate::metrics::Protocol::Http);
tracing::trace!(?n_connections, threshold = ?config.http_config.client_conn_threshold, "check");
if n_connections > config.http_config.client_conn_threshold {
tracing::trace!("attempting to cancel a random connection");
if let Some(token) = config.http_config.cancel_set.take() {
tracing::debug!("cancelling a random connection");
token.cancel()
}
}

let conn_token = cancellation_token.child_token();
let conn = connection_handler(
config,
backend.clone(),
connections.clone(),
cancellation_handler.clone(),
conn_token.clone(),
server.clone(),
tls_acceptor.clone(),
conn,
peer_addr,
)
.instrument(http_conn_span);

connections.spawn(async move {
let _cancel_guard = config.http_config.cancel_set.insert(conn_id, conn_token);
conn.await
});
}

connections.wait().await;
Expand Down Expand Up @@ -243,6 +261,7 @@ async fn connection_handler(
// On cancellation, trigger the HTTP connection handler to shut down.
let res = match select(pin!(cancellation_token.cancelled()), pin!(conn)).await {
Either::Left((_cancelled, mut conn)) => {
tracing::debug!(%peer_addr, "cancelling connection");
conn.as_mut().graceful_shutdown();
conn.await
}
Expand Down
102 changes: 102 additions & 0 deletions proxy/src/serverless/cancel_set.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
//! A set for cancelling random http connections
use std::{
hash::{BuildHasher, BuildHasherDefault},
num::NonZeroUsize,
time::Duration,
};

use indexmap::IndexMap;
use parking_lot::Mutex;
use rand::{thread_rng, Rng};
use rustc_hash::FxHasher;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

type Hasher = BuildHasherDefault<FxHasher>;

pub struct CancelSet {
shards: Box<[Mutex<CancelShard>]>,
// keyed by random uuid, fxhasher is fine
hasher: Hasher,
}

pub struct CancelShard {
tokens: IndexMap<uuid::Uuid, (Instant, CancellationToken), Hasher>,
}

impl CancelSet {
pub fn new(shards: usize) -> Self {
CancelSet {
shards: (0..shards)
.map(|_| {
Mutex::new(CancelShard {
tokens: IndexMap::with_hasher(Hasher::default()),
})
})
.collect(),
hasher: Hasher::default(),
}
}

pub fn take(&self) -> Option<CancellationToken> {
for _ in 0..4 {
if let Some(token) = self.take_raw(thread_rng().gen()) {
return Some(token);
}
tracing::trace!("failed to get cancel token");
}
None
}

pub fn take_raw(&self, rng: usize) -> Option<CancellationToken> {
NonZeroUsize::new(self.shards.len())
.and_then(|len| self.shards[rng % len].lock().take(rng / len))
}

pub fn insert(&self, id: uuid::Uuid, token: CancellationToken) -> CancelGuard<'_> {
let shard = NonZeroUsize::new(self.shards.len()).map(|len| {
let hash = self.hasher.hash_one(id) as usize;
let shard = &self.shards[hash % len];
shard.lock().insert(id, token);
shard
});
CancelGuard { shard, id }
}
}

impl CancelShard {
fn take(&mut self, rng: usize) -> Option<CancellationToken> {
NonZeroUsize::new(self.tokens.len()).and_then(|len| {
// 10 second grace period so we don't cancel new connections
if self.tokens.get_index(rng % len)?.1 .0.elapsed() < Duration::from_secs(10) {
return None;
}

let (_key, (_insert, token)) = self.tokens.swap_remove_index(rng % len)?;
Some(token)
})
}

fn remove(&mut self, id: uuid::Uuid) {
self.tokens.swap_remove(&id);
}

fn insert(&mut self, id: uuid::Uuid, token: CancellationToken) {
self.tokens.insert(id, (Instant::now(), token));
}
}

pub struct CancelGuard<'a> {
shard: Option<&'a Mutex<CancelShard>>,
id: Uuid,
}

impl Drop for CancelGuard<'_> {
fn drop(&mut self) {
if let Some(shard) = self.shard {
shard.lock().remove(self.id);
}
}
}
4 changes: 3 additions & 1 deletion proxy/src/serverless/conn_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ impl<C: ClientInnerExt> Drop for Client<C> {
mod tests {
use std::{mem, sync::atomic::AtomicBool};

use crate::{BranchId, EndpointId, ProjectId};
use crate::{serverless::cancel_set::CancelSet, BranchId, EndpointId, ProjectId};

use super::*;

Expand Down Expand Up @@ -767,6 +767,8 @@ mod tests {
max_total_conns: 3,
},
request_timeout: Duration::from_secs(1),
cancel_set: CancelSet::new(0),
client_conn_threshold: u64::MAX,
}));
let pool = GlobalConnPool::new(config);
let conn_info = ConnInfo {
Expand Down
4 changes: 2 additions & 2 deletions proxy/src/serverless/sql_over_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,8 @@ pub enum SqlOverHttpCancel {
impl ReportableError for SqlOverHttpCancel {
fn get_error_kind(&self) -> ErrorKind {
match self {
SqlOverHttpCancel::Postgres => ErrorKind::RateLimit,
SqlOverHttpCancel::Connect => ErrorKind::ServiceRateLimit,
SqlOverHttpCancel::Postgres => ErrorKind::ClientDisconnect,
SqlOverHttpCancel::Connect => ErrorKind::ClientDisconnect,
}
}
}
Expand Down

1 comment on commit 0c99e5e

@github-actions
Copy link

@github-actions github-actions bot commented on 0c99e5e May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3135 tests run: 2988 passed, 1 failed, 146 skipped (full report)


Failures on Postgres 14

  • test_storage_controller_many_tenants[github-actions-selfhosted]: release
# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_storage_controller_many_tenants[release-pg14-github-actions-selfhosted]"
Flaky tests (3)

Postgres 16

  • test_gc_aggressive: debug
  • test_vm_bit_clear_on_heap_lock: debug

Postgres 15

  • test_vm_bit_clear_on_heap_lock: debug

Code coverage* (full report)

  • functions: 31.2% (6258 of 20043 functions)
  • lines: 46.7% (46950 of 100468 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
0c99e5e at 2024-05-08T06:35:30.570Z :recycle:

Please sign in to comment.