Skip to content

Commit

Permalink
feat: ws stats (#229)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramiroaisen authored Dec 24, 2023
2 parents e8b35ce + fb14df6 commit 11e0289
Show file tree
Hide file tree
Showing 16 changed files with 859 additions and 31 deletions.
178 changes: 147 additions & 31 deletions Cargo.lock

Large diffs are not rendered by default.

17 changes: 17 additions & 0 deletions defs/db/WsStatsConnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { CountryCode } from "../CountryCode";
import type { DateTime } from "../DateTime";

export type WsStatsConnection = {
_id: string;
st: string;
dp: string;
du: number | null;
op: boolean;
cc: CountryCode | null;
ip: string;
ap: string | null;
av: number | null;
ca: DateTime;
cl: DateTime | null;
};
3 changes: 3 additions & 0 deletions defs/ws-stats/api/ws/stats/connection/WS/ClientEvent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.

export type ClientEvent = { kind: "ping" } | { kind: "pong" };
8 changes: 8 additions & 0 deletions defs/ws-stats/api/ws/stats/connection/WS/Query.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.

export type Query = {
connection_id?: string;
station_id: string;
app_kind?: string;
app_version?: number;
};
6 changes: 6 additions & 0 deletions defs/ws-stats/api/ws/stats/connection/WS/ServerEvent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.

export type ServerEvent =
| { kind: "ping" }
| { kind: "pong" }
| ({ kind: "start" } & { connection_id: string });
15 changes: 15 additions & 0 deletions rs/bin/openstream/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::process::ExitStatus;
use std::sync::Arc;

use api::storage::StorageServer;
use api::ws_stats::WsStatsServer;
use clap::{Parser, Subcommand};
use config::Config;
use db::access_token::{AccessToken, GeneratedBy};
Expand Down Expand Up @@ -438,6 +439,7 @@ async fn start_async(Start { config }: Start) -> Result<(), anyhow::Error> {
ref assets,
ref smtp,
ref payments,
ref ws_stats,
} = config.as_ref();

db::access_token::AccessToken::start_autoremove_job();
Expand Down Expand Up @@ -535,6 +537,19 @@ async fn start_async(Start { config }: Start) -> Result<(), anyhow::Error> {
}.boxed());
}

if let Some(ws_stats_config) = ws_stats {
let ws_stats = WsStatsServer::new(
deployment.id.clone(),
ws_stats_config.addrs.clone(),
shutdown.clone(),
);
let fut = ws_stats.start()?;
futs.push(async move {
fut.await.map_err(crate::error::ServerStartError::from)?;
Ok(())
}.boxed());
}

if let Some(static_config) = assets {
let assets = StaticServer::new(
static_config.addrs.clone(),
Expand Down
1 change: 1 addition & 0 deletions rs/packages/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod qs;
pub mod request_ext;
pub mod routes;
pub mod storage;
pub mod ws_stats;

use payments::client::PaymentsClient;

Expand Down
110 changes: 110 additions & 0 deletions rs/packages/api/src/ws_stats/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
pub mod routes;

use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
use hyper::Server;
use log::*;
use serde::{Deserialize, Serialize};
use shutdown::Shutdown;
use socket2::{Domain, Protocol, Socket, Type};
use std::future::Future;
use std::net::SocketAddr;

#[derive(Debug)]
pub struct WsStatsServer {
deployment_id: String,
addrs: Vec<SocketAddr>,
shutdown: Shutdown,
}

#[derive(Debug, thiserror::Error)]
pub enum WsStatsServerError {
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("hyper error: {0}")]
Hyper(#[from] hyper::Error),
}

#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub struct Status {
status: usize,
}

impl WsStatsServer {
pub fn new(deployment_id: String, addrs: Vec<SocketAddr>, shutdown: Shutdown) -> Self {
Self {
deployment_id,
addrs,
shutdown,
}
}

pub fn start(
self,
) -> Result<impl Future<Output = Result<(), hyper::Error>> + 'static, WsStatsServerError> {
let mut app = prex::prex();

app.with(http::middleware::server);
app.get("/status", http::middleware::status);

app.at("/").nest(routes::router(
self.deployment_id.clone(),
self.shutdown.clone(),
));

let app = app.build().expect("ws stats server prex build");

let futs = FuturesUnordered::new();

for addr in self.addrs.iter().copied() {
let domain = match addr {
SocketAddr::V4(_) => Domain::IPV4,
SocketAddr::V6(_) => Domain::IPV6,
};

let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;

if addr.is_ipv6() {
socket.set_only_v6(true)?;
}

socket.set_nonblocking(true)?;
socket.set_reuse_address(true)?;
// socket.set_reuse_port(true)?;

socket.bind(&addr.into())?;
socket.listen(1024)?;

let tcp = socket.into();

let server = Server::from_tcp(tcp)?
.http1_only(true)
.http1_title_case_headers(false)
.http1_preserve_header_case(false)
.http1_keepalive(false);

{
use owo_colors::*;
info!(target: "ws-stats", "ws-stats server bound to {}", addr.yellow());
}

let fut = server
.serve(app.clone())
.with_graceful_shutdown(self.shutdown.signal());

futs.push(fut);
}

Ok(async move {
futs.try_collect().await?;
drop(self);
Ok(())
})
}
}

impl Drop for WsStatsServer {
fn drop(&mut self) {
info!(target: "ws-stats", "ws-stats server dropped");
}
}
Loading

0 comments on commit 11e0289

Please sign in to comment.