diff --git a/rust/migration/src/lib.rs b/rust/migration/src/lib.rs index 807d2b741..8a3809c9e 100644 --- a/rust/migration/src/lib.rs +++ b/rust/migration/src/lib.rs @@ -11,6 +11,7 @@ mod m20240509_163905_add_label_to_job; mod m20240517_195757_add_updated_at_to_blob; mod m20240522_185420_create_job_history; mod m20240731_152842_create_job_size_proc; +mod m20240731_201632_create_job_blob_timestamp_index; pub struct Migrator; @@ -29,6 +30,7 @@ impl MigratorTrait for Migrator { Box::new(m20240517_195757_add_updated_at_to_blob::Migration), Box::new(m20240522_185420_create_job_history::Migration), Box::new(m20240731_152842_create_job_size_proc::Migration), + Box::new(m20240731_201632_create_job_blob_timestamp_index::Migration), ] } } diff --git a/rust/migration/src/m20240731_201632_create_job_blob_timestamp_index.rs b/rust/migration/src/m20240731_201632_create_job_blob_timestamp_index.rs new file mode 100644 index 000000000..8b9044d63 --- /dev/null +++ b/rust/migration/src/m20240731_201632_create_job_blob_timestamp_index.rs @@ -0,0 +1,53 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute_unprepared( + " + CREATE INDEX IF NOT EXISTS blob_updated_at_idx + ON blob(updated_at) + ", + ) + .await?; + + manager + .get_connection() + .execute_unprepared( + " + CREATE INDEX IF NOT EXISTS job_created_at_idx + ON job(created_at) + ", + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute_unprepared( + " + DROP INDEX IF EXISTS job_created_at_idx + ", + ) + .await?; + + manager + .get_connection() + .execute_unprepared( + " + DROP INDEX IF EXISTS blob_updated_at_idx + ", + ) + .await?; + + Ok(()) + } +} diff --git a/rust/rsc/src/bin/rsc/main.rs b/rust/rsc/src/bin/rsc/main.rs index 7fc315ad8..c8d71147d 100644 --- a/rust/rsc/src/bin/rsc/main.rs +++ b/rust/rsc/src/bin/rsc/main.rs @@ -246,7 +246,7 @@ fn launch_blob_eviction( // This gives clients time to reference a blob before it gets evicted. let ttl = (Utc::now() - Duration::from_secs(config.blob_eviction.ttl)).naive_utc(); - let blobs = match database::read_unreferenced_blobs( + let blobs = match database::delete_unreferenced_blobs( conn.as_ref(), ttl, config.blob_eviction.chunk_size, @@ -255,27 +255,17 @@ fn launch_blob_eviction( { Ok(b) => b, Err(err) => { - tracing::error!(%err, "Failed to fetch blobs for eviction"); + tracing::error!(%err, "Failed to delete blobs for eviction"); should_sleep = true; continue; // Try again on the next tick } }; - let blob_ids: Vec = blobs.iter().map(|blob| blob.id).collect(); - let eligible = blob_ids.len(); - should_sleep = eligible == 0; + let deleted = blobs.len(); - tracing::info!(%eligible, "At least N blobs eligible for eviction"); + should_sleep = deleted == 0; - // Delete blobs from database - match database::delete_blobs_by_ids(conn.as_ref(), blob_ids).await { - Ok(deleted) => tracing::info!(%deleted, "Deleted blobs from database"), - Err(err) => { - tracing::error!(%err, "Failed to delete blobs from db for eviction"); - should_sleep = true; - continue; // Try again on the next tick - } - }; + tracing::info!(%deleted, "N blobs deleted for eviction"); // Delete blobs from blob store for blob in blobs { @@ -283,7 +273,7 @@ fn launch_blob_eviction( Some(s) => s.clone(), None => { let blob = blob.clone(); - tracing::info!(%blob.id, %blob.store_id, %blob.key, "Blob has been orphaned!"); + tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!"); tracing::error!(%blob.store_id, "Blob's store id missing from activated stores"); continue; } @@ -292,7 +282,7 @@ fn launch_blob_eviction( tokio::spawn(async move { store.delete_key(blob.key.clone()).await.unwrap_or_else(|err| { let blob = blob.clone(); - tracing::info!(%blob.id, %blob.store_id, %blob.key, "Blob has been orphaned!"); + tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!"); tracing::error!(%err, "Failed to delete blob from store for eviction. See above for blob info"); }); }); diff --git a/rust/rsc/src/database.rs b/rust/rsc/src/database.rs index 6b0193ba9..504ce7893 100644 --- a/rust/rsc/src/database.rs +++ b/rust/rsc/src/database.rs @@ -577,44 +577,6 @@ pub async fn upsert_blob( // ---------- Read ---------- -// Reads blobs from the database that are unreferenced and have surpassed the allocated grace -// period to be referenced. -// -// For new blobs this allows the client to create several blobs and then reference them all at -// once. Existing blobs whose job was just evicted will likely be well past the grace period and -// thus quickly evicted themselves. -pub async fn read_unreferenced_blobs( - db: &T, - ttl: NaiveDateTime, - chunk: u32, -) -> Result, DbErr> { - // Limit = 16k as the query is also subject to parameter max. - // Blob has 4 params so (2^16)/4 = 16384. Also generally best to chunk blob eviction - // to avoid large eviction stalls. - Blob::find() - .from_raw_sql(Statement::from_sql_and_values( - DbBackend::Postgres, - r#" - SELECT * FROM blob - WHERE updated_at <= $1 - AND id IN - ( - SELECT id FROM blob - EXCEPT - ( - SELECT blob_id FROM output_file - UNION SELECT stdout_blob_id FROM job - UNION SELECT stderr_blob_id FROM job - ) - ) - LIMIT $2 - "#, - [ttl.into(), chunk.into()], - )) - .all(db) - .await -} - pub async fn count_blobs(db: &T) -> Result { Blob::find().count(db).await } @@ -639,37 +601,45 @@ pub async fn total_blob_size(db: &T) -> Result(db: &T, ids: Vec) -> Result { - if ids.len() == 0 { - return Ok(0); - } - - let mut affected = 0; - - let chunked: Vec> = ids - .into_iter() - .chunks((MAX_SQLX_PARAMS / 1).into()) - .into_iter() - .map(|chunk| chunk.collect()) - .collect(); +#[derive(Clone, Debug, FromQueryResult)] +pub struct DeletedBlob { + pub store_id: Uuid, + pub key: String, +} - for chunk in chunked { - let result = Blob::delete_many() - .filter( - entity::blob::Column::Id.in_subquery( - migration::Query::select() - .column(migration::Asterisk) - .from_values(chunk, migration::Alias::new("foo")) - .take(), - ), +// Deletes blobs from the database that are unreferenced and have surpassed the allocated grace +// period to be referenced. +// +// For new blobs this allows the client to create several blobs and then reference them all at +// once. Existing blobs whose job was just evicted will likely be well past the grace period and +// thus quickly evicted themselves. +pub async fn delete_unreferenced_blobs( + db: &T, + ttl: NaiveDateTime, + chunk: u32, +) -> Result, DbErr> { + DeletedBlob::find_by_statement(Statement::from_sql_and_values( + DbBackend::Postgres, + r#" + WITH + eligible_blob_ids as ( + SELECT id FROM blob + WHERE updated_at <= $1 + EXCEPT ( + SELECT blob_id FROM output_file + UNION SELECT stdout_blob_id FROM job + UNION SELECT stderr_blob_id FROM job + ) + LIMIT $2 ) - .exec(db) - .await?; - - affected += result.rows_affected; - } - - Ok(affected) + DELETE from blob b + WHERE b.id IN (SELECT id FROM eligible_blob_ids) + RETURNING b.store_id, b.key + "#, + [ttl.into(), chunk.into()], + )) + .all(db) + .await } // --------------------------------------------------