Skip to content

Commit

Permalink
Added on_unique_violation
Browse files Browse the repository at this point in the history
  • Loading branch information
rm-dr committed Nov 13, 2024
1 parent f822a0b commit 231426c
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 5 deletions.
5 changes: 5 additions & 0 deletions copperc/src/app/u/pipeline/_nodes/additem.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ export const AddItemNode: NodeDef<AddItemNodeType> = {
}

return {
// TODO: add ui
on_unique_violation: {
parameter_type: "String",
value: "select",
},
dataset: {
parameter_type: "Integer",
value: node.data.dataset,
Expand Down
1 change: 1 addition & 0 deletions copperd/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions copperd/lib/itemdb/src/client/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod attribute;
mod class;
mod dataset;
mod item;
pub use item::*;

#[derive(Debug, Error)]
/// An error we may encounter when connecting to postgres
Expand Down
6 changes: 6 additions & 0 deletions copperd/lib/piper/src/base/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ pub enum RunNodeError {
},
}

impl From<sqlx::Error> for RunNodeError {
fn from(value: sqlx::Error) -> Self {
Self::DbError(Arc::new(value))
}
}

impl From<std::io::Error> for RunNodeError {
fn from(value: std::io::Error) -> Self {
Self::IoError(Arc::new(value))
Expand Down
1 change: 1 addition & 0 deletions copperd/nodes/basic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ copper-piper = { workspace = true }
copper-itemdb = { workspace = true }

rand = { workspace = true }
sqlx = { workspace = true }
tracing = { workspace = true }
tokio = { workspace = true }
async-trait = { workspace = true }
Expand Down
80 changes: 75 additions & 5 deletions copperd/nodes/basic/src/additem.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use async_trait::async_trait;
use copper_itemdb::{
client::errors::{class::GetClassError, dataset::GetDatasetError},
client::{
errors::{class::GetClassError, dataset::GetDatasetError},
AddItemError,
},
AttrData, AttributeInfo, ClassInfo,
};
use copper_piper::{
Expand All @@ -10,9 +13,21 @@ use copper_piper::{
};
use rand::{distributions::Alphanumeric, Rng};
use smartstring::{LazyCompact, SmartString};
use sqlx::Acquire;
use std::{collections::BTreeMap, sync::Arc};
use tracing::{debug, trace};

/// How we should react when we try to create a node that
/// violates a "unique" constraint
enum OnUniqueViolation {
/// Throw an error & cancel the pipeline
Fail,

/// If there is ONE conflicting item, create nothing and return its id.
/// If more than one item conflicts, error.
Select,
}

pub struct AddItem {}

impl NodeBuilder for AddItem {
Expand Down Expand Up @@ -64,7 +79,6 @@ impl<'ctx> Node<'ctx> for AddItem {
};

// This is only used by UI, but make sure it's sane.

if let Some(value) = params.remove("dataset") {
match value {
NodeParameterValue::Integer(x) => {
Expand Down Expand Up @@ -105,6 +119,31 @@ impl<'ctx> Node<'ctx> for AddItem {
parameter: "dataset".into(),
});
};

let on_unique_violation = if let Some(value) = params.remove("on_unique_violation") {
match value {
NodeParameterValue::String(x) => {
match x.as_str() {
"fail" => OnUniqueViolation::Fail,
"select" => OnUniqueViolation::Select,

// TODO: error if unknown string
_ => OnUniqueViolation::Fail,
}
}

_ => {
return Err(RunNodeError::BadParameterType {
parameter: "on_unique_violation".into(),
})
}
}
} else {
return Err(RunNodeError::MissingParameter {
parameter: "on_unique_violation".into(),
});
};

if let Some((param, _)) = params.first_key_value() {
return Err(RunNodeError::UnexpectedParameter {
parameter: param.clone(),
Expand Down Expand Up @@ -196,19 +235,50 @@ impl<'ctx> Node<'ctx> for AddItem {
// Set up and send transaction
//

// Savepoint, in case we need to rollback `add_item`.
// See comments below.
let mut trans2 = trans.begin().await?;

let new_item = ctx
.itemdb_client
.add_item(
&mut trans,
&mut trans2,
class.id,
attributes
.into_iter()
.map(|(_, (k, d))| (k.id, d))
.filter_map(|(k, v)| v.map(|v| (k, v)))
.collect(),
)
.await
.map_err(|x| RunNodeError::Other(Arc::new(x)))?;
.await;

let new_item = match new_item {
Ok(x) => {
// We added the item successfully,
// commit savepoint.
trans2.commit().await?;
x
}
Err(err) => match on_unique_violation {
OnUniqueViolation::Fail => return Err(RunNodeError::Other(Arc::new(err))),
OnUniqueViolation::Select => match err {
AddItemError::UniqueViolated {
ref conflicting_ids,
} => {
if conflicting_ids.len() == 1 {
// We're not failing the pipeline, so `trans` will be committed.
// however, we shouldn't add a new item---so we need to roll back
// trans2.
trans2.rollback().await?;
*conflicting_ids.first().unwrap()
} else {
return Err(RunNodeError::Other(Arc::new(err)));
}
}
_ => return Err(RunNodeError::Other(Arc::new(err))),
},
},
};

let mut output = BTreeMap::new();
output.insert(
Expand Down

0 comments on commit 231426c

Please sign in to comment.