Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add healthcheck to media sessions and stations #179

Merged
merged 1 commit into from
Sep 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 29 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@

[workspace]

resolver = "2"

members = [
"rs/bin/openstream",

Expand Down
13 changes: 13 additions & 0 deletions defs/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ export const EXTERNAL_RELAY_NO_LISTENERS_SHUTDOWN_DELAY_SECS = 60;
/** Internal forwarded ip header used when openstream servers are connecting with each other */
export const FORWARD_IP_HEADER = "x-openstream-forwarded-ip";

/** interval in which
* $stations.owner_deployment_info.health_checked_at
* and $media_session.health_checked_at
* will be set to $NOW */
export const MEDIA_SESSION_HEALTH_CHECK_INTERVAL_SECS = 6;

/** interval to check if $stations.owner_deployment_info and $media_sessions are healthy */
export const MEDIA_SESSION_HEALTH_CHECK_KILL_INTERVAL_SECS = 5;

/** time to check if a $media_session (and $station.owner_deployment_info) is healthy
* otherwise kill it in database */
export const MEDIA_SESSION_HEALTH_SHUTDOWN_TIMEOUT_SECS = 33;

/** Access token header used by payments servers implementations */
export const PAYMENTS_ACCESS_TOKEN_HEADER = "x-access-token";

Expand Down
1 change: 1 addition & 0 deletions defs/db/MediaSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export type MediaSession = {
transfer_bytes: number;
closed_at: DateTime | null;
duration_ms: number | null;
health_checked_at: DateTime | null;
created_at: DateTime;
updated_at: DateTime;
} & MediaSessionKind;
2 changes: 2 additions & 0 deletions defs/db/OwnerDeploymentInfo.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { DateTime } from "../DateTime";

export type OwnerDeploymentInfo = {
deployment_id: string;
task_id: string;
content_type: string;
health_checked_at: DateTime | null;
};
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
padding-inline: min(20%, max(12rem, 10%));
padding-block: 6.5rem;
position: relative;
background-image: url("$lib/img/login-bg.jpg");
background-image: url($lib/img/login-bg.jpg);
background-size: cover;
background-position: center;
--field-container-bg: rgba(0,0,0,0);
Expand Down
3 changes: 2 additions & 1 deletion rs/bin/openstream/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use log::*;
use anyhow::{bail, Context};
use api::ApiServer;
use defer_lite::defer;
use media_sessions::MediaSessionMap;
use media_sessions::{MediaSessionMap, healthcheck};
use mongodb::bson::doc;
use mongodb::bson::Document;
use serde_util::DateTime;
Expand Down Expand Up @@ -565,6 +565,7 @@ async fn start_async(Start { config }: Start) -> Result<(), anyhow::Error> {

tokio::spawn(db::deployment::start_health_check_job(deployment_id.clone()));
tokio::spawn(db::station_picture::upgrade_images_if_needed());
tokio::spawn(healthcheck::health_shutdown_job());

tokio::spawn({
let shutdown = shutdown.clone();
Expand Down
16 changes: 16 additions & 0 deletions rs/config/constants/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,22 @@ pub const STATION_PICTURES_VERSION: f64 = 5.0;
#[const_register]
pub const DEPLOYMENT_HEALTH_CHECK_INTERVAL_SECS: u16 = 1;

/// interval in which
/// $stations.owner_deployment_info.health_checked_at
/// and $media_session.health_checked_at
/// will be set to $NOW
#[const_register]
pub const MEDIA_SESSION_HEALTH_CHECK_INTERVAL_SECS: u16 = 6;

/// time to check if a $media_session (and $station.owner_deployment_info) is healthy
/// otherwise kill it in database
#[const_register]
pub const MEDIA_SESSION_HEALTH_SHUTDOWN_TIMEOUT_SECS: u16 = 33;

/// interval to check if $stations.owner_deployment_info and $media_sessions are healthy
#[const_register]
pub const MEDIA_SESSION_HEALTH_CHECK_KILL_INTERVAL_SECS: u16 = 5;

/// validation constants
pub mod validate {
use super::*;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use time::Duration;
fn random_os() -> Option<String> {
use rand::seq::SliceRandom; // 0.7.2

#[allow(clippy::useless_vec)]
let vs = vec![
Some("Linux"),
Some("Android"),
Expand All @@ -27,6 +28,7 @@ fn random_os() -> Option<String> {
fn random_browser() -> Option<String> {
use rand::seq::SliceRandom; // 0.7.2

#[allow(clippy::useless_vec)]
let vs = vec![
Some("Chrome"),
Some("Safari"),
Expand Down
2 changes: 1 addition & 1 deletion rs/packages/api/src/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub trait JsonHandler: Send + Sync + Sized + Clone + 'static {
false
}

fn cookies<'a, 'b>(&'a self, _output: &'b Self::Output) -> Vec<cookie::Cookie> {
fn cookies(&self, _output: &Self::Output) -> Vec<cookie::Cookie> {
vec![]
}

Expand Down
1 change: 1 addition & 0 deletions rs/packages/api/src/routes/auth/user/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ pub mod post {
.await
.map_err(HandleError::PaymentSavePaymentMethod)?;

#[allow(clippy::let_and_return)]
payment_method_response
};

Expand Down
1 change: 1 addition & 0 deletions rs/packages/api/src/routes/invitations/reject.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ pub mod post {
Some(invitation) => invitation,
};

#[allow(clippy::let_and_return)]
invitation
}
};
Expand Down
1 change: 1 addition & 0 deletions rs/packages/api/src/routes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub fn router(

app.at("/runtime/restart-playlist/:station").post(
runtime::restart_playlist::station_id::post::Endpoint {
deployment_id: deployment_id.clone(),
media_sessions: media_sessions.clone(),
drop_tracer: drop_tracer.clone(),
shutdown: shutdown.clone(),
Expand Down
1 change: 1 addition & 0 deletions rs/packages/api/src/routes/payment_methods/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ pub mod post {
.await
.map_err(HandleError::PaymentSavePaymentMethod)?;

#[allow(clippy::let_and_return)]
payment_method_response
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub mod post {

#[derive(Debug, Clone)]
pub struct Endpoint {
pub deployment_id: String,
pub media_sessions: MediaSessionMap,
pub drop_tracer: DropTracer,
pub shutdown: Shutdown,
Expand Down Expand Up @@ -70,7 +71,14 @@ pub mod post {
async fn perform(&self, input: Self::Input) -> Result<Self::Output, Self::HandleError> {
let Self::Input { station_id } = input;
let mut lock = self.media_sessions.write();
let _ = lock.restart(&station_id, self.shutdown.clone(), self.drop_tracer.clone());
let _ = lock
.restart(
station_id.to_string(),
self.deployment_id.to_string(),
self.shutdown.clone(),
self.drop_tracer.clone(),
)
.await;
Ok(Output(EmptyStruct(())))
}
}
Expand Down
9 changes: 8 additions & 1 deletion rs/packages/api/src/routes/stations/restart_playlist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,14 @@ pub mod post {
MediaSessionKind::Playlist { .. } => {
if media_session.deployment_id == self.deployment_id {
let mut lock = self.media_sessions.write();
let _ = lock.restart(&station.id, self.shutdown.clone(), self.drop_tracer.clone());
let _ = lock
.restart(
self.deployment_id.clone(),
station.id,
self.shutdown.clone(),
self.drop_tracer.clone(),
)
.await;
} else {
#[allow(clippy::collapsible_else_if)]
if let Some(deployment) = Deployment::get_by_id(&media_session.deployment_id).await? {
Expand Down
2 changes: 1 addition & 1 deletion rs/packages/db/src/models/deployment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub struct Deployment {
pub created_at: DateTime,
pub updated_at: DateTime,

// TODO: this Option is for back compat only
// TODO: this Option<> is for back compat only
// create a migration and change this to DateTime
pub health_checked_at: Option<DateTime>,

Expand Down
5 changes: 5 additions & 0 deletions rs/packages/db/src/models/media_session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ pub struct MediaSession {
#[serde(with = "serde_util::as_f64::option")]
pub duration_ms: Option<u64>,

// TODO: this Option<> is for back compat only
// create a migration and change this to DateTime
pub health_checked_at: Option<DateTime>,

pub created_at: DateTime,
pub updated_at: DateTime,
}
Expand Down Expand Up @@ -182,6 +186,7 @@ mod test {
},
now_playing: None,
state: MediaSessionState::Closed,
health_checked_at: Some(DateTime::now()),
closed_at: Some(DateTime::now()),
duration_ms: Some(100),
};
Expand Down
4 changes: 4 additions & 0 deletions rs/packages/db/src/models/station/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,10 @@ pub struct OwnerDeploymentInfo {
pub deployment_id: String,
pub task_id: String,
pub content_type: String,

// this Option<> is for backwards compatibility
// it should be removed in the future
pub health_checked_at: Option<DateTime>,
}

impl From<OwnerDeploymentInfo> for Bson {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ pub async fn get_analytics(query: AnalyticsQuery) -> Result<Analytics, mongodb::

macro_rules! add {
($acc:ident, $key:expr) => {
let mut item = $acc.entry($key).or_default();
let item = $acc.entry($key).or_default();
item.sessions += 1;
item.ips.insert(conn.ip);
item.total_duration_ms += conn_duration_ms;
Expand Down
Loading