Skip to content

Commit

Permalink
rsc: Delete multiple blob files per task by config (#1652)
Browse files Browse the repository at this point in the history
* rsc: Delete multiple blob files per task by config

* cleanup

* Update rust/rsc/src/bin/rsc/config.rs

Co-authored-by: Colin Schmidt <[email protected]>

---------

Co-authored-by: Colin Schmidt <[email protected]>
  • Loading branch information
V-FEXrt and colinschmidt authored Sep 25, 2024
1 parent b7f811b commit 52eb7bb
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 19 deletions.
3 changes: 2 additions & 1 deletion rust/rsc/.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"blob_eviction": {
"tick_rate": 60,
"ttl": 3600,
"chunk_size": 16000
"chunk_size": 16000,
"file_chunk_size": 100
},
"job_eviction": {
"ttl": {
Expand Down
16 changes: 14 additions & 2 deletions rust/rsc/src/bin/rsc/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,20 @@ pub struct RSCTTLConfig {
pub tick_rate: u64,
// How long an object is allowed to live
pub ttl: u64,
// Maximum number of objects to delete at a time. Must be 1 >= x <= 16000
// Maximum number of objects to delete from the db at a time. Must be 1 >= x <= 16000
pub chunk_size: u32,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct RSCBlobTTLConfig {
// How often to run the eviction check in seconds
pub tick_rate: u64,
// How long an object is allowed to live
pub ttl: u64,
// Maximum number of blobs to delete from the db at a time. Must be 1 >= x <= 64000
pub chunk_size: u32,
// Maximum number of files to delete from the disk per task
pub file_chunk_size: usize,
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -62,7 +74,7 @@ pub struct RSCConfig {
// The directory that server logs should be written to. If None logs are written to stdout
pub log_directory: Option<String>,
// The config to control blob eviction
pub blob_eviction: RSCTTLConfig,
pub blob_eviction: RSCBlobTTLConfig,
// The config to control job eviction
pub job_eviction: RSCJobEvictionConfig,
// The config to control job size calculation
Expand Down
45 changes: 29 additions & 16 deletions rust/rsc/src/bin/rsc/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use axum::{
};
use chrono::Utc;
use clap::Parser;
use itertools::Itertools;
use migration::{Migrator, MigratorTrait};
use rlimit::Resource;
use rsc::database;
Expand Down Expand Up @@ -279,23 +280,34 @@ fn launch_blob_eviction(
tracing::info!(%deleted, "N blobs deleted for eviction");

// Delete blobs from blob store
for blob in blobs {
let store = match blob_stores.get(&blob.store_id) {
Some(s) => s.clone(),
None => {
let blob = blob.clone();
tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::error!(%blob.store_id, "Blob's store id missing from activated stores");
continue;
}
};
let chunked: Vec<Vec<database::DeletedBlob>> = blobs
.into_iter()
.chunks(config.blob_eviction.file_chunk_size)
.into_iter()
.map(|chunk| chunk.collect())
.collect();

for chunk in chunked {
let thread_store = blob_stores.clone();

tokio::spawn(async move {
store.delete_key(blob.key.clone()).await.unwrap_or_else(|err| {
let blob = blob.clone();
tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::error!(%err, "Failed to delete blob from store for eviction. See above for blob info");
});
for blob in chunk {
let store = match thread_store.get(&blob.store_id) {
Some(s) => s.clone(),
None => {
let blob = blob.clone();
tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::error!(%blob.store_id, "Blob's store id missing from activated stores");
continue;
}
};

store.delete_key(blob.key.clone()).await.unwrap_or_else(|err| {
let blob = blob.clone();
tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!");
tracing::error!(%err, "Failed to delete blob from store for eviction. See above for blob info");
});
}
});
}
}
Expand Down Expand Up @@ -481,10 +493,11 @@ mod tests {
active_store: store_id.to_string(),
connection_pool_timeout: 10,
log_directory: None,
blob_eviction: config::RSCTTLConfig {
blob_eviction: config::RSCBlobTTLConfig {
tick_rate: 10,
ttl: 100,
chunk_size: 100,
file_chunk_size: 1,
},
job_eviction: config::RSCJobEvictionConfig::TTL(config::RSCTTLConfig {
tick_rate: 10,
Expand Down

0 comments on commit 52eb7bb

Please sign in to comment.