Skip to content

Commit

Permalink
fix/db-duplicate-opp (#221)
Browse files Browse the repository at this point in the history
* check duplicate opp before adding to db

* handle count error

* add .sqlx files

* updates

* address comments 2

* unncessary derefmut trait

* fix the creation time setting

* correct removal time heuristic
  • Loading branch information
anihamde authored Nov 11, 2024
1 parent 195556b commit cc612f4
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 84 deletions.

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE opportunity DROP COLUMN last_creation_time;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE opportunity ADD COLUMN last_creation_time TIMESTAMP NOT NULL DEFAULT NOW();
ALTER TABLE opportunity ALTER COLUMN last_creation_time DROP DEFAULT;
UPDATE opportunity SET last_creation_time = creation_time;
6 changes: 2 additions & 4 deletions auction-server/src/opportunity/entities/opportunity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use {
self,
},
},
state::UnixTimestampMicros,
},
ethers::types::Bytes,
std::ops::Deref,
Expand All @@ -32,19 +31,18 @@ pub struct OpportunityCoreFields<T: TokenAmount> {
pub chain_id: ChainId,
pub sell_tokens: Vec<T>,
pub buy_tokens: Vec<T>,
pub creation_time: UnixTimestampMicros,
pub creation_time: OffsetDateTime,
}

