From 41c8340dd26c941ee1815ad3a1cc915c4361c22d Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Fri, 15 Nov 2024 22:35:20 -0800 Subject: [PATCH] Raise pool connection limit to prevent timeout on `acquire()` --- copperd/bin/edged/src/main.rs | 3 ++- copperd/bin/piper/src/main.rs | 15 ++++++++++++++- copperd/lib/itemdb/src/client/client/mod.rs | 20 ++++++++++++++++---- copperd/lib/util/src/logging.rs | 7 ++++++- 4 files changed, 38 insertions(+), 7 deletions(-) diff --git a/copperd/bin/edged/src/main.rs b/copperd/bin/edged/src/main.rs index 230e3192..58efab6d 100644 --- a/copperd/bin/edged/src/main.rs +++ b/copperd/bin/edged/src/main.rs @@ -39,7 +39,8 @@ async fn make_app(config: Arc, s3_client: Arc) -> Router trace!(message = "Connecting to itemdb"); // Connect to database - let itemdb_client = match ItemdbClient::open(&config.edged_itemdb_addr, true).await { + // TODO: configure max connections + let itemdb_client = match ItemdbClient::open(32, &config.edged_itemdb_addr, true).await { Ok(db) => Arc::new(db), Err(ItemdbOpenError::Database(e)) => { error!(message = "SQL error while opening item database", err = ?e); diff --git a/copperd/bin/piper/src/main.rs b/copperd/bin/piper/src/main.rs index ae045b0e..654f45d2 100644 --- a/copperd/bin/piper/src/main.rs +++ b/copperd/bin/piper/src/main.rs @@ -127,7 +127,20 @@ async fn main() { trace!(message = "Connecting to itemdb"); // Connect to database let itemdb_client = loop { - match ItemdbClient::open(&config.piper_itemdb_addr, false).await { + match ItemdbClient::open( + // We need at least one connection per job. + // If we use any fewer, requests to acquire new connections will time out! + // ...and add 4 extra connections, just to be safe. + // + // If piper exits with a "connection timed out" error, we need to raise this limit. + // Be careful with this, though---understand *why* you need so many connections! + // We really shouldn't need more than one per job. + u32::try_from(config.piper_parallel_jobs).unwrap() + 4, + &config.piper_itemdb_addr, + false, + ) + .await + { Ok(db) => break Arc::new(db), Err(ItemdbOpenError::Database(e)) => { error!(message = "SQL error while opening item database", err = ?e); diff --git a/copperd/lib/itemdb/src/client/client/mod.rs b/copperd/lib/itemdb/src/client/client/mod.rs index a63db3b4..9a6e3ec9 100644 --- a/copperd/lib/itemdb/src/client/client/mod.rs +++ b/copperd/lib/itemdb/src/client/client/mod.rs @@ -5,7 +5,7 @@ use sqlx::{ pool::PoolConnection, postgres::PgPoolOptions, Connection, PgConnection, PgPool, Postgres, }; use thiserror::Error; -use tracing::info; +use tracing::{info, trace}; use crate::client::migrate; @@ -39,7 +39,11 @@ pub struct ItemdbClient { impl ItemdbClient { /// Create a new [`LocalDataset`]. - pub async fn open(db_addr: &str, migrate: bool) -> Result { + pub async fn open( + max_connections: u32, + db_addr: &str, + migrate: bool, + ) -> Result { info!(message = "Opening dataset", ds_type = "postgres", ?db_addr); // Apply migrations @@ -56,8 +60,7 @@ impl ItemdbClient { drop(conn); let pool = PgPoolOptions::new() - // TODO: configure - .max_connections(5) + .max_connections(max_connections) .connect(db_addr) .await?; @@ -65,6 +68,15 @@ impl ItemdbClient { } pub async fn new_connection(&self) -> Result, sqlx::Error> { + let size = self.pool.size(); + let idle_connections = self.pool.num_idle(); + let active_connections = size - u32::try_from(idle_connections).unwrap(); + trace!( + message = "Trying to open itemdb connection", + idle_connections, + active_connections + ); + let conn = self.pool.acquire().await?; return Ok(conn); } diff --git a/copperd/lib/util/src/logging.rs b/copperd/lib/util/src/logging.rs index 3d832b44..319cb331 100644 --- a/copperd/lib/util/src/logging.rs +++ b/copperd/lib/util/src/logging.rs @@ -51,6 +51,7 @@ impl LoggingPreset { other: LogLevel::Warn, http: LogLevel::Warn, s3: LogLevel::Warn, + sqlx: LogLevel::Error, piper: LogLevel::Info, runner: LogLevel::Info, @@ -64,6 +65,7 @@ impl LoggingPreset { other: LogLevel::Warn, http: LogLevel::Warn, s3: LogLevel::Warn, + sqlx: LogLevel::Error, piper: LogLevel::Debug, runner: LogLevel::Debug, @@ -77,6 +79,7 @@ impl LoggingPreset { other: LogLevel::Debug, http: LogLevel::Warn, s3: LogLevel::Warn, + sqlx: LogLevel::Warn, piper: LogLevel::Trace, runner: LogLevel::Trace, @@ -90,6 +93,7 @@ impl LoggingPreset { other: LogLevel::Trace, http: LogLevel::Warn, s3: LogLevel::Warn, + sqlx: LogLevel::Warn, piper: LogLevel::Trace, runner: LogLevel::Trace, @@ -106,6 +110,7 @@ pub struct LoggingConfig { other: LogLevel, http: LogLevel, s3: LogLevel, + sqlx: LogLevel, piper: LogLevel, runner: LogLevel, @@ -122,7 +127,6 @@ impl From for EnvFilter { // // Non-configurable sources // - format!("sqlx={}", LogLevel::Warn), format!("aws_sdk_s3={}", LogLevel::Warn), format!("aws_smithy_runtime={}", LogLevel::Warn), format!("aws_smithy_runtime_api={}", LogLevel::Warn), @@ -133,6 +137,7 @@ impl From for EnvFilter { // format!("tower_http={}", conf.http), format!("s3={}", conf.s3), + format!("sqlx={}", conf.sqlx), // // Piper format!("piper::pipeline::runner={}", conf.runner), format!("piper::pipeline::job={}", conf.job),