Skip to content

Commit

Permalink
rsc: Precalcuate job size as part of cron job
Browse files Browse the repository at this point in the history
  • Loading branch information
V-FEXrt committed Jul 31, 2024
1 parent 35a642b commit 7113eee
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 39 deletions.
1 change: 1 addition & 0 deletions rust/entity/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct Model {
pub memory: i64,
pub i_bytes: i64,
pub o_bytes: i64,
pub size: Option<i64>,
pub created_at: DateTime,
pub label: String,
}
Expand Down
2 changes: 2 additions & 0 deletions rust/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod m20231128_000751_normalize_uses_table;
mod m20240509_163905_add_label_to_job;
mod m20240517_195757_add_updated_at_to_blob;
mod m20240522_185420_create_job_history;
mod m20240731_152842_create_job_size_proc;

pub struct Migrator;

Expand All @@ -27,6 +28,7 @@ impl MigratorTrait for Migrator {
Box::new(m20240509_163905_add_label_to_job::Migration),
Box::new(m20240517_195757_add_updated_at_to_blob::Migration),
Box::new(m20240522_185420_create_job_history::Migration),
Box::new(m20240731_152842_create_job_size_proc::Migration),
]
}
}
2 changes: 2 additions & 0 deletions rust/migration/src/m20220101_000002_create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Job::Memory).big_unsigned().not_null())
.col(ColumnDef::new(Job::IBytes).big_unsigned().not_null())
.col(ColumnDef::new(Job::OBytes).big_unsigned().not_null())
.col(ColumnDef::new(Job::Size).big_unsigned())
.foreign_key(
ForeignKeyCreateStatement::new()
.name("fk-stdout_blob_id-blob")
Expand Down Expand Up @@ -240,6 +241,7 @@ pub enum Job {
Memory,
IBytes,
OBytes,
Size,
}

#[derive(DeriveIden)]
Expand Down
76 changes: 76 additions & 0 deletions rust/migration/src/m20240731_152842_create_job_size_proc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared(
"
CREATE OR REPLACE PROCEDURE calculate_job_size(
job_lim int,
INOUT updated_count int
)
language plpgsql
as $$
BEGIN
-- Run the query that find the jobs, calcs their sizes, and then updates the table
WITH
eligible_jobs as (
SELECT id, stdout_blob_id, stderr_blob_id
FROM job
WHERE size IS NULL
ORDER BY created_at
ASC
LIMIT job_lim
),
job_blob_size as (
SELECT ej.id, SUM(COALESCE(b.size,0)) as size
FROM eligible_jobs ej
LEFT JOIN output_file o
ON ej.id = o.job_id
LEFT JOIN blob b
ON o.blob_id = b.id
GROUP BY ej.id
),
full_size as (
SELECT
ej.id,
CAST(jb.size + stdout.size + stderr.size as BIGINT) as size
FROM eligible_jobs ej
INNER JOIN job_blob_size jb
ON ej.id = jb.id
INNER JOIN blob stdout
ON ej.stdout_blob_id = stdout.id
INNER JOIN blob stderr
ON ej.stderr_blob_id = stderr.id
)
UPDATE job j
SET size = f.size
FROM full_size f
WHERE j.id = f.id;
-- Grab the rows affected count
GET DIAGNOSTICS updated_count = ROW_COUNT;
END;
$$;
",
)
.await?;
Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared("DROP PROCEDURE IF EXISTS calculate_job_size(int, int)")
.await?;
Ok(())
}
}
6 changes: 5 additions & 1 deletion rust/rsc/.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"server_address": "0.0.0.0:3002",
"connection_pool_timeout": 60,
"standalone": false,
"active_store": "3446d287-1d6f-439f-bc8a-9e73ab34065d",
"active_store": "6a6ea9c9-a261-44b1-8ef7-305a12b04eab",
"log_directory": null,
"blob_eviction": {
"tick_rate": 60,
Expand All @@ -16,5 +16,9 @@
"ttl": 86400,
"chunk_size": 16000
}
},
"job_size_calculate": {
"tick_rate": 60,
"chunk_size": 100
}
}
1 change: 1 addition & 0 deletions rust/rsc/src/bin/rsc/add_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub async fn add_job(
i_bytes: Set(payload.ibytes as i64),
o_bytes: Set(payload.obytes as i64),
label: Set(payload.label.unwrap_or("".to_string())),
size: NotSet,
};

