From 5fa6dfa73a8ba39c12f1564873ab6b08349153b3 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Thu, 10 Oct 2024 10:49:31 -0700 Subject: [PATCH 1/7] Added `ApiAttrData`, a seperate enum for user-provided data --- copperc/src/app/u/upload/controlpanel.tsx | 4 +- copperc/src/lib/api/openapi.ts | 9 +- copperd/Cargo.lock | 1 + copperd/bin/edged/src/api/pipeline/mod.rs | 5 +- copperd/bin/edged/src/api/pipeline/run.rs | 26 ++++- copperd/bin/pipelined/src/api/pipeline/run.rs | 5 +- copperd/lib/edged/Cargo.toml | 1 + copperd/lib/edged/src/apidata.rs | 110 ++++++++++++++++++ copperd/lib/edged/src/lib.rs | 3 + copperd/lib/pipelined/src/data.rs | 2 +- copperd/lib/storaged/src/data.rs | 6 +- copperd/nodes/storaged/src/additem.rs | 4 +- 12 files changed, 156 insertions(+), 20 deletions(-) create mode 100644 copperd/lib/edged/src/apidata.rs diff --git a/copperc/src/app/u/upload/controlpanel.tsx b/copperc/src/app/u/upload/controlpanel.tsx index 00ab4fa5..bda52012 100644 --- a/copperc/src/app/u/upload/controlpanel.tsx +++ b/copperc/src/app/u/upload/controlpanel.tsx @@ -9,7 +9,7 @@ import { components } from "@/lib/api/openapi"; import styles from "./page.module.scss"; import { uploadFiles } from "./uploadlogic"; -import { QueuedFileState, UploadState } from "./page"; +import { UploadState } from "./page"; import { ppBytes } from "@/lib/ppbytes"; function UploadBar({ @@ -312,7 +312,7 @@ export function ControlPanel(params: { input: { [input_name]: { type: "Blob", - object_key: upload_job, + key: upload_job, }, }, }, diff --git a/copperc/src/lib/api/openapi.ts b/copperc/src/lib/api/openapi.ts index 9371dc50..1c068b6d 100644 --- a/copperc/src/lib/api/openapi.ts +++ b/copperc/src/lib/api/openapi.ts @@ -358,9 +358,8 @@ export interface paths { export type webhooks = Record; export interface components { schemas: { - /** @description A value stored inside an attribute. - * Each of these corresponds to an [`AttrDataStub`] */ - AttrData: + /** @description Attribute data, provided by the user by api calls. */ + ApiAttrData: | { data_type: components["schemas"]["AttrDataStub"]; /** @enum {string} */ @@ -410,7 +409,7 @@ export interface components { } | { /** @description The object's key */ - object_key: string; + key: string; /** @enum {string} */ type: "Blob"; } @@ -710,7 +709,7 @@ export interface components { }; RunPipelineRequest: { input: { - [key: string]: components["schemas"]["AttrData"]; + [key: string]: components["schemas"]["ApiAttrData"]; }; /** @description A unique id for this job */ job_id: string; diff --git a/copperd/Cargo.lock b/copperd/Cargo.lock index f588e34a..f9c5c721 100644 --- a/copperd/Cargo.lock +++ b/copperd/Cargo.lock @@ -763,6 +763,7 @@ dependencies = [ "argon2", "copper-pipelined", "copper-storaged", + "copper-util", "serde", "smartstring", "utoipa", diff --git a/copperd/bin/edged/src/api/pipeline/mod.rs b/copperd/bin/edged/src/api/pipeline/mod.rs index 2d5dc1ed..1cefc6d8 100644 --- a/copperd/bin/edged/src/api/pipeline/mod.rs +++ b/copperd/bin/edged/src/api/pipeline/mod.rs @@ -4,12 +4,11 @@ use axum::{ routing::{delete, get, patch, post}, Router, }; -use copper_edged::PipelineInfo; +use copper_edged::{ApiAttrData, PipelineInfo}; use copper_pipelined::{ base::NodeParameterValue, json::{EdgeJson, InputPort, NodeJson, NodeJsonPosition, OutputPort, PipelineJson}, }; -use copper_storaged::AttrData; use utoipa::OpenApi; mod add; @@ -50,7 +49,7 @@ use update::*; RunPipelineRequest, NodeParameterValue, PipelineInfo, - AttrData + ApiAttrData )) )] pub(super) struct PipelineApi; diff --git a/copperd/bin/edged/src/api/pipeline/run.rs b/copperd/bin/edged/src/api/pipeline/run.rs index 1323a546..3ecf0c13 100644 --- a/copperd/bin/edged/src/api/pipeline/run.rs +++ b/copperd/bin/edged/src/api/pipeline/run.rs @@ -5,6 +5,7 @@ use axum::{ Json, }; use axum_extra::extract::CookieJar; +use copper_edged::ApiAttrData; use copper_pipelined::client::PipelinedRequestError; use copper_storaged::AttrData; use serde::Deserialize; @@ -24,8 +25,8 @@ pub(super) struct RunPipelineRequest { #[schema(value_type = String)] pub job_id: SmartString, - #[schema(value_type = BTreeMap)] - pub input: BTreeMap, AttrData>, + #[schema(value_type = BTreeMap)] + pub input: BTreeMap, ApiAttrData>, } /// Start a pipeline job @@ -74,9 +75,28 @@ pub(super) async fn run_pipeline( return StatusCode::UNAUTHORIZED.into_response(); } + let mut converted_input: BTreeMap, AttrData> = BTreeMap::new(); + for (k, v) in payload.input { + // If we can automatically convert, do so + if let Ok(x) = AttrData::try_from(&v) { + converted_input.insert(k, x); + continue; + } + + // Some types need manual conversion + if let Some(x) = match &v { + ApiAttrData::Blob { key } => Some(AttrData::Blob { key: key.clone() }), + _ => None, + } { + converted_input.insert(k, x); + } + + unreachable!("User-provided data {v:?} could not be converted automatically, but was not caught by the manual conversion `match`.") + } + let res = state .pipelined_client - .run_pipeline(&pipe.data, &payload.job_id, &payload.input, user.id) + .run_pipeline(&pipe.data, &payload.job_id, &converted_input, user.id) .await; return match res { diff --git a/copperd/bin/pipelined/src/api/pipeline/run.rs b/copperd/bin/pipelined/src/api/pipeline/run.rs index 4548cbd1..5b9e099f 100644 --- a/copperd/bin/pipelined/src/api/pipeline/run.rs +++ b/copperd/bin/pipelined/src/api/pipeline/run.rs @@ -65,10 +65,10 @@ pub(super) async fn run_pipeline( let mut input = BTreeMap::new(); for (name, value) in payload.input { match value { - AttrData::Blob { object_key } => input.insert( + AttrData::Blob { key } => input.insert( name, PipeData::Blob { - source: BytesSource::S3 { key: object_key }, + source: BytesSource::S3 { key }, }, ), @@ -80,6 +80,7 @@ pub(super) async fn run_pipeline( let context = CopperContext { blob_fragment_size: state.config.pipelined_blob_fragment_size, stream_channel_capacity: state.config.pipelined_stream_channel_size, + objectstore_blob_bucket: (&state.config.pipelined_objectstore_bucket).into(), objectstore_client: state.objectstore_client.clone(), storaged_client: state.storaged_client.clone(), job_id: payload.job_id.clone(), diff --git a/copperd/lib/edged/Cargo.toml b/copperd/lib/edged/Cargo.toml index 06b826aa..9e53f11e 100644 --- a/copperd/lib/edged/Cargo.toml +++ b/copperd/lib/edged/Cargo.toml @@ -19,6 +19,7 @@ workspace = true [dependencies] copper-pipelined = { workspace = true } copper-storaged = { workspace = true } +copper-util = { workspace = true } smartstring = { workspace = true } serde = { workspace = true } diff --git a/copperd/lib/edged/src/apidata.rs b/copperd/lib/edged/src/apidata.rs new file mode 100644 index 00000000..165be54f --- /dev/null +++ b/copperd/lib/edged/src/apidata.rs @@ -0,0 +1,110 @@ +use copper_storaged::{AttrData, AttrDataStub, ClassId, ItemId}; +use copper_util::HashType; +use serde::{Deserialize, Serialize}; +use smartstring::{LazyCompact, SmartString}; +use std::fmt::Debug; +use utoipa::ToSchema; + +/// Attribute data, provided by the user by api calls. +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +#[serde(tag = "type")] +pub enum ApiAttrData { + /// Typed, unset data + None { data_type: AttrDataStub }, + + /// A block of text + Text { + #[schema(value_type = String)] + value: SmartString, + }, + + /// An integer + Integer { + /// The integer + value: i64, + + /// If true, this integer must be non-negative + is_non_negative: bool, + }, + + /// A float + Float { + /// The float + value: f64, + + /// If true, this float must be non-negative + is_non_negative: bool, + }, + + /// A boolean + Boolean { value: bool }, + + /// A checksum + Hash { + /// The type of this hash + hash_type: HashType, + + /// The hash data + data: Vec, + }, + + /// Binary data stored in S3 + Blob { + /// The object's key + #[schema(value_type = String)] + key: SmartString, + }, + + /// A reference to an item in another class + Reference { + /// The item class this reference points to + #[schema(value_type = i64)] + class: ClassId, + + /// The item + #[schema(value_type = i64)] + item: ItemId, + }, +} + +impl TryFrom<&ApiAttrData> for AttrData { + type Error = (); + + fn try_from(value: &ApiAttrData) -> Result { + value.clone().try_into() + } +} + +impl TryFrom for AttrData { + type Error = (); + + fn try_from(value: ApiAttrData) -> Result { + Ok(match value { + ApiAttrData::Blob { .. } => return Err(()), + + ApiAttrData::None { data_type } => Self::None { + data_type: data_type.clone(), + }, + ApiAttrData::Boolean { value } => Self::Boolean { value }, + ApiAttrData::Text { value } => Self::Text { value }, + ApiAttrData::Hash { hash_type, data } => Self::Hash { hash_type, data }, + ApiAttrData::Reference { class, item } => Self::Reference { class, item }, + + ApiAttrData::Float { + value, + is_non_negative, + } => Self::Float { + value, + is_non_negative, + }, + + ApiAttrData::Integer { + value, + is_non_negative, + } => Self::Integer { + value, + is_non_negative, + }, + }) + } +} diff --git a/copperd/lib/edged/src/lib.rs b/copperd/lib/edged/src/lib.rs index e1efc7d0..a9077d50 100644 --- a/copperd/lib/edged/src/lib.rs +++ b/copperd/lib/edged/src/lib.rs @@ -3,3 +3,6 @@ pub use info::*; mod id; pub use id::*; + +mod apidata; +pub use apidata::*; diff --git a/copperd/lib/pipelined/src/data.rs b/copperd/lib/pipelined/src/data.rs index 34efd355..85a8f507 100644 --- a/copperd/lib/pipelined/src/data.rs +++ b/copperd/lib/pipelined/src/data.rs @@ -62,7 +62,7 @@ pub enum BytesSource { receiver: async_broadcast::Receiver>>, }, S3 { - key: String, + key: SmartString, }, } diff --git a/copperd/lib/storaged/src/data.rs b/copperd/lib/storaged/src/data.rs index f0ee37bc..fa2961bf 100644 --- a/copperd/lib/storaged/src/data.rs +++ b/copperd/lib/storaged/src/data.rs @@ -10,7 +10,8 @@ use utoipa::ToSchema; use super::id::{ClassId, ItemId}; /// A value stored inside an attribute. -/// Each of these corresponds to an [`AttrDataStub`] +/// These are never directly provided by users (See `ApiAttrData`), +/// but may be passed around in internal api calls. #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] #[serde(tag = "type")] pub enum AttrData { @@ -56,7 +57,8 @@ pub enum AttrData { /// Binary data stored in S3 Blob { /// The object's key - object_key: String, + #[schema(value_type = String)] + key: SmartString, }, /// A reference to an item in another class diff --git a/copperd/nodes/storaged/src/additem.rs b/copperd/nodes/storaged/src/additem.rs index 49c03dbd..1d50e27c 100644 --- a/copperd/nodes/storaged/src/additem.rs +++ b/copperd/nodes/storaged/src/additem.rs @@ -123,7 +123,7 @@ impl Node for AddItem { match data { Some(PipeData::Blob { source }) => { // TODO: recompute if exists - let new_obj_key: String = rand::thread_rng() + let new_obj_key: SmartString = rand::thread_rng() .sample_iter(&Alphanumeric) .take(32) .map(char::from) @@ -156,7 +156,7 @@ impl Node for AddItem { attr.1 = Some( AttrData::Blob { - object_key: new_obj_key, + key: new_obj_key, } .into(), ) From 79cacf37c38611154ff94d81d90e0c4c648fe60a Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Wed, 16 Oct 2024 16:12:39 -0700 Subject: [PATCH 2/7] Move & refactor S3Client (move bucket out of struct) --- copperd/Cargo.lock | 4 +- copperd/bin/edged/src/api/mod.rs | 2 +- copperd/bin/edged/src/main.rs | 4 +- copperd/bin/edged/src/uploader/errors.rs | 5 +-- copperd/bin/edged/src/uploader/mod.rs | 6 ++- copperd/bin/pipelined/src/api/mod.rs | 3 +- copperd/bin/pipelined/src/api/pipeline/run.rs | 4 +- copperd/bin/pipelined/src/main.rs | 8 ++-- copperd/lib/pipelined/Cargo.toml | 2 - copperd/lib/pipelined/src/data.rs | 1 + copperd/lib/pipelined/src/helpers/mod.rs | 3 -- copperd/lib/pipelined/src/helpers/reader.rs | 8 ++-- copperd/lib/pipelined/src/lib.rs | 22 +++++++--- copperd/lib/storaged/src/data.rs | 4 ++ copperd/lib/util/Cargo.toml | 2 + copperd/lib/util/src/lib.rs | 1 + .../src/helpers => util/src}/s3client.rs | 43 +++++++++++-------- copperd/nodes/storaged/src/additem.rs | 7 ++- 18 files changed, 74 insertions(+), 55 deletions(-) rename copperd/lib/{pipelined/src/helpers => util/src}/s3client.rs (93%) diff --git a/copperd/Cargo.lock b/copperd/Cargo.lock index f9c5c721..04bf82d5 100644 --- a/copperd/Cargo.lock +++ b/copperd/Cargo.lock @@ -788,7 +788,6 @@ version = "0.1.0" dependencies = [ "async-broadcast", "async-trait", - "aws-sdk-s3", "copper-storaged", "copper-util", "reqwest", @@ -797,7 +796,6 @@ dependencies = [ "smartstring", "time", "tokio", - "tracing", "utoipa", ] @@ -819,11 +817,13 @@ dependencies = [ name = "copper-util" version = "0.1.0" dependencies = [ + "aws-sdk-s3", "dotenvy", "envy", "petgraph", "serde", "serde_with", + "smartstring", "tracing", "tracing-subscriber", "utoipa", diff --git a/copperd/bin/edged/src/api/mod.rs b/copperd/bin/edged/src/api/mod.rs index eaafe5a5..9fdfacc5 100644 --- a/copperd/bin/edged/src/api/mod.rs +++ b/copperd/bin/edged/src/api/mod.rs @@ -2,9 +2,9 @@ use axum::routing::post; use axum::{extract::DefaultBodyLimit, Router}; use copper_edged::UserInfo; use copper_pipelined::client::PipelinedClient; -use copper_pipelined::helpers::S3Client; use copper_storaged::client::StoragedClient; use copper_storaged::{AttrDataStub, AttributeInfo, AttributeOptions, ClassInfo, DatasetInfo}; +use copper_util::s3client::S3Client; use copper_util::HashType; use std::sync::Arc; use tower_http::trace::TraceLayer; diff --git a/copperd/bin/edged/src/main.rs b/copperd/bin/edged/src/main.rs index 95d53d83..6a68866e 100644 --- a/copperd/bin/edged/src/main.rs +++ b/copperd/bin/edged/src/main.rs @@ -4,9 +4,9 @@ use aws_config::{BehaviorVersion, Region}; use aws_sdk_s3::config::Credentials; use axum::Router; use config::EdgedConfig; -use copper_pipelined::{client::ReqwestPipelineClient, helpers::S3Client}; +use copper_pipelined::client::ReqwestPipelineClient; use copper_storaged::client::ReqwestStoragedClient; -use copper_util::load_env; +use copper_util::{load_env, s3client::S3Client}; use database::postgres::{PgDatabaseClient, PgDatabaseOpenError}; use std::sync::Arc; use tracing::{debug, error, info}; diff --git a/copperd/bin/edged/src/uploader/errors.rs b/copperd/bin/edged/src/uploader/errors.rs index 95498994..a16a3c4a 100644 --- a/copperd/bin/edged/src/uploader/errors.rs +++ b/copperd/bin/edged/src/uploader/errors.rs @@ -1,8 +1,7 @@ -use copper_pipelined::helpers::{ - S3CreateMultipartUploadError, S3UploadFinishError, S3UploadPartError, -}; use std::{error::Error, fmt::Display}; +use copper_util::s3client::{S3CreateMultipartUploadError, S3UploadFinishError, S3UploadPartError}; + #[derive(Debug)] pub enum NewUploadError { /// S3 client error while creating upload diff --git a/copperd/bin/edged/src/uploader/mod.rs b/copperd/bin/edged/src/uploader/mod.rs index cb978c5b..6c529b94 100644 --- a/copperd/bin/edged/src/uploader/mod.rs +++ b/copperd/bin/edged/src/uploader/mod.rs @@ -1,6 +1,8 @@ -use copper_pipelined::helpers::{MultipartUpload, S3Client}; use copper_storaged::UserId; -use copper_util::MimeType; +use copper_util::{ + s3client::{MultipartUpload, S3Client}, + MimeType, +}; use errors::{NewUploadError, UploadFinishError, UploadFragmentError}; use rand::{distributions::Alphanumeric, Rng}; use smartstring::{LazyCompact, SmartString}; diff --git a/copperd/bin/pipelined/src/api/mod.rs b/copperd/bin/pipelined/src/api/mod.rs index ecd5d193..29869f41 100644 --- a/copperd/bin/pipelined/src/api/mod.rs +++ b/copperd/bin/pipelined/src/api/mod.rs @@ -2,12 +2,11 @@ use axum::{extract::DefaultBodyLimit, Router}; use copper_pipelined::{ base::NodeParameterValue, data::PipeData, - helpers::S3Client, json::{EdgeJson, InputPort, NodeJson, NodeJsonPosition, OutputPort, PipelineJson}, CopperContext, }; use copper_storaged::{client::StoragedClient, AttrData, AttrDataStub}; -use copper_util::HashType; +use copper_util::{s3client::S3Client, HashType}; use std::sync::Arc; use tokio::sync::Mutex; use tower_http::trace::TraceLayer; diff --git a/copperd/bin/pipelined/src/api/pipeline/run.rs b/copperd/bin/pipelined/src/api/pipeline/run.rs index 5b9e099f..7d75d512 100644 --- a/copperd/bin/pipelined/src/api/pipeline/run.rs +++ b/copperd/bin/pipelined/src/api/pipeline/run.rs @@ -65,10 +65,10 @@ pub(super) async fn run_pipeline( let mut input = BTreeMap::new(); for (name, value) in payload.input { match value { - AttrData::Blob { key } => input.insert( + AttrData::Blob { bucket, key } => input.insert( name, PipeData::Blob { - source: BytesSource::S3 { key }, + source: BytesSource::S3 { bucket, key }, }, ), diff --git a/copperd/bin/pipelined/src/main.rs b/copperd/bin/pipelined/src/main.rs index 7be66235..f09078b1 100644 --- a/copperd/bin/pipelined/src/main.rs +++ b/copperd/bin/pipelined/src/main.rs @@ -2,9 +2,9 @@ use api::RouterState; use aws_config::{BehaviorVersion, Region}; use aws_sdk_s3::config::Credentials; use config::{PipelinedConfig, ASYNC_POLL_AWAIT_MS}; -use copper_pipelined::{data::PipeData, helpers::S3Client, CopperContext}; +use copper_pipelined::{data::PipeData, CopperContext}; use copper_storaged::client::ReqwestStoragedClient; -use copper_util::load_env; +use copper_util::{load_env, s3client::S3Client}; use futures::TryFutureExt; use pipeline::runner::{PipelineRunner, PipelineRunnerOptions}; use std::{error::Error, future::IntoFuture, sync::Arc}; @@ -87,9 +87,7 @@ async fn main() { .unwrap(), ), - objectstore_client: Arc::new( - S3Client::new(client, &config.pipelined_objectstore_bucket).await, - ), + objectstore_client: Arc::new(S3Client::new(client).await), config, }; diff --git a/copperd/lib/pipelined/Cargo.toml b/copperd/lib/pipelined/Cargo.toml index 98ef5f83..194fac17 100644 --- a/copperd/lib/pipelined/Cargo.toml +++ b/copperd/lib/pipelined/Cargo.toml @@ -21,9 +21,7 @@ copper-util = { workspace = true } copper-storaged = { workspace = true } reqwest = { workspace = true } -tracing = { workspace = true } async-trait = { workspace = true } -aws-sdk-s3 = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true } diff --git a/copperd/lib/pipelined/src/data.rs b/copperd/lib/pipelined/src/data.rs index 85a8f507..85240555 100644 --- a/copperd/lib/pipelined/src/data.rs +++ b/copperd/lib/pipelined/src/data.rs @@ -63,6 +63,7 @@ pub enum BytesSource { }, S3 { key: SmartString, + bucket: SmartString, }, } diff --git a/copperd/lib/pipelined/src/helpers/mod.rs b/copperd/lib/pipelined/src/helpers/mod.rs index 0baced07..0ebb5c0b 100644 --- a/copperd/lib/pipelined/src/helpers/mod.rs +++ b/copperd/lib/pipelined/src/helpers/mod.rs @@ -1,5 +1,2 @@ -mod s3client; -pub use s3client::*; - mod reader; pub use reader::*; diff --git a/copperd/lib/pipelined/src/helpers/reader.rs b/copperd/lib/pipelined/src/helpers/reader.rs index de693a60..b68d3dfa 100644 --- a/copperd/lib/pipelined/src/helpers/reader.rs +++ b/copperd/lib/pipelined/src/helpers/reader.rs @@ -1,4 +1,4 @@ -use copper_util::MimeType; +use copper_util::{s3client::S3Reader, MimeType}; use std::sync::Arc; use crate::{ @@ -7,8 +7,6 @@ use crate::{ CopperContext, }; -use super::s3client::S3Reader; - pub enum BytesSourceReader { Array { data: Option>>, @@ -35,9 +33,9 @@ impl BytesSourceReader { BytesSource::Stream { receiver, mime } => Self::Stream { receiver, mime }, - BytesSource::S3 { key } => Self::S3( + BytesSource::S3 { bucket, key } => Self::S3( ctx.objectstore_client - .create_reader(&key) + .create_reader(&bucket, &key) .await .map_err(|e| RunNodeError::Other(Arc::new(e)))?, ), diff --git a/copperd/lib/pipelined/src/lib.rs b/copperd/lib/pipelined/src/lib.rs index 4485b538..7a34d5af 100644 --- a/copperd/lib/pipelined/src/lib.rs +++ b/copperd/lib/pipelined/src/lib.rs @@ -8,8 +8,8 @@ pub mod structs; use async_trait::async_trait; use base::{PipelineJobContext, RunNodeError}; use copper_storaged::{client::StoragedClient, Transaction, UserId}; +use copper_util::s3client::S3Client; use data::PipeData; -use helpers::S3Client; use smartstring::{LazyCompact, SmartString}; use std::sync::Arc; use tokio::sync::Mutex; @@ -27,18 +27,26 @@ pub struct CopperContext { /// overflowing channel, larger values use more memory. pub stream_channel_capacity: usize, - pub storaged_client: Arc, - pub objectstore_client: Arc, + /// The id of this job pub job_id: SmartString, - /// The transaction to apply once this pipeline successfully resolves. - /// A pipeline should trigger AT MOST one transaction. - pub transaction: Mutex, - /// The user running this pipeline. /// Used to make sure we have permission to do the /// actions in this pipeline. pub run_by_user: UserId, + + /// The storaged client this pipeline should use + pub storaged_client: Arc, + + /// The objectstore client this pipeline should use + pub objectstore_client: Arc, + + /// The name of the bucket to store blobs in + pub objectstore_blob_bucket: SmartString, + + /// The transaction to apply once this pipeline successfully resolves. + /// A pipeline triggers AT MOST one transaction. + pub transaction: Mutex, } #[async_trait] diff --git a/copperd/lib/storaged/src/data.rs b/copperd/lib/storaged/src/data.rs index fa2961bf..aa4d5a12 100644 --- a/copperd/lib/storaged/src/data.rs +++ b/copperd/lib/storaged/src/data.rs @@ -56,6 +56,10 @@ pub enum AttrData { /// Binary data stored in S3 Blob { + /// The name of the bucket this blob is stored in + #[schema(value_type = String)] + bucket: SmartString, + /// The object's key #[schema(value_type = String)] key: SmartString, diff --git a/copperd/lib/util/Cargo.toml b/copperd/lib/util/Cargo.toml index ff3b7143..518199b1 100644 --- a/copperd/lib/util/Cargo.toml +++ b/copperd/lib/util/Cargo.toml @@ -25,3 +25,5 @@ serde = { workspace = true } serde_with = { workspace = true } petgraph = { workspace = true } utoipa = { workspace = true } +aws-sdk-s3 = { workspace = true } +smartstring = { workspace = true } diff --git a/copperd/lib/util/src/lib.rs b/copperd/lib/util/src/lib.rs index 2a0dfc7e..e769f495 100644 --- a/copperd/lib/util/src/lib.rs +++ b/copperd/lib/util/src/lib.rs @@ -12,6 +12,7 @@ pub use mime::*; pub mod graph; pub mod logging; pub mod names; +pub mod s3client; /// The types of hashes we support #[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize, Serialize, ToSchema)] diff --git a/copperd/lib/pipelined/src/helpers/s3client.rs b/copperd/lib/util/src/s3client.rs similarity index 93% rename from copperd/lib/pipelined/src/helpers/s3client.rs rename to copperd/lib/util/src/s3client.rs index 08f49af8..70928ccc 100644 --- a/copperd/lib/pipelined/src/helpers/s3client.rs +++ b/copperd/lib/util/src/s3client.rs @@ -3,7 +3,6 @@ use aws_sdk_s3::{ primitives::{ByteStream, SdkBody}, types::{CompletedMultipartUpload, CompletedPart}, }; -use copper_util::MimeType; use smartstring::{LazyCompact, SmartString}; use std::{ error::Error, @@ -12,6 +11,8 @@ use std::{ }; use tracing::error; +use crate::MimeType; + // // MARK: Errors // @@ -145,36 +146,37 @@ impl Error for S3UploadFinishError { // MARK: Implementations // +/// An interface to a specific S3 bucket #[derive(Clone)] pub struct S3Client { client: aws_sdk_s3::Client, - bucket: String, } -/// Provides an unbuffered interface to an S3 object. -/// -/// impl S3Client { - pub async fn new(client: aws_sdk_s3::Client, bucket: impl ToString) -> Self { - let bucket = bucket.to_string(); - Self { client, bucket } + pub async fn new(client: aws_sdk_s3::Client) -> Self { + Self { client } } } impl<'a> S3Client { - pub async fn create_reader(&'a self, key: &str) -> Result { + pub async fn create_reader( + &'a self, + bucket: &str, + key: &str, + ) -> Result { let b = self .client .get_object() - .bucket(&self.bucket) + .bucket(bucket) .key(key) .send() .await?; return Ok(S3Reader { client: self.clone(), - + bucket: bucket.into(), key: key.into(), + cursor: 0, // TODO: when does this fail? size: b.content_length.unwrap().try_into().unwrap(), @@ -184,13 +186,14 @@ impl<'a> S3Client { pub async fn create_multipart_upload( &'a self, + bucket: &str, key: &str, mime: MimeType, ) -> Result { let multipart_upload_res = self .client .create_multipart_upload() - .bucket(&self.bucket) + .bucket(bucket) .key(key) .content_type(&mime) .send() @@ -200,7 +203,9 @@ impl<'a> S3Client { return Ok(MultipartUpload { client: self.clone(), + bucket: bucket.into(), key: key.into(), + id: upload_id.into(), completed_parts: Vec::new(), }); @@ -209,8 +214,9 @@ impl<'a> S3Client { pub struct S3Reader { client: S3Client, - + bucket: SmartString, key: SmartString, + cursor: u64, size: u64, mime: MimeType, @@ -231,7 +237,7 @@ impl S3Reader { .client .client .get_object() - .bucket(&self.client.bucket) + .bucket(self.bucket.as_str()) .key(self.key.as_str()) .range(format!("bytes={start_byte}-{end_byte}")) .send() @@ -301,8 +307,9 @@ impl Seek for S3Reader { pub struct MultipartUpload { client: S3Client, - + bucket: SmartString, key: SmartString, + id: SmartString, completed_parts: Vec, } @@ -326,7 +333,7 @@ impl MultipartUpload { .client .client .upload_part() - .bucket(&self.client.bucket) + .bucket(self.bucket.as_str()) .key(self.key.as_str()) .upload_id(self.id.clone()) .body(stream) @@ -351,7 +358,7 @@ impl MultipartUpload { .client .client .abort_multipart_upload() - .bucket(&self.client.bucket) + .bucket(self.bucket.as_str()) .key(self.key.as_str()) .upload_id(self.id.clone()) .send() @@ -370,7 +377,7 @@ impl MultipartUpload { self.client .client .complete_multipart_upload() - .bucket(&self.client.bucket) + .bucket(self.bucket.as_str()) .key(self.key.as_str()) .upload_id(self.id.clone()) .multipart_upload(completed_multipart_upload) diff --git a/copperd/nodes/storaged/src/additem.rs b/copperd/nodes/storaged/src/additem.rs index 1d50e27c..33fa0c85 100644 --- a/copperd/nodes/storaged/src/additem.rs +++ b/copperd/nodes/storaged/src/additem.rs @@ -135,7 +135,11 @@ impl Node for AddItem { let mut upload = ctx .objectstore_client - .create_multipart_upload(&new_obj_key, reader.mime().clone()) + .create_multipart_upload( + &ctx.objectstore_blob_bucket, + &new_obj_key, + reader.mime().clone(), + ) .await .map_err(|e| RunNodeError::Other(Arc::new(e)))?; @@ -156,6 +160,7 @@ impl Node for AddItem { attr.1 = Some( AttrData::Blob { + bucket: ctx.objectstore_blob_bucket.clone(), key: new_obj_key, } .into(), From 4e014acd1689032f0189111da4563d70a4fbda5d Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Wed, 16 Oct 2024 22:41:06 -0700 Subject: [PATCH 3/7] Added upload bucket --- copperd/.env | 4 ++-- copperd/bin/edged/src/api/mod.rs | 4 ++-- copperd/bin/edged/src/api/pipeline/run.rs | 6 +++++- copperd/bin/edged/src/config.rs | 4 ++-- copperd/bin/edged/src/main.rs | 8 ++++---- 5 files changed, 15 insertions(+), 11 deletions(-) diff --git a/copperd/.env b/copperd/.env index 6069b5a7..f698ecca 100644 --- a/copperd/.env +++ b/copperd/.env @@ -14,7 +14,7 @@ EDGED_PIPELINED_SECRET = "impipesecret" EDGED_OBJECTSTORE_KEY_ID = "HrmycpZai3D0X23oVQ97" EDGED_OBJECTSTORE_KEY_SECRET = "jiyQKKHx7KbnlqWGPIuGwp9iBEtPqRf4HuGJHEdl" EDGED_OBJECTSTORE_URL = "http://localhost:9000" -EDGED_OBJECTSTORE_BUCKET = "copper-upload" +EDGED_OBJECTSTORE_UPLOAD_BUCKET = "copper-upload" EDGED_UPLOAD_JOB_TIMEOUT = 300 # pipelined config @@ -34,7 +34,7 @@ PIPELINED_STORAGED_SECRET = "imstoresecret" PIPELINED_OBJECTSTORE_KEY_ID = "HrmycpZai3D0X23oVQ97" PIPELINED_OBJECTSTORE_KEY_SECRET = "jiyQKKHx7KbnlqWGPIuGwp9iBEtPqRf4HuGJHEdl" PIPELINED_OBJECTSTORE_URL = "http://localhost:9000" -PIPELINED_OBJECTSTORE_BUCKET = "copper-upload" +PIPELINED_OBJECTSTORE_BUCKET = "copper-blobs" # storaged config STORAGED_LOGLEVEL = "Develop" diff --git a/copperd/bin/edged/src/api/mod.rs b/copperd/bin/edged/src/api/mod.rs index 9fdfacc5..fc733b5e 100644 --- a/copperd/bin/edged/src/api/mod.rs +++ b/copperd/bin/edged/src/api/mod.rs @@ -36,7 +36,7 @@ pub struct RouterState { pub storaged_client: Arc, pub pipelined_client: Arc, pub auth: Arc>, - pub objectstore_client: Arc, + pub s3_client_upload: Arc, pub uploader: Arc, } @@ -50,7 +50,7 @@ impl Clone for RouterState { auth: self.auth.clone(), storaged_client: self.storaged_client.clone(), pipelined_client: self.pipelined_client.clone(), - objectstore_client: self.objectstore_client.clone(), + s3_client_upload: self.s3_client_upload.clone(), uploader: self.uploader.clone(), } } diff --git a/copperd/bin/edged/src/api/pipeline/run.rs b/copperd/bin/edged/src/api/pipeline/run.rs index 3ecf0c13..d6ae0891 100644 --- a/copperd/bin/edged/src/api/pipeline/run.rs +++ b/copperd/bin/edged/src/api/pipeline/run.rs @@ -85,10 +85,14 @@ pub(super) async fn run_pipeline( // Some types need manual conversion if let Some(x) = match &v { - ApiAttrData::Blob { key } => Some(AttrData::Blob { key: key.clone() }), + ApiAttrData::Blob { key } => Some(AttrData::Blob { + bucket: (&state.config.edged_objectstore_upload_bucket).into(), + key: key.clone(), + }), _ => None, } { converted_input.insert(k, x); + continue; } unreachable!("User-provided data {v:?} could not be converted automatically, but was not caught by the manual conversion `match`.") diff --git a/copperd/bin/edged/src/config.rs b/copperd/bin/edged/src/config.rs index d562ca44..ddc587cd 100644 --- a/copperd/bin/edged/src/config.rs +++ b/copperd/bin/edged/src/config.rs @@ -44,8 +44,8 @@ pub struct EdgedConfig { pub edged_objectstore_key_secret: String, /// Object store url pub edged_objectstore_url: String, - /// Object store bucket - pub edged_objectstore_bucket: String, + /// The bucket to store user uploads in + pub edged_objectstore_upload_bucket: String, /// How long an upload job may idle before being deleted, in seconds pub edged_upload_job_timeout: u64, diff --git a/copperd/bin/edged/src/main.rs b/copperd/bin/edged/src/main.rs index 6a68866e..addf8a41 100644 --- a/copperd/bin/edged/src/main.rs +++ b/copperd/bin/edged/src/main.rs @@ -19,7 +19,7 @@ mod database; mod auth; mod uploader; -async fn make_app(config: Arc, objectstore_client: Arc) -> Router { +async fn make_app(config: Arc, s3_client_upload: Arc) -> Router { // Connect to database let db = match PgDatabaseClient::open(&config.edged_db_addr).await { Ok(db) => db, @@ -38,7 +38,7 @@ async fn make_app(config: Arc, objectstore_client: Arc) - config: config.clone(), db_client: Arc::new(db), auth: Arc::new(AuthHelper::new()), - uploader: Arc::new(Uploader::new(config.clone(), objectstore_client.clone())), + uploader: Arc::new(Uploader::new(config.clone(), s3_client_upload.clone())), pipelined_client: Arc::new( ReqwestPipelineClient::new( @@ -55,7 +55,7 @@ async fn make_app(config: Arc, objectstore_client: Arc) - .unwrap(), ), - objectstore_client, + s3_client_upload, }); } @@ -112,7 +112,7 @@ async fn main() { let app = make_app( config.clone(), - Arc::new(S3Client::new(client.clone(), &config.edged_objectstore_bucket).await), + Arc::new(S3Client::new(client.clone()).await), ) .await; From e034b79aa7649538594fc79b0928f83670eed89c Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Wed, 16 Oct 2024 22:42:53 -0700 Subject: [PATCH 4/7] Added upload job ids --- copperc/src/app/u/upload/controlpanel.tsx | 2 +- copperc/src/lib/api/openapi.ts | 10 +- copperd/Cargo.lock | 1 - copperd/bin/edged/src/api/mod.rs | 3 +- copperd/bin/edged/src/api/pipeline/mod.rs | 4 +- copperd/bin/edged/src/api/pipeline/run.rs | 37 +++- .../edged/src/api/storage/finish_upload.rs | 4 +- .../bin/edged/src/api/storage/start_upload.rs | 5 +- .../bin/edged/src/api/storage/upload_part.rs | 4 +- copperd/{lib => bin}/edged/src/apidata.rs | 14 +- copperd/bin/edged/src/config.rs | 2 +- copperd/bin/edged/src/main.rs | 1 + copperd/bin/edged/src/uploader/mod.rs | 203 +++++++++++++----- copperd/lib/edged/Cargo.toml | 1 - copperd/lib/edged/src/lib.rs | 3 - copperd/lib/util/src/s3client.rs | 4 + 16 files changed, 213 insertions(+), 85 deletions(-) rename copperd/{lib => bin}/edged/src/apidata.rs (83%) diff --git a/copperc/src/app/u/upload/controlpanel.tsx b/copperc/src/app/u/upload/controlpanel.tsx index bda52012..2f05495f 100644 --- a/copperc/src/app/u/upload/controlpanel.tsx +++ b/copperc/src/app/u/upload/controlpanel.tsx @@ -312,7 +312,7 @@ export function ControlPanel(params: { input: { [input_name]: { type: "Blob", - key: upload_job, + upload_id: upload_job, }, }, }, diff --git a/copperc/src/lib/api/openapi.ts b/copperc/src/lib/api/openapi.ts index 1c068b6d..43213fef 100644 --- a/copperc/src/lib/api/openapi.ts +++ b/copperc/src/lib/api/openapi.ts @@ -408,10 +408,16 @@ export interface components { type: "Hash"; } | { - /** @description The object's key */ - key: string; /** @enum {string} */ type: "Blob"; + /** @description The upload id. This must only be used once, + * uploaded files are deleted once their job is done. + * + * Also, note that we _never_ send the S3 key to the + * client---only the upload id as a proxy. This makes sure + * that clients can only start jobs on uploads they own, + * and reduces the risk of other creative abuse. */ + upload_id: string; } | { /** diff --git a/copperd/Cargo.lock b/copperd/Cargo.lock index 04bf82d5..299fd9cf 100644 --- a/copperd/Cargo.lock +++ b/copperd/Cargo.lock @@ -763,7 +763,6 @@ dependencies = [ "argon2", "copper-pipelined", "copper-storaged", - "copper-util", "serde", "smartstring", "utoipa", diff --git a/copperd/bin/edged/src/api/mod.rs b/copperd/bin/edged/src/api/mod.rs index fc733b5e..6c49a46e 100644 --- a/copperd/bin/edged/src/api/mod.rs +++ b/copperd/bin/edged/src/api/mod.rs @@ -12,9 +12,8 @@ use utoipa::OpenApi; use utoipa_swagger_ui::SwaggerUi; use crate::auth::AuthHelper; -use crate::database::base::client::DatabaseClient; - use crate::config::EdgedConfig; +use crate::database::base::client::DatabaseClient; use crate::uploader::Uploader; mod attribute; diff --git a/copperd/bin/edged/src/api/pipeline/mod.rs b/copperd/bin/edged/src/api/pipeline/mod.rs index 1cefc6d8..b66572e1 100644 --- a/copperd/bin/edged/src/api/pipeline/mod.rs +++ b/copperd/bin/edged/src/api/pipeline/mod.rs @@ -1,10 +1,10 @@ -use crate::database::base::client::DatabaseClient; use crate::RouterState; +use crate::{apidata::ApiAttrData, database::base::client::DatabaseClient}; use axum::{ routing::{delete, get, patch, post}, Router, }; -use copper_edged::{ApiAttrData, PipelineInfo}; +use copper_edged::PipelineInfo; use copper_pipelined::{ base::NodeParameterValue, json::{EdgeJson, InputPort, NodeJson, NodeJsonPosition, OutputPort, PipelineJson}, diff --git a/copperd/bin/edged/src/api/pipeline/run.rs b/copperd/bin/edged/src/api/pipeline/run.rs index d6ae0891..0b68ac41 100644 --- a/copperd/bin/edged/src/api/pipeline/run.rs +++ b/copperd/bin/edged/src/api/pipeline/run.rs @@ -5,7 +5,6 @@ use axum::{ Json, }; use axum_extra::extract::CookieJar; -use copper_edged::ApiAttrData; use copper_pipelined::client::PipelinedRequestError; use copper_storaged::AttrData; use serde::Deserialize; @@ -15,7 +14,9 @@ use tracing::error; use utoipa::ToSchema; use crate::{ + apidata::ApiAttrData, database::base::{client::DatabaseClient, errors::pipeline::GetPipelineError}, + uploader::GotJobKey, RouterState, }; @@ -85,10 +86,36 @@ pub(super) async fn run_pipeline( // Some types need manual conversion if let Some(x) = match &v { - ApiAttrData::Blob { key } => Some(AttrData::Blob { - bucket: (&state.config.edged_objectstore_upload_bucket).into(), - key: key.clone(), - }), + ApiAttrData::Blob { upload_id } => { + let res = state.uploader.get_job_object_key(user.id, upload_id).await; + match res { + GotJobKey::NoSuchJob => { + return ( + StatusCode::BAD_REQUEST, + Json(format!( + "Invalid input: input {k} references a job that does not exist" + )), + ) + .into_response(); + } + + GotJobKey::JobNotDone => { + return ( + StatusCode::BAD_REQUEST, + Json(format!( + "Invalid input: input {k} references a job that is not finished" + )), + ) + .into_response(); + } + + GotJobKey::HereYouGo(key) => Some(AttrData::Blob { + bucket: (&state.config.edged_objectstore_upload_bucket).into(), + key, + }), + } + } + _ => None, } { converted_input.insert(k, x); diff --git a/copperd/bin/edged/src/api/storage/finish_upload.rs b/copperd/bin/edged/src/api/storage/finish_upload.rs index a76c9bce..292a090f 100644 --- a/copperd/bin/edged/src/api/storage/finish_upload.rs +++ b/copperd/bin/edged/src/api/storage/finish_upload.rs @@ -6,8 +6,8 @@ use axum::{ use axum_extra::extract::CookieJar; use tracing::error; -use crate::database::base::client::DatabaseClient; use crate::{api::RouterState, uploader::errors::UploadFinishError}; +use crate::{database::base::client::DatabaseClient, uploader::UploadJobId}; /// Rename a attribute #[utoipa::path( @@ -27,7 +27,7 @@ use crate::{api::RouterState, uploader::errors::UploadFinishError}; pub(super) async fn finish_upload( jar: CookieJar, State(state): State>, - Path(job_id): Path, + Path(job_id): Path, ) -> Response { let user = match state.auth.auth_or_logout(&state, &jar).await { Err(x) => return x, diff --git a/copperd/bin/edged/src/api/storage/start_upload.rs b/copperd/bin/edged/src/api/storage/start_upload.rs index 43fc4f19..67107b75 100644 --- a/copperd/bin/edged/src/api/storage/start_upload.rs +++ b/copperd/bin/edged/src/api/storage/start_upload.rs @@ -10,8 +10,8 @@ use serde::{Deserialize, Serialize}; use tracing::error; use utoipa::ToSchema; -use crate::database::base::client::DatabaseClient; use crate::{api::RouterState, uploader::errors::NewUploadError}; +use crate::{database::base::client::DatabaseClient, uploader::UploadJobId}; #[derive(Debug, Deserialize, ToSchema)] pub(super) struct StartUploadRequest { @@ -21,7 +21,8 @@ pub(super) struct StartUploadRequest { #[derive(Debug, Serialize, ToSchema)] pub(super) struct StartUploadResponse { - job_id: String, + #[schema(value_type = String)] + job_id: UploadJobId, request_body_limit: usize, } diff --git a/copperd/bin/edged/src/api/storage/upload_part.rs b/copperd/bin/edged/src/api/storage/upload_part.rs index c3847d93..434d6915 100644 --- a/copperd/bin/edged/src/api/storage/upload_part.rs +++ b/copperd/bin/edged/src/api/storage/upload_part.rs @@ -8,8 +8,8 @@ use axum::{ use axum_extra::extract::CookieJar; use tracing::{error, warn}; -use crate::database::base::client::DatabaseClient; use crate::{api::RouterState, uploader::errors::UploadFragmentError}; +use crate::{database::base::client::DatabaseClient, uploader::UploadJobId}; /// Upload a part of a file. /// TODO: enforce 5MB minimum size @@ -30,7 +30,7 @@ use crate::{api::RouterState, uploader::errors::UploadFragmentError}; pub(super) async fn upload_part( jar: CookieJar, State(state): State>, - Path(job_id): Path, + Path(job_id): Path, mut multipart: Multipart, ) -> Response { let user = match state.auth.auth_or_logout(&state, &jar).await { diff --git a/copperd/lib/edged/src/apidata.rs b/copperd/bin/edged/src/apidata.rs similarity index 83% rename from copperd/lib/edged/src/apidata.rs rename to copperd/bin/edged/src/apidata.rs index 165be54f..946eaed5 100644 --- a/copperd/lib/edged/src/apidata.rs +++ b/copperd/bin/edged/src/apidata.rs @@ -5,6 +5,8 @@ use smartstring::{LazyCompact, SmartString}; use std::fmt::Debug; use utoipa::ToSchema; +use crate::uploader::UploadJobId; + /// Attribute data, provided by the user by api calls. #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] #[serde(tag = "type")] @@ -48,11 +50,17 @@ pub enum ApiAttrData { data: Vec, }, - /// Binary data stored in S3 + /// Binary data we uploaded previously Blob { - /// The object's key + /// The upload id. This must only be used once, + /// uploaded files are deleted once their job is done. + /// + /// Also, note that we _never_ send the S3 key to the + /// client---only the upload id as a proxy. This makes sure + /// that clients can only start jobs on uploads they own, + /// and reduces the risk of other creative abuse. #[schema(value_type = String)] - key: SmartString, + upload_id: UploadJobId, }, /// A reference to an item in another class diff --git a/copperd/bin/edged/src/config.rs b/copperd/bin/edged/src/config.rs index ddc587cd..4a39ccc3 100644 --- a/copperd/bin/edged/src/config.rs +++ b/copperd/bin/edged/src/config.rs @@ -47,7 +47,7 @@ pub struct EdgedConfig { /// The bucket to store user uploads in pub edged_objectstore_upload_bucket: String, - /// How long an upload job may idle before being deleted, in seconds + /// How long a pending upload job may idle before being deleted, in seconds pub edged_upload_job_timeout: u64, } diff --git a/copperd/bin/edged/src/main.rs b/copperd/bin/edged/src/main.rs index addf8a41..cd690f6b 100644 --- a/copperd/bin/edged/src/main.rs +++ b/copperd/bin/edged/src/main.rs @@ -16,6 +16,7 @@ mod api; mod config; mod database; +mod apidata; mod auth; mod uploader; diff --git a/copperd/bin/edged/src/uploader/mod.rs b/copperd/bin/edged/src/uploader/mod.rs index 6c529b94..dcee8483 100644 --- a/copperd/bin/edged/src/uploader/mod.rs +++ b/copperd/bin/edged/src/uploader/mod.rs @@ -5,8 +5,9 @@ use copper_util::{ }; use errors::{NewUploadError, UploadFinishError, UploadFragmentError}; use rand::{distributions::Alphanumeric, Rng}; +use serde::{Deserialize, Serialize}; use smartstring::{LazyCompact, SmartString}; -use std::{sync::Arc, time::Duration}; +use std::{collections::BTreeMap, fmt::Display, sync::Arc, time::Duration}; use time::OffsetDateTime; use tracing::{debug, info}; @@ -16,38 +17,92 @@ pub mod errors; const UPLOAD_ID_LENGTH: usize = 16; +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +#[serde(transparent)] +pub struct UploadJobId { + id: SmartString, +} + +impl UploadJobId { + #[inline(always)] + pub fn new() -> Self { + let id: SmartString = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(UPLOAD_ID_LENGTH) + .map(char::from) + .collect(); + + Self { id } + } + + pub fn as_str(&self) -> &str { + &self.id + } +} + +impl Display for UploadJobId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.id) + } +} + +pub enum UploadJobState { + /// This job is pending, value is upload target + Pending(MultipartUpload), + + /// This job is done, value is S3 object key. + Done(SmartString), +} + pub struct UploadJob { - pub id: SmartString, + pub id: UploadJobId, pub started_at: OffsetDateTime, pub last_activity: OffsetDateTime, pub owner: UserId, - pub uploadjob: MultipartUpload, + pub state: UploadJobState, pub mime: MimeType, } pub struct Uploader { config: Arc, - jobs: tokio::sync::Mutex>, + jobs: tokio::sync::Mutex>, objectstore_client: Arc, } +pub enum GotJobKey { + NoSuchJob, + JobNotDone, + HereYouGo(SmartString), +} + impl Uploader { pub fn new(config: Arc, objectstore_client: Arc) -> Self { Self { config, - jobs: tokio::sync::Mutex::new(Vec::new()), + jobs: tokio::sync::Mutex::new(BTreeMap::new()), objectstore_client, } } - #[inline(always)] - fn generate_id() -> SmartString { - rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(UPLOAD_ID_LENGTH) - .map(char::from) - .collect() + /// Get a finished upload job's object key. + pub async fn get_job_object_key(&self, as_user: UserId, job_id: &UploadJobId) -> GotJobKey { + let jobs = self.jobs.lock().await; + + let job = match jobs.get(job_id) { + Some(x) => x, + None => return GotJobKey::NoSuchJob, + }; + + // Make sure we are allowed to get this job + if job.owner != as_user { + return GotJobKey::NoSuchJob; + } + + match &job.state { + UploadJobState::Pending(_) => GotJobKey::JobNotDone, + UploadJobState::Done(x) => GotJobKey::HereYouGo(x.clone()), + } } /// Check all active jobs in this uploader, @@ -61,25 +116,33 @@ impl Uploader { let now = OffsetDateTime::now_utc(); let offset = Duration::from_secs(self.config.edged_upload_job_timeout); - let mut i = 0; - while i < jobs.len() { - let j = &jobs[i]; + let mut to_remove = Vec::new(); + for (k, j) in jobs.iter() { + let should_remove = match j.state { + UploadJobState::Pending(_) => j.last_activity + offset < now, + UploadJobState::Done(_) => false, + }; - if j.last_activity + offset < now { + if should_remove { debug!( - message = "Removing job", + message = "Job queued for removal", reason = "timeout", job_id = ?j.id, started_at = ?j.started_at ); - let job = jobs.swap_remove(i); - job.uploadjob.cancel().await; - + to_remove.push(k.clone()); continue; } + } - i += 1; + for k in to_remove { + debug!(message = "Removing job", reason = "timeout", job_id = ?k); + let job = jobs.remove(&k).unwrap(); + match job.state { + UploadJobState::Pending(uj) => uj.cancel().await, + _ => unreachable!(), + } } } } @@ -89,37 +152,39 @@ impl Uploader { &self, owner: UserId, mime: MimeType, - ) -> Result, NewUploadError> { + ) -> Result { self.check_jobs().await; let mut jobs = self.jobs.lock().await; - let id = loop { - let id = Uploader::generate_id(); - // TODO: check existing S3 objects - if jobs.iter().all(|us| us.id != id) { - break id; - } - }; + let id = UploadJobId::new(); let now = OffsetDateTime::now_utc(); - jobs.push(UploadJob { - id: id.clone(), - owner, - started_at: now, - last_activity: now, - mime: mime.clone(), - uploadjob: self - .objectstore_client - .create_multipart_upload(&id, mime) - .await?, - }); + jobs.insert( + id.clone(), + UploadJob { + id: id.clone(), + owner, + started_at: now, + last_activity: now, + mime: mime.clone(), + state: UploadJobState::Pending( + self.objectstore_client + .create_multipart_upload( + &self.config.edged_objectstore_upload_bucket, + format!("{}/{id}", i64::from(owner)).as_str(), + mime, + ) + .await?, + ), + }, + ); info!( message = "Created a new upload job", job_id = ?id, ); - return Ok(id); + return Ok(id.into()); } /// Upload one fragment of an upload job. @@ -129,17 +194,20 @@ impl Uploader { pub async fn upload_part( &self, as_user: UserId, - job_id: &str, + job_id: &UploadJobId, data: &[u8], part_number: Option, ) -> Result<(), UploadFragmentError> { self.check_jobs().await; let mut jobs = self.jobs.lock().await; - let job = jobs - .iter_mut() - .find(|us| us.id == job_id) - .ok_or(UploadFragmentError::BadUpload)?; + let job = jobs.get_mut(job_id).ok_or(UploadFragmentError::BadUpload)?; + + // Cannot upload parts to a finished job + if !matches!(job.state, UploadJobState::Pending(_)) { + return Err(UploadFragmentError::BadUpload); + } + if job.owner != as_user { return Err(UploadFragmentError::NotMyUpload); } @@ -147,7 +215,10 @@ impl Uploader { job.last_activity = OffsetDateTime::now_utc(); let part_number = match part_number { Some(x) => x, - None => i32::try_from(job.uploadjob.n_completed_parts()).unwrap() + 1, + None => match &mut job.state { + UploadJobState::Pending(uj) => i32::try_from(uj.n_completed_parts()).unwrap() + 1, + UploadJobState::Done(_) => unreachable!(), + }, }; assert!( @@ -156,28 +227,44 @@ impl Uploader { ); // TODO: queue this future. CAREFUL WITH PART NUMBERS! - job.uploadjob.upload_part(data, part_number).await?; + match &mut job.state { + UploadJobState::Pending(uj) => uj.upload_part(data, part_number).await?, + UploadJobState::Done(_) => unreachable!(), + }; return Ok(()); } - pub async fn finish_job(&self, as_user: UserId, job_id: &str) -> Result<(), UploadFinishError> { + pub async fn finish_job( + &self, + as_user: UserId, + job_id: &UploadJobId, + ) -> Result<(), UploadFinishError> { self.check_jobs().await; let mut jobs = self.jobs.lock().await; - let job_idx = jobs - .iter_mut() - .enumerate() - .find(|(_, us)| us.id == job_id) - .ok_or(UploadFinishError::BadUpload)?; - if job_idx.1.owner != as_user { + let job = jobs.get_mut(job_id).ok_or(UploadFinishError::BadUpload)?; + + // Cannot finish a finished job + if matches!(job.state, UploadJobState::Done(_)) { + return Err(UploadFinishError::BadUpload); + } + + if job.owner != as_user { return Err(UploadFinishError::NotMyUpload); } - let job_idx = job_idx.0; - let job = jobs.swap_remove(job_idx); + let done_state = UploadJobState::Done(match &job.state { + UploadJobState::Pending(uj) => uj.key().into(), + UploadJobState::Done(_) => unreachable!(), + }); + + let uj = std::mem::replace(&mut job.state, done_state); - job.uploadjob.finish().await?; + match uj { + UploadJobState::Pending(uj) => uj.finish().await?, + UploadJobState::Done(_) => unreachable!(), + }; debug!( message = "Finished upload", diff --git a/copperd/lib/edged/Cargo.toml b/copperd/lib/edged/Cargo.toml index 9e53f11e..06b826aa 100644 --- a/copperd/lib/edged/Cargo.toml +++ b/copperd/lib/edged/Cargo.toml @@ -19,7 +19,6 @@ workspace = true [dependencies] copper-pipelined = { workspace = true } copper-storaged = { workspace = true } -copper-util = { workspace = true } smartstring = { workspace = true } serde = { workspace = true } diff --git a/copperd/lib/edged/src/lib.rs b/copperd/lib/edged/src/lib.rs index a9077d50..e1efc7d0 100644 --- a/copperd/lib/edged/src/lib.rs +++ b/copperd/lib/edged/src/lib.rs @@ -3,6 +3,3 @@ pub use info::*; mod id; pub use id::*; - -mod apidata; -pub use apidata::*; diff --git a/copperd/lib/util/src/s3client.rs b/copperd/lib/util/src/s3client.rs index 70928ccc..64a7605f 100644 --- a/copperd/lib/util/src/s3client.rs +++ b/copperd/lib/util/src/s3client.rs @@ -319,6 +319,10 @@ impl MultipartUpload { self.completed_parts.len() } + pub fn key(&self) -> &str { + &self.key + } + /// Upload a part to a multipart upload. /// `part_number` must be consecutive, and starts at 1. pub async fn upload_part( From 29bae2e3407c782e7122f281fc141f46bf57fcd0 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Thu, 17 Oct 2024 15:23:36 -0700 Subject: [PATCH 5/7] Clean up upload jobs (except those that have been assigned to a pipeline) --- copperd/bin/edged/src/api/pipeline/run.rs | 29 ++++- copperd/bin/edged/src/config.rs | 4 +- copperd/bin/edged/src/uploader/errors.rs | 34 +++++- copperd/bin/edged/src/uploader/mod.rs | 139 ++++++++++++++++++---- copperd/lib/util/src/s3client.rs | 56 ++++++++- 5 files changed, 232 insertions(+), 30 deletions(-) diff --git a/copperd/bin/edged/src/api/pipeline/run.rs b/copperd/bin/edged/src/api/pipeline/run.rs index 0b68ac41..6ab5125f 100644 --- a/copperd/bin/edged/src/api/pipeline/run.rs +++ b/copperd/bin/edged/src/api/pipeline/run.rs @@ -16,7 +16,7 @@ use utoipa::ToSchema; use crate::{ apidata::ApiAttrData, database::base::{client::DatabaseClient, errors::pipeline::GetPipelineError}, - uploader::GotJobKey, + uploader::{errors::UploadAssignError, GotJobKey}, RouterState, }; @@ -88,7 +88,7 @@ pub(super) async fn run_pipeline( if let Some(x) = match &v { ApiAttrData::Blob { upload_id } => { let res = state.uploader.get_job_object_key(user.id, upload_id).await; - match res { + let key = match res { GotJobKey::NoSuchJob => { return ( StatusCode::BAD_REQUEST, @@ -109,7 +109,30 @@ pub(super) async fn run_pipeline( .into_response(); } - GotJobKey::HereYouGo(key) => Some(AttrData::Blob { + GotJobKey::JobIsAssigned => { + return ( + StatusCode::BAD_REQUEST, + Json(format!( + "Invalid input: input {k} references a job that has been assigned to a pipeline" + )), + ) + .into_response(); + } + + GotJobKey::HereYouGo(key) => key, + }; + + let res = state + .uploader + .assign_job_to_pipeline(user.id, upload_id, &payload.job_id) + .await; + + match res { + // This is impossible, we already checked these cases + Err(UploadAssignError::BadUpload) => unreachable!(), + Err(UploadAssignError::NotMyUpload) => unreachable!(), + + Ok(()) => Some(AttrData::Blob { bucket: (&state.config.edged_objectstore_upload_bucket).into(), key, }), diff --git a/copperd/bin/edged/src/config.rs b/copperd/bin/edged/src/config.rs index 4a39ccc3..70411353 100644 --- a/copperd/bin/edged/src/config.rs +++ b/copperd/bin/edged/src/config.rs @@ -47,7 +47,9 @@ pub struct EdgedConfig { /// The bucket to store user uploads in pub edged_objectstore_upload_bucket: String, - /// How long a pending upload job may idle before being deleted, in seconds + /// How long an upload job may idle before being deleted, in seconds + /// - if a pending upload job does not receive a part for this many seconds, it is deleted + /// - if a finished upload job is not passed to a `run()` call within this many seconds, it is deleted pub edged_upload_job_timeout: u64, } diff --git a/copperd/bin/edged/src/uploader/errors.rs b/copperd/bin/edged/src/uploader/errors.rs index a16a3c4a..bbb7541f 100644 --- a/copperd/bin/edged/src/uploader/errors.rs +++ b/copperd/bin/edged/src/uploader/errors.rs @@ -32,7 +32,7 @@ impl Error for NewUploadError { #[derive(Debug)] pub enum UploadFragmentError { - /// We tried to push a fragment to an upload that doesn't exist + /// We tried to push a fragment to an upload that doesn't exist or isn't pending BadUpload, /// We tried to push a fragment to an upload we don't own @@ -77,7 +77,7 @@ impl Error for UploadFragmentError { #[derive(Debug)] pub enum UploadFinishError { - /// We tried to finish an upload that doesn't exist + /// We tried to finish an upload that doesn't exist or isn't pending BadUpload, /// We tried to finish an upload we don't own @@ -115,3 +115,33 @@ impl Error for UploadFinishError { } } } + +#[derive(Debug)] +pub enum UploadAssignError { + /// We tried to assign an upload that doesn't exist or isn't done + BadUpload, + + /// We tried to assign an upload we don't own + NotMyUpload, +} + +impl Display for UploadAssignError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::BadUpload => { + write!(f, "Tried to finish an upload that doesn't exist") + } + Self::NotMyUpload => { + write!(f, "Tried to finish an upload that we don't own") + } + } + } +} + +impl Error for UploadAssignError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + _ => None, + } + } +} diff --git a/copperd/bin/edged/src/uploader/mod.rs b/copperd/bin/edged/src/uploader/mod.rs index dcee8483..b5c0b47c 100644 --- a/copperd/bin/edged/src/uploader/mod.rs +++ b/copperd/bin/edged/src/uploader/mod.rs @@ -3,18 +3,22 @@ use copper_util::{ s3client::{MultipartUpload, S3Client}, MimeType, }; -use errors::{NewUploadError, UploadFinishError, UploadFragmentError}; +use errors::{NewUploadError, UploadAssignError, UploadFinishError, UploadFragmentError}; use rand::{distributions::Alphanumeric, Rng}; use serde::{Deserialize, Serialize}; use smartstring::{LazyCompact, SmartString}; use std::{collections::BTreeMap, fmt::Display, sync::Arc, time::Duration}; use time::OffsetDateTime; -use tracing::{debug, info}; +use tracing::{debug, error, info}; use crate::config::EdgedConfig; pub mod errors; +// +// MARK: Helpers +// + const UPLOAD_ID_LENGTH: usize = 16; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] @@ -46,21 +50,42 @@ impl Display for UploadJobId { } } -pub enum UploadJobState { +/// The state of an upload job +#[derive(Debug)] +enum UploadJobState { /// This job is pending, value is upload target Pending(MultipartUpload), /// This job is done, value is S3 object key. Done(SmartString), + + /// This job is done and has been assigned to a pipeline job. + /// Value is the pipeline job's id. + Assigned { + key: SmartString, + pipeline_job: SmartString, + }, +} + +pub enum GotJobKey { + NoSuchJob, + JobNotDone, + HereYouGo(SmartString), + JobIsAssigned, } +// +// MARK: UploadJob +// + pub struct UploadJob { pub id: UploadJobId, - pub started_at: OffsetDateTime, - pub last_activity: OffsetDateTime, - pub owner: UserId, - pub state: UploadJobState, + started_at: OffsetDateTime, + last_activity: OffsetDateTime, + state: UploadJobState, + + pub owner: UserId, pub mime: MimeType, } @@ -70,13 +95,8 @@ pub struct Uploader { objectstore_client: Arc, } -pub enum GotJobKey { - NoSuchJob, - JobNotDone, - HereYouGo(SmartString), -} - impl Uploader { + /// Initialize a new upload manager pub fn new(config: Arc, objectstore_client: Arc) -> Self { Self { config, @@ -102,14 +122,17 @@ impl Uploader { match &job.state { UploadJobState::Pending(_) => GotJobKey::JobNotDone, UploadJobState::Done(x) => GotJobKey::HereYouGo(x.clone()), + UploadJobState::Assigned { .. } => GotJobKey::JobIsAssigned, } } /// Check all active jobs in this uploader, - /// and remove those we no longer need. + /// and remove those that that have timed out. /// - /// This cleans up jobs that have timed out, - /// and jobs bound to a pipeline that has been finished. + /// This should be called before any uploader action, + /// which could result in a few old jobs laying around. + /// This is not a problem, since they will never pile up + /// and waste storage. #[inline(always)] async fn check_jobs(&self) { let mut jobs = self.jobs.lock().await; @@ -120,7 +143,10 @@ impl Uploader { for (k, j) in jobs.iter() { let should_remove = match j.state { UploadJobState::Pending(_) => j.last_activity + offset < now, - UploadJobState::Done(_) => false, + UploadJobState::Done(_) => j.last_activity + offset < now, + + // Assigned jobs never time out + UploadJobState::Assigned { .. } => false, }; if should_remove { @@ -128,7 +154,8 @@ impl Uploader { message = "Job queued for removal", reason = "timeout", job_id = ?j.id, - started_at = ?j.started_at + started_at = ?j.started_at, + state = ?j.state ); to_remove.push(k.clone()); @@ -141,13 +168,27 @@ impl Uploader { let job = jobs.remove(&k).unwrap(); match job.state { UploadJobState::Pending(uj) => uj.cancel().await, - _ => unreachable!(), + + UploadJobState::Assigned { key, .. } | UploadJobState::Done(key) => { + let res = self + .objectstore_client + .delete_object(&self.config.edged_objectstore_upload_bucket, &key) + .await; + match res { + Ok(()) => {} + Err(error) => { + error!(message = "Could not delete uploaded object", ?key, ?error); + } + } + } } } } } impl Uploader { + /// Create a new upload job owned by the given user + /// and return its id. pub async fn new_job( &self, owner: UserId, @@ -156,7 +197,14 @@ impl Uploader { self.check_jobs().await; let mut jobs = self.jobs.lock().await; - let id = UploadJobId::new(); + + // Generate a new id + let id = loop { + let id = UploadJobId::new(); + if !jobs.contains_key(&id) { + break id; + } + }; let now = OffsetDateTime::now_utc(); jobs.insert( @@ -203,7 +251,7 @@ impl Uploader { let mut jobs = self.jobs.lock().await; let job = jobs.get_mut(job_id).ok_or(UploadFragmentError::BadUpload)?; - // Cannot upload parts to a finished job + // Cannot upload parts to a job that isn't pending if !matches!(job.state, UploadJobState::Pending(_)) { return Err(UploadFragmentError::BadUpload); } @@ -218,6 +266,7 @@ impl Uploader { None => match &mut job.state { UploadJobState::Pending(uj) => i32::try_from(uj.n_completed_parts()).unwrap() + 1, UploadJobState::Done(_) => unreachable!(), + UploadJobState::Assigned { .. } => unreachable!(), }, }; @@ -230,11 +279,13 @@ impl Uploader { match &mut job.state { UploadJobState::Pending(uj) => uj.upload_part(data, part_number).await?, UploadJobState::Done(_) => unreachable!(), + UploadJobState::Assigned { .. } => unreachable!(), }; return Ok(()); } + /// Finish an upload job as the given user pub async fn finish_job( &self, as_user: UserId, @@ -245,8 +296,8 @@ impl Uploader { let mut jobs = self.jobs.lock().await; let job = jobs.get_mut(job_id).ok_or(UploadFinishError::BadUpload)?; - // Cannot finish a finished job - if matches!(job.state, UploadJobState::Done(_)) { + // Cannot finish a job that isn't pending + if !matches!(job.state, UploadJobState::Pending(_)) { return Err(UploadFinishError::BadUpload); } @@ -257,6 +308,7 @@ impl Uploader { let done_state = UploadJobState::Done(match &job.state { UploadJobState::Pending(uj) => uj.key().into(), UploadJobState::Done(_) => unreachable!(), + UploadJobState::Assigned { .. } => unreachable!(), }); let uj = std::mem::replace(&mut job.state, done_state); @@ -264,6 +316,7 @@ impl Uploader { match uj { UploadJobState::Pending(uj) => uj.finish().await?, UploadJobState::Done(_) => unreachable!(), + UploadJobState::Assigned { .. } => unreachable!(), }; debug!( @@ -274,4 +327,44 @@ impl Uploader { return Ok(()); } + + /// Finish an upload job as the given user + pub async fn assign_job_to_pipeline( + &self, + as_user: UserId, + job_id: &UploadJobId, + to_pipeline_job: &str, + ) -> Result<(), UploadAssignError> { + self.check_jobs().await; + + let mut jobs = self.jobs.lock().await; + let job = jobs.get_mut(job_id).ok_or(UploadAssignError::BadUpload)?; + + // Cannot assign a job that isn't done + let key = match &job.state { + UploadJobState::Done(x) => x.clone(), + _ => return Err(UploadAssignError::BadUpload), + }; + + if job.owner != as_user { + return Err(UploadAssignError::NotMyUpload); + } + + let _ = std::mem::replace( + &mut job.state, + UploadJobState::Assigned { + key, + pipeline_job: to_pipeline_job.into(), + }, + ); + + debug!( + message = "Assigned upload job", + job_id = ?job_id, + mime = ?job.mime, + to_pipeline_job + ); + + return Ok(()); + } } diff --git a/copperd/lib/util/src/s3client.rs b/copperd/lib/util/src/s3client.rs index 64a7605f..5247c2fe 100644 --- a/copperd/lib/util/src/s3client.rs +++ b/copperd/lib/util/src/s3client.rs @@ -6,7 +6,7 @@ use aws_sdk_s3::{ use smartstring::{LazyCompact, SmartString}; use std::{ error::Error, - fmt::Display, + fmt::{Debug, Display}, io::{Seek, SeekFrom, Write}, }; use tracing::error; @@ -142,6 +142,35 @@ impl Error for S3UploadFinishError { } } +#[derive(Debug)] +pub enum S3DeleteObjectError { + SdkError(Box), +} + +impl + From> for S3DeleteObjectError +{ + fn from(value: SdkError) -> Self { + Self::SdkError(Box::new(value)) + } +} + +impl Display for S3DeleteObjectError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::SdkError(_) => write!(f, "sdk error"), + } + } +} + +impl Error for S3DeleteObjectError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match self { + Self::SdkError(x) => Some(&**x), + } + } +} + // // MARK: Implementations // @@ -210,6 +239,21 @@ impl<'a> S3Client { completed_parts: Vec::new(), }); } + + pub async fn delete_object( + &'a self, + bucket: &str, + key: &str, + ) -> Result<(), S3DeleteObjectError> { + self.client + .delete_object() + .bucket(bucket) + .key(key) + .send() + .await?; + + return Ok(()); + } } pub struct S3Reader { @@ -391,3 +435,13 @@ impl MultipartUpload { return Ok(()); } } + +impl Debug for MultipartUpload { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "MultipartUpload{{bucket: {}, key: {}}}", + self.bucket, self.key + ) + } +} From 4114c3296f5f94ad0ff6defba3e8c3c6e4d8bda3 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Thu, 17 Oct 2024 16:35:19 -0700 Subject: [PATCH 6/7] Delete uploads from finished pipeline jobs --- copperd/bin/edged/src/main.rs | 21 ++++--- copperd/bin/edged/src/uploader/mod.rs | 65 ++++++++++++++++++--- copperd/lib/pipelined/src/client/base.rs | 7 ++- copperd/lib/pipelined/src/client/reqwest.rs | 25 +++++++- 4 files changed, 99 insertions(+), 19 deletions(-) diff --git a/copperd/bin/edged/src/main.rs b/copperd/bin/edged/src/main.rs index cd690f6b..3a45259e 100644 --- a/copperd/bin/edged/src/main.rs +++ b/copperd/bin/edged/src/main.rs @@ -34,21 +34,24 @@ async fn make_app(config: Arc, s3_client_upload: Arc) -> } }; + let pipelined_client = Arc::new( + ReqwestPipelineClient::new(&config.edged_pipelined_addr, &config.edged_pipelined_secret) + // TODO: handle error + .unwrap(), + ); + // Create app return api::router(RouterState { config: config.clone(), db_client: Arc::new(db), auth: Arc::new(AuthHelper::new()), - uploader: Arc::new(Uploader::new(config.clone(), s3_client_upload.clone())), + uploader: Arc::new(Uploader::new( + config.clone(), + s3_client_upload.clone(), + pipelined_client.clone(), + )), - pipelined_client: Arc::new( - ReqwestPipelineClient::new( - &config.edged_pipelined_addr, - &config.edged_pipelined_secret, - ) - // TODO: handle error - .unwrap(), - ), + pipelined_client, storaged_client: Arc::new( ReqwestStoragedClient::new(&config.edged_storaged_addr, &config.edged_storaged_secret) diff --git a/copperd/bin/edged/src/uploader/mod.rs b/copperd/bin/edged/src/uploader/mod.rs index b5c0b47c..364f640b 100644 --- a/copperd/bin/edged/src/uploader/mod.rs +++ b/copperd/bin/edged/src/uploader/mod.rs @@ -1,3 +1,4 @@ +use copper_pipelined::{client::PipelinedClient, structs::JobInfoState}; use copper_storaged::UserId; use copper_util::{ s3client::{MultipartUpload, S3Client}, @@ -93,14 +94,20 @@ pub struct Uploader { config: Arc, jobs: tokio::sync::Mutex>, objectstore_client: Arc, + pipelined_client: Arc, } impl Uploader { /// Initialize a new upload manager - pub fn new(config: Arc, objectstore_client: Arc) -> Self { + pub fn new( + config: Arc, + objectstore_client: Arc, + pipelined_client: Arc, + ) -> Self { Self { config, jobs: tokio::sync::Mutex::new(BTreeMap::new()), + pipelined_client, objectstore_client, } } @@ -141,18 +148,52 @@ impl Uploader { let mut to_remove = Vec::new(); for (k, j) in jobs.iter() { - let should_remove = match j.state { - UploadJobState::Pending(_) => j.last_activity + offset < now, - UploadJobState::Done(_) => j.last_activity + offset < now, + let reason; - // Assigned jobs never time out - UploadJobState::Assigned { .. } => false, + let should_remove = match &j.state { + UploadJobState::Pending(_) => { + reason = "pending timeout"; + j.last_activity + offset < now + } + + UploadJobState::Done(_) => { + reason = "done timeout"; + j.last_activity + offset < now + } + + UploadJobState::Assigned { pipeline_job, .. } => { + // Apply a timeout even to assigned jobs, so that we + // need fewer api calls, and to prevent errors caused + // by a race condition + // (a job is assigned to a pipeline that isn't returned by pipelined) + if j.last_activity + offset > now { + reason = "UNREACHABLE"; + false + } else { + let info = self.pipelined_client.get_job(&pipeline_job).await; + if info.is_err() { + reason = "assigned job error"; + true + } else { + reason = "assigned job finished"; + + match info.unwrap().state { + JobInfoState::Failed => true, + JobInfoState::BuildError { .. } => true, + JobInfoState::Success => true, + + JobInfoState::Queued => false, + JobInfoState::Running => false, + } + } + } + } }; if should_remove { debug!( message = "Job queued for removal", - reason = "timeout", + reason, job_id = ?j.id, started_at = ?j.started_at, state = ?j.state @@ -164,7 +205,7 @@ impl Uploader { } for k in to_remove { - debug!(message = "Removing job", reason = "timeout", job_id = ?k); + debug!(message = "Removing job", job_id = ?k); let job = jobs.remove(&k).unwrap(); match job.state { UploadJobState::Pending(uj) => uj.cancel().await, @@ -260,7 +301,9 @@ impl Uploader { return Err(UploadFragmentError::NotMyUpload); } + // Only update last_activity if request was valid job.last_activity = OffsetDateTime::now_utc(); + let part_number = match part_number { Some(x) => x, None => match &mut job.state { @@ -305,6 +348,9 @@ impl Uploader { return Err(UploadFinishError::NotMyUpload); } + // Only update last_activity if request was valid + job.last_activity = OffsetDateTime::now_utc(); + let done_state = UploadJobState::Done(match &job.state { UploadJobState::Pending(uj) => uj.key().into(), UploadJobState::Done(_) => unreachable!(), @@ -350,6 +396,9 @@ impl Uploader { return Err(UploadAssignError::NotMyUpload); } + // Only update last_activity if request was valid + job.last_activity = OffsetDateTime::now_utc(); + let _ = std::mem::replace( &mut job.state, UploadJobState::Assigned { diff --git a/copperd/lib/pipelined/src/client/base.rs b/copperd/lib/pipelined/src/client/base.rs index 6c759915..5f4f7a38 100644 --- a/copperd/lib/pipelined/src/client/base.rs +++ b/copperd/lib/pipelined/src/client/base.rs @@ -5,7 +5,10 @@ use copper_storaged::{AttrData, UserId}; use reqwest::StatusCode; use smartstring::{LazyCompact, SmartString}; -use crate::{json::PipelineJson, structs::JobInfoList}; +use crate::{ + json::PipelineJson, + structs::{JobInfo, JobInfoList}, +}; #[derive(Debug)] pub enum PipelinedRequestError { @@ -59,4 +62,6 @@ pub trait PipelinedClient: Send + Sync { skip: usize, count: usize, ) -> Result; + + async fn get_job(&self, job_id: &str) -> Result; } diff --git a/copperd/lib/pipelined/src/client/reqwest.rs b/copperd/lib/pipelined/src/client/reqwest.rs index 04bdc992..f603882b 100644 --- a/copperd/lib/pipelined/src/client/reqwest.rs +++ b/copperd/lib/pipelined/src/client/reqwest.rs @@ -6,7 +6,10 @@ use smartstring::{LazyCompact, SmartString}; use std::collections::BTreeMap; use super::{PipelinedClient, PipelinedRequestError}; -use crate::{json::PipelineJson, structs::JobInfoList}; +use crate::{ + json::PipelineJson, + structs::{JobInfo, JobInfoList}, +}; pub struct ReqwestPipelineClient { client: Client, @@ -101,4 +104,24 @@ impl PipelinedClient for ReqwestPipelineClient { return Ok(de); } + + async fn get_job(&self, job_id: &str) -> Result { + let url = self.pipelined_url.join(&format!("/job/{job_id}")).unwrap(); + let res = self + .client + .get(url) + .header( + header::AUTHORIZATION, + format!("Bearer {}", self.pipelined_secret), + ) + .send() + .await + .map_err(convert_error)?; + + let json = res.text().await.map_err(convert_error)?; + let de: JobInfo = serde_json::from_str(&json) + .map_err(|e| PipelinedRequestError::Other { error: Box::new(e) })?; + + return Ok(de); + } } From ffcf636f5f4f7adda0f9666a60d0c5c45ed22e90 Mon Sep 17 00:00:00 2001 From: rm-dr <96270320+rm-dr@users.noreply.github.com> Date: Thu, 17 Oct 2024 16:36:31 -0700 Subject: [PATCH 7/7] Increase log & queue size --- copperd/.env | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/copperd/.env b/copperd/.env index f698ecca..85d56df9 100644 --- a/copperd/.env +++ b/copperd/.env @@ -21,8 +21,8 @@ EDGED_UPLOAD_JOB_TIMEOUT = 300 PIPELINED_LOGLEVEL = "Trace" PIPELINED_BLOB_FRAGMENT_SIZE = 10000000 PIPELINED_MAX_RUNNING_JOBS = 4 -PIPELINED_JOB_QUEUE_SIZE = 100 -PIPELINED_JOB_LOG_SIZE = 100 +PIPELINED_JOB_QUEUE_SIZE = 1000 +PIPELINED_JOB_LOG_SIZE = 1000 PIPELINED_REQUEST_BODY_LIMIT = 20000000 PIPELINED_SERVER_ADDR = "127.0.0.1:4000"