diff --git a/copperc/src/app/u/pipeline/_nodes/additem.tsx b/copperc/src/app/u/pipeline/_nodes/additem.tsx index c794673c..bed63d20 100644 --- a/copperc/src/app/u/pipeline/_nodes/additem.tsx +++ b/copperc/src/app/u/pipeline/_nodes/additem.tsx @@ -204,6 +204,11 @@ export const AddItemNode: NodeDef = { } return { + // TODO: add ui + on_unique_violation: { + parameter_type: "String", + value: "select", + }, dataset: { parameter_type: "Integer", value: node.data.dataset, diff --git a/copperd/Cargo.lock b/copperd/Cargo.lock index 7aedd9b9..f9a4b187 100644 --- a/copperd/Cargo.lock +++ b/copperd/Cargo.lock @@ -1866,6 +1866,7 @@ dependencies = [ "serde_json", "sha2", "smartstring", + "sqlx", "tokio", "tracing", ] diff --git a/copperd/lib/itemdb/src/client/client/mod.rs b/copperd/lib/itemdb/src/client/client/mod.rs index d5907851..a63db3b4 100644 --- a/copperd/lib/itemdb/src/client/client/mod.rs +++ b/copperd/lib/itemdb/src/client/client/mod.rs @@ -13,6 +13,7 @@ mod attribute; mod class; mod dataset; mod item; +pub use item::*; #[derive(Debug, Error)] /// An error we may encounter when connecting to postgres diff --git a/copperd/lib/piper/src/base/errors.rs b/copperd/lib/piper/src/base/errors.rs index c7a89b7a..9bd307ff 100644 --- a/copperd/lib/piper/src/base/errors.rs +++ b/copperd/lib/piper/src/base/errors.rs @@ -86,6 +86,12 @@ pub enum RunNodeError { }, } +impl From for RunNodeError { + fn from(value: sqlx::Error) -> Self { + Self::DbError(Arc::new(value)) + } +} + impl From for RunNodeError { fn from(value: std::io::Error) -> Self { Self::IoError(Arc::new(value)) diff --git a/copperd/nodes/basic/Cargo.toml b/copperd/nodes/basic/Cargo.toml index a271e378..accbfda8 100644 --- a/copperd/nodes/basic/Cargo.toml +++ b/copperd/nodes/basic/Cargo.toml @@ -22,6 +22,7 @@ copper-piper = { workspace = true } copper-itemdb = { workspace = true } rand = { workspace = true } +sqlx = { workspace = true } tracing = { workspace = true } tokio = { workspace = true } async-trait = { workspace = true } diff --git a/copperd/nodes/basic/src/additem.rs b/copperd/nodes/basic/src/additem.rs index ca8ec922..3cfa5976 100644 --- a/copperd/nodes/basic/src/additem.rs +++ b/copperd/nodes/basic/src/additem.rs @@ -1,6 +1,9 @@ use async_trait::async_trait; use copper_itemdb::{ - client::errors::{class::GetClassError, dataset::GetDatasetError}, + client::{ + errors::{class::GetClassError, dataset::GetDatasetError}, + AddItemError, + }, AttrData, AttributeInfo, ClassInfo, }; use copper_piper::{ @@ -10,9 +13,21 @@ use copper_piper::{ }; use rand::{distributions::Alphanumeric, Rng}; use smartstring::{LazyCompact, SmartString}; +use sqlx::Acquire; use std::{collections::BTreeMap, sync::Arc}; use tracing::{debug, trace}; +/// How we should react when we try to create a node that +/// violates a "unique" constraint +enum OnUniqueViolation { + /// Throw an error & cancel the pipeline + Fail, + + /// If there is ONE conflicting item, create nothing and return its id. + /// If more than one item conflicts, error. + Select, +} + pub struct AddItem {} impl NodeBuilder for AddItem { @@ -64,7 +79,6 @@ impl<'ctx> Node<'ctx> for AddItem { }; // This is only used by UI, but make sure it's sane. - if let Some(value) = params.remove("dataset") { match value { NodeParameterValue::Integer(x) => { @@ -105,6 +119,31 @@ impl<'ctx> Node<'ctx> for AddItem { parameter: "dataset".into(), }); }; + + let on_unique_violation = if let Some(value) = params.remove("on_unique_violation") { + match value { + NodeParameterValue::String(x) => { + match x.as_str() { + "fail" => OnUniqueViolation::Fail, + "select" => OnUniqueViolation::Select, + + // TODO: error if unknown string + _ => OnUniqueViolation::Fail, + } + } + + _ => { + return Err(RunNodeError::BadParameterType { + parameter: "on_unique_violation".into(), + }) + } + } + } else { + return Err(RunNodeError::MissingParameter { + parameter: "on_unique_violation".into(), + }); + }; + if let Some((param, _)) = params.first_key_value() { return Err(RunNodeError::UnexpectedParameter { parameter: param.clone(), @@ -196,10 +235,14 @@ impl<'ctx> Node<'ctx> for AddItem { // Set up and send transaction // + // Savepoint, in case we need to rollback `add_item`. + // See comments below. + let mut trans2 = trans.begin().await?; + let new_item = ctx .itemdb_client .add_item( - &mut trans, + &mut trans2, class.id, attributes .into_iter() @@ -207,8 +250,35 @@ impl<'ctx> Node<'ctx> for AddItem { .filter_map(|(k, v)| v.map(|v| (k, v))) .collect(), ) - .await - .map_err(|x| RunNodeError::Other(Arc::new(x)))?; + .await; + + let new_item = match new_item { + Ok(x) => { + // We added the item successfully, + // commit savepoint. + trans2.commit().await?; + x + } + Err(err) => match on_unique_violation { + OnUniqueViolation::Fail => return Err(RunNodeError::Other(Arc::new(err))), + OnUniqueViolation::Select => match err { + AddItemError::UniqueViolated { + ref conflicting_ids, + } => { + if conflicting_ids.len() == 1 { + // We're not failing the pipeline, so `trans` will be committed. + // however, we shouldn't add a new item---so we need to roll back + // trans2. + trans2.rollback().await?; + *conflicting_ids.first().unwrap() + } else { + return Err(RunNodeError::Other(Arc::new(err))); + } + } + _ => return Err(RunNodeError::Other(Arc::new(err))), + }, + }, + }; let mut output = BTreeMap::new(); output.insert(