Skip to content

Commit

Permalink
proxy: Refactor http conn pool (#9785)
Browse files Browse the repository at this point in the history
- Use the same ConnPoolEntry for http connection pool.
- Rename EndpointConnPool to the HttpConnPool.
- Narrow clone bound for client

Fixes #9284
  • Loading branch information
awarus authored Nov 20, 2024
1 parent 313ebfd commit 2d6bf17
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 223 deletions.
13 changes: 7 additions & 6 deletions proxy/src/serverless/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use tracing::field::display;
use tracing::{debug, info};

use super::conn_pool::poll_client;
use super::conn_pool_lib::{Client, ConnInfo, GlobalConnPool};
use super::http_conn_pool::{self, poll_http2_client, Send};
use super::conn_pool_lib::{Client, ConnInfo, EndpointConnPool, GlobalConnPool};
use super::http_conn_pool::{self, poll_http2_client, HttpConnPool, Send};
use super::local_conn_pool::{self, LocalConnPool, EXT_NAME, EXT_SCHEMA, EXT_VERSION};
use crate::auth::backend::local::StaticAuthRules;
use crate::auth::backend::{ComputeCredentials, ComputeUserInfo};
Expand All @@ -36,9 +36,10 @@ use crate::rate_limiter::EndpointRateLimiter;
use crate::types::{EndpointId, Host, LOCAL_PROXY_SUFFIX};

pub(crate) struct PoolingBackend {
pub(crate) http_conn_pool: Arc<super::http_conn_pool::GlobalConnPool<Send>>,
pub(crate) http_conn_pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
pub(crate) local_pool: Arc<LocalConnPool<tokio_postgres::Client>>,
pub(crate) pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
pub(crate) pool:
Arc<GlobalConnPool<tokio_postgres::Client, EndpointConnPool<tokio_postgres::Client>>>,

pub(crate) config: &'static ProxyConfig,
pub(crate) auth_backend: &'static crate::auth::Backend<'static, ()>,
Expand Down Expand Up @@ -474,7 +475,7 @@ impl ShouldRetryWakeCompute for LocalProxyConnError {
}

struct TokioMechanism {
pool: Arc<GlobalConnPool<tokio_postgres::Client>>,
pool: Arc<GlobalConnPool<tokio_postgres::Client, EndpointConnPool<tokio_postgres::Client>>>,
conn_info: ConnInfo,
conn_id: uuid::Uuid,

Expand Down Expand Up @@ -524,7 +525,7 @@ impl ConnectMechanism for TokioMechanism {
}

struct HyperMechanism {
pool: Arc<http_conn_pool::GlobalConnPool<Send>>,
pool: Arc<GlobalConnPool<Send, HttpConnPool<Send>>>,
conn_info: ConnInfo,
conn_id: uuid::Uuid,

Expand Down
6 changes: 4 additions & 2 deletions proxy/src/serverless/conn_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use {
};

use super::conn_pool_lib::{
Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, GlobalConnPool,
Client, ClientDataEnum, ClientInnerCommon, ClientInnerExt, ConnInfo, EndpointConnPool,
GlobalConnPool,
};
use crate::context::RequestContext;
use crate::control_plane::messages::MetricsAuxInfo;
Expand Down Expand Up @@ -52,7 +53,7 @@ impl fmt::Display for ConnInfo {
}

pub(crate) fn poll_client<C: ClientInnerExt>(
global_pool: Arc<GlobalConnPool<C>>,
global_pool: Arc<GlobalConnPool<C, EndpointConnPool<C>>>,
ctx: &RequestContext,
conn_info: ConnInfo,
client: C,
Expand Down Expand Up @@ -167,6 +168,7 @@ pub(crate) fn poll_client<C: ClientInnerExt>(
Client::new(inner, conn_info, pool_clone)
}

#[derive(Clone)]
pub(crate) struct ClientDataRemote {
session: tokio::sync::watch::Sender<uuid::Uuid>,
cancel: CancellationToken,
Expand Down
171 changes: 99 additions & 72 deletions proxy/src/serverless/conn_pool_lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::HashMap;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::atomic::{self, AtomicUsize};
use std::sync::{Arc, Weak};
Expand Down Expand Up @@ -43,13 +44,14 @@ impl ConnInfo {
}
}

#[derive(Clone)]
pub(crate) enum ClientDataEnum {
Remote(ClientDataRemote),
Local(ClientDataLocal),
#[allow(dead_code)]
Http(ClientDataHttp),
}

#[derive(Clone)]
pub(crate) struct ClientInnerCommon<C: ClientInnerExt> {
pub(crate) inner: C,
pub(crate) aux: MetricsAuxInfo,
Expand Down Expand Up @@ -91,6 +93,7 @@ pub(crate) struct ConnPoolEntry<C: ClientInnerExt> {
pub(crate) struct EndpointConnPool<C: ClientInnerExt> {
pools: HashMap<(DbName, RoleName), DbUserConnPool<C>>,
total_conns: usize,
/// max # connections per endpoint
max_conns: usize,
_guard: HttpEndpointPoolsGuard<'static>,
global_connections_count: Arc<AtomicUsize>,
Expand Down Expand Up @@ -317,24 +320,49 @@ impl<C: ClientInnerExt> DbUserConn<C> for DbUserConnPool<C> {
}
}

pub(crate) struct GlobalConnPool<C: ClientInnerExt> {
pub(crate) trait EndpointConnPoolExt<C: ClientInnerExt> {
fn clear_closed(&mut self) -> usize;
fn total_conns(&self) -> usize;
}

impl<C: ClientInnerExt> EndpointConnPoolExt<C> for EndpointConnPool<C> {
fn clear_closed(&mut self) -> usize {
let mut clients_removed: usize = 0;
for db_pool in self.pools.values_mut() {
clients_removed += db_pool.clear_closed_clients(&mut self.total_conns);
}
clients_removed
}

fn total_conns(&self) -> usize {
self.total_conns
}
}

pub(crate) struct GlobalConnPool<C, P>
where
C: ClientInnerExt,
P: EndpointConnPoolExt<C>,
{
// endpoint -> per-endpoint connection pool
//
// That should be a fairly conteded map, so return reference to the per-endpoint
// pool as early as possible and release the lock.
global_pool: DashMap<EndpointCacheKey, Arc<RwLock<EndpointConnPool<C>>>>,
pub(crate) global_pool: DashMap<EndpointCacheKey, Arc<RwLock<P>>>,

/// Number of endpoint-connection pools
///
/// [`DashMap::len`] iterates over all inner pools and acquires a read lock on each.
/// That seems like far too much effort, so we're using a relaxed increment counter instead.
/// It's only used for diagnostics.
global_pool_size: AtomicUsize,
pub(crate) global_pool_size: AtomicUsize,

/// Total number of connections in the pool
global_connections_count: Arc<AtomicUsize>,
pub(crate) global_connections_count: Arc<AtomicUsize>,

pub(crate) config: &'static crate::config::HttpConfig,

config: &'static crate::config::HttpConfig,
_marker: PhantomData<C>,
}

#[derive(Debug, Clone, Copy)]
Expand All @@ -357,14 +385,19 @@ pub struct GlobalConnPoolOptions {
pub max_total_conns: usize,
}

impl<C: ClientInnerExt> GlobalConnPool<C> {
impl<C, P> GlobalConnPool<C, P>
where
C: ClientInnerExt,
P: EndpointConnPoolExt<C>,
{
pub(crate) fn new(config: &'static crate::config::HttpConfig) -> Arc<Self> {
let shards = config.pool_options.pool_shards;
Arc::new(Self {
global_pool: DashMap::with_shard_amount(shards),
global_pool_size: AtomicUsize::new(0),
config,
global_connections_count: Arc::new(AtomicUsize::new(0)),
_marker: PhantomData,
})
}

Expand All @@ -378,60 +411,6 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
self.config.pool_options.idle_timeout
}

pub(crate) fn get(
self: &Arc<Self>,
ctx: &RequestContext,
conn_info: &ConnInfo,
) -> Result<Option<Client<C>>, HttpConnError> {
let mut client: Option<ClientInnerCommon<C>> = None;
let Some(endpoint) = conn_info.endpoint_cache_key() else {
return Ok(None);
};

let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
if let Some(entry) = endpoint_pool
.write()
.get_conn_entry(conn_info.db_and_user())
{
client = Some(entry.conn);
}
let endpoint_pool = Arc::downgrade(&endpoint_pool);

// ok return cached connection if found and establish a new one otherwise
if let Some(mut client) = client {
if client.inner.is_closed() {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);
}
tracing::Span::current()
.record("conn_id", tracing::field::display(client.get_conn_id()));
tracing::Span::current().record(
"pid",
tracing::field::display(client.inner.get_process_id()),
);
debug!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
);

match client.get_data() {
ClientDataEnum::Local(data) => {
data.session().send(ctx.session_id())?;
}

ClientDataEnum::Remote(data) => {
data.session().send(ctx.session_id())?;
}
ClientDataEnum::Http(_) => (),
}

ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.success();
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
}
Ok(None)
}

pub(crate) fn shutdown(&self) {
// drops all strong references to endpoint-pools
self.global_pool.clear();
Expand Down Expand Up @@ -464,17 +443,10 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
// if the current endpoint pool is unique (no other strong or weak references)
// then it is currently not in use by any connections.
if let Some(pool) = Arc::get_mut(x.get_mut()) {
let EndpointConnPool {
pools, total_conns, ..
} = pool.get_mut();

// ensure that closed clients are removed
for db_pool in pools.values_mut() {
clients_removed += db_pool.clear_closed_clients(total_conns);
}
let endpoints = pool.get_mut();
clients_removed = endpoints.clear_closed();

// we only remove this pool if it has no active connections
if *total_conns == 0 {
if endpoints.total_conns() == 0 {
info!("pool: discarding pool for endpoint {endpoint}");
return false;
}
Expand Down Expand Up @@ -510,6 +482,62 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
info!("pool: performed global pool gc. size now {global_pool_size}");
}
}
}

impl<C: ClientInnerExt> GlobalConnPool<C, EndpointConnPool<C>> {
pub(crate) fn get(
self: &Arc<Self>,
ctx: &RequestContext,
conn_info: &ConnInfo,
) -> Result<Option<Client<C>>, HttpConnError> {
let mut client: Option<ClientInnerCommon<C>> = None;
let Some(endpoint) = conn_info.endpoint_cache_key() else {
return Ok(None);
};

let endpoint_pool = self.get_or_create_endpoint_pool(&endpoint);
if let Some(entry) = endpoint_pool
.write()
.get_conn_entry(conn_info.db_and_user())
{
client = Some(entry.conn);
}
let endpoint_pool = Arc::downgrade(&endpoint_pool);

// ok return cached connection if found and establish a new one otherwise
if let Some(mut client) = client {
if client.inner.is_closed() {
info!("pool: cached connection '{conn_info}' is closed, opening a new one");
return Ok(None);
}
tracing::Span::current()
.record("conn_id", tracing::field::display(client.get_conn_id()));
tracing::Span::current().record(
"pid",
tracing::field::display(client.inner.get_process_id()),
);
debug!(
cold_start_info = ColdStartInfo::HttpPoolHit.as_str(),
"pool: reusing connection '{conn_info}'"
);

match client.get_data() {
ClientDataEnum::Local(data) => {
data.session().send(ctx.session_id())?;
}

ClientDataEnum::Remote(data) => {
data.session().send(ctx.session_id())?;
}
ClientDataEnum::Http(_) => (),
}

ctx.set_cold_start_info(ColdStartInfo::HttpPoolHit);
ctx.success();
return Ok(Some(Client::new(client, conn_info.clone(), endpoint_pool)));
}
Ok(None)
}

pub(crate) fn get_or_create_endpoint_pool(
self: &Arc<Self>,
Expand Down Expand Up @@ -556,7 +584,6 @@ impl<C: ClientInnerExt> GlobalConnPool<C> {
pool
}
}

pub(crate) struct Client<C: ClientInnerExt> {
span: Span,
inner: Option<ClientInnerCommon<C>>,
Expand Down
Loading

1 comment on commit 2d6bf17

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5626 tests run: 5388 passed, 2 failed, 236 skipped (full report)


Failures on Postgres 16

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_sharded_ingest[release-pg16-github-actions-selfhosted-1] or test_compaction_l0_memory[release-pg16-github-actions-selfhosted]"
Flaky tests (1)

Postgres 14

Code coverage* (full report)

  • functions: 31.4% (7948 of 25319 functions)
  • lines: 49.3% (63079 of 127901 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
2d6bf17 at 2024-11-20T21:18:46.739Z :recycle:

Please sign in to comment.