diff --git a/Cargo.lock b/Cargo.lock index 7e222371..2d6f59a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -179,7 +179,7 @@ dependencies = [ "static_init", "test-util", "thiserror", - "time 0.3.20", + "time 0.3.28", "tokio", "tokio-stream", "ts-rs", @@ -605,7 +605,7 @@ dependencies = [ "serde", "serde_bytes", "serde_json", - "time 0.3.20", + "time 0.3.28", "uuid", ] @@ -869,9 +869,9 @@ dependencies = [ [[package]] name = "const-str" -version = "0.5.4" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b62c6d3ea43cbe0bc5a081f276fd477e4291d168aacc9f9d98073325333c0d4" +checksum = "aca749d3d3f5b87a0d6100509879f9cf486ab510803a4a4e1001da1ff61c2bd6" dependencies = [ "const-str-proc-macro", ] @@ -926,7 +926,7 @@ dependencies = [ "rand 0.8.5", "sha2", "subtle", - "time 0.3.20", + "time 0.3.28", "version_check", ] @@ -981,7 +981,7 @@ dependencies = [ "rand 0.8.5", "serde-util", "thiserror", - "time 0.3.20", + "time 0.3.28", "tokio", "user-agent", ] @@ -1349,7 +1349,7 @@ dependencies = [ "static_init", "strum", "thiserror", - "time 0.3.20", + "time 0.3.28", "tokio", "ts-rs", "uid", @@ -1426,6 +1426,15 @@ dependencies = [ "text_lines", ] +[[package]] +name = "deranged" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2696e8a945f658fd14dc3b87242e6b80cd0f36ff04ea560fa39082368847946" +dependencies = [ + "serde", +] + [[package]] name = "derivative" version = "2.2.0" @@ -2745,7 +2754,7 @@ checksum = "15d30b9b346f215bebb2683e1f0947c49411fc488a41b05521a1fc3aa825f7e9" dependencies = [ "generic-array", "serde", - "time 0.3.20", + "time 0.3.28", ] [[package]] @@ -3094,7 +3103,7 @@ dependencies = [ "log", "owo-colors 3.5.0", "static_init", - "time 0.3.20", + "time 0.3.28", ] [[package]] @@ -3240,6 +3249,7 @@ dependencies = [ "atomic_float", "burst", "bytes", + "const-str", "constants", "db", "drop-tracer", @@ -3254,6 +3264,7 @@ dependencies = [ "shutdown", "stream-util", "thiserror", + "time 0.3.28", "tokio", "tokio-stream", "url", @@ -5100,7 +5111,7 @@ dependencies = [ "serde", "serde_json", "static_init", - "time 0.3.20", + "time 0.3.28", "ts-rs", ] @@ -5201,7 +5212,7 @@ dependencies = [ "serde", "serde_json", "serde_with_macros 3.0.0", - "time 0.3.20", + "time 0.3.28", ] [[package]] @@ -6036,10 +6047,11 @@ dependencies = [ [[package]] name = "time" -version = "0.3.20" +version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd0cbfecb4d19b5ea75bb31ad904eb5b9fa13f21079c3b92017ebdf4999a5890" +checksum = "17f6bb557fd245c28e6411aa56b6403c689ad95061f50e4be16c274e70a17e48" dependencies = [ + "deranged", "itoa 1.0.5", "libc", "num_threads", @@ -6050,15 +6062,15 @@ dependencies = [ [[package]] name = "time-core" -version = "0.1.0" +version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" +checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" [[package]] name = "time-macros" -version = "0.2.8" +version = "0.2.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd80a657e71da814b8e5d60d3374fc6d35045062245d80224748ae522dd76f36" +checksum = "1a942f44339478ef67935ab2bbaec2fb0322496cf3cbe84b261e06ac3814c572" dependencies = [ "time-core", ] diff --git a/Cargo.toml b/Cargo.toml index 252cf797..3dc28bcb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,4 +1,8 @@ + [workspace] + +resolver = "2" + members = [ "rs/bin/openstream", diff --git a/defs/constants.ts b/defs/constants.ts index cd3ce813..cd02ecfc 100644 --- a/defs/constants.ts +++ b/defs/constants.ts @@ -44,6 +44,19 @@ export const EXTERNAL_RELAY_NO_LISTENERS_SHUTDOWN_DELAY_SECS = 60; /** Internal forwarded ip header used when openstream servers are connecting with each other */ export const FORWARD_IP_HEADER = "x-openstream-forwarded-ip"; +/** interval in which + * $stations.owner_deployment_info.health_checked_at + * and $media_session.health_checked_at + * will be set to $NOW */ +export const MEDIA_SESSION_HEALTH_CHECK_INTERVAL_SECS = 6; + +/** interval to check if $stations.owner_deployment_info and $media_sessions are healthy */ +export const MEDIA_SESSION_HEALTH_CHECK_KILL_INTERVAL_SECS = 5; + +/** time to check if a $media_session (and $station.owner_deployment_info) is healthy + * otherwise kill it in database */ +export const MEDIA_SESSION_HEALTH_SHUTDOWN_TIMEOUT_SECS = 33; + /** Access token header used by payments servers implementations */ export const PAYMENTS_ACCESS_TOKEN_HEADER = "x-access-token"; diff --git a/defs/db/MediaSession.ts b/defs/db/MediaSession.ts index 23338ca0..c054be6d 100644 --- a/defs/db/MediaSession.ts +++ b/defs/db/MediaSession.ts @@ -13,6 +13,7 @@ export type MediaSession = { transfer_bytes: number; closed_at: DateTime | null; duration_ms: number | null; + health_checked_at: DateTime | null; created_at: DateTime; updated_at: DateTime; } & MediaSessionKind; diff --git a/defs/db/OwnerDeploymentInfo.ts b/defs/db/OwnerDeploymentInfo.ts index 62ab4e5a..d8521b13 100644 --- a/defs/db/OwnerDeploymentInfo.ts +++ b/defs/db/OwnerDeploymentInfo.ts @@ -1,7 +1,9 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { DateTime } from "../DateTime"; export type OwnerDeploymentInfo = { deployment_id: string; task_id: string; content_type: string; + health_checked_at: DateTime | null; }; diff --git a/front/admin/src/lib/components/LoginDashboard/LoginDashboard.svelte b/front/admin/src/lib/components/LoginDashboard/LoginDashboard.svelte index 66c0a58a..cf708c49 100644 --- a/front/admin/src/lib/components/LoginDashboard/LoginDashboard.svelte +++ b/front/admin/src/lib/components/LoginDashboard/LoginDashboard.svelte @@ -8,7 +8,7 @@ padding-inline: min(20%, max(12rem, 10%)); padding-block: 6.5rem; position: relative; - background-image: url("$lib/img/login-bg.jpg"); + background-image: url($lib/img/login-bg.jpg); background-size: cover; background-position: center; --field-container-bg: rgba(0,0,0,0); diff --git a/rs/bin/openstream/src/main.rs b/rs/bin/openstream/src/main.rs index afa6a7ae..8b0cb782 100644 --- a/rs/bin/openstream/src/main.rs +++ b/rs/bin/openstream/src/main.rs @@ -17,7 +17,7 @@ use log::*; use anyhow::{bail, Context}; use api::ApiServer; use defer_lite::defer; -use media_sessions::MediaSessionMap; +use media_sessions::{MediaSessionMap, healthcheck}; use mongodb::bson::doc; use mongodb::bson::Document; use serde_util::DateTime; @@ -565,6 +565,7 @@ async fn start_async(Start { config }: Start) -> Result<(), anyhow::Error> { tokio::spawn(db::deployment::start_health_check_job(deployment_id.clone())); tokio::spawn(db::station_picture::upgrade_images_if_needed()); + tokio::spawn(healthcheck::health_shutdown_job()); tokio::spawn({ let shutdown = shutdown.clone(); diff --git a/rs/config/constants/src/lib.rs b/rs/config/constants/src/lib.rs index 06fe6b55..ac1eef86 100644 --- a/rs/config/constants/src/lib.rs +++ b/rs/config/constants/src/lib.rs @@ -117,6 +117,22 @@ pub const STATION_PICTURES_VERSION: f64 = 5.0; #[const_register] pub const DEPLOYMENT_HEALTH_CHECK_INTERVAL_SECS: u16 = 1; +/// interval in which +/// $stations.owner_deployment_info.health_checked_at +/// and $media_session.health_checked_at +/// will be set to $NOW +#[const_register] +pub const MEDIA_SESSION_HEALTH_CHECK_INTERVAL_SECS: u16 = 6; + +/// time to check if a $media_session (and $station.owner_deployment_info) is healthy +/// otherwise kill it in database +#[const_register] +pub const MEDIA_SESSION_HEALTH_SHUTDOWN_TIMEOUT_SECS: u16 = 33; + +/// interval to check if $stations.owner_deployment_info and $media_sessions are healthy +#[const_register] +pub const MEDIA_SESSION_HEALTH_CHECK_KILL_INTERVAL_SECS: u16 = 5; + /// validation constants pub mod validate { use super::*; diff --git a/rs/internal-scripts/create-random-stream-connections/src/main.rs b/rs/internal-scripts/create-random-stream-connections/src/main.rs index 99b461a9..fa05bab7 100644 --- a/rs/internal-scripts/create-random-stream-connections/src/main.rs +++ b/rs/internal-scripts/create-random-stream-connections/src/main.rs @@ -11,6 +11,7 @@ use time::Duration; fn random_os() -> Option { use rand::seq::SliceRandom; // 0.7.2 + #[allow(clippy::useless_vec)] let vs = vec![ Some("Linux"), Some("Android"), @@ -27,6 +28,7 @@ fn random_os() -> Option { fn random_browser() -> Option { use rand::seq::SliceRandom; // 0.7.2 + #[allow(clippy::useless_vec)] let vs = vec![ Some("Chrome"), Some("Safari"), diff --git a/rs/packages/api/src/json/mod.rs b/rs/packages/api/src/json/mod.rs index 155e6be4..5557ade9 100644 --- a/rs/packages/api/src/json/mod.rs +++ b/rs/packages/api/src/json/mod.rs @@ -21,7 +21,7 @@ pub trait JsonHandler: Send + Sync + Sized + Clone + 'static { false } - fn cookies<'a, 'b>(&'a self, _output: &'b Self::Output) -> Vec { + fn cookies(&self, _output: &Self::Output) -> Vec { vec![] } diff --git a/rs/packages/api/src/routes/auth/user/register.rs b/rs/packages/api/src/routes/auth/user/register.rs index 73eb1b70..7daf5cbb 100644 --- a/rs/packages/api/src/routes/auth/user/register.rs +++ b/rs/packages/api/src/routes/auth/user/register.rs @@ -416,6 +416,7 @@ pub mod post { .await .map_err(HandleError::PaymentSavePaymentMethod)?; + #[allow(clippy::let_and_return)] payment_method_response }; diff --git a/rs/packages/api/src/routes/invitations/reject.rs b/rs/packages/api/src/routes/invitations/reject.rs index b3a6c4b5..b247533e 100644 --- a/rs/packages/api/src/routes/invitations/reject.rs +++ b/rs/packages/api/src/routes/invitations/reject.rs @@ -144,6 +144,7 @@ pub mod post { Some(invitation) => invitation, }; + #[allow(clippy::let_and_return)] invitation } }; diff --git a/rs/packages/api/src/routes/mod.rs b/rs/packages/api/src/routes/mod.rs index 61b24e71..ec34c2ef 100644 --- a/rs/packages/api/src/routes/mod.rs +++ b/rs/packages/api/src/routes/mod.rs @@ -128,6 +128,7 @@ pub fn router( app.at("/runtime/restart-playlist/:station").post( runtime::restart_playlist::station_id::post::Endpoint { + deployment_id: deployment_id.clone(), media_sessions: media_sessions.clone(), drop_tracer: drop_tracer.clone(), shutdown: shutdown.clone(), diff --git a/rs/packages/api/src/routes/payment_methods/mod.rs b/rs/packages/api/src/routes/payment_methods/mod.rs index 0360a688..dee04d59 100644 --- a/rs/packages/api/src/routes/payment_methods/mod.rs +++ b/rs/packages/api/src/routes/payment_methods/mod.rs @@ -278,6 +278,7 @@ pub mod post { .await .map_err(HandleError::PaymentSavePaymentMethod)?; + #[allow(clippy::let_and_return)] payment_method_response }; diff --git a/rs/packages/api/src/routes/runtime/restart_playlist/station_id.rs b/rs/packages/api/src/routes/runtime/restart_playlist/station_id.rs index 733019e2..6ecd5d82 100644 --- a/rs/packages/api/src/routes/runtime/restart_playlist/station_id.rs +++ b/rs/packages/api/src/routes/runtime/restart_playlist/station_id.rs @@ -22,6 +22,7 @@ pub mod post { #[derive(Debug, Clone)] pub struct Endpoint { + pub deployment_id: String, pub media_sessions: MediaSessionMap, pub drop_tracer: DropTracer, pub shutdown: Shutdown, @@ -70,7 +71,14 @@ pub mod post { async fn perform(&self, input: Self::Input) -> Result { let Self::Input { station_id } = input; let mut lock = self.media_sessions.write(); - let _ = lock.restart(&station_id, self.shutdown.clone(), self.drop_tracer.clone()); + let _ = lock + .restart( + station_id.to_string(), + self.deployment_id.to_string(), + self.shutdown.clone(), + self.drop_tracer.clone(), + ) + .await; Ok(Output(EmptyStruct(()))) } } diff --git a/rs/packages/api/src/routes/stations/restart_playlist.rs b/rs/packages/api/src/routes/stations/restart_playlist.rs index ed780ddc..6eb1daab 100644 --- a/rs/packages/api/src/routes/stations/restart_playlist.rs +++ b/rs/packages/api/src/routes/stations/restart_playlist.rs @@ -107,7 +107,14 @@ pub mod post { MediaSessionKind::Playlist { .. } => { if media_session.deployment_id == self.deployment_id { let mut lock = self.media_sessions.write(); - let _ = lock.restart(&station.id, self.shutdown.clone(), self.drop_tracer.clone()); + let _ = lock + .restart( + self.deployment_id.clone(), + station.id, + self.shutdown.clone(), + self.drop_tracer.clone(), + ) + .await; } else { #[allow(clippy::collapsible_else_if)] if let Some(deployment) = Deployment::get_by_id(&media_session.deployment_id).await? { diff --git a/rs/packages/db/src/models/deployment/mod.rs b/rs/packages/db/src/models/deployment/mod.rs index 696eba5b..92f0450c 100644 --- a/rs/packages/db/src/models/deployment/mod.rs +++ b/rs/packages/db/src/models/deployment/mod.rs @@ -38,7 +38,7 @@ pub struct Deployment { pub created_at: DateTime, pub updated_at: DateTime, - // TODO: this Option is for back compat only + // TODO: this Option<> is for back compat only // create a migration and change this to DateTime pub health_checked_at: Option, diff --git a/rs/packages/db/src/models/media_session/mod.rs b/rs/packages/db/src/models/media_session/mod.rs index 7e198d0e..6b6199fe 100644 --- a/rs/packages/db/src/models/media_session/mod.rs +++ b/rs/packages/db/src/models/media_session/mod.rs @@ -30,6 +30,10 @@ pub struct MediaSession { #[serde(with = "serde_util::as_f64::option")] pub duration_ms: Option, + // TODO: this Option<> is for back compat only + // create a migration and change this to DateTime + pub health_checked_at: Option, + pub created_at: DateTime, pub updated_at: DateTime, } @@ -182,6 +186,7 @@ mod test { }, now_playing: None, state: MediaSessionState::Closed, + health_checked_at: Some(DateTime::now()), closed_at: Some(DateTime::now()), duration_ms: Some(100), }; diff --git a/rs/packages/db/src/models/station/mod.rs b/rs/packages/db/src/models/station/mod.rs index 4ff7a620..9dd6bdb5 100644 --- a/rs/packages/db/src/models/station/mod.rs +++ b/rs/packages/db/src/models/station/mod.rs @@ -321,6 +321,10 @@ pub struct OwnerDeploymentInfo { pub deployment_id: String, pub task_id: String, pub content_type: String, + + // this Option<> is for backwards compatibility + // it should be removed in the future + pub health_checked_at: Option, } impl From for Bson { diff --git a/rs/packages/db/src/models/stream_connection/analytics/mod.rs b/rs/packages/db/src/models/stream_connection/analytics/mod.rs index ee49d1f4..85c77184 100644 --- a/rs/packages/db/src/models/stream_connection/analytics/mod.rs +++ b/rs/packages/db/src/models/stream_connection/analytics/mod.rs @@ -400,7 +400,7 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result { - let mut item = $acc.entry($key).or_default(); + let item = $acc.entry($key).or_default(); item.sessions += 1; item.ips.insert(conn.ip); item.total_duration_ms += conn_duration_ms; diff --git a/rs/packages/db/src/models/stream_connection/index.rs b/rs/packages/db/src/models/stream_connection/index.rs index 85680bb2..e441c744 100644 --- a/rs/packages/db/src/models/stream_connection/index.rs +++ b/rs/packages/db/src/models/stream_connection/index.rs @@ -612,6 +612,7 @@ impl MemIndex { .await .unwrap(); + #[allow(clippy::let_and_return)] total } diff --git a/rs/packages/env_logger/src/filter/mod.rs b/rs/packages/env_logger/src/filter/mod.rs index 829e40cf..573fab6c 100644 --- a/rs/packages/env_logger/src/filter/mod.rs +++ b/rs/packages/env_logger/src/filter/mod.rs @@ -253,9 +253,11 @@ impl Builder { }); } + let filter = self.filter.take(); + Filter { directives: mem::take(&mut directives), - filter: mem::replace(&mut self.filter, None), + filter, } } } diff --git a/rs/packages/env_logger/src/lib.rs b/rs/packages/env_logger/src/lib.rs index 551ed3d9..0c6316dc 100644 --- a/rs/packages/env_logger/src/lib.rs +++ b/rs/packages/env_logger/src/lib.rs @@ -1,4 +1,4 @@ -#![allow(clippy::forget_ref)] +#![allow(clippy::all)] //! error!("this is printed by default"); //! diff --git a/rs/packages/ffmpeg/src/metadata.rs b/rs/packages/ffmpeg/src/metadata.rs index 76bbf021..277c355a 100644 --- a/rs/packages/ffmpeg/src/metadata.rs +++ b/rs/packages/ffmpeg/src/metadata.rs @@ -106,9 +106,9 @@ pub async fn get(mut data: Receiver) -> Result tup, + _ = health_handle => unreachable!() + }; let exit = match status { Ok(exit) => exit, diff --git a/rs/packages/media-sessions/src/healthcheck.rs b/rs/packages/media-sessions/src/healthcheck.rs new file mode 100644 index 00000000..22b7a90c --- /dev/null +++ b/rs/packages/media-sessions/src/healthcheck.rs @@ -0,0 +1,121 @@ +use db::{ + media_session::MediaSession, + station::{OwnerDeploymentInfo, Station}, + Model, +}; +use mongodb::bson::doc; +use serde_util::DateTime; + +pub async fn check_now() -> Result { + const KEY_STATION_HEALTH_CHECKED_AT: &str = db::key!( + Station::KEY_OWNER_DEPLOYMENT_INFO, + OwnerDeploymentInfo::KEY_HEALTH_CHECKED_AT + ); + + let limit: DateTime = (time::OffsetDateTime::now_utc() + - time::Duration::seconds(constants::MEDIA_SESSION_HEALTH_SHUTDOWN_TIMEOUT_SECS as i64)) + .into(); + + let filter = doc! { + "$and": [ + { KEY_STATION_HEALTH_CHECKED_AT: { "$ne": null } }, + { KEY_STATION_HEALTH_CHECKED_AT: { "$lt": limit } }, + ], + }; + + let update = doc! { "$set": { Station::KEY_OWNER_DEPLOYMENT_INFO: null } }; + + let r = Station::cl().update_many(filter, update, None).await?; + + Ok(r) +} + +pub async fn health_shutdown_job() { + let duration = tokio::time::Duration::from_secs( + constants::MEDIA_SESSION_HEALTH_CHECK_KILL_INTERVAL_SECS as u64, + ); + + loop { + match check_now().await { + Ok(r) => { + if r.modified_count > 0 { + log::info!( + target: "media-session-health", + "closed owner_deployment_info of {} stations", + r.modified_count + ) + } + } + + Err(e) => { + log::error!( + target: "media-session-health", + "error checking media session health: {} => {}", + e, + e, + ) + } + } + + tokio::time::sleep(duration).await; + } +} + +pub async fn run_health_check_interval_for_station_and_media_session( + station_id: &str, + media_session_id: &str, + task_id: &str, +) { + let mut interval = tokio::time::interval(tokio::time::Duration::from_secs( + constants::MEDIA_SESSION_HEALTH_CHECK_INTERVAL_SECS as u64, + )); + + loop { + interval.tick().await; + let now = DateTime::now(); + + { + const KEY_TASK_ID: &str = db::key!( + Station::KEY_OWNER_DEPLOYMENT_INFO, + OwnerDeploymentInfo::KEY_TASK_ID + ); + + const KEY_HEALTH_CHECKED_AT: &str = db::key!( + Station::KEY_OWNER_DEPLOYMENT_INFO, + OwnerDeploymentInfo::KEY_HEALTH_CHECKED_AT + ); + + let filter = doc! { Station::KEY_ID: station_id, KEY_TASK_ID: task_id }; + let update = doc! { "$set": { KEY_HEALTH_CHECKED_AT: now } }; + + match Station::cl().update_one(filter, update, None).await { + Ok(_) => continue, + Err(e) => { + log::error!( + target: "media-session-health", + "error updating station {}: {} => {}", + station_id, + e, + e, + ) + } + }; + } + + { + let update = doc! { "$set": { MediaSession::KEY_HEALTH_CHECKED_AT: now } }; + match MediaSession::update_by_id(media_session_id, update).await { + Ok(_) => continue, + Err(e) => { + log::error!( + target: "media-session-health", + "error updating media session {}: {} => {}", + media_session_id, + e, + e, + ) + } + }; + } + } +} diff --git a/rs/packages/media-sessions/src/lib.rs b/rs/packages/media-sessions/src/lib.rs index 0eac86b9..e13c8cd7 100644 --- a/rs/packages/media-sessions/src/lib.rs +++ b/rs/packages/media-sessions/src/lib.rs @@ -1,6 +1,10 @@ use burst::Burst; use constants::STREAM_CHANNEL_CAPACITY; +use db::station::{OwnerDeploymentInfo, Station}; +use db::Model; +use mongodb::bson::doc; use parking_lot::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard}; +use serde_util::DateTime; use shutdown::Shutdown; use std::collections::btree_map::Entry; use std::collections::BTreeMap; @@ -8,6 +12,7 @@ use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; pub mod external_relay; +pub mod healthcheck; pub mod live; pub mod playlist; pub mod relay; @@ -60,6 +65,8 @@ impl Map { #[derive(Debug, thiserror::Error)] pub enum RestartError { + #[error("internal error (db)")] + Db(#[from] mongodb::error::Error), #[error("cannot restart, station is live streaming")] LiveStreaming, #[error("cannot restart, station is streaming from external relay")] @@ -79,13 +86,14 @@ impl<'a> WriteLock<'a> { self.lock.inner.entry(station_id.to_string()) } - pub fn restart( + pub async fn restart( &mut self, - station_id: &str, + deployment_id: String, + station_id: String, shutdown: Shutdown, drop_tracer: DropTracer, ) -> Result<(), RestartError> { - if let Some(session) = self.get(station_id) { + if let Some(session) = self.get(&station_id) { match session.kind() { MediaSessionKind::Live { .. } => return Err(RestartError::LiveStreaming), MediaSessionKind::ExternalRelay => { @@ -96,7 +104,20 @@ impl<'a> WriteLock<'a> { } } - let tx = self.transmit(station_id, MediaSessionKind::Playlist {}); + let task_id = Station::random_owner_task_id(); + + let owner_deployment_info = OwnerDeploymentInfo { + deployment_id: deployment_id.to_string(), + task_id: task_id.clone(), + content_type: String::from("audio/mpeg"), + health_checked_at: Some(DateTime::now()), + }; + + let update = doc! { "$set": { Station::KEY_OWNER_DEPLOYMENT_INFO: owner_deployment_info } }; + + Station::update_by_id(&station_id, update).await?; + + let tx = self.transmit(&station_id, &task_id, MediaSessionKind::Playlist {}); run_playlist_session( tx, @@ -109,10 +130,16 @@ impl<'a> WriteLock<'a> { Ok(()) } - pub fn transmit(&mut self, station_id: &str, kind: MediaSessionKind) -> Transmitter { + pub fn transmit( + &mut self, + station_id: &str, + task_id: &str, + kind: MediaSessionKind, + ) -> Transmitter { let info = Arc::new(MediaSessionInfo { uid: uid(), station_id: station_id.to_string(), + task_id: task_id.to_string(), kind, }); @@ -295,6 +322,7 @@ impl Drop for MediaSession { pub struct MediaSessionInfo { pub(crate) uid: u64, pub(crate) station_id: String, + pub(crate) task_id: String, pub(crate) kind: MediaSessionKind, } @@ -304,6 +332,11 @@ impl MediaSessionInfo { &self.station_id } + #[inline] + pub fn task_id(&self) -> &str { + &self.task_id + } + #[inline] pub fn kind(&self) -> &MediaSessionKind { &self.kind diff --git a/rs/packages/media-sessions/src/live.rs b/rs/packages/media-sessions/src/live.rs index b4b06dd2..7b778124 100644 --- a/rs/packages/media-sessions/src/live.rs +++ b/rs/packages/media-sessions/src/live.rs @@ -3,7 +3,7 @@ use std::sync::{ Arc, }; -use crate::{SendError, Transmitter}; +use crate::{healthcheck, SendError, Transmitter}; use bytes::Bytes; use constants::STREAM_CHUNK_SIZE; use db::{media_session::MediaSessionState, Model}; @@ -35,22 +35,25 @@ pub async fn run_live_session( drop_tracer: DropTracer, ) -> Result<(), LiveError> { let station_id = tx.info.station_id().to_string(); + let task_id = tx.info.task_id().to_string(); + let media_session_id = db::media_session::MediaSession::uid(); let document = { use db::media_session::*; let now = DateTime::now(); let document = MediaSession { - id: MediaSession::uid(), + id: media_session_id.clone(), deployment_id, station_id: station_id.clone(), - created_at: now, - updated_at: now, transfer_bytes: 0, kind: MediaSessionKind::Live { request }, now_playing: None, state: MediaSessionState::Open, closed_at: None, duration_ms: None, + health_checked_at: Some(now), + created_at: now, + updated_at: now, }; match MediaSession::insert(&document).await { @@ -85,43 +88,58 @@ pub async fn run_live_session( //let output = mp3::readrate(reader).chunked(STREAM_CHUNK_SIZE); //tokio::pin!(output); - use stream_util::IntoTryBytesStreamRated; - let output = data.rated(400_000 / 8).chunked(STREAM_CHUNK_SIZE); - tokio::pin!(output); + let handle = async move { + use stream_util::IntoTryBytesStreamRated; + let output = data.rated(400_000 / 8).chunked(STREAM_CHUNK_SIZE); + tokio::pin!(output); - let signal = shutdown.signal(); - let fut = async move { - let mut transfer = 0u64; + let signal = shutdown.signal(); + let fut = async move { + let mut transfer = 0u64; - loop { - if shutdown.is_closed() || tx.is_terminated() { - break; - } - - match output.next().await { - None => break, - Some(Err(e)) => { - warn!("live session error: {e} => {e:?}"); - return Err(LiveError::Data(e)); + loop { + if shutdown.is_closed() || tx.is_terminated() { + break; } - Some(Ok(bytes)) => { - transfer += bytes.len() as u64; - transfer_bytes.store(transfer, Ordering::Release); - match tx.send(bytes) { - Ok(_) => continue, - Err(SendError::NoListeners(_)) => continue, - Err(SendError::Terminated(_)) => break, + + match output.next().await { + None => break, + Some(Err(e)) => { + warn!("live session error: {e} => {e:?}"); + return Err(LiveError::Data(e)); + } + Some(Ok(bytes)) => { + transfer += bytes.len() as u64; + transfer_bytes.store(transfer, Ordering::Release); + match tx.send(bytes) { + Ok(_) => continue, + Err(SendError::NoListeners(_)) => continue, + Err(SendError::Terminated(_)) => break, + } } } } - } - Ok(()) + Ok(()) + }; + + let result = tokio::select! { + result = fut => result, + _ = signal => Ok(()), + }; + + result }; + let health_handle = healthcheck::run_health_check_interval_for_station_and_media_session( + &station_id, + &media_session_id, + &task_id, + ); + let result = tokio::select! { - result = fut => result, - _ = signal => Ok(()), + result = handle => result, + () = health_handle => unreachable!() }; drop(dropper); diff --git a/rs/packages/media-sessions/src/playlist.rs b/rs/packages/media-sessions/src/playlist.rs index fbac5374..19b358a2 100644 --- a/rs/packages/media-sessions/src/playlist.rs +++ b/rs/packages/media-sessions/src/playlist.rs @@ -21,7 +21,7 @@ use shutdown::Shutdown; use std::sync::Arc; use stream_util::{IntoTryBytesStreamChunked, IntoTryBytesStreamRated}; -use crate::{SendError, Transmitter}; +use crate::{SendError, Transmitter, healthcheck}; pub fn run_playlist_session( tx: Transmitter, @@ -31,7 +31,8 @@ pub fn run_playlist_session( resume: bool, ) -> tokio::task::JoinHandle> { tokio::spawn(async move { - let station_id = tx.info.station_id.as_str(); + let station_id = tx.info.station_id(); + let task_id = tx.info.task_id(); let result = async { let station_id = tx.info.station_id.as_str(); @@ -74,12 +75,13 @@ pub fn run_playlist_session( }) }; - let media_session_doc_id = db::media_session::MediaSession::uid(); + let media_session_id = db::media_session::MediaSession::uid(); let media_session_doc = { use db::media_session::*; + let now = DateTime::now(); let media_session_doc = MediaSession { - id: media_session_doc_id.clone(), + id: media_session_id.clone(), station_id: station_id.to_string(), deployment_id: deployment_id.clone(), transfer_bytes: 0, @@ -95,8 +97,9 @@ pub fn run_playlist_session( state: MediaSessionState::Open, closed_at: None, duration_ms: None, - created_at: DateTime::now(), - updated_at: DateTime::now(), + health_checked_at: Some(now), + created_at: now, + updated_at: now, }; MediaSession::insert(&media_session_doc).await?; @@ -111,123 +114,166 @@ pub fn run_playlist_session( start: Instant::now(), }; - let mut first = true; + let handle = async { - // we fill the burst on start - let mut burst_len: usize = 0; + let mut first = true; - let mut no_listeners_since: Option = None; + // we fill the burst on start + let mut burst_len: usize = 0; - let mut current_file = start_file; + let mut no_listeners_since: Option = None; - 'files: loop { - let (i, part) = if first { - first = false; - (i, part) - } else { - let next_file = - AudioFile::playlist_next(station_id, ¤t_file.id, current_file.order).await?; + let mut current_file = start_file; - match next_file { - None => { - info!( - "stopping playlist for station {} (no files found in account)", - station_id - ); + 'files: loop { + let (i, part) = if first { + first = false; + (i, part) + } else { + let next_file = + AudioFile::playlist_next(station_id, ¤t_file.id, current_file.order).await?; - break 'files; - } + match next_file { + None => { + info!( + "stopping playlist for station {} (no files found in account)", + station_id + ); + + break 'files; + } - Some(next_file) => { - current_file = next_file; + Some(next_file) => { + current_file = next_file; + } } + + (0.0, 0) + }; + + if shutdown.is_closed() || tx.is_terminated() { + return Ok(()); } - (0.0, 0) - }; + info!( + "start playback of audio file {}: '{}' for station {}", + current_file.id, + current_file + .metadata + .title + .as_ref() + .unwrap_or(¤t_file.filename), + station_id, + ); - if shutdown.is_closed() || tx.is_terminated() { - return Ok(()); - } + { + let title = current_file.metadata.title.clone().unwrap_or_else(|| current_file.filename.clone()); + + let now = DateTime::now(); + let play_history_item = PlayHistoryItem { + id: PlayHistoryItem::uid(), + deployment_id: deployment_id.clone(), + title: title.clone(), + artist: current_file.metadata.artist.clone(), + kind: play_history_item::Kind::Playlist { file_id: current_file.id.clone() }, + station_id: station_id.to_string(), + created_at: now, + }; + + let now_playing = MediaSessionNowPlaying { + title, + artist: current_file.metadata.artist.clone(), + }; + + let update = doc! { + "$set": { + db::media_session::MediaSession::KEY_NOW_PLAYING: now_playing, + db::media_session::MediaSession::KEY_UPDATED_AT: now, + } + }; - info!( - "start playback of audio file {}: '{}' for station {}", - current_file.id, - current_file - .metadata - .title - .as_ref() - .unwrap_or(¤t_file.filename), - station_id, - ); - - { - let title = current_file.metadata.title.clone().unwrap_or_else(|| current_file.filename.clone()); - - let now = DateTime::now(); - let play_history_item = PlayHistoryItem { - id: PlayHistoryItem::uid(), - deployment_id: deployment_id.clone(), - title: title.clone(), - artist: current_file.metadata.artist.clone(), - kind: play_history_item::Kind::Playlist { file_id: current_file.id.clone() }, - station_id: station_id.to_string(), - created_at: now, - }; + db::media_session::MediaSession::update_by_id(&media_session_id, update).await?; + PlayHistoryItem::insert(play_history_item).await?; - let now_playing = MediaSessionNowPlaying { - title, - artist: current_file.metadata.artist.clone(), - }; + use db::media_session::MediaSession; + MediaSession::set_file_chunk_part( + &media_session_id, + ¤t_file.id, + i, + part as f64, + ) + .await?; + } + + out.set_file_id(current_file.id.clone()); + out.set_file_order(current_file.order); + out.set_i(i); + out.set_part(part); - let update = doc! { - "$set": { - db::media_session::MediaSession::KEY_NOW_PLAYING: now_playing, - db::media_session::MediaSession::KEY_UPDATED_AT: now, + let mut first_item = true; + + let stream = AudioChunk::stream_from(¤t_file.id, i).inspect(|_| { + if first_item { + first_item = false; + } else { + out.increment_i(); + out.set_part(0); } - }; + }); - db::media_session::MediaSession::update_by_id(&media_session_doc_id, update).await?; - PlayHistoryItem::insert(play_history_item).await?; - - use db::media_session::MediaSession; - MediaSession::set_file_chunk_part( - &media_session_doc_id, - ¤t_file.id, - i, - part as f64, - ) - .await?; - } + let stream = stream.chunked(STREAM_CHUNK_SIZE).skip(part).inspect(|_| { + out.increment_part(); + }); + //.rated(file.bytes_sec) - out.set_file_id(current_file.id.clone()); - out.set_file_order(current_file.order); - out.set_i(i); - out.set_part(part); + let mut transfer = 0u64; - let mut first_item = true; + // fill the burst without delay between chunk parts + tokio::pin!(stream); + if burst_len < STREAM_BURST_LENGTH { + 'chunks: loop { + if shutdown.is_closed() || tx.is_terminated() { + break 'files; + } - let stream = AudioChunk::stream_from(¤t_file.id, i).inspect(|_| { - if first_item { - first_item = false; - } else { - out.increment_i(); - out.set_part(0); - } - }); + match stream.try_next().await? { + None => continue 'files, + + Some(bytes) => { + burst_len += 1; + + transfer += bytes.len() as u64; + out.set_transfer(transfer); + + if shutdown.is_closed() { + return Ok(()); + } + + let burst_filled = burst_len >= STREAM_BURST_LENGTH; + + match tx.send(bytes) { + Ok(_) | Err(SendError::NoListeners(_)) => { + if burst_filled { + break 'chunks; + } else { + continue 'chunks; + } + } - let stream = stream.chunked(STREAM_CHUNK_SIZE).skip(part).inspect(|_| { - out.increment_part(); - }); - //.rated(file.bytes_sec) + // here the stream has been terminated (maybe replaced with a newer transmitter) + Err(SendError::Terminated(_)) => break 'files, + } + } + } + } + } - let mut transfer = 0u64; + // add byte rate to the stream + let stream = stream.rated(current_file.bytes_sec); + tokio::pin!(stream); - // fill the burst without delay between chunk parts - tokio::pin!(stream); - if burst_len < STREAM_BURST_LENGTH { 'chunks: loop { - if shutdown.is_closed() || tx.is_terminated() { + if shutdown.is_closed() { break 'files; } @@ -235,90 +281,59 @@ pub fn run_playlist_session( None => continue 'files, Some(bytes) => { - burst_len += 1; - transfer += bytes.len() as u64; out.set_transfer(transfer); - - if shutdown.is_closed() { + if shutdown.is_closed() || tx.is_terminated() { return Ok(()); } - let burst_filled = burst_len >= STREAM_BURST_LENGTH; - match tx.send(bytes) { - Ok(_) | Err(SendError::NoListeners(_)) => { - if burst_filled { - break 'chunks; - } else { + // n is the number of listeners that received the chunk + Ok(_) => { + no_listeners_since = None; + continue 'chunks; + } + + // check if shutdown delay is elapsed + Err(SendError::NoListeners(_)) => match no_listeners_since { + Some(instant) => { + if instant.elapsed().as_secs() > PLAYLIST_NO_LISTENERS_SHUTDOWN_DELAY_SECS { + info!( + "shutting down playlist for station {} (no listeners shutdown delay elapsed)", + station_id + ); + break 'files; + } else { + continue 'chunks; + } + } + + None => { + no_listeners_since = Some(Instant::now()); continue 'chunks; } - } - + } // here the stream has been terminated (maybe replaced with a newer transmitter) Err(SendError::Terminated(_)) => break 'files, } } } } - } - - // add byte rate to the stream - let stream = stream.rated(current_file.bytes_sec); - tokio::pin!(stream); - - 'chunks: loop { - if shutdown.is_closed() { - break 'files; - } - - match stream.try_next().await? { - None => continue 'files, + }; - Some(bytes) => { - transfer += bytes.len() as u64; - out.set_transfer(transfer); - if shutdown.is_closed() || tx.is_terminated() { - return Ok(()); - } + Ok(()) + }; + + let health_handle = healthcheck::run_health_check_interval_for_station_and_media_session(station_id, &media_session_id, task_id); - match tx.send(bytes) { - // n is the number of listeners that received the chunk - Ok(_) => { - no_listeners_since = None; - continue 'chunks; - } - - // check if shutdown delay is elapsed - Err(SendError::NoListeners(_)) => match no_listeners_since { - Some(instant) => { - if instant.elapsed().as_secs() > PLAYLIST_NO_LISTENERS_SHUTDOWN_DELAY_SECS { - info!( - "shutting down playlist for station {} (no listeners shutdown delay elapsed)", - station_id - ); - break 'files; - } else { - continue 'chunks; - } - } - - None => { - no_listeners_since = Some(Instant::now()); - continue 'chunks; - } - } - // here the stream has been terminated (maybe replaced with a newer transmitter) - Err(SendError::Terminated(_)) => break 'files, - } - } - } - } - } + let r = tokio::select! { + r = handle => r, + _ = health_handle => unreachable!() + }; drop(dropper); - Ok(()) + r } .await; diff --git a/rs/packages/metre/src/lib.rs b/rs/packages/metre/src/lib.rs index e59f13d0..ee9976a7 100644 --- a/rs/packages/metre/src/lib.rs +++ b/rs/packages/metre/src/lib.rs @@ -434,6 +434,7 @@ impl ConfigLoader { } /// Add a partial configuration from a file + #[allow(clippy::result_large_err)] pub fn file(&mut self, path: &str, format: Format) -> Result<&mut Self, Error> { let code = std::fs::read_to_string(path).map_err(|e| Error::Io { path: path.into(), @@ -444,6 +445,7 @@ impl ConfigLoader { } /// Add a partial configuration from a file, if it exists + #[allow(clippy::result_large_err)] pub fn file_optional(&mut self, path: &str, format: Format) -> Result<&mut Self, Error> { let exists = Path::new(path).try_exists().map_err(|e| Error::Io { path: path.into(), @@ -459,12 +461,14 @@ impl ConfigLoader { /// Add a partial configuration from enviroment varialbes #[inline(always)] + #[allow(clippy::result_large_err)] pub fn env(&mut self) -> Result<&mut Self, Error> { self._env(&StdEnv, None) } /// Add a partial configuration from enviroment variables with a prefix #[inline(always)] + #[allow(clippy::result_large_err)] pub fn env_with_prefix(&mut self, prefix: &str) -> Result<&mut Self, Error> { self._env(&StdEnv, Some(prefix)) } @@ -475,12 +479,14 @@ impl ConfigLoader { /// /// The [`EnvProvider`] trait is already implemented for several kinds of Maps from the standard library #[inline(always)] + #[allow(clippy::result_large_err)] pub fn env_with_provider(&mut self, env: &E) -> Result<&mut Self, Error> { self._env(env, None) } /// See [`Self::env_with_provider`] and [`Self::env_with_prefix`] #[inline(always)] + #[allow(clippy::result_large_err)] pub fn env_with_provider_and_prefix( &mut self, env: &E, @@ -491,6 +497,7 @@ impl ConfigLoader { /// Add a partial configuration from in-memory code #[inline(always)] + #[allow(clippy::result_large_err)] pub fn code>(&mut self, code: S, format: Format) -> Result<&mut Self, Error> { self._code(code.as_ref(), format, LoadLocation::Memory) } @@ -499,6 +506,7 @@ impl ConfigLoader { /// /// Specifying the [`LoadLocation`] of the in-memory code is useful for error reporting #[inline(always)] + #[allow(clippy::result_large_err)] pub fn code_with_location>( &mut self, code: S, @@ -509,6 +517,7 @@ impl ConfigLoader { } /// Add a partial configuration from a url + #[allow(clippy::result_large_err)] pub fn url(&mut self, url: &str, format: Format) -> Result<&mut Self, Error> { let map_err = |e| Error::Network { url: url.to_string(), @@ -541,11 +550,13 @@ impl ConfigLoader { } #[inline(always)] + #[allow(clippy::result_large_err)] fn _env(&mut self, env: &E, prefix: Option<&str>) -> Result<&mut Self, Error> { let partial = T::Partial::from_env_with_provider_and_optional_prefix(env, prefix)?; self._add(partial) } + #[allow(clippy::result_large_err)] fn _code( &mut self, code: &str, @@ -582,17 +593,20 @@ impl ConfigLoader { /// Add a partial configuration from the `#[config(default = value)]` attributes #[inline(always)] + #[allow(clippy::result_large_err)] pub fn defaults(&mut self) -> Result<&mut Self, Error> { self._add(T::Partial::defaults()) } /// Add a pre generated partial configuration #[inline(always)] + #[allow(clippy::result_large_err)] pub fn partial(&mut self, partial: T::Partial) -> Result<&mut Self, Error> { self._add(partial) } #[inline(always)] + #[allow(clippy::result_large_err)] fn _add(&mut self, partial: T::Partial) -> Result<&mut Self, Error> { self.partial.merge(partial)?; Ok(self) @@ -600,12 +614,14 @@ impl ConfigLoader { /// Get a reference to the partial configuration #[inline(always)] + #[allow(clippy::result_large_err)] pub fn partial_state(&self) -> &T::Partial { &self.partial } /// Get a mutable reference to the partial configuration #[inline(always)] + #[allow(clippy::result_large_err)] pub fn partial_state_mut(&mut self) -> &mut T::Partial { &mut self.partial } @@ -614,6 +630,7 @@ impl ConfigLoader { /// /// this function will error if there are missing required properties #[inline(always)] + #[allow(clippy::result_large_err)] pub fn finish(self) -> Result { let v = T::from_partial(self.partial)?; Ok(v) diff --git a/rs/packages/owo-colors/src/lib.rs b/rs/packages/owo-colors/src/lib.rs index 580ebf3a..08c7a3b6 100644 --- a/rs/packages/owo-colors/src/lib.rs +++ b/rs/packages/owo-colors/src/lib.rs @@ -77,6 +77,7 @@ #![cfg_attr(doc_cfg, feature(doc_cfg))] #![doc(html_logo_url = "https://jam1.re/img/rust_owo.svg")] #![warn(missing_docs)] +#![allow(clippy::needless_lifetimes)] pub mod colors; mod combo; @@ -177,7 +178,7 @@ macro_rules! style_methods { #[$meta] #[must_use] #[inline(always)] - fn $name<'a>(&'a self) -> styles::$ty<'a, Self> { + fn $name(&self) -> styles::$ty { styles::$ty(self) } )* diff --git a/rs/packages/prex/src/matcher.rs b/rs/packages/prex/src/matcher.rs index 275a519b..5ebdf389 100644 --- a/rs/packages/prex/src/matcher.rs +++ b/rs/packages/prex/src/matcher.rs @@ -213,7 +213,7 @@ mod tests { #[test] fn test_pattern_compiler() { - let urls = vec![ + let urls = [ ("/", "/"), ("/:p1", "/(?P[^/]+)"), ("/:p1(.+)", "/(?P.+)"), diff --git a/rs/packages/source-alt/src/handler/source.rs b/rs/packages/source-alt/src/handler/source.rs index 709e5ad5..e594247c 100644 --- a/rs/packages/source-alt/src/handler/source.rs +++ b/rs/packages/source-alt/src/handler/source.rs @@ -15,6 +15,7 @@ use hyper::{ }; use log::*; use media_sessions::{live::LiveError, MediaSessionKind, MediaSessionMap}; +use serde_util::DateTime; use shutdown::Shutdown; use stream_util::IntoTryBytesStream; use tokio::io::AsyncWriteExt; @@ -87,9 +88,12 @@ pub async fn source( Some(t) => t.to_string(), }; + let task_id = Station::random_owner_task_id(); + let info = OwnerDeploymentInfo { deployment_id: deployment_id.clone(), - task_id: Station::random_owner_task_id(), + task_id: task_id.clone(), + health_checked_at: Some(DateTime::now()), content_type: content_type.clone(), }; @@ -233,16 +237,22 @@ pub async fn source( let tx = { let mut map = media_sessions.write(); match map.entry(&station.id) { - Entry::Vacant(_) => Some(map.transmit(&station.id, MediaSessionKind::Live { content_type })), + Entry::Vacant(_) => Some(map.transmit( + &station.id, + &task_id, + MediaSessionKind::Live { content_type }, + )), Entry::Occupied(entry) => { let session = entry.get(); match session.kind() { MediaSessionKind::Live { .. } => None, MediaSessionKind::Playlist { .. } | MediaSessionKind::Relay { .. } - | MediaSessionKind::ExternalRelay => { - Some(map.transmit(&station.id, MediaSessionKind::Live { content_type })) - } + | MediaSessionKind::ExternalRelay => Some(map.transmit( + &station.id, + &task_id, + MediaSessionKind::Live { content_type }, + )), } } } diff --git a/rs/packages/stream/src/lib.rs b/rs/packages/stream/src/lib.rs index 86f05c0b..e0a0c667 100644 --- a/rs/packages/stream/src/lib.rs +++ b/rs/packages/stream/src/lib.rs @@ -415,11 +415,12 @@ impl StreamHandler { drop_tracer: &DropTracer, shutdown: &Shutdown, ) -> Result<(Listener, Station), StreamError> { + let task_id = Station::random_owner_task_id(); let info = OwnerDeploymentInfo { deployment_id: deployment_id.to_string(), - task_id: Station::random_owner_task_id(), - // audio/mpeg is because we'll start a playlist media session on demand + task_id: task_id.clone(), content_type: String::from("audio/mpeg"), + health_checked_at: Some(DateTime::now()), }; let (rx, station) = 'rx: { @@ -445,6 +446,7 @@ impl StreamHandler { None => { break 'relay_tx lock.upgrade().transmit( station_id, + &task_id, media_sessions::MediaSessionKind::Relay { content_type: owner_info.content_type.clone(), }, @@ -456,6 +458,7 @@ impl StreamHandler { } else { break 'relay_tx lock.upgrade().transmit( station_id, + &task_id, media_sessions::MediaSessionKind::Relay { content_type: owner_info.content_type.clone(), }, @@ -583,8 +586,11 @@ impl StreamHandler { let mut lock = lock.upgrade(); match &station.external_relay_url { None => { - let tx = - lock.transmit(station_id, media_sessions::MediaSessionKind::Playlist {}); + let tx = lock.transmit( + station_id, + &task_id, + media_sessions::MediaSessionKind::Playlist {}, + ); let rx = tx.subscribe(); let shutdown = shutdown.clone(); let deployment_id = deployment_id.to_string(); @@ -599,8 +605,11 @@ impl StreamHandler { } Some(url) => { - let tx = - lock.transmit(station_id, media_sessions::MediaSessionKind::ExternalRelay); + let tx = lock.transmit( + station_id, + &task_id, + media_sessions::MediaSessionKind::ExternalRelay, + ); let rx = tx.subscribe(); let shutdown = shutdown.clone(); let deployment_id = deployment_id.to_string(); diff --git a/rs/packages/validate/src/url.rs b/rs/packages/validate/src/url.rs index 346ccc66..9b527929 100644 --- a/rs/packages/validate/src/url.rs +++ b/rs/packages/validate/src/url.rs @@ -2,16 +2,16 @@ pub mod patterns { use once_cell::sync::Lazy; use regex_static::{lazy_regex, Regex}; - pub static WEBSITE: Lazy = lazy_regex!(r#"^https?://.+"#); - pub static TWITTER: Lazy = lazy_regex!(r#"^https://twitter\.com/.+"#); - pub static FACEBOOK: Lazy = lazy_regex!(r#"^https://www\.facebook\.com/.+"#); - pub static INSTAGRAM: Lazy = lazy_regex!(r#"^https://www\.instagram\.com/.+"#); - pub static THREADS: Lazy = lazy_regex!(r#"^https://www\.threads\.net/.+"#); - pub static YOUTUBE: Lazy = lazy_regex!(r#"^https://www\.youtube\.com/.+"#); - pub static TWITCH: Lazy = lazy_regex!(r#"^https://www\.twitch\.tv/.+"#); - pub static TIKTOK: Lazy = lazy_regex!(r#"^https://www\.tiktok\.com/.+"#); - pub static SPOTIFY: Lazy = lazy_regex!(r#"^https://open\.spotify\.com/.+"#); - pub static RADIOCUT: Lazy = lazy_regex!(r#"^https://radiocut\.fm/.+"#); - pub static GOOGLE_PLAY: Lazy = lazy_regex!(r#"^https://play\.google\.com/.+"#); - pub static APP_STORE: Lazy = lazy_regex!(r#"^https://apps\.apple\.com/.+"#); + pub static WEBSITE: Lazy = lazy_regex!(r"^https?://.+"); + pub static TWITTER: Lazy = lazy_regex!(r"^https://twitter\.com/.+"); + pub static FACEBOOK: Lazy = lazy_regex!(r"^https://www\.facebook\.com/.+"); + pub static INSTAGRAM: Lazy = lazy_regex!(r"^https://www\.instagram\.com/.+"); + pub static THREADS: Lazy = lazy_regex!(r"^https://www\.threads\.net/.+"); + pub static YOUTUBE: Lazy = lazy_regex!(r"^https://www\.youtube\.com/.+"); + pub static TWITCH: Lazy = lazy_regex!(r"^https://www\.twitch\.tv/.+"); + pub static TIKTOK: Lazy = lazy_regex!(r"^https://www\.tiktok\.com/.+"); + pub static SPOTIFY: Lazy = lazy_regex!(r"^https://open\.spotify\.com/.+"); + pub static RADIOCUT: Lazy = lazy_regex!(r"^https://radiocut\.fm/.+"); + pub static GOOGLE_PLAY: Lazy = lazy_regex!(r"^https://play\.google\.com/.+"); + pub static APP_STORE: Lazy = lazy_regex!(r"^https://apps\.apple\.com/.+"); } diff --git a/rs/patches/bson/src/de/raw.rs b/rs/patches/bson/src/de/raw.rs index 51842782..4dd383e3 100644 --- a/rs/patches/bson/src/de/raw.rs +++ b/rs/patches/bson/src/de/raw.rs @@ -7,8 +7,7 @@ use std::{ use serde::{ de::{EnumAccess, Error as SerdeError, IntoDeserializer, MapAccess, VariantAccess}, - forward_to_deserialize_any, - Deserializer as SerdeDeserializer, + forward_to_deserialize_any, Deserializer as SerdeDeserializer, }; use crate::{ @@ -16,27 +15,12 @@ use crate::{ raw::{RawBinaryRef, RAW_ARRAY_NEWTYPE, RAW_BSON_NEWTYPE, RAW_DOCUMENT_NEWTYPE}, spec::{BinarySubtype, ElementType}, uuid::UUID_NEWTYPE_NAME, - Bson, - DateTime, - Decimal128, - DeserializerOptions, - RawDocument, - Timestamp, + Bson, DateTime, Decimal128, DeserializerOptions, RawDocument, Timestamp, }; use super::{ - read_bool, - read_f128, - read_f64, - read_i32, - read_i64, - read_string, - read_u8, - DeserializerHint, - Error, - Result, - MAX_BSON_SIZE, - MIN_CODE_WITH_SCOPE_SIZE, + read_bool, read_f128, read_f64, read_i32, read_i64, read_string, read_u8, DeserializerHint, + Error, Result, MAX_BSON_SIZE, MIN_CODE_WITH_SCOPE_SIZE, }; use crate::de::serde::MapDeserializer; @@ -993,7 +977,7 @@ impl TimestampDeserializer { impl<'de, 'a> serde::de::Deserializer<'de> for &'a mut TimestampDeserializer { type Error = Error; - fn deserialize_any(mut self, visitor: V) -> Result + fn deserialize_any(self, visitor: V) -> Result where V: serde::de::Visitor<'de>, { @@ -1093,7 +1077,7 @@ impl DateTimeDeserializer { impl<'de, 'a> serde::de::Deserializer<'de> for &'a mut DateTimeDeserializer { type Error = Error; - fn deserialize_any(mut self, visitor: V) -> Result + fn deserialize_any(self, visitor: V) -> Result where V: serde::de::Visitor<'de>, { @@ -1194,7 +1178,7 @@ impl<'a> BinaryDeserializer<'a> { impl<'de, 'a> serde::de::Deserializer<'de> for &'a mut BinaryDeserializer<'de> { type Error = Error; - fn deserialize_any(mut self, visitor: V) -> Result + fn deserialize_any(self, visitor: V) -> Result where V: serde::de::Visitor<'de>, { @@ -1328,7 +1312,7 @@ impl<'de, 'a> CodeWithScopeDeserializer<'de, 'a> { impl<'de, 'a, 'b> serde::de::Deserializer<'de> for &'b mut CodeWithScopeDeserializer<'de, 'a> { type Error = Error; - fn deserialize_any(mut self, visitor: V) -> Result + fn deserialize_any(self, visitor: V) -> Result where V: serde::de::Visitor<'de>, { @@ -1445,7 +1429,7 @@ impl<'de, 'a> DbPointerDeserializer<'de, 'a> { impl<'de, 'a, 'b> serde::de::Deserializer<'de> for &'b mut DbPointerDeserializer<'de, 'a> { type Error = Error; - fn deserialize_any(mut self, visitor: V) -> Result + fn deserialize_any(self, visitor: V) -> Result where V: serde::de::Visitor<'de>, { diff --git a/rs/patches/bson/src/ser/raw/mod.rs b/rs/patches/bson/src/ser/raw/mod.rs index 64829d4a..980c7ee6 100644 --- a/rs/patches/bson/src/ser/raw/mod.rs +++ b/rs/patches/bson/src/ser/raw/mod.rs @@ -259,11 +259,7 @@ impl<'a> serde::Serializer for &'a mut Serializer { } #[inline] - fn serialize_newtype_struct( - mut self, - name: &'static str, - value: &T, - ) -> Result + fn serialize_newtype_struct(self, name: &'static str, value: &T) -> Result where T: serde::Serialize, { diff --git a/rs/patches/ts-rs/macros/src/types/enum.rs b/rs/patches/ts-rs/macros/src/types/enum.rs index 7cb321a3..edf3972b 100644 --- a/rs/patches/ts-rs/macros/src/types/enum.rs +++ b/rs/patches/ts-rs/macros/src/types/enum.rs @@ -51,7 +51,7 @@ pub(crate) fn r#enum_def(s: &ItemEnum) -> syn::Result { inline: if formatted_variants.is_empty() { quote!("") } else { - quote!(vec![#(#formatted_variants),*].join(" | ")) + quote!([#(#formatted_variants),*].join(" | ")) }, decl: quote!(format!("type {}{} = {};", #name, #generic_args, Self::inline())), diff --git a/rs/patches/ts-rs/macros/src/types/generics.rs b/rs/patches/ts-rs/macros/src/types/generics.rs index 3f90370c..ae692ac3 100644 --- a/rs/patches/ts-rs/macros/src/types/generics.rs +++ b/rs/patches/ts-rs/macros/src/types/generics.rs @@ -29,7 +29,8 @@ pub fn format_generics(deps: &mut Dependencies, generics: &Generics) -> TokenStr _ => None, }); - let comma_separated = quote!(vec![#(#expanded_params),*].join(", ")); + let comma_separated = quote!([#(#expanded_params),*].join(", ")); + quote!(format!("<{}>", #comma_separated)) } diff --git a/rs/patches/ts-rs/macros/src/types/named.rs b/rs/patches/ts-rs/macros/src/types/named.rs index 4c5531d0..9554eda4 100644 --- a/rs/patches/ts-rs/macros/src/types/named.rs +++ b/rs/patches/ts-rs/macros/src/types/named.rs @@ -40,13 +40,13 @@ pub(crate) fn named( let fields = if formatted_fields.is_empty() { quote!(String::new()) } else { - quote!(vec![#(#formatted_fields),*].join(" ")) + quote!([#(#formatted_fields),*].join(" ")) }; let generic_args = format_generics(&mut dependencies, generics); let inline = quote! { - vec![ + [ format!("{{ {} }}", #fields), #(#flattened_fields),* ].join("&") diff --git a/rs/patches/ts-rs/macros/src/types/tuple.rs b/rs/patches/ts-rs/macros/src/types/tuple.rs index 034217c2..58d7c353 100644 --- a/rs/patches/ts-rs/macros/src/types/tuple.rs +++ b/rs/patches/ts-rs/macros/src/types/tuple.rs @@ -33,7 +33,7 @@ pub(crate) fn tuple( inline: quote! { format!( "[{}]", - vec![#(#formatted_fields),*].join(", ") + [#(#formatted_fields),*].join(", ") ) }, decl: quote! {