Skip to content

Commit

Permalink
change static to rust-embed based solution (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramiroaisen authored Jun 21, 2023
2 parents dd9804b + a09684c commit 169c348
Show file tree
Hide file tree
Showing 16 changed files with 462 additions and 115 deletions.
249 changes: 220 additions & 29 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ members = [
"rs/packages/stream",
"rs/packages/source-alt",
"rs/packages/stream",
"rs/packages/assets",
"rs/packages/router",
"rs/packages/upload",
"rs/packages/api",
Expand Down
67 changes: 0 additions & 67 deletions mailer-static/config.toml

This file was deleted.

16 changes: 0 additions & 16 deletions mailer-static/pm2.config.cjs

This file was deleted.

7 changes: 7 additions & 0 deletions openstream.sample.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@
"addrs": [ "0.0.0.0:10900" ]
},

// the [static] server is the interface to the static files server
"static": {
// string(SocketAddr)[] (required)
// socket addrs to bind the assets server to
"addrs": [ "0.0.0.0:11000" ]
},

// payments server configuration
"payments": {

Expand Down
8 changes: 8 additions & 0 deletions openstream.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ addrs = [ "0.0.0.0:10700" ]
# socket addrs to bind the api server to
addrs = [ "0.0.0.0:10900" ]

# the [assets] server is the interface to the static files server
[static]

# string(SocketAddr)[] (required)
# socket addrs to bind the assets server to
addrs = [ "0.0.0.0:11000" ]


# payments server configuration
[payments]

Expand Down
1 change: 1 addition & 0 deletions rs/bin/openstream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ hyper = "0.14.26"
validate = { version = "0.1.0", path = "../../packages/validate" }
mailer = { version = "0.1.0", path = "../../packages/mailer" }
payments = { version = "0.1.0", path = "../../packages/payments" }
assets = { version = "0.1.0", path = "../../packages/assets" }
15 changes: 15 additions & 0 deletions rs/bin/openstream/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use serde_util::DateTime;
use shutdown::Shutdown;
// use source::SourceServer;
use stream::StreamServer;
use assets::StaticServer;
use tokio::runtime::Runtime;

use jemallocator::Jemalloc;
Expand Down Expand Up @@ -371,6 +372,7 @@ async fn start_async(Start { config }: Start) -> Result<(), anyhow::Error> {
ref source,
ref api,
ref storage,
ref assets,
ref smtp,
ref payments,
} = config.as_ref();
Expand Down Expand Up @@ -468,6 +470,19 @@ async fn start_async(Start { config }: Start) -> Result<(), anyhow::Error> {
}.boxed());
}

if let Some(static_config) = assets {
let assets = StaticServer::new(
static_config.addrs.clone(),
shutdown.clone(),
);
let fut = assets.start()?;
futs.push(async move {
fut.await.map_err(crate::error::ServerStartError::from)?;
Ok(())
}.boxed());
}


// if let Some(router_config) = router {
// let router = RouterServer::new(router_config.addrs.clone(), shutdown.clone());
// let fut = router.start()?;
Expand Down
23 changes: 23 additions & 0 deletions rs/packages/assets/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[package]
name = "assets"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.68"
base64-compat = "1.0.0"
futures-util = "0.3.28"
http = { version = "0.1.0", path = "../http" }
hyper = "0.14.26"
log = "0.4.19"
mime_guess = "2.0.4"
owo-colors = "3.5.0"
prex = { version = "0.1.0", path = "../prex" }
rust-embed = { version = "6.7.0", features = ["debug-embed"] }
rust-embed-for-web = "11.1.1"
shutdown = { version = "0.1.0", path = "../shutdown" }
socket2 = "0.5.3"
thiserror = "1.0.40"
tokio = { version = "1.28.2", features = ["full"] }
171 changes: 171 additions & 0 deletions rs/packages/assets/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
#![allow(clippy::useless_format)]

use async_trait::async_trait;
use futures_util::stream::{FuturesUnordered, TryStreamExt};
use hyper::header::{HeaderValue, CONTENT_TYPE, ETAG, IF_NONE_MATCH};
use hyper::{Body, Server, StatusCode};
use log::*;
use prex::{handler::Handler, Next, Request, Response};
use rust_embed::RustEmbed;
use shutdown::Shutdown;
use socket2::{Domain, Protocol, Socket, Type};
use std::future::Future;
use std::net::SocketAddr;

#[derive(RustEmbed)]
#[folder = "../../../static/static/"]
struct Assets;