impl<T: TokenAmount> OpportunityCoreFields<T> {
pub fn new_with_current_time(val: OpportunityCoreFieldsCreate<T>) -> Self {
let odt = OffsetDateTime::now_utc();
Self {
id: Uuid::new_v4(),
permission_key: val.permission_key,
chain_id: val.chain_id,
sell_tokens: val.sell_tokens,
buy_tokens: val.buy_tokens,
creation_time: odt.unix_timestamp_nanos() / 1000 as UnixTimestampMicros,
creation_time: OffsetDateTime::now_utc(),
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions auction-server/src/opportunity/entities/opportunity_evm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl From<OpportunityEvm> for api::OpportunityEvm {
fn from(val: OpportunityEvm) -> Self {
api::OpportunityEvm {
opportunity_id: val.id,
creation_time: val.creation_time,
creation_time: val.creation_time.unix_timestamp_nanos() / 1000,
params: api::OpportunityParamsEvm::V1(api::OpportunityParamsV1Evm(
api::OpportunityCreateV1Evm {
permission_key: val.permission_key.clone(),
Expand Down Expand Up @@ -170,7 +170,7 @@ impl TryFrom<repository::Opportunity<repository::OpportunityMetadataEvm>> for Op
Ok(OpportunityEvm {
core_fields: OpportunityCoreFields {
id: val.id,
creation_time: val.creation_time.assume_utc().unix_timestamp_nanos() / 1000,
creation_time: val.last_creation_time.assume_utc(),
permission_key: PermissionKey::from(val.permission_key),
chain_id: val.chain_id,
sell_tokens,
Expand Down
4 changes: 2 additions & 2 deletions auction-server/src/opportunity/entities/opportunity_svm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl From<OpportunitySvm> for api::OpportunitySvm {
};
api::OpportunitySvm {
opportunity_id: val.id,
creation_time: val.creation_time,
creation_time: val.creation_time.unix_timestamp_nanos() / 1000,
slot: val.slot,
params: api::OpportunityParamsSvm::V1(api::OpportunityParamsV1Svm {
program,
Expand Down Expand Up @@ -226,7 +226,7 @@ impl TryFrom<repository::Opportunity<repository::OpportunityMetadataSvm>> for Op
Ok(OpportunitySvm {
core_fields: OpportunityCoreFields {
id: val.id,
creation_time: val.creation_time.assume_utc().unix_timestamp_nanos() / 1000,
creation_time: val.last_creation_time.assume_utc(),
permission_key: PermissionKey::from(val.permission_key),
chain_id: val.chain_id,
sell_tokens,
Expand Down
134 changes: 115 additions & 19 deletions auction-server/src/opportunity/repository/add_opportunity.rs
Original file line number Diff line number Diff line change
@@ -1,66 +1,162 @@
use {
super::{
models::OpportunityMetadata,
models::{
self,
OpportunityMetadata,
OpportunityRemovalReason,
},
InMemoryStore,
Repository,
},
crate::{
api::RestError,
opportunity::{
entities,
entities::Opportunity,
opportunity::entities::{
self,
Opportunity,
},
},
sqlx::Postgres,
sqlx::{
Postgres,
QueryBuilder,
},
time::{
OffsetDateTime,
PrimitiveDateTime,
},
uuid::Uuid,
};

impl<T: InMemoryStore> Repository<T> {
/// Add an opportunity to the system, in memory and in the database.
///
/// The provided opportunity will be added to the in-memory store. If the opportunity already exists
/// in the database, then it will be refreshed. Otherwise it will be added to the database.
pub async fn add_opportunity(
&self,
db: &sqlx::Pool<Postgres>,
opportunity: <T::Opportunity as entities::Opportunity>::OpportunityCreate,
) -> Result<T::Opportunity, RestError> {
let opportunity = self.get_or_add_db_opportunity(db, opportunity).await?;

self.in_memory_store
.opportunities
.write()
.await
.entry(opportunity.get_key())
.or_insert_with(Vec::new)
.push(opportunity.clone());

Ok(opportunity)
}

/// Gets the opportunity from the database if it exists, otherwise adds it.
///
/// If the opportunity already exists in the database and hasn't become invalid in simulation, then
/// it will be refreshed. Otherwise, the opportunity will be added to the database.
async fn get_or_add_db_opportunity(
&self,
db: &sqlx::Pool<Postgres>,
opportunity: <T::Opportunity as entities::Opportunity>::OpportunityCreate,
) -> Result<T::Opportunity, RestError> {
let chain_type = <T::Opportunity as entities::Opportunity>::ModelMetadata::get_chain_type();
let opportunity: T::Opportunity =
<T::Opportunity as entities::Opportunity>::new_with_current_time(opportunity);
let odt = OffsetDateTime::from_unix_timestamp_nanos(opportunity.creation_time * 1000)
.expect("creation_time is valid");
let metadata = opportunity.get_models_metadata();

let result = sqlx::query!("SELECT id FROM opportunity WHERE permission_key = $1 AND chain_id = $2 AND chain_type = $3 AND sell_tokens = $4 AND buy_tokens = $5 AND metadata #- '{slot}' = $6 AND (removal_reason IS NULL OR removal_reason = $7) LIMIT 1",
opportunity.permission_key.to_vec(),
opportunity.chain_id,
chain_type as _,
serde_json::to_value(&opportunity.sell_tokens).expect("Failed to serialize sell_tokens"),
serde_json::to_value(&opportunity.buy_tokens).expect("Failed to serialize buy_tokens"),
self.remove_slot_field(serde_json::to_value(metadata).expect("Failed to serialize metadata")),
OpportunityRemovalReason::Expired as _)
.fetch_optional(db)
.await
.map_err(|e| {
tracing::error!("DB: Failed to check duplicate opportunity: {}", e);
RestError::TemporarilyUnavailable
})?;

match result {
Some(record) => self.refresh_db_opportunity(db, record.id).await,
None => self.add_db_opportunity(db, opportunity).await,
}
}

fn remove_slot_field(&self, mut metadata: serde_json::Value) -> serde_json::Value {
if let Some(obj) = metadata.as_object_mut() {
obj.remove("slot");
}
metadata
}

/// Refresh an opportunity that already exists in the database.
///
/// This will update the last creation time of the opportunity and remove the removal reason and time.
async fn refresh_db_opportunity(
&self,
db: &sqlx::Pool<Postgres>,
id: Uuid,
) -> Result<T::Opportunity, RestError> {
let odt_creation = OffsetDateTime::now_utc();

let mut query = QueryBuilder::new("UPDATE opportunity SET removal_reason = NULL, removal_time = NULL, last_creation_time = ");
query.push_bind(PrimitiveDateTime::new(
odt_creation.date(),
odt_creation.time(),
));
query.push(" WHERE id = ");
query.push_bind(id);
query.push(" RETURNING *");
let opportunity: models::Opportunity<
<T::Opportunity as entities::Opportunity>::ModelMetadata,
> = query.build_query_as().fetch_one(db).await.map_err(|e| {
tracing::error!("DB: Failed to refresh opportunity {}: {}", id, e);
RestError::TemporarilyUnavailable
})?;

opportunity.clone().try_into().map_err(|_| {
tracing::error!(
"Failed to convert database opportunity to entity opportunity: {:?}",
opportunity
);
RestError::TemporarilyUnavailable
})
}

async fn add_db_opportunity(
&self,
db: &sqlx::Pool<Postgres>,
opportunity: <T as InMemoryStore>::Opportunity,
) -> Result<T::Opportunity, RestError> {
let chain_type = <T::Opportunity as entities::Opportunity>::ModelMetadata::get_chain_type();
let metadata = opportunity.get_models_metadata();
sqlx::query!("INSERT INTO opportunity (id,
creation_time,
last_creation_time,
permission_key,
chain_id,
chain_type,
metadata,
sell_tokens,
buy_tokens) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)",
buy_tokens) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
opportunity.id,
PrimitiveDateTime::new(odt.date(), odt.time()),
PrimitiveDateTime::new(opportunity.creation_time.date(), opportunity.creation_time.time()),
PrimitiveDateTime::new(opportunity.creation_time.date(), opportunity.creation_time.time()),
opportunity.permission_key.to_vec(),
opportunity.chain_id,
chain_type as _,
serde_json::to_value(metadata).expect("Failed to serialize metadata"),
serde_json::to_value(&opportunity.sell_tokens).unwrap(),
serde_json::to_value(&opportunity.buy_tokens).unwrap())
serde_json::to_value(&opportunity.sell_tokens).expect("Failed to serialize sell_tokens"),
serde_json::to_value(&opportunity.buy_tokens).expect("Failed to serialize buy_tokens"))
.execute(db)
.await
.map_err(|e| {
tracing::error!("DB: Failed to insert opportunity: {}", e);
RestError::TemporarilyUnavailable
})?;

self.in_memory_store
.opportunities
.write()
.await
.entry(opportunity.get_key())
.or_insert_with(Vec::new)
.push(opportunity.clone());

Ok(opportunity)
}
}
21 changes: 11 additions & 10 deletions auction-server/src/opportunity/repository/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,15 @@ impl OpportunityMetadata for OpportunityMetadataSvm {
#[derive(Clone, FromRow, Debug)]
#[allow(dead_code)]
pub struct Opportunity<T: OpportunityMetadata> {
pub id: Uuid,
pub creation_time: PrimitiveDateTime,
pub permission_key: Vec<u8>,
pub chain_id: String,
pub chain_type: ChainType,
pub removal_time: Option<PrimitiveDateTime>,
pub sell_tokens: JsonValue,
pub buy_tokens: JsonValue,
pub removal_reason: Option<OpportunityRemovalReason>,
pub metadata: Json<T>,
pub id: Uuid,
pub creation_time: PrimitiveDateTime,
pub last_creation_time: PrimitiveDateTime,
pub permission_key: Vec<u8>,
pub chain_id: String,
pub chain_type: ChainType,
pub removal_time: Option<PrimitiveDateTime>,
pub sell_tokens: JsonValue,
pub buy_tokens: JsonValue,
pub removal_reason: Option<OpportunityRemovalReason>,
pub metadata: Json<T>,
}
Loading

0 comments on commit cc612f4

Please sign in to comment.