diff --git a/defs/constants.ts b/defs/constants.ts index 4e2fde11..428c863c 100644 --- a/defs/constants.ts +++ b/defs/constants.ts @@ -55,6 +55,9 @@ export const HEADER_RELAY_SOURCE_DEPLOYMENT = "x-source-deployment"; /** Internal relay rejection code header */ export const INTERNAL_RELAY_REJECTION_CODE_HEADER = "x-openstream-rejection-code"; +/** header name to get the result of the stream is-hls-redirect query */ +export const IS_HLS_REDIRECT_HEADER = "x-is-hls-redirect"; + /** timeout to wait to obtain a lock on a media session items * if not released in this timeout, probably the item is poisoned * and the process is aborted with a panic (and restarted by the process manager) */ diff --git a/rs/config/constants/src/lib.rs b/rs/config/constants/src/lib.rs index 6adeebae..638b4453 100644 --- a/rs/config/constants/src/lib.rs +++ b/rs/config/constants/src/lib.rs @@ -170,6 +170,10 @@ pub const MEDIA_LOCK_TIMEOUT_SECS: u64 = 30; #[const_register] pub const STREAM_CONNECTION_MAX_DURATION_SECS: u64 = 60 * 60 * 6; // 6 hours +/// header name to get the result of the stream is-hls-redirect query +#[const_register] +pub const IS_HLS_REDIRECT_HEADER: &str = "x-is-hls-redirect"; + /// validation constants pub mod validate { use super::*; diff --git a/rs/packages/stream/src/lib.rs b/rs/packages/stream/src/lib.rs index b729cac1..ae4c1d29 100644 --- a/rs/packages/stream/src/lib.rs +++ b/rs/packages/stream/src/lib.rs @@ -52,6 +52,10 @@ const CACHE_CONTROL_NO_CACHE: HeaderValue = HeaderValue::from_static("no-cache") #[allow(clippy::declare_interior_mutable_const)] const TEXT_PLAIN_UTF8: HeaderValue = HeaderValue::from_static("text/plain;charset=utf-8"); +#[allow(clippy::declare_interior_mutable_const)] +const APPLICATION_JSON_UTF8: HeaderValue = HeaderValue::from_static("application/json;charset=utf-8"); + + #[derive(Debug)] pub struct StreamServer { deployment_id: String, @@ -137,6 +141,8 @@ impl StreamServer { RelayHandler::new(self.deployment_id.clone(), self.media_sessions.clone()), ); + app.get("/stream/:id/is-hls-redirect", IsHlsRedirectHandler {}); + let app = app.build().expect("prex app build stream"); let futs = FuturesUnordered::new(); @@ -1069,4 +1075,79 @@ fn find_first_frame_index(data: &[u8]) -> Option { }) } } +} + +pub struct IsHlsRedirectHandler {} + +#[derive(Debug, thiserror::Error)] +pub enum IsHlsRedirectError { + #[error("db: {0}")] + Db(#[from] mongodb::error::Error), + + #[error("station with id {0} not found")] + StationNotFound(String), +} + +impl From for Response { + fn from(e: IsHlsRedirectError) -> Response { + let (status, message) = match e { + IsHlsRedirectError::Db(_) => ( + StatusCode::INTERNAL_SERVER_ERROR, + "internal server error (db)".into(), + ), + + IsHlsRedirectError::StationNotFound(id) => ( + StatusCode::NOT_FOUND, + format!("station with id {id} not found"), + ), + }; + + let mut res = Response::new(status); + res.headers_mut().append(CONTENT_TYPE, TEXT_PLAIN_UTF8); + *res.body_mut() = Body::from(message); + res + } +} + +impl IsHlsRedirectHandler { + async fn handle(&self, req: Request) -> Result { + tokio::spawn(async move { + let station_id = req.param("id").unwrap(); + + let station = match Station::get_by_id(station_id).await? { + Some(station) => station, + None => return Err(IsHlsRedirectError::StationNotFound(station_id.to_string())), + }; + + let is_hls_redirect = { + if !station.external_relay_redirect { + false + } else { + match station.external_relay_url { + None => false, + Some(u) => match url::Url::parse(&u) { + Err(_) => false, + Ok(url) => { + let path = url.path().to_lowercase(); + path.ends_with(".m3u8") || path.ends_with(".m3u") + }, + } + } + } + }; + + let mut res = Response::new(StatusCode::OK); + res.headers_mut().insert(HeaderName::from_static(constants::IS_HLS_REDIRECT_HEADER), HeaderValue::from_str(&is_hls_redirect.to_string()).unwrap()); + res.headers_mut().insert(CONTENT_TYPE, APPLICATION_JSON_UTF8); + *res.body_mut() = Body::from(is_hls_redirect.to_string()); + Ok(res) + }).await.unwrap() + } +} + +#[async_trait] +impl Handler for IsHlsRedirectHandler { + async fn call(&self, req: Request, _: Next) -> Response { + self.handle(req).await.into() + } } \ No newline at end of file