From 3c02c7013e943f64472ef46f99b99ede2d5a66d1 Mon Sep 17 00:00:00 2001 From: ramiroaisen <52116153+ramiroaisen@users.noreply.github.com> Date: Fri, 23 Jun 2023 17:58:21 -0300 Subject: [PATCH 1/3] feat: external relay --- Cargo.lock | 1 + defs/api/stations/POST/Payload.ts | 1 + .../[station]/now-playing/GET/Output.ts | 6 +- defs/constants.ts | 2 + defs/db/MediaSessionKind.ts | 3 +- defs/db/Station.ts | 1 + defs/error/PublicErrorCode.ts | 1 + defs/ops/StationPatch.ts | 1 + .../lib/components/Dashboard/Drawer.svelte | 4 +- .../src/lib/components/Player/Player.svelte | 197 ----------- .../admin/src/lib/components/Player/player.ts | 303 ----------------- .../src/routes/(online)/(app)/+layout.svelte | 7 - front/app/src/lib/components/Player/player.ts | 11 +- .../[account]/account-station-item.svelte | 12 +- .../[account]/stations/[station]/+page.svelte | 12 +- .../stations/create-station/+page.svelte | 1 + rs/config/constants/src/lib.rs | 3 + rs/packages/api/src/error/mod.rs | 6 + rs/packages/api/src/error/public.rs | 1 + rs/packages/api/src/routes/stations/mod.rs | 14 + .../api/src/routes/stations/now_playing.rs | 30 +- .../src/routes/stations/restart_playlist.rs | 6 + .../db/src/models/media_session/mod.rs | 4 + rs/packages/db/src/models/station/mod.rs | 18 + rs/packages/ffmpeg/src/lib.rs | 7 +- rs/packages/media-sessions/Cargo.toml | 1 + .../media-sessions/src/external_relay.rs | 317 ++++++++++++++++++ rs/packages/media-sessions/src/lib.rs | 16 +- rs/packages/media-sessions/src/playlist.rs | 2 +- .../source-alt/src/handler/metadata.rs | 1 + rs/packages/source-alt/src/handler/source.rs | 4 +- rs/packages/stream/src/lib.rs | 47 ++- 32 files changed, 499 insertions(+), 541 deletions(-) delete mode 100644 front/admin/src/lib/components/Player/Player.svelte delete mode 100644 front/admin/src/lib/components/Player/player.ts create mode 100644 rs/packages/media-sessions/src/external_relay.rs diff --git a/Cargo.lock b/Cargo.lock index 6cbd9d98..13373062 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2731,6 +2731,7 @@ dependencies = [ "constants", "db", "drop-tracer", + "ffmpeg", "futures-util", "hyper", "log", diff --git a/defs/api/stations/POST/Payload.ts b/defs/api/stations/POST/Payload.ts index 45e0b8e3..0c87e0ad 100644 --- a/defs/api/stations/POST/Payload.ts +++ b/defs/api/stations/POST/Payload.ts @@ -24,6 +24,7 @@ export type Payload = { google_play_url: string | null; app_store_url: string | null; frequencies: Array | null; + external_relay_url: string | null; user_metadata?: Metadata; system_metadata?: Metadata; }; diff --git a/defs/api/stations/[station]/now-playing/GET/Output.ts b/defs/api/stations/[station]/now-playing/GET/Output.ts index 645b7a9a..4ab0687a 100644 --- a/defs/api/stations/[station]/now-playing/GET/Output.ts +++ b/defs/api/stations/[station]/now-playing/GET/Output.ts @@ -1,8 +1,12 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. export type Output = - | ({ kind: "none" } & { start_on_connect: boolean }) + | ({ kind: "none" } & { + start_on_connect: boolean; + external_relay_url: string | null; + }) | ({ kind: "live" } & { title: string | null; artist: string | null }) + | ({ kind: "external-relay" } & { url: string }) | ({ kind: "playlist" } & { file_id: string; filename: string; diff --git a/defs/constants.ts b/defs/constants.ts index 9ea16918..cc399e8a 100644 --- a/defs/constants.ts +++ b/defs/constants.ts @@ -19,6 +19,8 @@ export const EMAIL_VERIFICATION_CODE_LEN = 6; export const EMAIL_VERIFICATION_VALIDITY_SECS = 3600; +export const EXTERNAL_RELAY_NO_LISTENERS_SHUTDOWN_DELAY_SECS = 30; + export const FORWARD_IP_HEADER = "x-openstream-forwarded-ip"; export const PAYMENTS_ACCESS_TOKEN_HEADER = "x-access-token"; diff --git a/defs/db/MediaSessionKind.ts b/defs/db/MediaSessionKind.ts index 154583c6..f73e5661 100644 --- a/defs/db/MediaSessionKind.ts +++ b/defs/db/MediaSessionKind.ts @@ -11,4 +11,5 @@ export type MediaSessionKind = last_audio_chunk_skip_parts: number; last_audio_chunk_date: DateTime; }) - | ({ kind: "live" } & { request: Request }); + | ({ kind: "live" } & { request: Request }) + | ({ kind: "external-relay" } & { url: string }); diff --git a/defs/db/Station.ts b/defs/db/Station.ts index be98a0f6..03a8eba9 100644 --- a/defs/db/Station.ts +++ b/defs/db/Station.ts @@ -32,6 +32,7 @@ export type Station = { system_metadata: Metadata; owner_deployment_info: OwnerDeploymentInfo | null; playlist_is_randomly_shuffled: boolean; + external_relay_url: string | null; source_password: string; created_at: DateTime; updated_at: DateTime; diff --git a/defs/error/PublicErrorCode.ts b/defs/error/PublicErrorCode.ts index d5ff7cd5..d8217f9a 100644 --- a/defs/error/PublicErrorCode.ts +++ b/defs/error/PublicErrorCode.ts @@ -48,6 +48,7 @@ export type PublicErrorCode = | "UNRESOLVABLE_ADMIN_ME" | "UNRESOLVABLE_USER_ME" | "PLAYLIST_START_IS_LIVE" + | "PLAYLIST_START_IS_EXTERNAL_RELAY" | "PLAYLIST_START_NO_FILES" | "RENDER_MAIL" | "SEND_MAIL" diff --git a/defs/ops/StationPatch.ts b/defs/ops/StationPatch.ts index c87f868b..d882883e 100644 --- a/defs/ops/StationPatch.ts +++ b/defs/ops/StationPatch.ts @@ -23,6 +23,7 @@ export type StationPatch = { twitch_url?: string | null; google_play_url?: string | null; app_store_url?: string | null; + external_relay_url?: string | null; user_metadata?: Metadata; system_metadata?: Metadata; }; diff --git a/front/admin/src/lib/components/Dashboard/Drawer.svelte b/front/admin/src/lib/components/Dashboard/Drawer.svelte index b5c977c4..5a92f5a8 100644 --- a/front/admin/src/lib/components/Dashboard/Drawer.svelte +++ b/front/admin/src/lib/components/Dashboard/Drawer.svelte @@ -30,8 +30,6 @@ // @ts-ignore import logo from "$share/img/logo-trans-128.png?w=64&format=webp"; - import { player_state } from "../Player/player"; - $: if(browser) { document.documentElement.classList[fixed_open ? "add" : "remove"](HTML_OPEN_CLASSNAME); } @@ -171,7 +169,7 @@
{/if} -
+
-
- -
-
-
-
- - {title} - -
- {#if subtitle} -
- {subtitle} -
- {/if} -
-
- -
- -
-
-
-{/if} \ No newline at end of file diff --git a/front/admin/src/lib/components/Player/player.ts b/front/admin/src/lib/components/Player/player.ts deleted file mode 100644 index 74a8eb2d..00000000 --- a/front/admin/src/lib/components/Player/player.ts +++ /dev/null @@ -1,303 +0,0 @@ -import { browser } from "$app/environment"; -import { default_logger } from "$share/logger"; -import { get_now_playing_store, type NowPlaying } from "$lib/now-playing"; -import { _get } from "$share/net.client"; -import { derived, get, writable } from "svelte/store"; -import { page } from "$app/stores"; - -export type PlayerState = PlayerState.Closed | PlayerState.Station | PlayerState.AudioFile; - -export type AudioState = "playing" | "loading" | "paused"; - -const logger = default_logger.scoped("player"); - -export namespace PlayerState { - export interface Base { - type: string - } - - export interface Closed extends Base { - type: "closed" - } - - export interface Station extends Base { - type: "station" - audio_state: AudioState, - station: { - _id: string, - picture_id: string, - name: string - } - } - - export interface AudioFile extends Base { - type: "track" - audio_state: AudioState, - file: import("$server/defs/db/AudioFile").AudioFile - picture_id: string, - } -} - -let audio: HTMLAudioElement | null = null; - -const now_playing = writable(null); -const readonly = { subscribe: now_playing.subscribe }; -export { readonly as player_now_playing } - -export const storage_audio_url = (station_id: string, file_id: string) => { - const base: string = get(page).data.config.storage_public_url; - return `${base}/stations/${station_id}/files/${file_id}/stream?token=${media_token()}` -} - -export const station_stream_url = (station_id: string) => { - const base = get(page).data.config.stream_public_url; - return `${base}/stream/${station_id}` -} - - -export const media_token = () => { - return get(page).data.user?.media_key ?? ""; -} - -let current_now_playing_unsub: (() => void) | null = null; -const now_playing_start = (station_id: string) => { - now_playing_stop(); - logger.info("now playing subscriber start"); - const store = get_now_playing_store(station_id); - current_now_playing_unsub = store.subscribe(v => now_playing.set(v?.info ?? null)); -} - -const now_playing_stop = () => { - now_playing.set(null); - if(current_now_playing_unsub) { - logger.info("now playing subscriber stop") - current_now_playing_unsub(); - current_now_playing_unsub = null; - } -} - - -export const pause = () => { - logger.info("pause()"); - audio?.pause(); - set_audio_state("paused"); - const $state = get(player_state); - if($state.type === "closed") {} - else if($state.type === "track") {} - else if($state.type === "station") { - logger.info("destroy tag"); - destroy_audio_tag(); - } - else assert_never($state); -} - -export const resume = () => { - const $state = get(player_state); - if($state.type === "closed") logger.warn("resume called with player_state.type === 'closed'"); - else if($state.type === "track") audio?.play(); - else if($state.type === "station") { - if($state.audio_state === "paused") { - const audio = get_audio_tag(station_stream_url($state.station._id)); - audio.play(); - } - } else assert_never($state); -} - -const player_state = writable({ type: "closed" }); -const readable_player_state = { subscribe: player_state.subscribe }; -export { readable_player_state as player_state }; - -export const player_title = derived(player_state, (state): string => { - if(state.type === "closed") return ""; - else if(state.type === "track") return state.file.metadata.title || state.file.filename; - else if(state.type === "station") return state.station.name; - else return assert_never(state); -}) - -export const player_subtitle = derived([player_state, now_playing], ([state, now_playing]): string | null => { - if(state.type === "closed") return null; - else if(state.type === "track") return state.file.metadata.artist; - else if(state.type === "station") { - if(now_playing == null) return null; - else if(now_playing.kind === "none") return null; - else if(now_playing.kind === "live") return "Live streaming"; - else if(now_playing.kind === "playlist") { - const artist = now_playing.artist; - const title = now_playing.title || now_playing.filename; - if(artist) { - return `${title} - ${artist}` - } else { - return title; - } - } - else return assert_never(now_playing) - } - else return assert_never(state) -}) - -export const player_playing_audio_file_id = derived(player_state, (state): string | null => { - if(state.type === "track") return state.file._id; - else return null; -}) - -export const player_playing_station_id = derived(player_state, (state): string | null => { - if(state.type === "station") return state.station._id; - else return null; -}) - -export const player_audio_state = derived(player_state, (state): AudioState => { - if(state.type === "closed") return "paused"; - else if(state.type === "station") return state.audio_state; - else if(state.type === "track") return state.audio_state; - else return assert_never(state); -}) - -export const play_station = (station: { _id: string, picture_id: string, name: string }) => { - if(!browser) throw new Error("player.play_station called in ssr context"); - const $state = get(player_state); - if($state.type === "station" && $state.station._id === station._id) { - resume(); - } else { - destroy_audio_tag(); - player_state.set({ - type: "station", - audio_state: "loading", - station, - }) - const audio = get_audio_tag(station_stream_url(station._id)) - audio.play().catch(e => { - logger.warn(`error playing station ${station._id} => ${e}`) - }) - - now_playing_start(station._id); - } -} - - -export const player_picture_id = derived(player_state, $player_state => { - if($player_state.type === "closed") return null; - else if($player_state.type === "station") return $player_state.station.picture_id; - else if($player_state.type === "track") return $player_state.picture_id; - else assert_never($player_state); -}) - -// we use derived to subscribe to two store at once -// we need to subscribe to the store, derived only runs if it has subscribers -derived([player_state, now_playing], ([$player_state, $now_playing]) => { - if( - $player_state.type === "station" && - //$player_state.audio_state === "paused" && - $now_playing?.kind === "none" && - $now_playing.start_on_connect === false - ) { - close(); - } -}).subscribe(() => {}) - - -export const play_track = (file: import("$server/defs/db/AudioFile").AudioFile, picture_id: string) => { - if(!browser) throw new Error("player.play_track called in ssr context"); - destroy_audio_tag(); - now_playing_stop(); - player_state.set({ - type: "track", - file, - audio_state: "loading", - picture_id, - }) - - const audio = get_audio_tag(storage_audio_url(file.station_id, file._id)); - - audio.play().catch(e => { - logger.warn(`error playing audio track ${file._id} => ${e}`); - }) -} - -export const close = () => { - destroy_audio_tag() - now_playing_stop(); - player_state.set({ type: "closed" }); -} - -const destroy_audio_tag = () => { - if(audio != null) { - audio.pause(); - audio.src = "data:audio/wav;base64,UklGRjIAAABXQVZFZm10IBIAAAABAAEAQB8AAEAfAAABAAgAAABmYWN0BAAAAAAAAABkYXRhAAAAAA=="; - } -} - -const set_audio_state = (audio_state: AudioState) => { - logger.info("set audio state", audio_state); - const $state = get(player_state); - if($state.type === "closed") return; - else if($state.type === "station") player_state.set({ ...$state, audio_state }); - else if($state.type === "track") player_state.set({ ...$state, audio_state }); - else assert_never($state); -} - -const get_audio_tag = (src: string): HTMLAudioElement => { - if(audio == null) { - - audio = new Audio(src); - - let start = Date.now(); - - const _play = audio.play; - audio.play = () => { - start = Date.now(); - return _play.call(audio) - } - - set_audio_state("loading"); - - audio.onpause = () => { - logger.info("onpause"); - set_audio_state("paused"); - } - - audio.onerror = () => { - logger.info("onerror") - set_audio_state("paused"); - } - - audio.onseeking = () => { - logger.info("onseeking") - set_audio_state("loading"); - } - - audio.onplay = () => { - logger.info("onplay") - set_audio_state("loading"); - } - - audio.onplaying = () => { - logger.info("onplaying") - set_audio_state("playing"); - } - - audio.onended = () => { - const $player_state = get(player_state); - if($player_state.type === "station") { - const src = audio?.src; - if(src != null) { - if(Date.now() - start > 5000) { - destroy_audio_tag(); - const audio = get_audio_tag(src); - audio.play(); - } - } - } - } - - return audio - - } else { - - audio.src = src; - - return audio; - - } -} - -const assert_never = (v: never): never => { throw new Error("assert never called with value:", v) } diff --git a/front/admin/src/routes/(online)/(app)/+layout.svelte b/front/admin/src/routes/(online)/(app)/+layout.svelte index cd0ff4a5..27a1f5a1 100644 --- a/front/admin/src/routes/(online)/(app)/+layout.svelte +++ b/front/admin/src/routes/(online)/(app)/+layout.svelte @@ -1,15 +1,9 @@ diff --git a/front/app/src/lib/components/Form/BooleanField.svelte b/front/app/src/lib/components/Form/BooleanField.svelte new file mode 100644 index 00000000..464b86d8 --- /dev/null +++ b/front/app/src/lib/components/Form/BooleanField.svelte @@ -0,0 +1,81 @@ + + + + +
+ +
\ No newline at end of file diff --git a/front/app/src/routes/(root)/(online)/(app)/accounts/[account]/stations/[station]/+layout.ts b/front/app/src/routes/(root)/(online)/(app)/accounts/[account]/stations/[station]/+layout.ts index 5718e499..3c3a9610 100644 --- a/front/app/src/routes/(root)/(online)/(app)/accounts/[account]/stations/[station]/+layout.ts +++ b/front/app/src/routes/(root)/(online)/(app)/accounts/[account]/stations/[station]/+layout.ts @@ -5,36 +5,27 @@ export const load = (async ({ fetch, url, parent, depends, params }) => { depends("api:stations/:id"); - let current_page: null | "dashboard" | "profile" | "playlist" | "broadcast" | "settings" = null; + let current_page: null | "dashboard" | "profile" | "playlist" | "broadcast" | "relay" | "settings" = null; const { stations, account } = await parent(); const station = stations.items.find(item => item._id === params.station); + if(station == null) { + throw error(404, { + status: 404, + code: "CLIENT_STATION_NOT_FOUND", + message: `Station with id ${params.station} does not exists or has been deleted`, + }) + } - if(station != null) { - if(station.account_id !== account._id) { - throw error(404, { - status: 404, - code: "CLIENT_STATION_ACCOUNT_MISMATCH", - message: `Station with id ${station._id} doesn't belong to this account`, - }) - } - return { station, current_page } - } - - const helper: import("$api/stations/[station]/GET/Output").Output = await load_get(`/api/stations/${params.station}`, { fetch, url }); - - if(helper.station.account_id !== account._id) { + if(station.account_id !== account._id) { throw error(404, { status: 404, code: "CLIENT_STATION_ACCOUNT_MISMATCH", - message: `Station with id ${helper.station._id} doesn't belong to this account`, + message: `Station with id ${station._id} doesn't belong to this account`, }) } - return { - station: helper.station, - current_page, - } + return { station, current_page } }) satisfies import("./$types").LayoutLoad; \ No newline at end of file diff --git a/front/app/src/routes/(root)/(online)/(app)/accounts/[account]/stations/[station]/settings/+page.svelte b/front/app/src/routes/(root)/(online)/(app)/accounts/[account]/stations/[station]/settings/+page.svelte index 2054feec..80d6ed1e 100644 --- a/front/app/src/routes/(root)/(online)/(app)/accounts/[account]/stations/[station]/settings/+page.svelte +++ b/front/app/src/routes/(root)/(online)/(app)/accounts/[account]/stations/[station]/settings/+page.svelte @@ -14,6 +14,10 @@ import Icon from "$share/Icon.svelte"; import { locale } from "$lib/locale"; import { invalidate_siblings } from "$lib/invalidate"; + import { _url } from "$share/formy/validate"; + import CircularProgress from "$share/CircularProgress.svelte"; + import { scale } from "svelte/transition"; + import BooleanField from "$lib/components/Form/BooleanField.svelte"; let delete_name_input_value = ""; @@ -44,7 +48,59 @@ throw e; } }); + + const KEY = "front_previous_external_relay_url"; + const _prev = data.station.user_metadata[KEY]; + let prev = typeof _prev === "string" ? _prev : ""; + let external_relay_enabled = data.station.external_relay_url != null; + let external_relay_url = data.station.external_relay_url || prev; + let saving_relay = false; + const save_external_relay = action(async () => { + + if(saving_relay) return; + saving_relay = true; + + prev = external_relay_url; + + try { + let payload: import("$api/stations/[station]/PATCH/Payload").Payload; + if(external_relay_enabled) { + payload = { + external_relay_url, + user_metadata: { + [KEY]: external_relay_url, + }, + } + } else { + payload = { + external_relay_url: null, + user_metadata: { + [KEY]: external_relay_url, + } + } + } + + await _patch(`/api/stations/${data.station._id}`, payload); + // TODO: locale + _message("Settings updated"); + + saving_relay = false; + } catch(e) { + saving_relay = false; + throw e; + } + }) + + + const _parent = _url({ required: true }); + const _validate_external_relay_url = (value: string | null | undefined): string | null => { + if(external_relay_enabled) { + return _parent(value); + } else { + return null; + } + } @@ -157,15 +264,51 @@
{$locale.pages["station/settings"].title}
-

{$locale.pages["station/settings"].actions.title}

+
+ +

Master relay

+ + +
+
+ + +
+
+ + + +
-
- + + + +
+ +
+

{$locale.pages["station/settings"].actions.title}

+ +
+ +
@@ -176,39 +319,60 @@ on_close={() => delete_open = false} title={$locale.pages["station/settings"].dialogs.delete_station.title.replace("@name", data.station.name)} > - -
-
- {@html $locale.pages["station/settings"].dialogs.delete_station.message_html.replaceAll("@name", data.station.name)} -
- -
- - -
+ {#if data.is_account_owner} + + +
+ {@html $locale.pages["station/settings"].dialogs.delete_station.message_html.replaceAll("@name", data.station.name)} +
+ +
+ + +
+ +
+ + + +
+ +
+ {:else} +
+ + Only account administrators can delete stations.

+ Contact the account administrators if you want to delete this station. +
- -
- - + +
+ {/if} {/if} diff --git a/rs/packages/api/src/routes/mod.rs b/rs/packages/api/src/routes/mod.rs index 07557708..be1295ba 100644 --- a/rs/packages/api/src/routes/mod.rs +++ b/rs/packages/api/src/routes/mod.rs @@ -104,6 +104,13 @@ pub fn router( .at("/auth/admin/delegate/:user") .post(auth::admin::delegate::user::post::Endpoint {}.into_handler()); + app.at("/runtime/station-deleted/:station").post( + runtime::station_deleted::station_id::post::Endpoint { + media_sessions: media_sessions.clone(), + } + .into_handler(), + ); + app.at("/runtime/source-password-updated/:station").post( runtime::source_password_updated::station_id::post::Endpoint { media_sessions: media_sessions.clone(), @@ -111,6 +118,13 @@ pub fn router( .into_handler(), ); + app.at("/runtime/external-relay-updated/:station").post( + runtime::external_relay_updated::station_id::post::Endpoint { + media_sessions: media_sessions.clone(), + } + .into_handler(), + ); + app.at("/runtime/restart-playlist/:station").post( runtime::restart_playlist::station_id::post::Endpoint { media_sessions: media_sessions.clone(), @@ -257,8 +271,20 @@ pub fn router( app .at("/stations/:station") .get(stations::id::get::Endpoint {}.into_handler()) - .delete(stations::id::delete::Endpoint {}.into_handler()) - .patch(stations::id::patch::Endpoint {}.into_handler()); + .delete( + stations::id::delete::Endpoint { + deployment_id: deployment_id.clone(), + media_sessions: media_sessions.clone(), + } + .into_handler(), + ) + .patch( + stations::id::patch::Endpoint { + deployment_id: deployment_id.clone(), + media_sessions: media_sessions.clone(), + } + .into_handler(), + ); app.at("/stations/:station/stream-stats").get( stations::stream_stats::get::Endpoint { diff --git a/rs/packages/api/src/routes/runtime/external_relay_updated/mod.rs b/rs/packages/api/src/routes/runtime/external_relay_updated/mod.rs new file mode 100644 index 00000000..d24b5d07 --- /dev/null +++ b/rs/packages/api/src/routes/runtime/external_relay_updated/mod.rs @@ -0,0 +1 @@ +pub mod station_id; diff --git a/rs/packages/api/src/routes/runtime/external_relay_updated/station_id.rs b/rs/packages/api/src/routes/runtime/external_relay_updated/station_id.rs new file mode 100644 index 00000000..4afcb3af --- /dev/null +++ b/rs/packages/api/src/routes/runtime/external_relay_updated/station_id.rs @@ -0,0 +1,77 @@ +use std::collections::btree_map::Entry; + +use crate::json::JsonHandler; +use crate::request_ext::{self, GetAccessTokenScopeError}; + +use async_trait::async_trait; +use db::station::Station; +use media_sessions::MediaSessionMap; +use mongodb::bson::doc; +use prex::Request; +use serde::{Deserialize, Serialize}; +use serde_util::empty_struct::EmptyStruct; +use std::convert::Infallible; +use ts_rs::TS; + +pub mod post { + + use super::*; + + #[derive(Debug, Clone)] + pub struct Endpoint { + pub media_sessions: MediaSessionMap, + } + + #[derive(Debug, Clone)] + pub struct Input { + station: Station, + } + + #[derive(Debug, Clone, Serialize, Deserialize, TS)] + #[ts(export)] + #[ts(export_to = "../../../defs/api/runtime/external-relay-updated/[station]/POST/")] + pub struct Output(EmptyStruct); + + #[async_trait] + impl JsonHandler for Endpoint { + type Input = Input; + type Output = Output; + type ParseError = GetAccessTokenScopeError; + type HandleError = Infallible; + + async fn parse(&self, req: Request) -> Result { + let station_id = req.param("station").unwrap(); + let access_token_scope = request_ext::get_access_token_scope(&req).await?; + let station = access_token_scope.grant_station_scope(station_id).await?; + Ok(Self::Input { station }) + } + + async fn perform(&self, input: Self::Input) -> Result { + let Self::Input { station } = input; + perform(&self.media_sessions, &station); + Ok(Output(EmptyStruct(()))) + } + } +} + +pub fn perform(media_sessions: &MediaSessionMap, station: &Station) { + let mut lock = media_sessions.write(); + match lock.entry(&station.id) { + Entry::Vacant(..) => {} + Entry::Occupied(entry) => { + let session = entry.get(); + match &station.external_relay_url { + Some(_) => { + if session.is_external_relay() || session.is_playlist() { + entry.remove(); + } + } + None => { + if session.is_external_relay() { + entry.remove(); + } + } + } + } + } +} diff --git a/rs/packages/api/src/routes/runtime/mod.rs b/rs/packages/api/src/routes/runtime/mod.rs index 2b71772f..2880202d 100644 --- a/rs/packages/api/src/routes/runtime/mod.rs +++ b/rs/packages/api/src/routes/runtime/mod.rs @@ -1,2 +1,4 @@ +pub mod external_relay_updated; pub mod restart_playlist; pub mod source_password_updated; +pub mod station_deleted; diff --git a/rs/packages/api/src/routes/runtime/station_deleted/mod.rs b/rs/packages/api/src/routes/runtime/station_deleted/mod.rs new file mode 100644 index 00000000..d24b5d07 --- /dev/null +++ b/rs/packages/api/src/routes/runtime/station_deleted/mod.rs @@ -0,0 +1 @@ +pub mod station_id; diff --git a/rs/packages/api/src/routes/runtime/station_deleted/station_id.rs b/rs/packages/api/src/routes/runtime/station_deleted/station_id.rs new file mode 100644 index 00000000..e18038bb --- /dev/null +++ b/rs/packages/api/src/routes/runtime/station_deleted/station_id.rs @@ -0,0 +1,61 @@ +use crate::json::JsonHandler; +use crate::request_ext::{self, GetAccessTokenScopeError}; + +use async_trait::async_trait; +use media_sessions::MediaSessionMap; +use mongodb::bson::doc; +use prex::Request; +use serde::{Deserialize, Serialize}; +use serde_util::empty_struct::EmptyStruct; +use std::convert::Infallible; +use ts_rs::TS; + +pub mod post { + + use super::*; + + #[derive(Debug, Clone)] + pub struct Endpoint { + pub media_sessions: MediaSessionMap, + } + + #[derive(Debug, Clone)] + pub struct Input { + station_id: String, + } + + #[derive(Debug, Clone, Serialize, Deserialize, TS)] + #[ts(export)] + #[ts(export_to = "../../../defs/api/runtime/station-deleted/[station]/POST/")] + pub struct Output(EmptyStruct); + + #[async_trait] + impl JsonHandler for Endpoint { + type Input = Input; + type Output = Output; + type ParseError = GetAccessTokenScopeError; + type HandleError = Infallible; + + async fn parse(&self, req: Request) -> Result { + let station_id = req.param("station").unwrap(); + let access_token_scope = request_ext::get_access_token_scope(&req).await?; + let station = access_token_scope + .grant_station_owner_scope(station_id) + .await?; + Ok(Self::Input { + station_id: station.id, + }) + } + + async fn perform(&self, input: Self::Input) -> Result { + let Self::Input { station_id } = input; + perform(&self.media_sessions, &station_id); + Ok(Output(EmptyStruct(()))) + } + } +} + +pub fn perform(media_sessions: &MediaSessionMap, station_id: &str) { + let mut lock = media_sessions.write(); + lock.terminate(station_id); +} diff --git a/rs/packages/api/src/routes/stations/id.rs b/rs/packages/api/src/routes/stations/id.rs index 85642588..af45453a 100644 --- a/rs/packages/api/src/routes/stations/id.rs +++ b/rs/packages/api/src/routes/stations/id.rs @@ -68,9 +68,14 @@ pub mod get { pub mod delete { + use constants::ACCESS_TOKEN_HEADER; use db::account::{Account, Limit, Limits}; use db::audio_file::AudioFile; + use db::deployment::Deployment; use db::{run_transaction, Model}; + use hyper::http::HeaderValue; + use hyper::Body; + use media_sessions::MediaSessionMap; use serde_util::DateTime; // use futures_util::TryStreamExt; use serde_util::empty_struct::EmptyStruct; @@ -80,10 +85,14 @@ pub mod delete { use super::*; #[derive(Debug, Clone)] - pub struct Endpoint {} + pub struct Endpoint { + pub deployment_id: String, + pub media_sessions: MediaSessionMap, + } #[derive(Debug, Clone)] pub struct Input { + access_token_header: Option, station_id: String, } @@ -126,16 +135,22 @@ pub mod delete { async fn parse(&self, req: Request) -> Result { let station_id = req.param("station").unwrap(); let access_token = request_ext::get_access_token_scope(&req).await?; + let access_token_header = req.headers().get(ACCESS_TOKEN_HEADER).cloned(); let station = access_token.grant_station_owner_scope(station_id).await?; + Ok(Input { + access_token_header, station_id: station.id, }) } async fn perform(&self, input: Self::Input) -> Result { - let Input { station_id } = input; + let Input { + access_token_header, + station_id, + } = input; - run_transaction!(session => { + let station = run_transaction!(session => { let station = match tx_try!(Station::get_by_id_with_session(&station_id, &mut session).await) { None => return Err(HandleError::NotFound(station_id)), @@ -181,6 +196,52 @@ pub mod delete { if result.matched_count != 1 { return Err(HandleError::AccountNotFound(station.account_id)); } + + station + }); + + let deployment_id = self.deployment_id.clone(); + let media_sessions = self.media_sessions.clone(); + tokio::spawn(async move { + match station.owner_deployment_info { + None => {} + Some(info) => { + if info.deployment_id == deployment_id { + crate::routes::runtime::station_deleted::station_id::perform( + &media_sessions, + &station.id, + ); + } else { + #[allow(clippy-clippy::collapsible_else_if)] + if let Ok(Some(deployment)) = Deployment::get_by_id(&info.deployment_id).await { + use rand::seq::SliceRandom; + let addr = deployment.local_ip; + let port = deployment.api_ports.choose(&mut rand::thread_rng()); + if let Some(port) = port { + let uri = format!( + "http://{}:{}/runtime/station-deleted/{}", + addr, port, station.id + ); + + let client = hyper::Client::default(); + let mut req = hyper::Request::builder() + .method(hyper::http::Method::POST) + .uri(uri); + + if let Some(v) = access_token_header { + if let Ok(v) = v.to_str() { + req = req.header(ACCESS_TOKEN_HEADER, v); + } + }; + + if let Ok(req) = req.body(Body::empty()) { + let _ = client.request(req).await; + } + } + } + } + } + } }); Ok(Output(EmptyStruct(()))) @@ -193,14 +254,21 @@ pub mod patch { use crate::error::ApiError; use super::*; + use constants::ACCESS_TOKEN_HEADER; use db::{ - error::ApplyPatchError, fetch_and_patch, run_transaction, station::StationPatch, Model, + deployment::Deployment, error::ApplyPatchError, fetch_and_patch, run_transaction, + station::StationPatch, Model, }; + use hyper::{http::HeaderValue, Body}; + use media_sessions::MediaSessionMap; use prex::request::ReadBodyJsonError; use validify::{ValidationErrors, Validify}; #[derive(Debug, Clone)] - pub struct Endpoint {} + pub struct Endpoint { + pub deployment_id: String, + pub media_sessions: MediaSessionMap, + } #[derive(Debug, Clone, Serialize, Deserialize, TS)] #[ts(export, export_to = "../../../defs/api/stations/[station]/PATCH/")] @@ -210,6 +278,7 @@ pub mod patch { pub struct Input { payload: Payload, access_token_scope: AccessTokenScope, + access_token_header: Option, station: Station, } @@ -278,9 +347,12 @@ pub mod patch { let payload: Payload = req.read_body_json(100_000).await?; + let access_token_header = req.headers().get(ACCESS_TOKEN_HEADER).cloned(); + Ok(Self::Input { payload, access_token_scope, + access_token_header, station, }) } @@ -289,6 +361,7 @@ pub mod patch { let Self::Input { payload: Payload(patch), access_token_scope, + access_token_header, station, } = input; @@ -296,6 +369,8 @@ pub mod patch { let patch: StationPatch = Validify::validify(patch.into())?; + let prev_external_relay_url = patch.external_relay_url.clone(); + let station = run_transaction!(session => { fetch_and_patch!(Station, station, &id, Err(HandleError::StationNotFound(id)), session, { if let Some(picture_id) = &patch.picture_id { @@ -311,6 +386,55 @@ pub mod patch { }) }); + if let Some(prev_url) = prev_external_relay_url { + if prev_url != station.external_relay_url { + let deployment_id = self.deployment_id.clone(); + let media_sessions = self.media_sessions.clone(); + let station = station.clone(); + tokio::spawn(async move { + match &station.owner_deployment_info { + None => {} + Some(info) => { + if info.deployment_id == deployment_id { + crate::routes::runtime::external_relay_updated::station_id::perform( + &media_sessions, + &station, + ); + } else { + #[allow(clippy-clippy::collapsible_else_if)] + if let Ok(Some(deployment)) = Deployment::get_by_id(&info.deployment_id).await { + use rand::seq::SliceRandom; + let addr = deployment.local_ip; + let port = deployment.api_ports.choose(&mut rand::thread_rng()); + if let Some(port) = port { + let uri = format!( + "http://{}:{}/runtime/external-relay-updated/{}", + addr, port, station.id + ); + + let client = hyper::Client::default(); + let mut req = hyper::Request::builder() + .method(hyper::http::Method::POST) + .uri(uri); + + if let Some(v) = access_token_header { + if let Ok(v) = v.to_str() { + req = req.header(ACCESS_TOKEN_HEADER, v); + } + }; + + if let Ok(req) = req.body(Body::empty()) { + let _ = client.request(req).await; + } + } + } + } + } + } + }); + } + } + let out = station.into_public(access_token_scope.as_public_scope()); Ok(Output(out)) diff --git a/rs/packages/api/src/routes/stations/mod.rs b/rs/packages/api/src/routes/stations/mod.rs index 3951f3db..5a6366b8 100644 --- a/rs/packages/api/src/routes/stations/mod.rs +++ b/rs/packages/api/src/routes/stations/mod.rs @@ -136,10 +136,11 @@ pub mod get { Some(account_id) => doc! { Station::KEY_ACCOUNT_ID: account_id }, }; - let query_active_filter = match active.unwrap_or(QueryActive::Active) { - QueryActive::Active => current_filter_doc! {}, - QueryActive::Deleted => deleted_filter_doc! {}, - QueryActive::All => doc! {}, + let query_active_filter = match active { + None => current_filter_doc! {}, + Some(QueryActive::Active) => current_filter_doc! {}, + Some(QueryActive::Deleted) => deleted_filter_doc! {}, + Some(QueryActive::All) => doc! {}, }; let sort = doc! { Station::KEY_CREATED_AT: 1 }; diff --git a/rs/packages/db/src/models/station/mod.rs b/rs/packages/db/src/models/station/mod.rs index 18440327..c9382c04 100644 --- a/rs/packages/db/src/models/station/mod.rs +++ b/rs/packages/db/src/models/station/mod.rs @@ -1,6 +1,6 @@ use self::validation::*; use crate::error::ApplyPatchError; -use crate::Model; +use crate::{current_filter_doc, Model}; use crate::{metadata::Metadata, PublicScope}; use drop_tracer::Token; use geoip::CountryCode; @@ -284,6 +284,7 @@ pub struct UserPublicStation { // misc pub playlist_is_randomly_shuffled: bool, + pub external_relay_url: Option, // auth pub source_password: String, @@ -359,7 +360,10 @@ pub struct StationPatch { #[validate(length(min = "DESC_MIN", max = "DESC_MAX"))] pub description: Option>, + #[ts(optional)] pub type_of_content: Option, + + #[ts(optional)] pub country_code: Option, // location and language @@ -562,7 +566,7 @@ impl Station { Result<(Station, DeploymentTakeDropper), Option<(Station, OwnerDeploymentInfo)>>, mongodb::error::Error, > { - let filter = doc! { + let filter = current_filter_doc! { Station::KEY_ID: station_id, }; @@ -704,6 +708,7 @@ impl From for UserPublicStation { google_play_url: station.google_play_url, playlist_is_randomly_shuffled: station.playlist_is_randomly_shuffled, + external_relay_url: station.external_relay_url, source_password: station.source_password, user_metadata: station.user_metadata, diff --git a/rs/packages/media-sessions/src/external_relay.rs b/rs/packages/media-sessions/src/external_relay.rs index 34c3bde9..7021eb2f 100644 --- a/rs/packages/media-sessions/src/external_relay.rs +++ b/rs/packages/media-sessions/src/external_relay.rs @@ -3,7 +3,8 @@ use std::time::Instant; use crate::{SendError, Transmitter}; use constants::{ - EXTERNAL_RELAY_NO_LISTENERS_SHUTDOWN_DELAY_SECS, STREAM_CHUNK_SIZE, STREAM_KBITRATE, + EXTERNAL_RELAY_NO_LISTENERS_SHUTDOWN_DELAY_SECS, STREAM_BURST_LENGTH, STREAM_CHUNK_SIZE, + STREAM_KBITRATE, }; use db::media_session::MediaSession; use db::{media_session::MediaSessionState, Model}; @@ -126,29 +127,26 @@ pub fn run_external_releay_session( async move { use stream_util::*; - let chunks = stdout - .into_bytes_stream(STREAM_CHUNK_SIZE) - .rated(STREAM_KBITRATE * 1000); - - tokio::pin!(chunks); + let mut chunks = stdout.into_bytes_stream(STREAM_CHUNK_SIZE); let mut no_listeners_since: Option = None; - loop { + // fill the burst + let mut filled_burst_len: usize = 0; + let go_on = loop { + if filled_burst_len >= STREAM_BURST_LENGTH { + break true; + } + match chunks.next().await { - None => { - // trace!("channel {id}: ffmpeg stdout end"); - break; - } - Some(Err(_e)) => { - // trace!("channel {id}: ffmpeg stdout error: {e}"); - break; - } + None => break false, + Some(Err(_e)) => break false, Some(Ok(bytes)) => { if shutdown.is_closed() { - break; + break false; } + filled_burst_len += 1; match tx.send(bytes) { Ok(_) => continue, @@ -162,7 +160,7 @@ pub fn run_external_releay_session( "shutting down external-relay for station {} (no listeners shutdown delay elapsed)", station_id ); - break; + break false; } else { continue; } @@ -173,10 +171,55 @@ pub fn run_external_releay_session( continue; } }, - Err(SendError::Terminated(_)) => break, + Err(SendError::Terminated(_)) => break false, }; } } + }; + + if go_on { + // we continue but now the stream is byte rated + let chunks = chunks.rated(STREAM_KBITRATE * 1000); + tokio::pin!(chunks); + + loop { + match chunks.next().await { + None => break, + Some(Err(_e)) => break, + Some(Ok(bytes)) => { + if shutdown.is_closed() { + break; + } + + match tx.send(bytes) { + Ok(_) => continue, + + // check if shutdown delay is elapsed + Err(SendError::NoListeners(_)) => match no_listeners_since { + Some(instant) => { + if instant.elapsed().as_secs() + > EXTERNAL_RELAY_NO_LISTENERS_SHUTDOWN_DELAY_SECS + { + info!( + "shutting down external-relay for station {} (no listeners shutdown delay elapsed)", + station_id + ); + break; + } else { + continue; + } + } + + None => { + no_listeners_since = Some(Instant::now()); + continue; + } + }, + Err(SendError::Terminated(_)) => break, + }; + } + } + } } } }; @@ -190,7 +233,7 @@ pub fn run_external_releay_session( Err(e) => { warn!( - "releay session for station {station_id}: ffmpeg child error: {} => {:?}", + "external-relay session for station {station_id}: ffmpeg child error: {} => {:?}", e, e ); return Err(LiveError::ExitIo(e)); @@ -207,9 +250,9 @@ pub fn run_external_releay_session( } }; - // trace!("channel {id}: ffmpeg child end: {exit}"); drop(dropper); + // 224 is stdout broken pipe (that happens normally when the session is cancelled) if exit.success() { Ok(()) @@ -223,18 +266,22 @@ pub fn run_external_releay_session( } else { match stderr { Err(e) => { - warn!("master-releay session {station_id}: ffmpeg exit non-zero: exit={exit} stderr_error={e}"); + warn!("external-releay session {station_id}: ffmpeg exit non-zero: exit={exit} stderr_error={e}"); Err(LiveError::StderrError(e)) // format!("internal error allocating stream converter (stderr 1)") } Ok(v) => { let stderr = String::from_utf8_lossy(v.as_ref()).to_string(); - warn!( - "master-releay session for station {station_id}: ffmpeg exit non-zero: exit={exit} stderr={stderr}" - ); - Err(LiveError::ExitNotOk { stderr }) - // format!("error converting the audio stream (exit), possibly the audio is corrupted or is using a not supported format: {out}") + // 224 happens when stdout is terminated (broken pipe) that happens normally when the session is cancelled + if exit.code() == Some(224) && stderr.contains("Broken pipe") { + Ok(()) + } else { + warn!( + "external-releay session for station {station_id}: ffmpeg exit non-zero: exit={exit} stderr={stderr}" + ); + Err(LiveError::ExitNotOk { stderr }) + } } } } @@ -305,7 +352,7 @@ impl Drop for MediaSessionDropper { if let Err(e) = db::media_session::MediaSession::update_by_id(&id, update).await { error!( - "error closing master-relay session into db, session {}, {} {:?}", + "error closing external-relay session into db, session {}, {} {:?}", id, e, e ) } diff --git a/rs/packages/media-sessions/src/lib.rs b/rs/packages/media-sessions/src/lib.rs index a59b5e19..0eac86b9 100644 --- a/rs/packages/media-sessions/src/lib.rs +++ b/rs/packages/media-sessions/src/lib.rs @@ -274,6 +274,11 @@ impl MediaSession { self.info.is_live() } + #[inline] + pub fn is_external_relay(&self) -> bool { + self.info.is_external_relay() + } + #[inline] pub fn is_playlist(&self) -> bool { self.info.is_playlist() @@ -309,6 +314,11 @@ impl MediaSessionInfo { self.kind.is_live() } + #[inline] + pub fn is_external_relay(&self) -> bool { + self.kind.is_external_relay() + } + #[inline] pub fn is_playlist(&self) -> bool { self.kind.is_playlist() @@ -344,6 +354,11 @@ impl MediaSessionKind { matches!(self, MediaSessionKind::Live { .. }) } + #[inline] + pub fn is_external_relay(&self) -> bool { + matches!(self, MediaSessionKind::ExternalRelay { .. }) + } + #[inline] pub fn is_playlist(&self) -> bool { matches!(self, MediaSessionKind::Playlist { .. }) diff --git a/rs/packages/stream/src/lib.rs b/rs/packages/stream/src/lib.rs index 15fa11c2..25685c86 100644 --- a/rs/packages/stream/src/lib.rs +++ b/rs/packages/stream/src/lib.rs @@ -15,7 +15,7 @@ use ip_counter::IpCounter; use log::*; use media_sessions::external_relay::run_external_releay_session; use media_sessions::playlist::run_playlist_session; -use media_sessions::MediaSessionMap; +use media_sessions::{MediaSessionMap, Listener}; use media_sessions::RecvError; use media_sessions::relay::run_relay_session; use mongodb::bson::doc; @@ -29,7 +29,7 @@ use std::future::Future; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::Arc; -use std::time::SystemTime; +use std::time::{SystemTime, Instant}; use transfer_map::TransferTracer; // use url::Url; @@ -381,166 +381,175 @@ impl StreamHandler { tokio::spawn(async move { - let info = OwnerDeploymentInfo { - deployment_id: deployment_id.clone(), - task_id: Station::random_owner_task_id(), - // audio/mpeg is because we'll start a playlist media session on demand - content_type: String::from("audio/mpeg"), - }; - - let (mut rx, station) = 'rx: { - let (station, dropper) = match Station::try_set_owner_deployment_info(&station_id, info, drop_tracer.token()).await? { - Err(None) => { - return Err(StreamError::StationNotFound(station_id.to_string())); - }, - - Err(Some((station, owner_info))) => { - if owner_info.deployment_id == deployment_id { - (station, None) - } else { - let account = match Account::get_by_id(&station.account_id).await? { - Some(account) => account, - None => return Err(StreamError::AccountNotFound(station.account_id)) - }; - - if account.limits.transfer.avail() == 0 { - return Err(StreamError::TransferLimit); - } - - if account.limits.listeners.avail() == 0 { - return Err(StreamError::ListenersLimit); - } - - let deployment = match Deployment::get_by_id(&owner_info.deployment_id).await? { - None => return Err(StreamError::DeploymentNotFound), - Some(doc) => doc, - }; - - use rand::seq::SliceRandom; - let stream_port = deployment.stream_ports.choose(&mut rand::thread_rng()); + async fn get_rx(deployment_id: &str, station_id: &str, req: &Request, media_sessions: &MediaSessionMap, drop_tracer: &DropTracer, shutdown: &Shutdown) -> Result<(Listener, Station), StreamError> { + let info = OwnerDeploymentInfo { + deployment_id: deployment_id.to_string(), + task_id: Station::random_owner_task_id(), + // audio/mpeg is because we'll start a playlist media session on demand + content_type: String::from("audio/mpeg"), + }; + + let (rx, station) = 'rx: { + let (station, dropper) = match Station::try_set_owner_deployment_info(station_id, info, drop_tracer.token()).await? { + Err(None) => { + return Err(StreamError::StationNotFound(station_id.to_string())); + }, + + Err(Some((station, owner_info))) => { + if owner_info.deployment_id == deployment_id { + (station, None) + } else { + let account = match Account::get_by_id(&station.account_id).await? { + Some(account) => account, + None => return Err(StreamError::AccountNotFound(station.account_id)) + }; + + if account.limits.transfer.avail() == 0 { + return Err(StreamError::TransferLimit); + } - let port = match stream_port { - None => return Err(StreamError::DeploymentNoPort), - Some(port) => *port, - }; - - let destination = SocketAddr::from((deployment.local_ip, port)); - - let client = hyper::Client::default(); - - let mut hyper_req = hyper::Request::builder() - .uri(format!("http://{}:{}/relay/{}", destination.ip(), destination.port(), station_id)); - - for (key, value) in req.headers().clone().into_iter() { - if let Some(key) = key { - hyper_req = hyper_req.header(key, value); + if account.limits.listeners.avail() == 0 { + return Err(StreamError::ListenersLimit); } - } - - hyper_req = hyper_req - .header(X_OPENSTREAM_RELAY_CODE, &owner_info.deployment_id) - .header("x-openstream-relay-remote-addr", format!("{}", req.remote_addr())) - .header("x-openstream-relay-local-addr", format!("{}", req.local_addr())) - .header("x-openstream-relay-deployment-id", &deployment_id) - .header("x-openstream-relay-target-deployment-id", &deployment_id) - .header("connection", "close"); - - let hyper_req = match hyper_req.body(Body::empty()) { - Ok(req) => req, - Err(e) => return Err(StreamError::InternalHyperCreateRequest(e)) - }; - - match client.request(hyper_req).await { - Err(e) => return Err(StreamError::InternalHyperClientRequest(e)), - Ok(hyper_res) => { - // if error return the same error to the client - if !hyper_res.status().is_success() { - return Err(StreamError::RelayStatus(hyper_res.status())); + + let deployment = match Deployment::get_by_id(&owner_info.deployment_id).await? { + None => return Err(StreamError::DeploymentNotFound), + Some(doc) => doc, + }; + + use rand::seq::SliceRandom; + let stream_port = deployment.stream_ports.choose(&mut rand::thread_rng()); + + let port = match stream_port { + None => return Err(StreamError::DeploymentNoPort), + Some(port) => *port, + }; + + let destination = SocketAddr::from((deployment.local_ip, port)); + + let client = hyper::Client::default(); + + let mut hyper_req = hyper::Request::builder() + .uri(format!("http://{}:{}/relay/{}", destination.ip(), destination.port(), station_id)); + + for (key, value) in req.headers().clone().into_iter() { + if let Some(key) = key { + hyper_req = hyper_req.header(key, value); + } + } + + hyper_req = hyper_req + .header(X_OPENSTREAM_RELAY_CODE, &owner_info.deployment_id) + .header("x-openstream-relay-remote-addr", format!("{}", req.remote_addr())) + .header("x-openstream-relay-local-addr", format!("{}", req.local_addr())) + .header("x-openstream-relay-deployment-id", deployment_id) + .header("x-openstream-relay-target-deployment-id", deployment_id) + .header("connection", "close"); + + let hyper_req = match hyper_req.body(Body::empty()) { + Ok(req) => req, + Err(e) => return Err(StreamError::InternalHyperCreateRequest(e)) + }; + + match client.request(hyper_req).await { + Err(e) => return Err(StreamError::InternalHyperClientRequest(e)), + Ok(hyper_res) => { + // if error return the same error to the client + if !hyper_res.status().is_success() { + return Err(StreamError::RelayStatus(hyper_res.status())); + } + + let tx = media_sessions.write().transmit(station_id, media_sessions::MediaSessionKind::Relay { content_type: owner_info.content_type }); + let rx = tx.subscribe(); + run_relay_session(tx, deployment_id.to_string(), owner_info.deployment_id, hyper_res, shutdown.clone(), drop_tracer.clone()); + + break 'rx (rx, station); } - - let tx = media_sessions.write().transmit(&station_id, media_sessions::MediaSessionKind::Relay { content_type: owner_info.content_type }); - let rx = tx.subscribe(); - run_relay_session(tx, deployment_id.clone(), owner_info.deployment_id, hyper_res, shutdown.clone(), drop_tracer.clone()); - - break 'rx (rx, station); } } + }, + + Ok((station, dropper)) => { + (station, Some(dropper)) } - }, - - Ok((station, dropper)) => { - (station, Some(dropper)) + }; + + let account = match Account::get_by_id(&station.account_id).await? { + Some(account) => account, + None => return Err(StreamError::AccountNotFound(station.account_id)) + }; + + if account.limits.transfer.avail() == 0 { + return Err(StreamError::TransferLimit); } - }; - let account = match Account::get_by_id(&station.account_id).await? { - Some(account) => account, - None => return Err(StreamError::AccountNotFound(station.account_id)) - }; - - if account.limits.transfer.avail() == 0 { - return Err(StreamError::TransferLimit); - } - - if account.limits.listeners.avail() == 0 { - return Err(StreamError::ListenersLimit); - } - - #[allow(clippy::collapsible_if)] - if media_sessions.read().get(&station_id).is_none() { - match &station.external_relay_url { - None => { - if !AudioFile::exists(doc! { AudioFile::KEY_STATION_ID: &station.id }).await? { - return Err(StreamError::NotStreaming(station.id)); - } - } - - Some(_) => { } + if account.limits.listeners.avail() == 0 { + return Err(StreamError::ListenersLimit); } - - }; - - let rx = { - let lock = media_sessions.upgradable_read(); - - match lock.get(&station_id) { - Some(session) => session.subscribe(), - - None => { - let mut lock = lock.upgrade(); - match &station.external_relay_url { - None => { - let tx = lock.transmit(&station_id, media_sessions::MediaSessionKind::Playlist {}); - let rx = tx.subscribe(); - let shutdown = shutdown.clone(); - let deployment_id = deployment_id.clone(); - tokio::spawn(async move { - let _ = run_playlist_session(tx, deployment_id, shutdown, drop_tracer, true).await.unwrap(); - drop(dropper); - }); - rx + + #[allow(clippy::collapsible_if)] + if media_sessions.read().get(station_id).is_none() { + match &station.external_relay_url { + None => { + if !AudioFile::exists(doc! { AudioFile::KEY_STATION_ID: &station.id }).await? { + return Err(StreamError::NotStreaming(station.id)); } - - Some(url) => { - let tx = lock.transmit(&station_id, media_sessions::MediaSessionKind::ExternalRelay); - let rx = tx.subscribe(); - let shutdown = shutdown.clone(); - let deployment_id = deployment_id.clone(); - let url = url.clone(); - tokio::spawn(async move { - let _ = run_external_releay_session(tx, deployment_id, url, shutdown, drop_tracer).await.unwrap(); - drop(dropper); - }); - rx + } + + Some(_) => { } + } + + }; + + let rx = { + let lock = media_sessions.upgradable_read(); + + match lock.get(station_id) { + Some(session) => session.subscribe(), + + None => { + let mut lock = lock.upgrade(); + match &station.external_relay_url { + None => { + let tx = lock.transmit(station_id, media_sessions::MediaSessionKind::Playlist {}); + let rx = tx.subscribe(); + let shutdown = shutdown.clone(); + let deployment_id = deployment_id.to_string(); + let drop_tracer = drop_tracer.clone(); + tokio::spawn(async move { + let _ = run_playlist_session(tx, deployment_id, shutdown, drop_tracer, true).await.unwrap(); + drop(dropper); + }); + rx + } + + Some(url) => { + let tx = lock.transmit(station_id, media_sessions::MediaSessionKind::ExternalRelay); + let rx = tx.subscribe(); + let shutdown = shutdown.clone(); + let deployment_id = deployment_id.to_string(); + let drop_tracer = drop_tracer.clone(); + let url = url.clone(); + + tokio::spawn(async move { + let _ = run_external_releay_session(tx, deployment_id, url, shutdown, drop_tracer).await.unwrap(); + drop(dropper); + }); + rx + } } } } - } + }; + + (rx, station) }; - (rx, station) - }; + Ok((rx, station)) + } + let (rx, station) = get_rx(&deployment_id, &station_id, &req, &media_sessions, &drop_tracer, &shutdown).await?; + let content_type = rx.info().kind().content_type().to_string(); let conn_doc = { @@ -550,7 +559,7 @@ impl StreamHandler { StreamConnection { id: StreamConnection::uid(), station_id: station.id, - deployment_id, + deployment_id: deployment_id.clone(), is_open: true, ip: request.real_ip, country_code: request.country_code, @@ -590,44 +599,77 @@ impl StreamHandler { tokio::spawn({ let shutdown = shutdown.clone(); let transfer_map = transfer_map.clone(); + let mut rx = rx; - async move { - loop { - let r = rx.recv().await; + let mut start = Instant::now(); + let mut rx_had_data = false; - if shutdown.is_closed() { - break; - } + async move { + 'root: loop { + 'recv: loop { + let r = rx.recv().await; - match r { - // if lagged we ignore the error and continue with the oldest message buffered in the channel - // TODO: maybe we should advance to the newest message with stream.resubscribe() - Err(RecvError::Lagged(_)) => continue, + if shutdown.is_closed() { + break 'root; + } - // Here the channel has been dropped - Err(RecvError::Closed) => break, + match r { + // if lagged we ignore the error and continue with the oldest message buffered in the channel + // TODO: maybe we should advance to the newest message with stream.resubscribe() + Err(RecvError::Lagged(_)) => continue 'recv, - // Receive bytes and pass it to response body - Ok(bytes) => { - if shutdown.is_closed() { - break; - } + // Here the channel has been dropped + Err(RecvError::Closed) => break 'recv, - let len = bytes.len(); - match body_sender.send_data(bytes).await { - Err(_) => break, - Ok(()) => { - transfer_map.increment(&station.account_id, len); - transfer_bytes.fetch_add(len as u64, Ordering::SeqCst); + // Receive bytes and pass it to response body + Ok(bytes) => { + if shutdown.is_closed() { + break 'root; } - }; + + rx_had_data = true; + + let len = bytes.len(); + match body_sender.send_data(bytes).await { + Err(_) => break 'root, + Ok(()) => { + transfer_map.increment(&station.account_id, len); + transfer_bytes.fetch_add(len as u64, Ordering::SeqCst); + } + }; + } } } + + if shutdown.is_closed() { + break 'root; + } + + // if the connection had last < 5 secs + // or had no data we abort to + // avoid creating infinite loops here + if start.elapsed().as_secs() > 5 && rx_had_data { + start = Instant::now(); + rx_had_data = false; + let (new_rx, _) = get_rx( + &deployment_id, + &station_id, + &req, + &media_sessions, + &drop_tracer, + &shutdown, + ).await?; + rx = new_rx; + + continue 'root; + } } closed.store(true, Ordering::SeqCst); drop(connection_dropper); drop(ip_decrementer); + + Ok::<(), StreamError>(()) } }); From 488444abd52feed389f7b8987edafb1eb9fa6cfb Mon Sep 17 00:00:00 2001 From: ramiroaisen <52116153+ramiroaisen@users.noreply.github.com> Date: Sat, 24 Jun 2023 16:50:40 -0300 Subject: [PATCH 3/3] fix: station patch payload --- .../[account]/stations/[station]/profile/+page.svelte | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/front/app/src/routes/(root)/(online)/(app)/accounts/[account]/stations/[station]/profile/+page.svelte b/front/app/src/routes/(root)/(online)/(app)/accounts/[account]/stations/[station]/profile/+page.svelte index c9a71204..19f2fa84 100644 --- a/front/app/src/routes/(root)/(online)/(app)/accounts/[account]/stations/[station]/profile/+page.svelte +++ b/front/app/src/routes/(root)/(online)/(app)/accounts/[account]/stations/[station]/profile/+page.svelte @@ -65,8 +65,8 @@ const payload: import("$api/stations/[station]/PATCH/Payload").Payload = { ...dif, name, - type_of_content: type_of_content ?? null, - country_code: country_code ?? null, + type_of_content: type_of_content , + country_code: country_code, picture_id, }