Skip to content

Commit

Permalink
Merge branch 'master' into rsc-tier-cache-dirs
Browse files Browse the repository at this point in the history
  • Loading branch information
V-FEXrt authored Aug 1, 2024
2 parents 03d7d9c + ba2afd5 commit b66eb69
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 132 deletions.
4 changes: 2 additions & 2 deletions rust/entity/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ pub struct Model {
pub cwd: String,
pub stdin: String,
pub is_atty: bool,
#[sea_orm(column_type = "Binary(BlobSize::Blob(None))")]
pub hidden_info: Vec<u8>,
pub hidden_info: String,
pub stdout_blob_id: Uuid,
pub stderr_blob_id: Uuid,
pub status: i32,
Expand All @@ -28,6 +27,7 @@ pub struct Model {
pub memory: i64,
pub i_bytes: i64,
pub o_bytes: i64,
pub size: Option<i64>,
pub created_at: DateTime,
pub label: String,
}
Expand Down
4 changes: 4 additions & 0 deletions rust/migration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ mod m20231128_000751_normalize_uses_table;
mod m20240509_163905_add_label_to_job;
mod m20240517_195757_add_updated_at_to_blob;
mod m20240522_185420_create_job_history;
mod m20240731_152842_create_job_size_proc;
mod m20240731_201632_create_job_blob_timestamp_index;

pub struct Migrator;

Expand All @@ -27,6 +29,8 @@ impl MigratorTrait for Migrator {
Box::new(m20240509_163905_add_label_to_job::Migration),
Box::new(m20240517_195757_add_updated_at_to_blob::Migration),
Box::new(m20240522_185420_create_job_history::Migration),
Box::new(m20240731_152842_create_job_size_proc::Migration),
Box::new(m20240731_201632_create_job_blob_timestamp_index::Migration),
]
}
}
4 changes: 3 additions & 1 deletion rust/migration/src/m20220101_000002_create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Job::Cwd).string().not_null())
.col(ColumnDef::new(Job::Stdin).string().not_null())
.col(ColumnDef::new(Job::IsAtty).boolean().not_null())
.col(ColumnDef::new(Job::HiddenInfo).ezblob())
.col(ColumnDef::new(Job::HiddenInfo).string().not_null())
.col(ColumnDef::new(Job::StdoutBlobId).uuid().not_null())
.col(ColumnDef::new(Job::StderrBlobId).uuid().not_null())
.col(ColumnDef::new(Job::Status).integer().not_null())
Expand All @@ -44,6 +44,7 @@ impl MigrationTrait for Migration {
.col(ColumnDef::new(Job::Memory).big_unsigned().not_null())
.col(ColumnDef::new(Job::IBytes).big_unsigned().not_null())
.col(ColumnDef::new(Job::OBytes).big_unsigned().not_null())
.col(ColumnDef::new(Job::Size).big_unsigned())
.foreign_key(
ForeignKeyCreateStatement::new()
.name("fk-stdout_blob_id-blob")
Expand Down Expand Up @@ -240,6 +241,7 @@ pub enum Job {
Memory,
IBytes,
OBytes,
Size,
}

#[derive(DeriveIden)]
Expand Down
75 changes: 75 additions & 0 deletions rust/migration/src/m20240731_152842_create_job_size_proc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared(
"
CREATE OR REPLACE PROCEDURE calculate_job_size(
job_lim int,
INOUT updated_count int
)
language plpgsql
as $$
BEGIN
-- Run the query that find the jobs, calcs their sizes, and then updates the table
WITH
eligible_jobs as (
SELECT id, stdout_blob_id, stderr_blob_id
FROM job
WHERE size IS NULL
ORDER BY created_at
ASC
LIMIT job_lim
),
job_blob_size as (
SELECT ej.id, SUM(COALESCE(b.size,0)) as size
FROM eligible_jobs ej
LEFT JOIN output_file o
ON ej.id = o.job_id
LEFT JOIN blob b
ON o.blob_id = b.id
GROUP BY ej.id
),
full_size as (
SELECT
ej.id,
CAST(jb.size + stdout.size + stderr.size as BIGINT) as size
FROM eligible_jobs ej
INNER JOIN job_blob_size jb
ON ej.id = jb.id
INNER JOIN blob stdout
ON ej.stdout_blob_id = stdout.id
INNER JOIN blob stderr
ON ej.stderr_blob_id = stderr.id
)
UPDATE job j
SET size = f.size
FROM full_size f
WHERE j.id = f.id;
-- Grab the rows affected count
GET DIAGNOSTICS updated_count = ROW_COUNT;
END;
$$;
",
)
.await?;
Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared("DROP PROCEDURE IF EXISTS calculate_job_size(int, int)")
.await?;
Ok(())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared(
"
CREATE INDEX IF NOT EXISTS blob_updated_at_idx
ON blob(updated_at)
",
)
.await?;

manager
.get_connection()
.execute_unprepared(
"
CREATE INDEX IF NOT EXISTS job_created_at_idx
ON job(created_at)
",
)
.await?;

Ok(())
}

async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.get_connection()
.execute_unprepared(
"
DROP INDEX IF EXISTS job_created_at_idx
",
)
.await?;

manager
.get_connection()
.execute_unprepared(
"
DROP INDEX IF EXISTS blob_updated_at_idx
",
)
.await?;

Ok(())
}
}
6 changes: 5 additions & 1 deletion rust/rsc/.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"server_address": "0.0.0.0:3002",
"connection_pool_timeout": 60,
"standalone": false,
"active_store": "3446d287-1d6f-439f-bc8a-9e73ab34065d",
"active_store": "6a6ea9c9-a261-44b1-8ef7-305a12b04eab",
"log_directory": null,
"blob_eviction": {
"tick_rate": 60,
Expand All @@ -16,5 +16,9 @@
"ttl": 86400,
"chunk_size": 16000
}
},
"job_size_calculate": {
"tick_rate": 60,
"chunk_size": 100
}
}
1 change: 1 addition & 0 deletions rust/rsc/src/bin/rsc/add_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub async fn add_job(
i_bytes: Set(payload.ibytes as i64),
o_bytes: Set(payload.obytes as i64),
label: Set(payload.label.unwrap_or("".to_string())),
size: NotSet,
};

// Now perform the insert as a single transaction
Expand Down
10 changes: 10 additions & 0 deletions rust/rsc/src/bin/rsc/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
use config::{Config, ConfigError, Environment, File};
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
pub struct RSCCronLoopConfig {
// How often to run the loop in seconds
pub tick_rate: u64,
// Maximum number of objects to procss per tick. Must be 1 >= x <= 16000
pub chunk_size: i32,
}

#[derive(Debug, Deserialize, Serialize)]
pub struct RSCTTLConfig {
// How often to run the eviction check in seconds
Expand Down Expand Up @@ -47,6 +55,8 @@ pub struct RSCConfig {
pub blob_eviction: RSCTTLConfig,
// The config to control job eviction
pub job_eviction: RSCJobEvictionConfig,
// The config to control job size calculation
pub job_size_calculate: RSCCronLoopConfig,
}

impl RSCConfig {
Expand Down
Loading

0 comments on commit b66eb69

Please sign in to comment.