From 958299c439a7aaa6cafaa2fb62136881a6e0d0e6 Mon Sep 17 00:00:00 2001 From: Ashley Coleman Date: Wed, 31 Jul 2024 18:06:02 -0600 Subject: [PATCH 1/3] rsc: Precalcuate job size as part of cron job (#1614) * rsc: Precalcuate job size as part of cron job * cleanup * fix tests --- rust/entity/src/job.rs | 1 + rust/migration/src/lib.rs | 2 + .../src/m20220101_000002_create_table.rs | 2 + .../m20240731_152842_create_job_size_proc.rs | 75 +++++++++++++++++++ rust/rsc/.config.json | 6 +- rust/rsc/src/bin/rsc/add_job.rs | 1 + rust/rsc/src/bin/rsc/config.rs | 10 +++ rust/rsc/src/bin/rsc/main.rs | 50 +++++++++++-- rust/rsc/src/database.rs | 58 +++++++------- 9 files changed, 166 insertions(+), 39 deletions(-) create mode 100644 rust/migration/src/m20240731_152842_create_job_size_proc.rs diff --git a/rust/entity/src/job.rs b/rust/entity/src/job.rs index fc93fb91f..a1222959d 100644 --- a/rust/entity/src/job.rs +++ b/rust/entity/src/job.rs @@ -28,6 +28,7 @@ pub struct Model { pub memory: i64, pub i_bytes: i64, pub o_bytes: i64, + pub size: Option, pub created_at: DateTime, pub label: String, } diff --git a/rust/migration/src/lib.rs b/rust/migration/src/lib.rs index f24f9076a..807d2b741 100644 --- a/rust/migration/src/lib.rs +++ b/rust/migration/src/lib.rs @@ -10,6 +10,7 @@ mod m20231128_000751_normalize_uses_table; mod m20240509_163905_add_label_to_job; mod m20240517_195757_add_updated_at_to_blob; mod m20240522_185420_create_job_history; +mod m20240731_152842_create_job_size_proc; pub struct Migrator; @@ -27,6 +28,7 @@ impl MigratorTrait for Migrator { Box::new(m20240509_163905_add_label_to_job::Migration), Box::new(m20240517_195757_add_updated_at_to_blob::Migration), Box::new(m20240522_185420_create_job_history::Migration), + Box::new(m20240731_152842_create_job_size_proc::Migration), ] } } diff --git a/rust/migration/src/m20220101_000002_create_table.rs b/rust/migration/src/m20220101_000002_create_table.rs index 3d8c022c6..3b704ad74 100644 --- a/rust/migration/src/m20220101_000002_create_table.rs +++ b/rust/migration/src/m20220101_000002_create_table.rs @@ -44,6 +44,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Job::Memory).big_unsigned().not_null()) .col(ColumnDef::new(Job::IBytes).big_unsigned().not_null()) .col(ColumnDef::new(Job::OBytes).big_unsigned().not_null()) + .col(ColumnDef::new(Job::Size).big_unsigned()) .foreign_key( ForeignKeyCreateStatement::new() .name("fk-stdout_blob_id-blob") @@ -240,6 +241,7 @@ pub enum Job { Memory, IBytes, OBytes, + Size, } #[derive(DeriveIden)] diff --git a/rust/migration/src/m20240731_152842_create_job_size_proc.rs b/rust/migration/src/m20240731_152842_create_job_size_proc.rs new file mode 100644 index 000000000..9f2157a4e --- /dev/null +++ b/rust/migration/src/m20240731_152842_create_job_size_proc.rs @@ -0,0 +1,75 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute_unprepared( + " + CREATE OR REPLACE PROCEDURE calculate_job_size( + job_lim int, + INOUT updated_count int + ) + language plpgsql + as $$ + BEGIN + + -- Run the query that find the jobs, calcs their sizes, and then updates the table + WITH + eligible_jobs as ( + SELECT id, stdout_blob_id, stderr_blob_id + FROM job + WHERE size IS NULL + ORDER BY created_at + ASC + LIMIT job_lim + ), + job_blob_size as ( + SELECT ej.id, SUM(COALESCE(b.size,0)) as size + FROM eligible_jobs ej + LEFT JOIN output_file o + ON ej.id = o.job_id + LEFT JOIN blob b + ON o.blob_id = b.id + GROUP BY ej.id + ), + full_size as ( + SELECT + ej.id, + CAST(jb.size + stdout.size + stderr.size as BIGINT) as size + FROM eligible_jobs ej + INNER JOIN job_blob_size jb + ON ej.id = jb.id + INNER JOIN blob stdout + ON ej.stdout_blob_id = stdout.id + INNER JOIN blob stderr + ON ej.stderr_blob_id = stderr.id + ) + UPDATE job j + SET size = f.size + FROM full_size f + WHERE j.id = f.id; + + -- Grab the rows affected count + GET DIAGNOSTICS updated_count = ROW_COUNT; + + END; + $$; + ", + ) + .await?; + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute_unprepared("DROP PROCEDURE IF EXISTS calculate_job_size(int, int)") + .await?; + Ok(()) + } +} diff --git a/rust/rsc/.config.json b/rust/rsc/.config.json index 51aece440..88b715c76 100644 --- a/rust/rsc/.config.json +++ b/rust/rsc/.config.json @@ -3,7 +3,7 @@ "server_address": "0.0.0.0:3002", "connection_pool_timeout": 60, "standalone": false, - "active_store": "3446d287-1d6f-439f-bc8a-9e73ab34065d", + "active_store": "6a6ea9c9-a261-44b1-8ef7-305a12b04eab", "log_directory": null, "blob_eviction": { "tick_rate": 60, @@ -16,5 +16,9 @@ "ttl": 86400, "chunk_size": 16000 } + }, + "job_size_calculate": { + "tick_rate": 60, + "chunk_size": 100 } } diff --git a/rust/rsc/src/bin/rsc/add_job.rs b/rust/rsc/src/bin/rsc/add_job.rs index 9d2a4e417..ab5c10904 100644 --- a/rust/rsc/src/bin/rsc/add_job.rs +++ b/rust/rsc/src/bin/rsc/add_job.rs @@ -40,6 +40,7 @@ pub async fn add_job( i_bytes: Set(payload.ibytes as i64), o_bytes: Set(payload.obytes as i64), label: Set(payload.label.unwrap_or("".to_string())), + size: NotSet, }; // Now perform the insert as a single transaction diff --git a/rust/rsc/src/bin/rsc/config.rs b/rust/rsc/src/bin/rsc/config.rs index d4f951844..6d09b936b 100644 --- a/rust/rsc/src/bin/rsc/config.rs +++ b/rust/rsc/src/bin/rsc/config.rs @@ -1,6 +1,14 @@ use config::{Config, ConfigError, Environment, File}; use serde::{Deserialize, Serialize}; +#[derive(Debug, Deserialize, Serialize)] +pub struct RSCCronLoopConfig { + // How often to run the loop in seconds + pub tick_rate: u64, + // Maximum number of objects to procss per tick. Must be 1 >= x <= 16000 + pub chunk_size: i32, +} + #[derive(Debug, Deserialize, Serialize)] pub struct RSCTTLConfig { // How often to run the eviction check in seconds @@ -47,6 +55,8 @@ pub struct RSCConfig { pub blob_eviction: RSCTTLConfig, // The config to control job eviction pub job_eviction: RSCJobEvictionConfig, + // The config to control job size calculation + pub job_size_calculate: RSCCronLoopConfig, } impl RSCConfig { diff --git a/rust/rsc/src/bin/rsc/main.rs b/rust/rsc/src/bin/rsc/main.rs index 6b83f4803..7fc315ad8 100644 --- a/rust/rsc/src/bin/rsc/main.rs +++ b/rust/rsc/src/bin/rsc/main.rs @@ -218,8 +218,6 @@ fn launch_job_eviction(conn: Arc, tick_interval: u64, ttl: u let mut interval = tokio::time::interval(Duration::from_secs(tick_interval)); loop { interval.tick().await; - tracing::info!("Job TTL eviction tick"); - let ttl = (Utc::now() - Duration::from_secs(ttl)).naive_utc(); match database::evict_jobs_ttl(conn.clone(), ttl).await { @@ -240,7 +238,6 @@ fn launch_blob_eviction( tokio::time::interval(Duration::from_secs(config.blob_eviction.tick_rate)); let mut should_sleep = false; loop { - tracing::info!("Blob TTL eviction tick"); if should_sleep { interval.tick().await; } @@ -280,8 +277,6 @@ fn launch_blob_eviction( } }; - tracing::info!("Spawning blob deletion from stores"); - // Delete blobs from blob store for blob in blobs { let store = match blob_stores.get(&blob.store_id) { @@ -306,6 +301,42 @@ fn launch_blob_eviction( }); } +fn launch_job_size_calculate(conn: Arc, config: Arc) { + tokio::spawn(async move { + let mut interval = + tokio::time::interval(Duration::from_secs(config.job_size_calculate.tick_rate)); + let mut should_sleep = false; + loop { + if should_sleep { + interval.tick().await; + } + + let count = match database::calculate_job_size( + conn.as_ref(), + config.job_size_calculate.chunk_size, + ) + .await + { + Ok(Some(c)) => c.updated_count, + Ok(None) => { + tracing::error!("Failed to extract result from calculating job size"); + should_sleep = true; + continue; // Try again on the next tick + } + Err(err) => { + tracing::error!(%err, "Failed to calculate and update job size"); + should_sleep = true; + continue; // Try again on the next tick + } + }; + + should_sleep = count == 0; + + tracing::info!(%count, "Calculated and updated size for jobs"); + } + }); +} + fn request_max_fileno_limit() { let Ok((current, max)) = Resource::NOFILE.get() else { tracing::warn!("Unable to discover fileno limits. Using default"); @@ -360,7 +391,7 @@ async fn main() -> Result<(), Box> { // Activate blob stores let stores = activate_stores(connection.clone()).await; - // Launch evictions threads + // Launch long running concurrent threads match &config.job_eviction { config::RSCJobEvictionConfig::TTL(ttl) => { launch_job_eviction(connection.clone(), ttl.tick_rate, ttl.ttl); @@ -369,6 +400,7 @@ async fn main() -> Result<(), Box> { } launch_blob_eviction(connection.clone(), config.clone(), stores.clone()); + launch_job_size_calculate(connection.clone(), config.clone()); // Launch the server let router = create_router(connection.clone(), config.clone(), &stores); @@ -432,6 +464,10 @@ mod tests { ttl: 100, chunk_size: 100, }), + job_size_calculate: config::RSCCronLoopConfig { + tick_rate: 10, + chunk_size: 100, + }, } } @@ -788,6 +824,7 @@ mod tests { i_bytes: Set(100000), o_bytes: Set(1000), label: Set("".to_string()), + size: NotSet, }; insert_job.save(conn.clone().as_ref()).await.unwrap(); @@ -812,6 +849,7 @@ mod tests { i_bytes: Set(100000), o_bytes: Set(1000), label: Set("".to_string()), + size: NotSet, }; insert_job.save(conn.clone().as_ref()).await.unwrap(); diff --git a/rust/rsc/src/database.rs b/rust/rsc/src/database.rs index 306da04fc..6b0193ba9 100644 --- a/rust/rsc/src/database.rs +++ b/rust/rsc/src/database.rs @@ -209,6 +209,24 @@ pub async fn count_jobs(db: &T) -> Result { Job::find().count(db).await } +#[derive(Debug, FromQueryResult)] +pub struct ProcRowsUpdated { + pub updated_count: i32, +} +// Finds at most chunk number of jobs with unknown size then calculates and sets it +pub async fn calculate_job_size( + db: &T, + chunk: i32, +) -> Result, DbErr> { + ProcRowsUpdated::find_by_statement(Statement::from_sql_and_values( + DbBackend::Postgres, + "call calculate_job_size($1, NULL)", + [chunk.into()], + )) + .one(db) + .await +} + #[derive(Debug, FromQueryResult)] pub struct TimeSaved { pub savings: i64, @@ -327,22 +345,10 @@ pub async fn most_space_efficient_jobs( SELECT j.label, CAST(round(j.runtime) as BIGINT) as runtime, - CAST(b.blob_size + stdout.size + stderr.size as BIGINT) as disk_usage, - CAST(round(j.runtime / (b.blob_size + stdout.size + stderr.size) * 1000) as BIGINT) as ms_saved_per_byte - FROM ( - SELECT o.job_id, sum(b.size) as blob_size - FROM output_file o - INNER JOIN blob b - ON o.blob_id = b.id - GROUP BY o.job_id - ) b - INNER JOIN job j - ON j.id = b.job_id - INNER JOIN blob stdout - ON j.stdout_blob_id = stdout.id - INNER JOIN blob stderr - ON j.stderr_blob_id = stderr.id - WHERE (b.blob_size + stdout.size + stderr.size) > 0 + j.size as disk_usage, + CAST(round(j.runtime / (j.size) * 1000) as BIGINT) as ms_saved_per_byte + FROM job j + WHERE size IS NOT NULL ORDER BY ms_saved_per_byte DESC LIMIT 30; "#, @@ -360,22 +366,10 @@ pub async fn most_space_use_jobs( SELECT j.label, CAST(round(j.runtime) as BIGINT) as runtime, - CAST(b.blob_size + stdout.size + stderr.size as BIGINT) as disk_usage, - CAST(round(j.runtime / (b.blob_size + stdout.size + stderr.size) * 1000) as BIGINT) as ms_saved_per_byte - FROM ( - SELECT o.job_id, sum(b.size) as blob_size - FROM output_file o - INNER JOIN blob b - ON o.blob_id = b.id - GROUP BY o.job_id - ) b - INNER JOIN job j - ON j.id = b.job_id - INNER JOIN blob stdout - ON j.stdout_blob_id = stdout.id - INNER JOIN blob stderr - ON j.stderr_blob_id = stderr.id - WHERE (b.blob_size + stdout.size + stderr.size) > 0 + j.size as disk_usage, + CAST(round(j.runtime / (j.size) * 1000) as BIGINT) as ms_saved_per_byte + FROM job j + WHERE size IS NOT NULL ORDER BY disk_usage DESC LIMIT 30; "#, From ff0aeb3ddf7c1a71b444efd8e55437ab7e16b1df Mon Sep 17 00:00:00 2001 From: Ashley Coleman Date: Thu, 1 Aug 2024 10:05:07 -0600 Subject: [PATCH 2/3] rsc: Optimize blob eviction deletion (#1615) * rsc: Optimize blob eviction deletion * cleanup --- rust/migration/src/lib.rs | 2 + ..._201632_create_job_blob_timestamp_index.rs | 53 +++++++++ rust/rsc/src/bin/rsc/main.rs | 24 ++-- rust/rsc/src/database.rs | 104 +++++++----------- 4 files changed, 99 insertions(+), 84 deletions(-) create mode 100644 rust/migration/src/m20240731_201632_create_job_blob_timestamp_index.rs diff --git a/rust/migration/src/lib.rs b/rust/migration/src/lib.rs index 807d2b741..8a3809c9e 100644 --- a/rust/migration/src/lib.rs +++ b/rust/migration/src/lib.rs @@ -11,6 +11,7 @@ mod m20240509_163905_add_label_to_job; mod m20240517_195757_add_updated_at_to_blob; mod m20240522_185420_create_job_history; mod m20240731_152842_create_job_size_proc; +mod m20240731_201632_create_job_blob_timestamp_index; pub struct Migrator; @@ -29,6 +30,7 @@ impl MigratorTrait for Migrator { Box::new(m20240517_195757_add_updated_at_to_blob::Migration), Box::new(m20240522_185420_create_job_history::Migration), Box::new(m20240731_152842_create_job_size_proc::Migration), + Box::new(m20240731_201632_create_job_blob_timestamp_index::Migration), ] } } diff --git a/rust/migration/src/m20240731_201632_create_job_blob_timestamp_index.rs b/rust/migration/src/m20240731_201632_create_job_blob_timestamp_index.rs new file mode 100644 index 000000000..8b9044d63 --- /dev/null +++ b/rust/migration/src/m20240731_201632_create_job_blob_timestamp_index.rs @@ -0,0 +1,53 @@ +use sea_orm_migration::prelude::*; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute_unprepared( + " + CREATE INDEX IF NOT EXISTS blob_updated_at_idx + ON blob(updated_at) + ", + ) + .await?; + + manager + .get_connection() + .execute_unprepared( + " + CREATE INDEX IF NOT EXISTS job_created_at_idx + ON job(created_at) + ", + ) + .await?; + + Ok(()) + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .get_connection() + .execute_unprepared( + " + DROP INDEX IF EXISTS job_created_at_idx + ", + ) + .await?; + + manager + .get_connection() + .execute_unprepared( + " + DROP INDEX IF EXISTS blob_updated_at_idx + ", + ) + .await?; + + Ok(()) + } +} diff --git a/rust/rsc/src/bin/rsc/main.rs b/rust/rsc/src/bin/rsc/main.rs index 7fc315ad8..c8d71147d 100644 --- a/rust/rsc/src/bin/rsc/main.rs +++ b/rust/rsc/src/bin/rsc/main.rs @@ -246,7 +246,7 @@ fn launch_blob_eviction( // This gives clients time to reference a blob before it gets evicted. let ttl = (Utc::now() - Duration::from_secs(config.blob_eviction.ttl)).naive_utc(); - let blobs = match database::read_unreferenced_blobs( + let blobs = match database::delete_unreferenced_blobs( conn.as_ref(), ttl, config.blob_eviction.chunk_size, @@ -255,27 +255,17 @@ fn launch_blob_eviction( { Ok(b) => b, Err(err) => { - tracing::error!(%err, "Failed to fetch blobs for eviction"); + tracing::error!(%err, "Failed to delete blobs for eviction"); should_sleep = true; continue; // Try again on the next tick } }; - let blob_ids: Vec = blobs.iter().map(|blob| blob.id).collect(); - let eligible = blob_ids.len(); - should_sleep = eligible == 0; + let deleted = blobs.len(); - tracing::info!(%eligible, "At least N blobs eligible for eviction"); + should_sleep = deleted == 0; - // Delete blobs from database - match database::delete_blobs_by_ids(conn.as_ref(), blob_ids).await { - Ok(deleted) => tracing::info!(%deleted, "Deleted blobs from database"), - Err(err) => { - tracing::error!(%err, "Failed to delete blobs from db for eviction"); - should_sleep = true; - continue; // Try again on the next tick - } - }; + tracing::info!(%deleted, "N blobs deleted for eviction"); // Delete blobs from blob store for blob in blobs { @@ -283,7 +273,7 @@ fn launch_blob_eviction( Some(s) => s.clone(), None => { let blob = blob.clone(); - tracing::info!(%blob.id, %blob.store_id, %blob.key, "Blob has been orphaned!"); + tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!"); tracing::error!(%blob.store_id, "Blob's store id missing from activated stores"); continue; } @@ -292,7 +282,7 @@ fn launch_blob_eviction( tokio::spawn(async move { store.delete_key(blob.key.clone()).await.unwrap_or_else(|err| { let blob = blob.clone(); - tracing::info!(%blob.id, %blob.store_id, %blob.key, "Blob has been orphaned!"); + tracing::info!(%blob.store_id, %blob.key, "Blob has been orphaned!"); tracing::error!(%err, "Failed to delete blob from store for eviction. See above for blob info"); }); }); diff --git a/rust/rsc/src/database.rs b/rust/rsc/src/database.rs index 6b0193ba9..504ce7893 100644 --- a/rust/rsc/src/database.rs +++ b/rust/rsc/src/database.rs @@ -577,44 +577,6 @@ pub async fn upsert_blob( // ---------- Read ---------- -// Reads blobs from the database that are unreferenced and have surpassed the allocated grace -// period to be referenced. -// -// For new blobs this allows the client to create several blobs and then reference them all at -// once. Existing blobs whose job was just evicted will likely be well past the grace period and -// thus quickly evicted themselves. -pub async fn read_unreferenced_blobs( - db: &T, - ttl: NaiveDateTime, - chunk: u32, -) -> Result, DbErr> { - // Limit = 16k as the query is also subject to parameter max. - // Blob has 4 params so (2^16)/4 = 16384. Also generally best to chunk blob eviction - // to avoid large eviction stalls. - Blob::find() - .from_raw_sql(Statement::from_sql_and_values( - DbBackend::Postgres, - r#" - SELECT * FROM blob - WHERE updated_at <= $1 - AND id IN - ( - SELECT id FROM blob - EXCEPT - ( - SELECT blob_id FROM output_file - UNION SELECT stdout_blob_id FROM job - UNION SELECT stderr_blob_id FROM job - ) - ) - LIMIT $2 - "#, - [ttl.into(), chunk.into()], - )) - .all(db) - .await -} - pub async fn count_blobs(db: &T) -> Result { Blob::find().count(db).await } @@ -639,37 +601,45 @@ pub async fn total_blob_size(db: &T) -> Result(db: &T, ids: Vec) -> Result { - if ids.len() == 0 { - return Ok(0); - } - - let mut affected = 0; - - let chunked: Vec> = ids - .into_iter() - .chunks((MAX_SQLX_PARAMS / 1).into()) - .into_iter() - .map(|chunk| chunk.collect()) - .collect(); +#[derive(Clone, Debug, FromQueryResult)] +pub struct DeletedBlob { + pub store_id: Uuid, + pub key: String, +} - for chunk in chunked { - let result = Blob::delete_many() - .filter( - entity::blob::Column::Id.in_subquery( - migration::Query::select() - .column(migration::Asterisk) - .from_values(chunk, migration::Alias::new("foo")) - .take(), - ), +// Deletes blobs from the database that are unreferenced and have surpassed the allocated grace +// period to be referenced. +// +// For new blobs this allows the client to create several blobs and then reference them all at +// once. Existing blobs whose job was just evicted will likely be well past the grace period and +// thus quickly evicted themselves. +pub async fn delete_unreferenced_blobs( + db: &T, + ttl: NaiveDateTime, + chunk: u32, +) -> Result, DbErr> { + DeletedBlob::find_by_statement(Statement::from_sql_and_values( + DbBackend::Postgres, + r#" + WITH + eligible_blob_ids as ( + SELECT id FROM blob + WHERE updated_at <= $1 + EXCEPT ( + SELECT blob_id FROM output_file + UNION SELECT stdout_blob_id FROM job + UNION SELECT stderr_blob_id FROM job + ) + LIMIT $2 ) - .exec(db) - .await?; - - affected += result.rows_affected; - } - - Ok(affected) + DELETE from blob b + WHERE b.id IN (SELECT id FROM eligible_blob_ids) + RETURNING b.store_id, b.key + "#, + [ttl.into(), chunk.into()], + )) + .all(db) + .await } // -------------------------------------------------- From ba2afd50873bffd27dbe77a30eb8547b20f38597 Mon Sep 17 00:00:00 2001 From: Ashley Coleman Date: Thu, 1 Aug 2024 10:05:32 -0600 Subject: [PATCH 3/3] rsc: Fix type for hidden_info (#1616) --- rust/entity/src/job.rs | 3 +-- rust/migration/src/m20220101_000002_create_table.rs | 2 +- rust/rsc/src/bin/rsc/types.rs | 10 ++++------ 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/rust/entity/src/job.rs b/rust/entity/src/job.rs index a1222959d..50c6f2a28 100644 --- a/rust/entity/src/job.rs +++ b/rust/entity/src/job.rs @@ -16,8 +16,7 @@ pub struct Model { pub cwd: String, pub stdin: String, pub is_atty: bool, - #[sea_orm(column_type = "Binary(BlobSize::Blob(None))")] - pub hidden_info: Vec, + pub hidden_info: String, pub stdout_blob_id: Uuid, pub stderr_blob_id: Uuid, pub status: i32, diff --git a/rust/migration/src/m20220101_000002_create_table.rs b/rust/migration/src/m20220101_000002_create_table.rs index 3b704ad74..3e06a0e79 100644 --- a/rust/migration/src/m20220101_000002_create_table.rs +++ b/rust/migration/src/m20220101_000002_create_table.rs @@ -35,7 +35,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Job::Cwd).string().not_null()) .col(ColumnDef::new(Job::Stdin).string().not_null()) .col(ColumnDef::new(Job::IsAtty).boolean().not_null()) - .col(ColumnDef::new(Job::HiddenInfo).ezblob()) + .col(ColumnDef::new(Job::HiddenInfo).string().not_null()) .col(ColumnDef::new(Job::StdoutBlobId).uuid().not_null()) .col(ColumnDef::new(Job::StderrBlobId).uuid().not_null()) .col(ColumnDef::new(Job::Status).integer().not_null()) diff --git a/rust/rsc/src/bin/rsc/types.rs b/rust/rsc/src/bin/rsc/types.rs index 7617e957c..a96399240 100644 --- a/rust/rsc/src/bin/rsc/types.rs +++ b/rust/rsc/src/bin/rsc/types.rs @@ -40,8 +40,7 @@ pub struct AddJobPayload { pub cwd: String, pub stdin: String, pub is_atty: bool, - #[serde(with = "serde_bytes")] - pub hidden_info: Vec, + pub hidden_info: String, pub visible_files: Vec, pub output_dirs: Vec, pub output_symlinks: Vec, @@ -73,7 +72,7 @@ impl AddJobPayload { hasher.update(&self.stdin.len().to_le_bytes()); hasher.update(self.stdin.as_bytes()); hasher.update(&self.hidden_info.len().to_le_bytes()); - hasher.update(self.hidden_info.as_slice()); + hasher.update(self.hidden_info.as_bytes()); hasher.update(&[self.is_atty as u8]); hasher.update(&self.visible_files.len().to_le_bytes()); for file in &self.visible_files { @@ -93,8 +92,7 @@ pub struct ReadJobPayload { pub cwd: String, pub stdin: String, pub is_atty: bool, - #[serde(with = "serde_bytes")] - pub hidden_info: Vec, + pub hidden_info: String, pub visible_files: Vec, } @@ -111,7 +109,7 @@ impl ReadJobPayload { hasher.update(&self.stdin.len().to_le_bytes()); hasher.update(self.stdin.as_bytes()); hasher.update(&self.hidden_info.len().to_le_bytes()); - hasher.update(self.hidden_info.as_slice()); + hasher.update(self.hidden_info.as_bytes()); hasher.update(&[self.is_atty as u8]); hasher.update(&self.visible_files.len().to_le_bytes()); for file in &self.visible_files {