Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rsc: Optimize blob eviction deletion #1615

Merged
merged 3 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod m20231128_000751_normalize_uses_table;
mod m20240509_163905_add_label_to_job;
mod m20240517_195757_add_updated_at_to_blob;
mod m20240522_185420_create_job_history;
mod m20240731_201632_create_job_blob_timestamp_index;

pub struct Migrator;

Expand All @@ -27,6 +28,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240509_163905_add_label_to_job::Migration),
Box::new(m20240517_195757_add_updated_at_to_blob::Migration),
Box::new(m20240522_185420_create_job_history::Migration),
Box::new(m20240731_201632_create_job_blob_timestamp_index::Migration),
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared(
"
CREATE INDEX IF NOT EXISTS blob_updated_at_idx
ON blob(updated_at)
",
)
.await?;

manager
.get_connection()
.execute_unprepared(
"
CREATE INDEX IF NOT EXISTS job_created_at_idx
ON job(created_at)
",
)
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared(
"
DROP INDEX IF EXISTS job_created_at_idx
",
)
.await?;

manager
.get_connection()
.execute_unprepared(
"
DROP INDEX IF EXISTS blob_updated_at_idx
",
)
.await?;

Ok(())
}
}
26 changes: 7 additions & 19 deletions rust/rsc/src/bin/rsc/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ fn launch_blob_eviction(
// This gives clients time to reference a blob before it gets evicted.
let ttl = (Utc::now() - Duration::from_secs(config.blob_eviction.ttl)).naive_utc();

let blobs = match database::read_unreferenced_blobs(
let blobs = match database::delete_unreferenced_blobs(
conn.as_ref(),
ttl,
config.blob_eviction.chunk_size,
Expand All @@ -258,37 +258,25 @@ fn launch_blob_eviction(
{
Ok(b) => b,
Err(err) => {
tracing::error!(%err, "Failed to fetch blobs for eviction");
tracing::error!(%err, "Failed to delete blobs for eviction");
should_sleep = true;
continue; // Try again on the next tick
}
};

let blob_ids: Vec<Uuid> = blobs.iter().map(|blob| blob.id).collect();
let eligible = blob_ids.len();
should_sleep = eligible == 0;
let deleted = blobs.len();

tracing::info!(%eligible, "At least N blobs eligible for eviction");
should_sleep = deleted == 0;

// Delete blobs from database
match database::delete_blobs_by_ids(conn.as_ref(), blob_ids).await {
Ok(deleted) => tracing::info!(%deleted, "Deleted blobs from database"),
Err(err) => {
tracing::error!(%err, "Failed to delete blobs from db for eviction");
should_sleep = true;
continue; // Try again on the next tick
}
};

tracing::info!("Spawning blob deletion from stores");
tracing::info!(%deleted, "N blobs deleted for eviction");

// Delete blobs from blob store
for blob in blobs {
let store = match blob_stores.get(&blob.store_id) {
Some(s) => s.clone(),
None => {
let blob = blob.clone();
tracing::info!(%blob.id, %blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::error!(%blob.store_id, "Blob's store id missing from activated stores");
continue;
}
Expand All @@ -297,7 +285,7 @@ fn launch_blob_eviction(
tokio::spawn(async move {
store.delete_key(blob.key.clone()).await.unwrap_or_else(|err| {
let blob = blob.clone();
tracing::info!(%blob.id, %blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::error!(%err, "Failed to delete blob from store for eviction. See above for blob info");
});
});
Expand Down
104 changes: 37 additions & 67 deletions rust/rsc/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,44 +583,6 @@ pub async fn upsert_blob<T: ConnectionTrait>(

// ---------- Read ----------

// Reads blobs from the database that are unreferenced and have surpassed the allocated grace
// period to be referenced.
//
// For new blobs this allows the client to create several blobs and then reference them all at
// once. Existing blobs whose job was just evicted will likely be well past the grace period and
// thus quickly evicted themselves.
pub async fn read_unreferenced_blobs<T: ConnectionTrait>(
db: &T,
ttl: NaiveDateTime,
chunk: u32,
) -> Result<Vec<blob::Model>, DbErr> {
// Limit = 16k as the query is also subject to parameter max.
// Blob has 4 params so (2^16)/4 = 16384. Also generally best to chunk blob eviction
// to avoid large eviction stalls.
Blob::find()
.from_raw_sql(Statement::from_sql_and_values(
DbBackend::Postgres,
r#"
SELECT * FROM blob
WHERE updated_at <= $1
AND id IN
(
SELECT id FROM blob
EXCEPT
(
SELECT blob_id FROM output_file
UNION SELECT stdout_blob_id FROM job
UNION SELECT stderr_blob_id FROM job
)
)
LIMIT $2
"#,
[ttl.into(), chunk.into()],
))
.all(db)
.await
}

pub async fn count_blobs<T: ConnectionTrait>(db: &T) -> Result<u64, DbErr> {
Blob::find().count(db).await
}
Expand All @@ -645,37 +607,45 @@ pub async fn total_blob_size<T: ConnectionTrait>(db: &T) -> Result<Option<TotalB
// ---------- Update ----------

// ---------- Delete ----------
pub async fn delete_blobs_by_ids<T: ConnectionTrait>(db: &T, ids: Vec<Uuid>) -> Result<u64, DbErr> {
if ids.len() == 0 {
return Ok(0);
}

let mut affected = 0;

let chunked: Vec<Vec<Uuid>> = ids
.into_iter()
.chunks((MAX_SQLX_PARAMS / 1).into())
.into_iter()
.map(|chunk| chunk.collect())
.collect();
#[derive(Clone, Debug, FromQueryResult)]
pub struct DeletedBlob {
pub store_id: Uuid,
pub key: String,
}

for chunk in chunked {
let result = Blob::delete_many()
.filter(
entity::blob::Column::Id.in_subquery(
migration::Query::select()
.column(migration::Asterisk)
.from_values(chunk, migration::Alias::new("foo"))
.take(),
),
// Deletes blobs from the database that are unreferenced and have surpassed the allocated grace
// period to be referenced.
//
// For new blobs this allows the client to create several blobs and then reference them all at
// once. Existing blobs whose job was just evicted will likely be well past the grace period and
// thus quickly evicted themselves.
pub async fn delete_unreferenced_blobs<T: ConnectionTrait>(
db: &T,
ttl: NaiveDateTime,
chunk: u32,
) -> Result<Vec<DeletedBlob>, DbErr> {
DeletedBlob::find_by_statement(Statement::from_sql_and_values(
DbBackend::Postgres,
r#"
WITH
eligible_blob_ids as (
SELECT id FROM blob
WHERE updated_at <= $1
EXCEPT (
SELECT blob_id FROM output_file
UNION SELECT stdout_blob_id FROM job
UNION SELECT stderr_blob_id FROM job
)
LIMIT $2
)
.exec(db)
.await?;

affected += result.rows_affected;
}

Ok(affected)
DELETE from blob b
WHERE b.id IN (SELECT id FROM eligible_blob_ids)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not possible to just do IN (eligible_blob_ids.id) or something like that? I'm surprised to see a select is needed again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope :( I tried that. As fair as I could tell this didn't actually effect the query plan since WITH is kind of just writing an alias for a subquery

RETURNING b.store_id, b.key
"#,
[ttl.into(), chunk.into()],
))
.all(db)
.await
}

// --------------------------------------------------
Expand Down
Loading