-
I have 700 async fn db_connect() -> Result<PgPool, sqlx::Error> {
let db_url = env::var("DATABASE_URL").expect("Missing DATABASE_URL env var");
let connect_options = PgConnectOptions::from_str(&db_url)?
.ssl_mode(sqlx::postgres::PgSslMode::Prefer)
.log_statements(LevelFilter::Trace);
let pool = PgPoolOptions::new()
.max_connections(20)
.acquire_timeout(Duration::from_secs(2))
.connect_with(connect_options)
.await?;
Ok(pool)
} The pool is stored in my async fn load_collections(state: &AppState, table_name: &str, paths: Vec<PathBuf>) -> Result<()> {
let collections = load_geojson_files(paths).await?;
let mut tasks = Vec::with_capacity(collections.len());
for coll in collections {
tasks.push(populate_collection(state, table_name, coll)); // sqlx call happens in here
}
let results = join_all(tasks).await;
println!("{:?}", results);
Ok(())
} The query in Everything works fine on my local machine, with no network latency. (It ALSO works when I run the queries sequentially!) However, when I instead try to send the data to a Supabase server, the first 50ish queries run fine, but then they start intermittently failing with the error Is this approach, where I create all the Futures at once and then UPDATE: I succeeded in getting the async fn load_collections(
state: &Arc<AppState>,
table_name: &str,
paths: Vec<PathBuf>,
) -> Result<()> {
let collections = load_geojson_files(paths).await?;
let mut join_set = JoinSet::new();
let mut results = Vec::new();
let concurrency_limit = 20;
for coll in collections {
let state = state.clone();
let table_name = table_name.to_string();
join_set.spawn(async move { populate_collection(&state, &table_name, coll).await });
if join_set.len() >= concurrency_limit {
if let Some(result) = join_set.join_next().await {
results.push(result.unwrap());
}
}
}
// Await the completion of all tasks
while let Some(result) = join_set.join_next().await {
results.push(result.unwrap());
}
Ok(())
}
|
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 11 replies
-
I've got a workaround which I'm really not happy with. If I just try again, it seems to work the next time through. 😬 With a 2sec acquisition timeout, I'm seeing around 20 retries needed to successfully upload everything. But it DOES eventually work. Again, this isn't needed when I do them sequentially with a 2sec timeout. I'd love some insight into how I can fix the underlying problem! async fn populate_collection(
state: &Arc<AppState>,
table_name: &str,
coll: NamedFeatureCollection,
) -> Result<()> {
let metro_year = format!("{} {:?}", coll.utp_code, coll.year);
let chunk_count = 500;
'outer: for (i, (names, features)) in coll.chunks(chunk_count).enumerate() {
loop {
let count = names.len();
// …
let query = format!(
"INSERT […]"
);
let result = sqlx::query(query.as_str())
.bind([…])
.execute(&state.pool)
.await;
match result {
Ok(_) => continue 'outer,
Err(e) => match e {
sqlx::Error::PoolTimedOut => println!(
"!!! Failed to insert {} for {}. Trying again…",
table_name, metro_year
),
_ => return Err(anyhow!(e)),
},
}
}
}
debug!("Finished inserting {} for {}", table_name, metro_year);
Ok(())
} |
Beta Was this translation helpful? Give feedback.
-
I implemented a Still, I tried lowering the number of connections to 14 et… voilà. No more connection exceeded errors. And then I reverted all my So it would appear that I've found the solution. I don't understand WHY it works, but it does. Maybe it's something to do with the fact that I'm not using the Supabase pooler, although that is supposed to give me 200 connections, so 🤷. For anyone who wants a pool.rs // Adapted from performance-service by tsunyoku
// https://github.com/osuAkatsuki/performance-service/blob/9d40594d7645d38d1bde167fdadedb89cb4b4772/src/models/pool.rs
// Used under MIT license
use deadpool::managed::{Manager, Metrics, RecycleResult};
use sqlx::postgres::PgConnectOptions;
use sqlx::{ConnectOptions, Connection, Error as SqlxError, PgConnection};
#[derive(Clone, Debug)]
pub struct DbPool {
options: PgConnectOptions,
}
impl DbPool {
pub fn new(options: PgConnectOptions, max_size: usize) -> anyhow::Result<Pool> {
Ok(Pool::builder(Self { options }).max_size(max_size).build()?)
}
}
impl Manager for DbPool {
type Type = PgConnection;
type Error = SqlxError;
async fn create(&self) -> Result<PgConnection, SqlxError> {
self.options.connect().await
}
async fn recycle(&self, obj: &mut Self::Type, _: &Metrics) -> RecycleResult<SqlxError> {
Ok(obj.ping().await?)
}
}
pub type Pool = deadpool::managed::Pool<DbPool>;
macro_rules! get_conn {
($input:expr) => {
$input.get().await?.deref_mut()
};
}
pub(crate) use get_conn; And then to use it, it's just: main.rs use sqlx::query;
use std::ops::DerefMut;
mod pool;
use pool::{DbPool, Pool, get_conn};
const CONNECTION_COUNT: usize = 14;
// …
async fn main() -> Result<()> {
let pool = DbPool::new(connect_options, CONNECTION_COUNT)?;
query!("SELECT 1+1 AS res").execute(get_conn!(pool)).await?;
} |
Beta Was this translation helpful? Give feedback.
-
I believe this is a bug. It's manifesting in |
Beta Was this translation helpful? Give feedback.
-
It might sound obvious but it can also be the machine not having enough resources when running the program with such high load. |
Beta Was this translation helpful? Give feedback.
I implemented a
deadpool
pool, instead of the built-in PgPool, as suggested by this comment. That worked, but when I used the same concurrency limit of 20, I still got timeouts. The strange thing? It was exactly 6 connection acquisition timeouts every time! This time, though, the error said "max connections exceeded". Supabase says that you get 60 connections, so this is strange.Still, I tried lowering the number of connections to 14 et… voilà. No more connection exceeded errors. And then I reverted all my
deadpool
changes and just set the connection limit to 14, and nowPgPool
is working without any "pool timed out" errors.So it would appear that I've found the solution. I don't unders…