// Now perform the insert as a single transaction
Expand Down
10 changes: 10 additions & 0 deletions rust/rsc/src/bin/rsc/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
use config::{Config, ConfigError, Environment, File};
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
pub struct RSCCronLoopConfig {
// How often to run the loop in seconds
pub tick_rate: u64,
// Maximum number of objects to procss per tick. Must be 1 >= x <= 16000
pub chunk_size: i32,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct RSCTTLConfig {
// How often to run the eviction check in seconds
Expand Down Expand Up @@ -47,6 +55,8 @@ pub struct RSCConfig {
pub blob_eviction: RSCTTLConfig,
// The config to control job eviction
pub job_eviction: RSCJobEvictionConfig,
// The config to control job size calculation
pub job_size_calculate: RSCCronLoopConfig,
}

impl RSCConfig {
Expand Down
44 changes: 38 additions & 6 deletions rust/rsc/src/bin/rsc/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,6 @@ fn launch_job_eviction(conn: Arc<DatabaseConnection>, tick_interval: u64, ttl: u
let mut interval = tokio::time::interval(Duration::from_secs(tick_interval));
loop {
interval.tick().await;
tracing::info!("Job TTL eviction tick");

let ttl = (Utc::now() - Duration::from_secs(ttl)).naive_utc();

match database::evict_jobs_ttl(conn.clone(), ttl).await {
Expand All @@ -240,7 +238,6 @@ fn launch_blob_eviction(
tokio::time::interval(Duration::from_secs(config.blob_eviction.tick_rate));
let mut should_sleep = false;
loop {
tracing::info!("Blob TTL eviction tick");
if should_sleep {
interval.tick().await;
}
Expand Down Expand Up @@ -280,8 +277,6 @@ fn launch_blob_eviction(
}
};

tracing::info!("Spawning blob deletion from stores");

// Delete blobs from blob store
for blob in blobs {
let store = match blob_stores.get(&blob.store_id) {
Expand All @@ -306,6 +301,42 @@ fn launch_blob_eviction(
});
}

fn launch_job_size_calculate(conn: Arc<DatabaseConnection>, config: Arc<config::RSCConfig>) {
tokio::spawn(async move {
let mut interval =
tokio::time::interval(Duration::from_secs(config.job_size_calculate.tick_rate));
let mut should_sleep = false;
loop {
if should_sleep {
interval.tick().await;
}

let count = match database::calculate_job_size(
conn.as_ref(),
config.job_size_calculate.chunk_size,
)
.await
{
Ok(Some(c)) => c.updated_count,
Ok(None) => {
tracing::error!("Failed to extract result from calculating job size");
should_sleep = true;
continue; // Try again on the next tick
}
Err(err) => {
tracing::error!(%err, "Failed to calculate and update job size");
should_sleep = true;
continue; // Try again on the next tick
}
};

should_sleep = count == 0;

tracing::info!(%count, "Calculated and updated size for jobs");
}
});
}

fn request_max_fileno_limit() {
let Ok((current, max)) = Resource::NOFILE.get() else {
tracing::warn!("Unable to discover fileno limits. Using default");
Expand Down Expand Up @@ -360,7 +391,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Activate blob stores
let stores = activate_stores(connection.clone()).await;

// Launch evictions threads
// Launch long running concurrent threads
match &config.job_eviction {
config::RSCJobEvictionConfig::TTL(ttl) => {
launch_job_eviction(connection.clone(), ttl.tick_rate, ttl.ttl);
Expand All @@ -369,6 +400,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

launch_blob_eviction(connection.clone(), config.clone(), stores.clone());
launch_job_size_calculate(connection.clone(), config.clone());

// Launch the server
let router = create_router(connection.clone(), config.clone(), &stores);
Expand Down
58 changes: 26 additions & 32 deletions rust/rsc/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,24 @@ pub async fn count_jobs<T: ConnectionTrait>(db: &T) -> Result<u64, DbErr> {
Job::find().count(db).await
}

#[derive(Debug, FromQueryResult)]
pub struct ProcRowsUpdated {
pub updated_count: i32,
}
// Finds at most chunk number of jobs with unknown size then calculates and sets it
pub async fn calculate_job_size<T: ConnectionTrait>(
db: &T,
chunk: i32,
) -> Result<Option<ProcRowsUpdated>, DbErr> {
ProcRowsUpdated::find_by_statement(Statement::from_sql_and_values(
DbBackend::Postgres,
"call calculate_job_size($1, NULL)",
[chunk.into()],
))
.one(db)
.await
}

#[derive(Debug, FromQueryResult)]
pub struct TimeSaved {
pub savings: i64,
Expand Down Expand Up @@ -327,22 +345,10 @@ pub async fn most_space_efficient_jobs<T: ConnectionTrait>(
SELECT
j.label,
CAST(round(j.runtime) as BIGINT) as runtime,
CAST(b.blob_size + stdout.size + stderr.size as BIGINT) as disk_usage,
CAST(round(j.runtime / (b.blob_size + stdout.size + stderr.size) * 1000) as BIGINT) as ms_saved_per_byte
FROM (
SELECT o.job_id, sum(b.size) as blob_size
FROM output_file o
INNER JOIN blob b
ON o.blob_id = b.id
GROUP BY o.job_id
) b
INNER JOIN job j
ON j.id = b.job_id
INNER JOIN blob stdout
ON j.stdout_blob_id = stdout.id
INNER JOIN blob stderr
ON j.stderr_blob_id = stderr.id
WHERE (b.blob_size + stdout.size + stderr.size) > 0
j.size as disk_usage,
CAST(round(j.runtime / (j.size) * 1000) as BIGINT) as ms_saved_per_byte
FROM job j
WHERE size IS NOT NULL
ORDER BY ms_saved_per_byte DESC
LIMIT 30;
"#,
Expand All @@ -360,22 +366,10 @@ pub async fn most_space_use_jobs<T: ConnectionTrait>(
SELECT
j.label,
CAST(round(j.runtime) as BIGINT) as runtime,
CAST(b.blob_size + stdout.size + stderr.size as BIGINT) as disk_usage,
CAST(round(j.runtime / (b.blob_size + stdout.size + stderr.size) * 1000) as BIGINT) as ms_saved_per_byte
FROM (
SELECT o.job_id, sum(b.size) as blob_size
FROM output_file o
INNER JOIN blob b
ON o.blob_id = b.id
GROUP BY o.job_id
) b
INNER JOIN job j
ON j.id = b.job_id
INNER JOIN blob stdout
ON j.stdout_blob_id = stdout.id
INNER JOIN blob stderr
ON j.stderr_blob_id = stderr.id
WHERE (b.blob_size + stdout.size + stderr.size) > 0
j.size as disk_usage,
CAST(round(j.runtime / (j.size) * 1000) as BIGINT) as ms_saved_per_byte
FROM job j
WHERE size IS NOT NULL
ORDER BY disk_usage DESC
LIMIT 30;
"#,
Expand Down

0 comments on commit 7113eee

Please sign in to comment.