Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat source alt #215

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ members = [
"rs/packages/ip",
"rs/packages/config",
"rs/packages/stream",
"rs/packages/stream-alt",
"rs/packages/source-alt",
"rs/packages/stream",
"rs/packages/assets",
Expand Down
3 changes: 2 additions & 1 deletion rs/bin/openstream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ mailer = { version = "0.1.0", path = "../../packages/mailer" }
payments = { version = "0.1.0", path = "../../packages/payments" }
assets = { version = "0.1.0", path = "../../packages/assets" }
console-subscriber = { version = "0.1.9", features = ["parking_lot"] }
media = { version = "0.1.0", path = "../../packages/media" }
media = { version = "0.1.0", path = "../../packages/media" }
stream-alt = { version = "0.1.0", path = "../../packages/stream-alt" }
4 changes: 3 additions & 1 deletion rs/bin/openstream/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use mongodb::bson::doc;
use mongodb::bson::Document;
use serde_util::DateTime;
use shutdown::Shutdown;
use stream::StreamServer;
// use stream::StreamServer;
use stream_alt::StreamServer;
use assets::StaticServer;
use tokio::runtime::Runtime;

Expand Down Expand Up @@ -493,6 +494,7 @@ async fn start_async(Start { config }: Start) -> Result<(), anyhow::Error> {
drop_tracer.clone(),
media_sessions.clone(),
);

let fut = stream.start()?;
futs.push(async move {
fut.await.map_err(crate::error::ServerStartError::from)?;
Expand Down
4 changes: 3 additions & 1 deletion rs/packages/media/src/handle/external_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::{Duration, Instant};
use db::media_session::MediaSession;
use db::{media_session::MediaSessionState, Model};
use drop_tracer::{DropTracer, Token};
use ffmpeg::{Ffmpeg, FfmpegConfig, FfmpegSpawn};
use ffmpeg::{Ffmpeg, FfmpegConfig, FfmpegSpawn, Format};
use futures_util::StreamExt;
use log::*;
use mongodb::bson::doc;
Expand Down Expand Up @@ -84,6 +84,8 @@ pub fn run_external_relay_source(
kbitrate: STREAM_KBITRATE,
readrate: true,
readrate_initial_burst: STREAM_BURST_LENGTH as f64,
// audio/mpeg
format: Format::MP3,
..FfmpegConfig::default()
};

Expand Down
1 change: 1 addition & 0 deletions rs/packages/media/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ impl MediaSessionMap {
task_id.clone(),
"audio/mpeg".to_string(),
);

sender = Sender::new(station_id.to_string(), info);

{
Expand Down
12 changes: 12 additions & 0 deletions rs/packages/prex/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub fn is_trusted_ip(ip: IpAddr) -> bool {
pub struct Parts {
pub local_addr: SocketAddr,
pub remote_addr: SocketAddr,
pub proxy_protocol_ip: Option<IpAddr>,
pub method: Method,
pub uri: Uri,
pub version: Version,
Expand All @@ -36,6 +37,7 @@ pub struct Parts {
pub struct Request {
pub(crate) local_addr: SocketAddr,
pub(crate) remote_addr: SocketAddr,
pub(crate) proxy_protocol_ip: Option<IpAddr>,
pub(crate) method: Method,
pub(crate) uri: Uri,
pub(crate) version: Version,
Expand Down Expand Up @@ -71,6 +73,7 @@ impl Request {
Self {
local_addr: parts.local_addr,
remote_addr: parts.remote_addr,
proxy_protocol_ip: parts.proxy_protocol_ip,
method: parts.method,
uri: parts.uri,
headers: parts.headers,
Expand All @@ -86,6 +89,7 @@ impl Request {
Parts {
local_addr: self.local_addr,
remote_addr: self.remote_addr,
proxy_protocol_ip: self.proxy_protocol_ip,
method: self.method,
uri: self.uri,
headers: self.headers,
Expand Down Expand Up @@ -202,6 +206,12 @@ impl Request {
pub fn isomorphic_ip(&self) -> IpAddr {
let mut ip = self.remote_addr().ip();

if is_trusted_ip(ip) {
if let Some(proxy_ip) = self.proxy_protocol_ip {
ip = proxy_ip;
}
}

if is_trusted_ip(ip) {
if let Some(v) = self.headers().get(X_REAL_IP) {
if let Ok(v) = v.to_str() {
Expand Down Expand Up @@ -312,6 +322,7 @@ mod tests {
Parts {
local_addr: SocketAddr::from_str("127.0.0.1:8080").unwrap(),
remote_addr: SocketAddr::from_str("127.0.0.1:12345").unwrap(),
proxy_protocol_ip: None,
method: Method::GET,
uri: Uri::from_static("http://localhost"),
version: Version::HTTP_11,
Expand Down Expand Up @@ -419,6 +430,7 @@ mod tests {
let parts = Parts {
local_addr: request.local_addr,
remote_addr: request.remote_addr,
proxy_protocol_ip: None,
method: request.method,
uri: request.uri,
version: request.version,
Expand Down
1 change: 1 addition & 0 deletions rs/packages/prex/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ impl Service<hyper::Request<Body>> for RouterService {
let request = Request::from_parts(RequestParts {
local_addr,
remote_addr,
proxy_protocol_ip: None,
method: parts.method,
uri: parts.uri,
headers: parts.headers,
Expand Down
40 changes: 40 additions & 0 deletions rs/packages/stream-alt/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
[package]
name = "stream-alt"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
# channels = { path = "../channels" }
bytes = "1.2.1"
hyper = { version = "0.14.27", features = ["full"] }
prex = { version = "0.1.0", path = "../prex" }
tokio = { version = "1.29.0", features = ["full"] }
log = "0.4.17"
owo-colors = { version = "3.5.0", path = "../owo-colors" }
async-trait = "0.1.58"
drop-tracer = { version = "0.1.0", path = "../drop-tracer" }
shutdown = { version = "0.1.0", path = "../shutdown" }
serde_json = { version = "1.0", features = ["preserve_order"] }
serde = "1.0.147"
futures = "0.3.25"
http = { version = "0.1.0", path = "../http" }
db = { version = "0.1.0", path = "../db" }
serde-util = { version = "0.1.0", path = "../serde-util" }
mongodb = "2.7.0"
parking_lot = "0.12.1"
constants = { version = "0.1.0", path = "../../config/constants" }
thiserror = "1.0.38"
socket2 = "0.4.7"
ip-counter = { version = "0.1.0", path = "../ip-counter" }
ip_rfc = "0.1.0"
defer = "0.1.0"
url = "2.3.1"
rand = "0.8.5"
media = { version = "0.1.0", path = "../media" }
mp3 = { version = "0.3.4", path = "../mp3" }
proxy-protocol = { version = "0.1.0", path = "../proxy-protocol" }

[features]

99 changes: 99 additions & 0 deletions rs/packages/stream-alt/src/http/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::fmt::Display;

use hyper::http::uri::InvalidUri;

#[derive(Debug)]
pub enum ReadHeadError {
Io(std::io::Error),
Hyper(hyper::Error),
InvalidUri(InvalidUri),
SizeExceeded,
NoHeadLine,
NoMethod,
InvalidMethod,
NoUri,
// NoVersion,
InvalidVersion(String),
VersionMethodMismatch,
}

impl Display for ReadHeadError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(inner) => inner.fmt(f),
Self::Hyper(inner) => inner.fmt(f),
Self::InvalidUri(_) => write!(f, "invalid uri"),
Self::SizeExceeded => write!(f, "request head size exceeded"),
Self::NoHeadLine => write!(f, "request head doesn't have a head line"),
Self::NoMethod => write!(f, "request method not found"),
Self::InvalidMethod => write!(f, "request method is invalid"),
Self::NoUri => write!(f, "request uri not found"),
// Self::NoVersion => write!(f, "request version not found"),
Self::InvalidVersion(ver) => write!(f, "request version is invalid, version = '{ver}'"),
Self::VersionMethodMismatch => write!(
f,
"request version and method mismatch, HTTP/0.9 only allows GET requests"
),
}
}
}

impl std::error::Error for ReadHeadError {
fn cause(&self) -> Option<&dyn std::error::Error> {
match self {
Self::Io(inner) => Some(inner),
Self::Hyper(inner) => Some(inner),
_ => None,
}
}
}

impl From<std::io::Error> for ReadHeadError {
fn from(inner: std::io::Error) -> Self {
Self::Io(inner)
}
}

impl From<InvalidUri> for ReadHeadError {
fn from(inner: InvalidUri) -> Self {
Self::InvalidUri(inner)
}
}

impl From<hyper::Error> for ReadHeadError {
fn from(inner: hyper::Error) -> Self {
Self::Hyper(inner)
}
}

// #[derive(Debug)]
// pub enum WriteHeadError {
// Io(std::io::Error),
// SizeExceeded,
// UnsupportedVersion,
// }

// impl Display for WriteHeadError {
// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// match self {
// Self::Io(inner) => inner.fmt(f),
// Self::SizeExceeded => write!(f, "Response head size excedded"),
// Self::UnsupportedVersion => write!(f, "Response write, unsopported (non 1.0) version"),
// }
// }
// }

// impl std::error::Error for WriteHeadError {
// fn cause(&self) -> Option<&dyn std::error::Error> {
// match self {
// Self::Io(inner) => Some(inner),
// _ => None,
// }
// }
// }

// impl From<std::io::Error> for WriteHeadError {
// fn from(inner: std::io::Error) -> Self {
// Self::Io(inner)
// }
// }
Loading
Loading