Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rsc: Delete multiple blob files per task by config #1652

Merged
merged 3 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rust/rsc/.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"blob_eviction": {
"tick_rate": 60,
"ttl": 3600,
"chunk_size": 16000
"chunk_size": 16000,
"file_chunk_size": 100
},
"job_eviction": {
"ttl": {
Expand Down
16 changes: 14 additions & 2 deletions rust/rsc/src/bin/rsc/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,20 @@ pub struct RSCTTLConfig {
pub tick_rate: u64,
// How long an object is allowed to live
pub ttl: u64,
// Maximum number of objects to delete at a time. Must be 1 >= x <= 16000
// Maximum number of objects to delete from the db at a time. Must be 1 >= x <= 16000
pub chunk_size: u32,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct RSCBlobTTLConfig {
// How often to run the eviction check in seconds
pub tick_rate: u64,
// How long an object is allowed to live
pub ttl: u64,
// Maximum number of objects to delete from the db at a time. Must be 1 >= x <= 16000
V-FEXrt marked this conversation as resolved.
Show resolved Hide resolved
pub chunk_size: u32,
// Maximum number of files to delete from the disk per task
pub file_chunk_size: usize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is usize always going to be less than u32?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no usize is the number of bits a pointer on the host has (so pretty much u64 everywhere). The API that the value goes into expects a usize so it was easier to let the config system do validation instead of pushing it all the way into the eviction task

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thats fine.

}

#[derive(Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -62,7 +74,7 @@ pub struct RSCConfig {
// The directory that server logs should be written to. If None logs are written to stdout
pub log_directory: Option<String>,
// The config to control blob eviction
pub blob_eviction: RSCTTLConfig,
pub blob_eviction: RSCBlobTTLConfig,
// The config to control job eviction
pub job_eviction: RSCJobEvictionConfig,
// The config to control job size calculation
Expand Down
45 changes: 29 additions & 16 deletions rust/rsc/src/bin/rsc/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use axum::{
};
use chrono::Utc;
use clap::Parser;
use itertools::Itertools;
use migration::{Migrator, MigratorTrait};
use rlimit::Resource;
use rsc::database;
Expand Down Expand Up @@ -279,23 +280,34 @@ fn launch_blob_eviction(
tracing::info!(%deleted, "N blobs deleted for eviction");

// Delete blobs from blob store
for blob in blobs {
let store = match blob_stores.get(&blob.store_id) {
Some(s) => s.clone(),
None => {
let blob = blob.clone();
tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::error!(%blob.store_id, "Blob's store id missing from activated stores");
continue;
}
};
let chunked: Vec<Vec<database::DeletedBlob>> = blobs
.into_iter()
.chunks(config.blob_eviction.file_chunk_size)
.into_iter()
.map(|chunk| chunk.collect())
.collect();

for chunk in chunked {
let thread_store = blob_stores.clone();

tokio::spawn(async move {
store.delete_key(blob.key.clone()).await.unwrap_or_else(|err| {
let blob = blob.clone();
tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::error!(%err, "Failed to delete blob from store for eviction. See above for blob info");
});
for blob in chunk {
let store = match thread_store.get(&blob.store_id) {
Some(s) => s.clone(),
None => {
let blob = blob.clone();
tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::error!(%blob.store_id, "Blob's store id missing from activated stores");
continue;
}
};

store.delete_key(blob.key.clone()).await.unwrap_or_else(|err| {
let blob = blob.clone();
tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::error!(%err, "Failed to delete blob from store for eviction. See above for blob info");
});
}
});
}
}
Expand Down Expand Up @@ -481,10 +493,11 @@ mod tests {
active_store: store_id.to_string(),
connection_pool_timeout: 10,
log_directory: None,
blob_eviction: config::RSCTTLConfig {
blob_eviction: config::RSCBlobTTLConfig {
tick_rate: 10,
ttl: 100,
chunk_size: 100,
file_chunk_size: 1,
},
job_eviction: config::RSCJobEvictionConfig::TTL(config::RSCTTLConfig {
tick_rate: 10,
Expand Down
Loading