Skip to content

Commit

Permalink
feat: add healthcheck to media sessions and stations (#179)
Browse files Browse the repository at this point in the history
  • Loading branch information
ramiroaisen authored Sep 23, 2023
2 parents 1451651 + 6f24b86 commit 6352767
Show file tree
Hide file tree
Showing 42 changed files with 600 additions and 297 deletions.
46 changes: 29 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@

[workspace]

resolver = "2"

members = [
"rs/bin/openstream",

Expand Down
13 changes: 13 additions & 0 deletions defs/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ export const EXTERNAL_RELAY_NO_LISTENERS_SHUTDOWN_DELAY_SECS = 60;
/** Internal forwarded ip header used when openstream servers are connecting with each other */
export const FORWARD_IP_HEADER = "x-openstream-forwarded-ip";

/** interval in which
* $stations.owner_deployment_info.health_checked_at
* and $media_session.health_checked_at
* will be set to $NOW */
export const MEDIA_SESSION_HEALTH_CHECK_INTERVAL_SECS = 6;

/** interval to check if $stations.owner_deployment_info and $media_sessions are healthy */
export const MEDIA_SESSION_HEALTH_CHECK_KILL_INTERVAL_SECS = 5;

/** time to check if a $media_session (and $station.owner_deployment_info) is healthy
* otherwise kill it in database */
export const MEDIA_SESSION_HEALTH_SHUTDOWN_TIMEOUT_SECS = 33;

/** Access token header used by payments servers implementations */
export const PAYMENTS_ACCESS_TOKEN_HEADER = "x-access-token";

Expand Down
1 change: 1 addition & 0 deletions defs/db/MediaSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export type MediaSession = {
transfer_bytes: number;
closed_at: DateTime | null;
duration_ms: number | null;
health_checked_at: DateTime | null;
created_at: DateTime;
updated_at: DateTime;
} & MediaSessionKind;
2 changes: 2 additions & 0 deletions defs/db/OwnerDeploymentInfo.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DateTime } from "../DateTime";

export type OwnerDeploymentInfo = {
deployment_id: string;
task_id: string;
content_type: string;
health_checked_at: DateTime | null;
};
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
padding-inline: min(20%, max(12rem, 10%));
padding-block: 6.5rem;
position: relative;
background-image: url("$lib/img/login-bg.jpg");
background-image: url($lib/img/login-bg.jpg);
background-size: cover;
background-position: center;
--field-container-bg: rgba(0,0,0,0);
Expand Down
3 changes: 2 additions & 1 deletion rs/bin/openstream/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use log::*;
use anyhow::{bail, Context};
use api::ApiServer;
use defer_lite::defer;
use media_sessions::MediaSessionMap;
use media_sessions::{MediaSessionMap, healthcheck};
use mongodb::bson::doc;
use mongodb::bson::Document;
use serde_util::DateTime;
Expand Down Expand Up @@ -565,6 +565,7 @@ async fn start_async(Start { config }: Start) -> Result<(), anyhow::Error> {

tokio::spawn(db::deployment::start_health_check_job(deployment_id.clone()));
tokio::spawn(db::station_picture::upgrade_images_if_needed());
tokio::spawn(healthcheck::health_shutdown_job());

tokio::spawn({
let shutdown = shutdown.clone();
Expand Down
16 changes: 16 additions & 0 deletions rs/config/constants/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,22 @@ pub const STATION_PICTURES_VERSION: f64 = 5.0;
#[const_register]
pub const DEPLOYMENT_HEALTH_CHECK_INTERVAL_SECS: u16 = 1;

/// interval in which
/// $stations.owner_deployment_info.health_checked_at
/// and $media_session.health_checked_at
/// will be set to $NOW
#[const_register]
pub const MEDIA_SESSION_HEALTH_CHECK_INTERVAL_SECS: u16 = 6;

/// time to check if a $media_session (and $station.owner_deployment_info) is healthy
/// otherwise kill it in database
#[const_register]
pub const MEDIA_SESSION_HEALTH_SHUTDOWN_TIMEOUT_SECS: u16 = 33;

/// interval to check if $stations.owner_deployment_info and $media_sessions are healthy
#[const_register]
pub const MEDIA_SESSION_HEALTH_CHECK_KILL_INTERVAL_SECS: u16 = 5;

/// validation constants
pub mod validate {
use super::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use time::Duration;
fn random_os() -> Option<String> {
use rand::seq::SliceRandom; // 0.7.2

#[allow(clippy::useless_vec)]
let vs = vec![
Some("Linux"),
Some("Android"),
Expand All @@ -27,6 +28,7 @@ fn random_os() -> Option<String> {
fn random_browser() -> Option<String> {
use rand::seq::SliceRandom; // 0.7.2

#[allow(clippy::useless_vec)]
let vs = vec![
Some("Chrome"),
Some("Safari"),
Expand Down
2 changes: 1 addition & 1 deletion rs/packages/api/src/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub trait JsonHandler: Send + Sync + Sized + Clone + 'static {
false
}

fn cookies<'a, 'b>(&'a self, _output: &'b Self::Output) -> Vec<cookie::Cookie> {
fn cookies(&self, _output: &Self::Output) -> Vec<cookie::Cookie> {
vec![]
}

Expand Down
1 change: 1 addition & 0 deletions rs/packages/api/src/routes/auth/user/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ pub mod post {
.await
.map_err(HandleError::PaymentSavePaymentMethod)?;

#[allow(clippy::let_and_return)]
payment_method_response
};

Expand Down
1 change: 1 addition & 0 deletions rs/packages/api/src/routes/invitations/reject.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ pub mod post {
Some(invitation) => invitation,
};

#[allow(clippy::let_and_return)]
invitation
}
};
Expand Down
1 change: 1 addition & 0 deletions rs/packages/api/src/routes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub fn router(

app.at("/runtime/restart-playlist/:station").post(
runtime::restart_playlist::station_id::post::Endpoint {
deployment_id: deployment_id.clone(),
media_sessions: media_sessions.clone(),
drop_tracer: drop_tracer.clone(),
shutdown: shutdown.clone(),
Expand Down
1 change: 1 addition & 0 deletions rs/packages/api/src/routes/payment_methods/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ pub mod post {
.await
.map_err(HandleError::PaymentSavePaymentMethod)?;

#[allow(clippy::let_and_return)]
payment_method_response
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod post {

#[derive(Debug, Clone)]
pub struct Endpoint {
pub deployment_id: String,
pub media_sessions: MediaSessionMap,
pub drop_tracer: DropTracer,
pub shutdown: Shutdown,
Expand Down Expand Up @@ -70,7 +71,14 @@ pub mod post {
async fn perform(&self, input: Self::Input) -> Result<Self::Output, Self::HandleError> {
let Self::Input { station_id } = input;
let mut lock = self.media_sessions.write();
let _ = lock.restart(&station_id, self.shutdown.clone(), self.drop_tracer.clone());
let _ = lock
.restart(
station_id.to_string(),
self.deployment_id.to_string(),
self.shutdown.clone(),
self.drop_tracer.clone(),
)
.await;
Ok(Output(EmptyStruct(())))
}
}
Expand Down
9 changes: 8 additions & 1 deletion rs/packages/api/src/routes/stations/restart_playlist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,14 @@ pub mod post {
MediaSessionKind::Playlist { .. } => {
if media_session.deployment_id == self.deployment_id {
let mut lock = self.media_sessions.write();
let _ = lock.restart(&station.id, self.shutdown.clone(), self.drop_tracer.clone());
let _ = lock
.restart(
self.deployment_id.clone(),
station.id,
self.shutdown.clone(),
self.drop_tracer.clone(),
)
.await;
} else {
#[allow(clippy::collapsible_else_if)]
if let Some(deployment) = Deployment::get_by_id(&media_session.deployment_id).await? {
Expand Down
2 changes: 1 addition & 1 deletion rs/packages/db/src/models/deployment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct Deployment {
pub created_at: DateTime,
pub updated_at: DateTime,

// TODO: this Option is for back compat only
// TODO: this Option<> is for back compat only
// create a migration and change this to DateTime
pub health_checked_at: Option<DateTime>,

Expand Down
5 changes: 5 additions & 0 deletions rs/packages/db/src/models/media_session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ pub struct MediaSession {
#[serde(with = "serde_util::as_f64::option")]
pub duration_ms: Option<u64>,

// TODO: this Option<> is for back compat only
// create a migration and change this to DateTime
pub health_checked_at: Option<DateTime>,

pub created_at: DateTime,
pub updated_at: DateTime,
}
Expand Down Expand Up @@ -182,6 +186,7 @@ mod test {
},
now_playing: None,
state: MediaSessionState::Closed,
health_checked_at: Some(DateTime::now()),
closed_at: Some(DateTime::now()),
duration_ms: Some(100),
};
Expand Down
4 changes: 4 additions & 0 deletions rs/packages/db/src/models/station/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ pub struct OwnerDeploymentInfo {
pub deployment_id: String,
pub task_id: String,
pub content_type: String,

// this Option<> is for backwards compatibility
// it should be removed in the future
pub health_checked_at: Option<DateTime>,
}

impl From<OwnerDeploymentInfo> for Bson {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result<Analytics, mongodb::

macro_rules! add {
($acc:ident, $key:expr) => {
let mut item = $acc.entry($key).or_default();
let item = $acc.entry($key).or_default();
item.sessions += 1;
item.ips.insert(conn.ip);
item.total_duration_ms += conn_duration_ms;
Expand Down
Loading

0 comments on commit 6352767

Please sign in to comment.