#[derive(Debug, thiserror::Error)]
pub enum StaticServerError {
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("hyper error: {0}")]
Hyper(#[from] hyper::Error),
}

#[derive(Debug)]
pub struct StaticServer {
addrs: Vec<SocketAddr>,
shutdown: Shutdown,
}

#[derive(Debug)]
struct SourceServerInner {}

impl StaticServer {
pub fn new(addrs: Vec<SocketAddr>, shutdown: Shutdown) -> Self {
Self { addrs, shutdown }
}

pub fn start(
self,
) -> Result<impl Future<Output = Result<(), hyper::Error>> + 'static, StaticServerError> {
let mut app = prex::prex();

app.with(http::middleware::server);
app.get("/status", http::middleware::status);
app.get("/:path(.+)", StaticHandler::new());

let app = app.build().expect("prex app build source");

let futs = FuturesUnordered::new();

for addr in self.addrs.iter().cloned() {
let domain = match addr {
SocketAddr::V4(_) => Domain::IPV4,
SocketAddr::V6(_) => Domain::IPV6,
};

let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;

if addr.is_ipv6() {
socket.set_only_v6(true)?;
}

socket.set_nonblocking(true)?;
socket.set_reuse_address(true)?;
// socket.set_reuse_port(true)?;

match socket.bind(&addr.into()) {
Ok(()) => {}
Err(e) => {
error!("error binding to addr {} => {}", addr, e);
return Err(e.into());
}
};

socket.listen(1024)?;

let tcp = socket.into();

let server = Server::from_tcp(tcp)?
.http1_only(true)
.http1_title_case_headers(false)
.http1_preserve_header_case(false)
.http1_keepalive(false);

{
use owo_colors::*;
info!("static server bound to {}", addr.yellow());
}

futs.push({
let signal = self.shutdown.signal();
let server = server.serve(app.clone());

async move {
tokio::select! {
_ = signal => Ok(()),
r = server => r
}
}
});

// server
// .serve(app.clone())
// .with_graceful_shutdown(self.shutdown.signal()),
// )
}

Ok(async move {
futs.try_collect().await?;
drop(self);
Ok(())
})
}
}

impl Drop for StaticServer {
fn drop(&mut self) {
info!("static server dropped");
}
}

pub struct StaticHandler {}

impl StaticHandler {
fn new() -> Self {
Self {}
}

async fn handle(&self, req: Request) -> prex::Response {
let path = req.param("path").unwrap();

let entry = match Assets::get(path) {
None => return prex::Response::new(StatusCode::NOT_FOUND),
Some(entry) => entry,
};

let etag = format!(r#""{}""#, base64::encode(&entry.metadata.sha256_hash()));

if let Some(req_etag) = req.headers().get(IF_NONE_MATCH) {
if req_etag.as_bytes() == etag.as_bytes() {
return prex::Response::new(StatusCode::NOT_MODIFIED);
}
}

let mut res = Response::new(StatusCode::OK);

let content_type = match mime_guess::from_path(path).first() {
Some(mime) => HeaderValue::from_str(mime.as_ref()).unwrap(),
None => HeaderValue::from_static("application/octet-stream"),
};

let etag = HeaderValue::from_str(&etag).unwrap();

res.headers_mut().insert(CONTENT_TYPE, content_type);
res.headers_mut().insert(ETAG, etag);

*res.body_mut() = Body::from(entry.data);

res
}
}

#[async_trait]
impl Handler for StaticHandler {
async fn call(&self, req: Request, _: Next) -> prex::Response {
self.handle(req).await
}
}
16 changes: 15 additions & 1 deletion rs/packages/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,21 @@ pub struct Config {
// pub router: Option<Router>,
pub api: Option<Api>,
pub storage: Option<Storage>,

#[serde(rename = "static")]
pub assets: Option<Static>,

pub smtp: Smtp,
pub payments: Payments,
}

impl Config {
pub fn has_interfaces(&self) -> bool {
self.stream.is_some() || self.source.is_some() || self.api.is_some() || self.storage.is_some()
self.stream.is_some()
|| self.source.is_some()
|| self.api.is_some()
|| self.storage.is_some()
|| self.assets.is_some()
}
}

Expand All @@ -43,6 +51,12 @@ pub struct Stream {
pub addrs: Vec<SocketAddr>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct Static {
pub addrs: Vec<SocketAddr>,
}

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct Source {
Expand Down
Loading

0 comments on commit 169c348

Please sign in to comment.