Skip to content

Commit

Permalink
Clean upload bucket (#144)
Browse files Browse the repository at this point in the history
  • Loading branch information
rm-dr authored Oct 17, 2024
2 parents 8bd6f91 + ffcf636 commit 1452678
Show file tree
Hide file tree
Showing 30 changed files with 741 additions and 172 deletions.
4 changes: 2 additions & 2 deletions copperc/src/app/u/upload/controlpanel.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { components } from "@/lib/api/openapi";

import styles from "./page.module.scss";
import { uploadFiles } from "./uploadlogic";
import { QueuedFileState, UploadState } from "./page";
import { UploadState } from "./page";
import { ppBytes } from "@/lib/ppbytes";

function UploadBar({
Expand Down Expand Up @@ -312,7 +312,7 @@ export function ControlPanel(params: {
input: {
[input_name]: {
type: "Blob",
object_key: upload_job,
upload_id: upload_job,
},
},
},
Expand Down
17 changes: 11 additions & 6 deletions copperc/src/lib/api/openapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,8 @@ export interface paths {
export type webhooks = Record<string, never>;
export interface components {
schemas: {
/** @description A value stored inside an attribute.
* Each of these corresponds to an [`AttrDataStub`] */
AttrData:
/** @description Attribute data, provided by the user by api calls. */
ApiAttrData:
| {
data_type: components["schemas"]["AttrDataStub"];
/** @enum {string} */
Expand Down Expand Up @@ -409,10 +408,16 @@ export interface components {
type: "Hash";
}
| {
/** @description The object's key */
object_key: string;
/** @enum {string} */
type: "Blob";
/** @description The upload id. This must only be used once,
* uploaded files are deleted once their job is done.
*
* Also, note that we _never_ send the S3 key to the
* client---only the upload id as a proxy. This makes sure
* that clients can only start jobs on uploads they own,
* and reduces the risk of other creative abuse. */
upload_id: string;
}
| {
/**
Expand Down Expand Up @@ -710,7 +715,7 @@ export interface components {
};
RunPipelineRequest: {
input: {
[key: string]: components["schemas"]["AttrData"];
[key: string]: components["schemas"]["ApiAttrData"];
};
/** @description A unique id for this job */
job_id: string;
Expand Down
8 changes: 4 additions & 4 deletions copperd/.env
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ EDGED_PIPELINED_SECRET = "impipesecret"
EDGED_OBJECTSTORE_KEY_ID = "HrmycpZai3D0X23oVQ97"
EDGED_OBJECTSTORE_KEY_SECRET = "jiyQKKHx7KbnlqWGPIuGwp9iBEtPqRf4HuGJHEdl"
EDGED_OBJECTSTORE_URL = "http://localhost:9000"
EDGED_OBJECTSTORE_BUCKET = "copper-upload"
EDGED_OBJECTSTORE_UPLOAD_BUCKET = "copper-upload"
EDGED_UPLOAD_JOB_TIMEOUT = 300

# pipelined config
PIPELINED_LOGLEVEL = "Trace"
PIPELINED_BLOB_FRAGMENT_SIZE = 10000000
PIPELINED_MAX_RUNNING_JOBS = 4
PIPELINED_JOB_QUEUE_SIZE = 100
PIPELINED_JOB_LOG_SIZE = 100
PIPELINED_JOB_QUEUE_SIZE = 1000
PIPELINED_JOB_LOG_SIZE = 1000

PIPELINED_REQUEST_BODY_LIMIT = 20000000
PIPELINED_SERVER_ADDR = "127.0.0.1:4000"
Expand All @@ -34,7 +34,7 @@ PIPELINED_STORAGED_SECRET = "imstoresecret"
PIPELINED_OBJECTSTORE_KEY_ID = "HrmycpZai3D0X23oVQ97"
PIPELINED_OBJECTSTORE_KEY_SECRET = "jiyQKKHx7KbnlqWGPIuGwp9iBEtPqRf4HuGJHEdl"
PIPELINED_OBJECTSTORE_URL = "http://localhost:9000"
PIPELINED_OBJECTSTORE_BUCKET = "copper-upload"
PIPELINED_OBJECTSTORE_BUCKET = "copper-blobs"

# storaged config
STORAGED_LOGLEVEL = "Develop"
Expand Down
4 changes: 2 additions & 2 deletions copperd/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions copperd/bin/edged/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@ use axum::routing::post;
use axum::{extract::DefaultBodyLimit, Router};
use copper_edged::UserInfo;
use copper_pipelined::client::PipelinedClient;
use copper_pipelined::helpers::S3Client;
use copper_storaged::client::StoragedClient;
use copper_storaged::{AttrDataStub, AttributeInfo, AttributeOptions, ClassInfo, DatasetInfo};
use copper_util::s3client::S3Client;
use copper_util::HashType;
use std::sync::Arc;
use tower_http::trace::TraceLayer;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;

use crate::auth::AuthHelper;
use crate::database::base::client::DatabaseClient;

use crate::config::EdgedConfig;
use crate::database::base::client::DatabaseClient;
use crate::uploader::Uploader;

mod attribute;
Expand All @@ -36,7 +35,7 @@ pub struct RouterState<Client: DatabaseClient> {
pub storaged_client: Arc<dyn StoragedClient>,
pub pipelined_client: Arc<dyn PipelinedClient>,
pub auth: Arc<AuthHelper<Client>>,
pub objectstore_client: Arc<S3Client>,
pub s3_client_upload: Arc<S3Client>,
pub uploader: Arc<Uploader>,
}

Expand All @@ -50,7 +49,7 @@ impl<Client: DatabaseClient> Clone for RouterState<Client> {
auth: self.auth.clone(),
storaged_client: self.storaged_client.clone(),
pipelined_client: self.pipelined_client.clone(),
objectstore_client: self.objectstore_client.clone(),
s3_client_upload: self.s3_client_upload.clone(),
uploader: self.uploader.clone(),
}
}
Expand Down
5 changes: 2 additions & 3 deletions copperd/bin/edged/src/api/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::database::base::client::DatabaseClient;
use crate::RouterState;
use crate::{apidata::ApiAttrData, database::base::client::DatabaseClient};
use axum::{
routing::{delete, get, patch, post},
Router,
Expand All @@ -9,7 +9,6 @@ use copper_pipelined::{
base::NodeParameterValue,
json::{EdgeJson, InputPort, NodeJson, NodeJsonPosition, OutputPort, PipelineJson},
};
use copper_storaged::AttrData;
use utoipa::OpenApi;

mod add;
Expand Down Expand Up @@ -50,7 +49,7 @@ use update::*;
RunPipelineRequest,
NodeParameterValue,
PipelineInfo,
AttrData
ApiAttrData
))
)]
pub(super) struct PipelineApi;
Expand Down
80 changes: 77 additions & 3 deletions copperd/bin/edged/src/api/pipeline/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use tracing::error;
use utoipa::ToSchema;

use crate::{
apidata::ApiAttrData,
database::base::{client::DatabaseClient, errors::pipeline::GetPipelineError},
uploader::{errors::UploadAssignError, GotJobKey},
RouterState,
};

Expand All @@ -24,8 +26,8 @@ pub(super) struct RunPipelineRequest {
#[schema(value_type = String)]
pub job_id: SmartString<LazyCompact>,

#[schema(value_type = BTreeMap<String, AttrData>)]
pub input: BTreeMap<SmartString<LazyCompact>, AttrData>,
#[schema(value_type = BTreeMap<String, ApiAttrData>)]
pub input: BTreeMap<SmartString<LazyCompact>, ApiAttrData>,
}

/// Start a pipeline job
Expand Down Expand Up @@ -74,9 +76,81 @@ pub(super) async fn run_pipeline<Client: DatabaseClient>(
return StatusCode::UNAUTHORIZED.into_response();
}

let mut converted_input: BTreeMap<SmartString<LazyCompact>, AttrData> = BTreeMap::new();
for (k, v) in payload.input {
// If we can automatically convert, do so
if let Ok(x) = AttrData::try_from(&v) {
converted_input.insert(k, x);
continue;
}

// Some types need manual conversion
if let Some(x) = match &v {
ApiAttrData::Blob { upload_id } => {
let res = state.uploader.get_job_object_key(user.id, upload_id).await;
let key = match res {
GotJobKey::NoSuchJob => {
return (
StatusCode::BAD_REQUEST,
Json(format!(
"Invalid input: input {k} references a job that does not exist"
)),
)
.into_response();
}

GotJobKey::JobNotDone => {
return (
StatusCode::BAD_REQUEST,
Json(format!(
"Invalid input: input {k} references a job that is not finished"
)),
)
.into_response();
}

GotJobKey::JobIsAssigned => {
return (
StatusCode::BAD_REQUEST,
Json(format!(
"Invalid input: input {k} references a job that has been assigned to a pipeline"
)),
)
.into_response();
}

GotJobKey::HereYouGo(key) => key,
};

let res = state
.uploader
.assign_job_to_pipeline(user.id, upload_id, &payload.job_id)
.await;

match res {
// This is impossible, we already checked these cases
Err(UploadAssignError::BadUpload) => unreachable!(),
Err(UploadAssignError::NotMyUpload) => unreachable!(),

Ok(()) => Some(AttrData::Blob {
bucket: (&state.config.edged_objectstore_upload_bucket).into(),
key,
}),
}
}

_ => None,
} {
converted_input.insert(k, x);
continue;
}

unreachable!("User-provided data {v:?} could not be converted automatically, but was not caught by the manual conversion `match`.")
}

let res = state
.pipelined_client
.run_pipeline(&pipe.data, &payload.job_id, &payload.input, user.id)
.run_pipeline(&pipe.data, &payload.job_id, &converted_input, user.id)
.await;

return match res {
Expand Down
4 changes: 2 additions & 2 deletions copperd/bin/edged/src/api/storage/finish_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use axum::{
use axum_extra::extract::CookieJar;
use tracing::error;

use crate::database::base::client::DatabaseClient;
use crate::{api::RouterState, uploader::errors::UploadFinishError};
use crate::{database::base::client::DatabaseClient, uploader::UploadJobId};

/// Rename a attribute
#[utoipa::path(
Expand All @@ -27,7 +27,7 @@ use crate::{api::RouterState, uploader::errors::UploadFinishError};
pub(super) async fn finish_upload<Client: DatabaseClient>(
jar: CookieJar,
State(state): State<RouterState<Client>>,
Path(job_id): Path<String>,
Path(job_id): Path<UploadJobId>,
) -> Response {
let user = match state.auth.auth_or_logout(&state, &jar).await {
Err(x) => return x,
Expand Down
5 changes: 3 additions & 2 deletions copperd/bin/edged/src/api/storage/start_upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use serde::{Deserialize, Serialize};
use tracing::error;
use utoipa::ToSchema;

use crate::database::base::client::DatabaseClient;
use crate::{api::RouterState, uploader::errors::NewUploadError};
use crate::{database::base::client::DatabaseClient, uploader::UploadJobId};

#[derive(Debug, Deserialize, ToSchema)]
pub(super) struct StartUploadRequest {
Expand All @@ -21,7 +21,8 @@ pub(super) struct StartUploadRequest {

#[derive(Debug, Serialize, ToSchema)]
pub(super) struct StartUploadResponse {
job_id: String,
#[schema(value_type = String)]
job_id: UploadJobId,
request_body_limit: usize,
}

Expand Down
4 changes: 2 additions & 2 deletions copperd/bin/edged/src/api/storage/upload_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use axum::{
use axum_extra::extract::CookieJar;
use tracing::{error, warn};

use crate::database::base::client::DatabaseClient;
use crate::{api::RouterState, uploader::errors::UploadFragmentError};
use crate::{database::base::client::DatabaseClient, uploader::UploadJobId};

/// Upload a part of a file.
/// TODO: enforce 5MB minimum size
Expand All @@ -30,7 +30,7 @@ use crate::{api::RouterState, uploader::errors::UploadFragmentError};
pub(super) async fn upload_part<Client: DatabaseClient>(
jar: CookieJar,
State(state): State<RouterState<Client>>,
Path(job_id): Path<String>,
Path(job_id): Path<UploadJobId>,
mut multipart: Multipart,
) -> Response {
let user = match state.auth.auth_or_logout(&state, &jar).await {
Expand Down
Loading

0 comments on commit 1452678

Please sign in to comment.