Skip to content

Commit

Permalink
Decouple opportunity updates from api and organize long running proce…
Browse files Browse the repository at this point in the history
…sses
  • Loading branch information
m30m committed Feb 15, 2024
1 parent 6181113 commit fc6b871
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 141 deletions.
105 changes: 60 additions & 45 deletions auction-server/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use {
OpportunityBid,
OpportunityParamsWithMetadata,
},
ws::run_subscription_loop,
},
auction::run_submission_loop,
config::{
Expand Down Expand Up @@ -87,6 +88,7 @@ use {
// shutdown signal to all running tasks. However, this is a bit more complicated to implement and
// we don't rely on global state for anything else.
pub(crate) static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);
pub const EXIT_CHECK_INTERVAL: Duration = Duration::from_secs(1);

async fn root() -> String {
format!("PER Auction Server API {}", crate_version!())
Expand All @@ -107,8 +109,7 @@ pub enum RestError {
SimulationError { result: Bytes, reason: String },
/// The order was not found
OpportunityNotFound,
/// The server cannot currently communicate with the blockchain, so is not able to verify
/// which random values have been requested.
/// Internal error occurred during processing the request
TemporarilyUnavailable,
/// A catch-all error for all other types of errors that could occur during processing.
Unknown,
Expand Down Expand Up @@ -159,14 +160,8 @@ pub async fn live() -> Response {
(StatusCode::OK, "OK").into_response()
}

pub async fn start_server(run_options: RunOptions) -> Result<()> {
tokio::spawn(async move {
tracing::info!("Registered shutdown signal handler...");
tokio::signal::ctrl_c().await.unwrap();
tracing::info!("Shut down signal received, waiting for tasks...");
SHOULD_EXIT.store(true, Ordering::Release);
});

pub async fn start_api(run_options: RunOptions, store: Arc<Store>) -> Result<()> {
#[derive(OpenApi)]
#[openapi(
paths(
Expand Down Expand Up @@ -194,6 +189,48 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> {
)]
struct ApiDoc;

let app: Router<()> = Router::new()
.merge(SwaggerUi::new("/docs").url("/docs/openapi.json", ApiDoc::openapi()))
.route("/", get(root))
.route("/v1/bids", post(bid::bid))
.route(
"/v1/liquidation/opportunities",
post(liquidation::post_opportunity),
)
.route(
"/v1/liquidation/opportunities",
get(liquidation::get_opportunities),
)
.route(
"/v1/liquidation/opportunities/:opportunity_id/bids",
post(liquidation::post_bid),
)
.route("/v1/ws", get(ws::ws_route_handler))
.route("/live", get(live))
.layer(CorsLayer::permissive())
.with_state(store);

axum::Server::bind(&run_options.server.listen_addr)
.serve(app.into_make_service())
.with_graceful_shutdown(async {
while !SHOULD_EXIT.load(Ordering::Acquire) {
tokio::time::sleep(EXIT_CHECK_INTERVAL).await;
}
tracing::info!("Shutting down RPC server...");
})
.await?;
Ok(())
}

pub async fn start_server(run_options: RunOptions) -> Result<()> {
tokio::spawn(async move {
tracing::info!("Registered shutdown signal handler...");
tokio::signal::ctrl_c().await.unwrap();
tracing::info!("Shut down signal received, waiting for tasks...");
SHOULD_EXIT.store(true, Ordering::Release);
});


let config = Config::load(&run_options.config.config).map_err(|err| {
anyhow!(
"Failed to load config from file({path}): {:?}",
Expand Down Expand Up @@ -234,51 +271,29 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> {
.into_iter()
.collect();

let (update_tx, update_rx) = tokio::sync::mpsc::channel(1000);
let store = Arc::new(Store {
chains: chain_store?,
liquidation_store: LiquidationStore::default(),
per_operator: wallet,
ws: ws::WsState {
subscriber_counter: AtomicUsize::new(0),
subscribers: DashMap::new(),
subscribers: DashMap::new(),
update_tx,
},
});

let server_store = store.clone();

tokio::spawn(run_submission_loop(store.clone()));
tokio::spawn(run_verification_loop(store.clone()));

let app: Router<()> = Router::new()
.merge(SwaggerUi::new("/docs").url("/docs/openapi.json", ApiDoc::openapi()))
.route("/", get(root))
.route("/v1/bids", post(bid::bid))
.route(
"/v1/liquidation/opportunities",
post(liquidation::post_opportunity),
)
.route(
"/v1/liquidation/opportunities",
get(liquidation::get_opportunities),
)
.route(
"/v1/liquidation/opportunities/:opportunity_id/bids",
post(liquidation::post_bid),
)
.route("/v1/ws", get(ws::ws_route_handler))
.route("/live", get(live))
.layer(CorsLayer::permissive())
.with_state(server_store);

axum::Server::bind(&run_options.server.listen_addr)
.serve(app.into_make_service())
.with_graceful_shutdown(async {
while !SHOULD_EXIT.load(Ordering::Acquire) {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
tracing::info!("Shutting down RPC server...");
})
.await?;

let submission_loop = tokio::spawn(run_submission_loop(store.clone()));
let verification_loop = tokio::spawn(run_verification_loop(store.clone()));
let subscription_loop = tokio::spawn(run_subscription_loop(store.clone(), update_rx));
let server_loop = tokio::spawn(start_api(run_options, store.clone()));
join_all(vec![
submission_loop,
verification_loop,
subscription_loop,
server_loop,
])
.await;
Ok(())
}
16 changes: 10 additions & 6 deletions auction-server/src/api/liquidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ use {
handle_bid,
BidResult,
},
ws::{
notify_updates,
UpdateEvent::NewOpportunity,
},
ws::UpdateEvent::NewOpportunity,
ErrorBodyResponse,
RestError,
},
Expand Down Expand Up @@ -61,7 +58,6 @@ use {
uuid::Uuid,
};


/// Similar to OpportunityParams, but with the opportunity id included.
#[derive(Serialize, Deserialize, ToSchema, Clone, ToResponse)]
pub struct OpportunityParamsWithMetadata {
Expand Down Expand Up @@ -148,7 +144,15 @@ pub async fn post_opportunity(
write_lock.insert(params.permission_key.clone(), vec![opportunity.clone()]);
}

notify_updates(&store.ws, NewOpportunity(opportunity.clone().into())).await;
store
.ws
.update_tx
.send(NewOpportunity(opportunity.clone().into()))
.await
.map_err(|e| {
tracing::error!("Failed to send update: {}", e);
RestError::TemporarilyUnavailable
})?;

tracing::debug!("number of permission keys: {}", write_lock.len());
tracing::debug!(
Expand Down
41 changes: 35 additions & 6 deletions auction-server/src/api/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ use {
crate::{
api::{
liquidation::OpportunityParamsWithMetadata,
EXIT_CHECK_INTERVAL,
SHOULD_EXIT,
},
config::ChainId,
state::{
LiquidationOpportunity,
Store,
},
state::Store,
},
anyhow::{
anyhow,
Expand All @@ -23,11 +21,9 @@ use {
State,
WebSocketUpgrade,
},
http::HeaderMap,
response::IntoResponse,
},
dashmap::DashMap,
ethers::types::Chain,
futures::{
future::join_all,
stream::{
Expand Down Expand Up @@ -58,6 +54,7 @@ use {
pub struct WsState {
pub subscriber_counter: AtomicUsize,
pub subscribers: DashMap<SubscriberId, mpsc::Sender<UpdateEvent>>,
pub update_tx: mpsc::Sender<UpdateEvent>,
}

#[derive(Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -329,3 +326,35 @@ pub async fn notify_updates(ws_state: &WsState, event: UpdateEvent) {
}
});
}


pub async fn run_subscription_loop(
store: Arc<Store>,
mut receiver: mpsc::Receiver<UpdateEvent>,
) -> Result<()> {
let mut interval = tokio::time::interval(EXIT_CHECK_INTERVAL);

while !SHOULD_EXIT.load(Ordering::Acquire) {
tokio::select! {
update = receiver.recv() => {
match update {
None => {
// When the received message is None it means the channel has been closed. This
// should never happen as the channel is never closed. As we can't recover from
// this we shut down the application.
tracing::error!("Failed to receive update from store.");
SHOULD_EXIT.store(true, Ordering::Release);
break;
}
Some(event) => {
notify_updates(&store.ws, event).await;
},
}
},
_ = interval.tick() => {}
}
}

tracing::info!("Shutting down Websocket notifier...");
Ok(())
}
Loading

0 comments on commit fc6b871

Please sign in to comment.