Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket support #18

Merged
merged 6 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion auction-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion auction-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "auction-server"
version = "0.1.0"
version = "0.1.1"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand All @@ -25,3 +25,4 @@ utoipa-swagger-ui = { version = "3.1.4", features = ["axum"] }
serde_yaml = "0.9.25"
ethers = "2.0.10"
axum-macros = "0.4.0"
dashmap = { version = "5.4.0" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump the auction-server version as well?

139 changes: 30 additions & 109 deletions auction-server/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,29 @@ use {
},
liquidation::{
OpportunityBid,
OpportunityParamsWithId,
OpportunityParamsWithMetadata,
},
ws::{
ClientMessage,
ClientRequest,
ServerResultMessage,
ServerResultResponse,
ServerUpdateResponse,
},
},
auction::run_submission_loop,
config::{
ChainId,
Config,
RunOptions,
config::RunOptions,
server::{
EXIT_CHECK_INTERVAL,
SHOULD_EXIT,
},
liquidation_adapter::run_verification_loop,
state::{
ChainStore,
LiquidationStore,
OpportunityParams,
OpportunityParamsV1,
Store,
TokenQty,
},
},
anyhow::{
anyhow,
Result,
},
anyhow::Result,
axum::{
http::StatusCode,
response::{
Expand All @@ -44,30 +44,11 @@ use {
Router,
},
clap::crate_version,
ethers::{
providers::{
Http,
Middleware,
Provider,
},
signers::{
LocalWallet,
Signer,
},
types::Bytes,
},
futures::future::join_all,
ethers::types::Bytes,
serde::Serialize,
std::{
collections::HashMap,
sync::{
atomic::{
AtomicBool,
Ordering,
},
Arc,
},
time::Duration,
std::sync::{
atomic::Ordering,
Arc,
},
tower_http::cors::CorsLayer,
utoipa::{
Expand All @@ -78,20 +59,13 @@ use {
utoipa_swagger_ui::SwaggerUi,
};

// A static exit flag to indicate to running threads that we're shutting down. This is used to
// gracefully shutdown the application.
//
// NOTE: A more idiomatic approach would be to use a tokio::sync::broadcast channel, and to send a
// shutdown signal to all running tasks. However, this is a bit more complicated to implement and
// we don't rely on global state for anything else.
pub(crate) static SHOULD_EXIT: AtomicBool = AtomicBool::new(false);

async fn root() -> String {
format!("PER Auction Server API {}", crate_version!())
}

mod bid;
pub(crate) mod liquidation;
pub(crate) mod ws;

pub enum RestError {
/// The request contained invalid parameters
Expand All @@ -104,8 +78,7 @@ pub enum RestError {
SimulationError { result: Bytes, reason: String },
/// The order was not found
OpportunityNotFound,
/// The server cannot currently communicate with the blockchain, so is not able to verify
/// which random values have been requested.
/// Internal error occurred during processing the request
TemporarilyUnavailable,
/// A catch-all error for all other types of errors that could occur during processing.
Unknown,
Expand Down Expand Up @@ -156,14 +129,8 @@ pub async fn live() -> Response {
(StatusCode::OK, "OK").into_response()
}

pub async fn start_server(run_options: RunOptions) -> Result<()> {
tokio::spawn(async move {
tracing::info!("Registered shutdown signal handler...");
tokio::signal::ctrl_c().await.unwrap();
tracing::info!("Shut down signal received, waiting for tasks...");
SHOULD_EXIT.store(true, Ordering::Release);
});

pub async fn start_api(run_options: RunOptions, store: Arc<Store>) -> Result<()> {
#[derive(OpenApi)]
#[openapi(
paths(
Expand All @@ -177,12 +144,17 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> {
schemas(OpportunityParamsV1),
schemas(OpportunityBid),
schemas(OpportunityParams),
schemas(OpportunityParamsWithId),
schemas(OpportunityParamsWithMetadata),
schemas(TokenQty),
schemas(BidResult),
schemas(ErrorBodyResponse),
schemas(ClientRequest),
schemas(ClientMessage),
schemas(ServerResultMessage),
schemas(ServerUpdateResponse),
schemas(ServerResultResponse),
responses(ErrorBodyResponse),
responses(OpportunityParamsWithId),
responses(OpportunityParamsWithMetadata),
responses(BidResult)
),
tags(
Expand All @@ -191,57 +163,6 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> {
)]
struct ApiDoc;

let config = Config::load(&run_options.config.config).map_err(|err| {
anyhow!(
"Failed to load config from file({path}): {:?}",
err,
path = run_options.config.config
)
})?;

let wallet = run_options.per_private_key.parse::<LocalWallet>()?;
tracing::info!("Using wallet address: {}", wallet.address().to_string());

let chain_store: Result<HashMap<ChainId, ChainStore>> = join_all(config.chains.iter().map(
|(chain_id, chain_config)| async move {
let mut provider = Provider::<Http>::try_from(chain_config.geth_rpc_addr.clone())
.map_err(|err| {
anyhow!(
"Failed to connect to chain({chain_id}) at {rpc_addr}: {:?}",
err,
chain_id = chain_id,
rpc_addr = chain_config.geth_rpc_addr
)
})?;
provider.set_interval(Duration::from_secs(chain_config.poll_interval));
let id = provider.get_chainid().await?.as_u64();
Ok((
chain_id.clone(),
ChainStore {
provider,
network_id: id,
bids: Default::default(),
token_spoof_info: Default::default(),
config: chain_config.clone(),
},
))
},
))
.await
.into_iter()
.collect();

let store = Arc::new(Store {
chains: chain_store?,
liquidation_store: LiquidationStore::default(),
per_operator: wallet,
});

let server_store = store.clone();

tokio::spawn(run_submission_loop(store.clone()));
tokio::spawn(run_verification_loop(store.clone()));

let app: Router<()> = Router::new()
.merge(SwaggerUi::new("/docs").url("/docs/openapi.json", ApiDoc::openapi()))
.route("/", get(root))
Expand All @@ -258,19 +179,19 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> {
"/v1/liquidation/opportunities/:opportunity_id/bids",
post(liquidation::post_bid),
)
.route("/v1/ws", get(ws::ws_route_handler))
.route("/live", get(live))
.layer(CorsLayer::permissive())
.with_state(server_store);
.with_state(store);

axum::Server::bind(&run_options.server.listen_addr)
.serve(app.into_make_service())
.with_graceful_shutdown(async {
while !SHOULD_EXIT.load(Ordering::Acquire) {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
tokio::time::sleep(EXIT_CHECK_INTERVAL).await;
}
tracing::info!("Shutting down RPC server...");
})
.await?;

Ok(())
}
55 changes: 37 additions & 18 deletions auction-server/src/api/liquidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
handle_bid,
BidResult,
},
ws::UpdateEvent::NewOpportunity,
ErrorBodyResponse,
RestError,
},
Expand Down Expand Up @@ -57,22 +58,33 @@ use {
uuid::Uuid,
};


/// Similar to OpportunityParams, but with the opportunity id included.
#[derive(Serialize, Deserialize, ToSchema, Clone, ToResponse)]
pub struct OpportunityParamsWithId {
pub struct OpportunityParamsWithMetadata {
/// The opportunity unique id
#[schema(example = "f47ac10b-58cc-4372-a567-0e02b2c3d479", value_type=String)]
opportunity_id: Uuid,
/// Creation time of the opportunity
#[schema(example = "1700000000")]
creation_time: UnixTimestamp,
/// opportunity data
#[serde(flatten)]
params: OpportunityParams,
}

impl Into<OpportunityParamsWithId> for LiquidationOpportunity {
fn into(self) -> OpportunityParamsWithId {
OpportunityParamsWithId {
impl OpportunityParamsWithMetadata {
pub fn get_chain_id(&self) -> &ChainId {
match &self.params {
OpportunityParams::V1(params) => &params.chain_id,
}
}
}

impl Into<OpportunityParamsWithMetadata> for LiquidationOpportunity {
fn into(self) -> OpportunityParamsWithMetadata {
OpportunityParamsWithMetadata {
opportunity_id: self.id,
creation_time: self.creation_time,
params: self.params,
}
}
Expand All @@ -90,7 +102,7 @@ impl Into<OpportunityParamsWithId> for LiquidationOpportunity {
pub async fn post_opportunity(
State(store): State<Arc<Store>>,
Json(versioned_params): Json<OpportunityParams>,
) -> Result<Json<OpportunityParamsWithId>, RestError> {
) -> Result<Json<OpportunityParamsWithMetadata>, RestError> {
let params = match versioned_params.clone() {
OpportunityParams::V1(params) => params,
};
Expand Down Expand Up @@ -119,30 +131,37 @@ pub async fn post_opportunity(

if let Some(opportunities_existing) = write_lock.get_mut(&params.permission_key) {
// check if same opportunity exists in the vector
for opportunity_existing in opportunities_existing.clone() {
if opportunity_existing == opportunity {
for opportunity_existing in opportunities_existing.iter() {
if opportunity_existing == &opportunity {
return Err(RestError::BadParameters(
"Duplicate opportunity submission".to_string(),
));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clone in iteration above seems a bit off. I don't know whether it's unnecessary but if you use a map then try_insert can give you this shorter. I see some searchs with the order id.s too.

}

opportunities_existing.push(opportunity);
opportunities_existing.push(opportunity.clone());
} else {
write_lock.insert(params.permission_key.clone(), vec![opportunity]);
write_lock.insert(params.permission_key.clone(), vec![opportunity.clone()]);
}

store
.ws
.broadcast_sender
.send(NewOpportunity(opportunity.clone().into()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of cloning multiple times, maybe you could push opportunity into opportunities_existing, and then call

store.ws.update_tx.send(NewOpportunity(opportunities_existing.last().into()))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You also don't need to call opportunity.into() below, you could instead just create a new OpportunityParamsWithMetadata object and fill in.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first comment, we technically need opportunity in both places and both structs need to own them in the end so we need to clone.

I didn't understand the second comment, we implement the Into trait so we don't have to do this here.

.map_err(|e| {
tracing::error!("Failed to send update: {}", e);
RestError::TemporarilyUnavailable
})?;

tracing::debug!("number of permission keys: {}", write_lock.len());
tracing::debug!(
"number of opportunities for key: {}",
write_lock[&params.permission_key].len()
);

Ok(OpportunityParamsWithId {
opportunity_id: id,
params: versioned_params,
}
.into())
let opportunity_with_metadata: OpportunityParamsWithMetadata = opportunity.into();

Ok(opportunity_with_metadata.into())
}


Expand All @@ -162,8 +181,8 @@ params(ChainIdQueryParams))]
pub async fn get_opportunities(
State(store): State<Arc<Store>>,
query_params: Query<ChainIdQueryParams>,
) -> Result<axum::Json<Vec<OpportunityParamsWithId>>, RestError> {
let opportunities: Vec<OpportunityParamsWithId> = store
) -> Result<axum::Json<Vec<OpportunityParamsWithMetadata>>, RestError> {
let opportunities: Vec<OpportunityParamsWithMetadata> = store
.liquidation_store
.opportunities
.read()
Expand All @@ -177,7 +196,7 @@ pub async fn get_opportunities(
.clone()
.into()
})
.filter(|params_with_id: &OpportunityParamsWithId| {
.filter(|params_with_id: &OpportunityParamsWithMetadata| {
let params = match &params_with_id.params {
OpportunityParams::V1(params) => params,
};
Expand Down
Loading
Loading