Skip to content

Commit

Permalink
First version of websocket support
Browse files Browse the repository at this point in the history
  • Loading branch information
m30m committed Feb 15, 2024
1 parent 8b7abb8 commit 6181113
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 21 deletions.
14 changes: 14 additions & 0 deletions auction-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions auction-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ utoipa-swagger-ui = { version = "3.1.4", features = ["axum"] }
serde_yaml = "0.9.25"
ethers = "2.0.10"
axum-macros = "0.4.0"
dashmap = { version = "5.4.0" }
14 changes: 11 additions & 3 deletions auction-server/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
},
liquidation::{
OpportunityBid,
OpportunityParamsWithId,
OpportunityParamsWithMetadata,
},
},
auction::run_submission_loop,
Expand Down Expand Up @@ -44,6 +44,7 @@ use {
Router,
},
clap::crate_version,
dashmap::DashMap,
ethers::{
providers::{
Http,
Expand All @@ -63,6 +64,7 @@ use {
sync::{
atomic::{
AtomicBool,
AtomicUsize,
Ordering,
},
Arc,
Expand Down Expand Up @@ -92,6 +94,7 @@ async fn root() -> String {

mod bid;
pub(crate) mod liquidation;
pub(crate) mod ws;

pub enum RestError {
/// The request contained invalid parameters
Expand Down Expand Up @@ -177,12 +180,12 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> {
schemas(OpportunityParamsV1),
schemas(OpportunityBid),
schemas(OpportunityParams),
schemas(OpportunityParamsWithId),
schemas(OpportunityParamsWithMetadata),
schemas(TokenQty),
schemas(BidResult),
schemas(ErrorBodyResponse),
responses(ErrorBodyResponse),
responses(OpportunityParamsWithId),
responses(OpportunityParamsWithMetadata),
responses(BidResult)
),
tags(
Expand Down Expand Up @@ -235,6 +238,10 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> {
chains: chain_store?,
liquidation_store: LiquidationStore::default(),
per_operator: wallet,
ws: ws::WsState {
subscriber_counter: AtomicUsize::new(0),
subscribers: DashMap::new(),
},
});

let server_store = store.clone();
Expand All @@ -258,6 +265,7 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> {
"/v1/liquidation/opportunities/:opportunity_id/bids",
post(liquidation::post_bid),
)
.route("/v1/ws", get(ws::ws_route_handler))
.route("/live", get(live))
.layer(CorsLayer::permissive())
.with_state(server_store);
Expand Down
46 changes: 31 additions & 15 deletions auction-server/src/api/liquidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ use {
handle_bid,
BidResult,
},
ws::{
notify_updates,
UpdateEvent::NewOpportunity,
},
ErrorBodyResponse,
RestError,
},
Expand Down Expand Up @@ -60,19 +64,31 @@ use {

/// Similar to OpportunityParams, but with the opportunity id included.
#[derive(Serialize, Deserialize, ToSchema, Clone, ToResponse)]
pub struct OpportunityParamsWithId {
pub struct OpportunityParamsWithMetadata {
/// The opportunity unique id
#[schema(example = "f47ac10b-58cc-4372-a567-0e02b2c3d479", value_type=String)]
opportunity_id: Uuid,
/// Creation time of the opportunity
#[schema(example = "1700000000")]
creation_time: UnixTimestamp,
/// opportunity data
#[serde(flatten)]
params: OpportunityParams,
}

impl Into<OpportunityParamsWithId> for LiquidationOpportunity {
fn into(self) -> OpportunityParamsWithId {
OpportunityParamsWithId {
impl OpportunityParamsWithMetadata {
pub fn get_chain_id(&self) -> &ChainId {
match &self.params {
OpportunityParams::V1(params) => &params.chain_id,
}
}
}

impl Into<OpportunityParamsWithMetadata> for LiquidationOpportunity {
fn into(self) -> OpportunityParamsWithMetadata {
OpportunityParamsWithMetadata {
opportunity_id: self.id,
creation_time: self.creation_time,
params: self.params,
}
}
Expand All @@ -90,7 +106,7 @@ impl Into<OpportunityParamsWithId> for LiquidationOpportunity {
pub async fn post_opportunity(
State(store): State<Arc<Store>>,
Json(versioned_params): Json<OpportunityParams>,
) -> Result<Json<OpportunityParamsWithId>, RestError> {
) -> Result<Json<OpportunityParamsWithMetadata>, RestError> {
let params = match versioned_params.clone() {
OpportunityParams::V1(params) => params,
};
Expand Down Expand Up @@ -127,22 +143,22 @@ pub async fn post_opportunity(
}
}

opportunities_existing.push(opportunity);
opportunities_existing.push(opportunity.clone());
} else {
write_lock.insert(params.permission_key.clone(), vec![opportunity]);
write_lock.insert(params.permission_key.clone(), vec![opportunity.clone()]);
}

notify_updates(&store.ws, NewOpportunity(opportunity.clone().into())).await;

tracing::debug!("number of permission keys: {}", write_lock.len());
tracing::debug!(
"number of opportunities for key: {}",
write_lock[&params.permission_key].len()
);

Ok(OpportunityParamsWithId {
opportunity_id: id,
params: versioned_params,
}
.into())
let opportunity_with_metadata: OpportunityParamsWithMetadata = opportunity.into();

Ok(opportunity_with_metadata.into())
}


Expand All @@ -162,8 +178,8 @@ params(ChainIdQueryParams))]
pub async fn get_opportunities(
State(store): State<Arc<Store>>,
query_params: Query<ChainIdQueryParams>,
) -> Result<axum::Json<Vec<OpportunityParamsWithId>>, RestError> {
let opportunities: Vec<OpportunityParamsWithId> = store
) -> Result<axum::Json<Vec<OpportunityParamsWithMetadata>>, RestError> {
let opportunities: Vec<OpportunityParamsWithMetadata> = store
.liquidation_store
.opportunities
.read()
Expand All @@ -177,7 +193,7 @@ pub async fn get_opportunities(
.clone()
.into()
})
.filter(|params_with_id: &OpportunityParamsWithId| {
.filter(|params_with_id: &OpportunityParamsWithMetadata| {
let params = match &params_with_id.params {
OpportunityParams::V1(params) => params,
};
Expand Down
Loading

0 comments on commit 6181113

Please sign in to comment.