Skip to content

Commit

Permalink
Initial moq-transport-01 support (#115)
Browse files Browse the repository at this point in the history
Co-authored-by: Mike English <[email protected]>
  • Loading branch information
kixelated and englishm authored Nov 3, 2023
1 parent d55c4a8 commit ddfe796
Show file tree
Hide file tree
Showing 40 changed files with 1,006 additions and 307 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 35 additions & 15 deletions moq-pub/src/media.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::cli::Config;
use anyhow::{self, Context};
use moq_transport::cache::{broadcast, segment, track};
use moq_transport::cache::{broadcast, fragment, segment, track};
use moq_transport::VarInt;
use mp4::{self, ReadBox};
use serde_json::json;
Expand Down Expand Up @@ -44,11 +44,17 @@ impl Media {
let mut init_track = broadcast.create_track("0.mp4")?;
let mut init_segment = init_track.create_segment(segment::Info {
sequence: VarInt::ZERO,
priority: i32::MAX,
priority: 0,
expires: None,
})?;

init_segment.write_chunk(init.into())?;
// Create a single fragment, optionally setting the size
let mut init_fragment = init_segment.create_fragment(fragment::Info {
sequence: VarInt::ZERO,
size: None, // size is only needed when we have multiple fragments.
})?;

init_fragment.write_chunk(init.into())?;

let mut tracks = HashMap::new();

Expand Down Expand Up @@ -128,7 +134,7 @@ impl Media {
) -> Result<(), anyhow::Error> {
let mut segment = track.create_segment(segment::Info {
sequence: VarInt::ZERO,
priority: i32::MAX,
priority: 0,
expires: None,
})?;

Expand Down Expand Up @@ -211,8 +217,14 @@ impl Media {
let catalog_str = serde_json::to_string_pretty(&catalog)?;
log::info!("catalog: {}", catalog_str);

// Create a single fragment for the segment.
let mut fragment = segment.create_fragment(fragment::Info {
sequence: VarInt::ZERO,
size: None, // Size is only needed when we have multiple fragments.
})?;

// Add the segment and add the fragment.
segment.write_chunk(catalog_str.into())?;
fragment.write_chunk(catalog_str.into())?;

Ok(())
}
Expand Down Expand Up @@ -260,7 +272,7 @@ struct Track {
track: track::Publisher,

// The current segment
segment: Option<segment::Publisher>,
current: Option<fragment::Publisher>,

// The number of units per second.
timescale: u64,
Expand All @@ -274,16 +286,16 @@ impl Track {
Self {
track,
sequence: 0,
segment: None,
current: None,
timescale,
}
}

pub fn header(&mut self, raw: Vec<u8>, fragment: Fragment) -> anyhow::Result<()> {
if let Some(segment) = self.segment.as_mut() {
if let Some(current) = self.current.as_mut() {
if !fragment.keyframe {
// Use the existing segment
segment.write_chunk(raw.into())?;
current.write_chunk(raw.into())?;
return Ok(());
}
}
Expand All @@ -292,7 +304,7 @@ impl Track {

// Compute the timestamp in milliseconds.
// Overflows after 583 million years, so we're fine.
let timestamp: i32 = fragment
let timestamp: u32 = fragment
.timestamp(self.timescale)
.as_millis()
.try_into()
Expand All @@ -301,26 +313,34 @@ impl Track {
// Create a new segment.
let mut segment = self.track.create_segment(segment::Info {
sequence: VarInt::try_from(self.sequence).context("sequence too large")?,
priority: timestamp, // newer segments are higher priority

// Newer segments are higher priority
priority: u32::MAX.checked_sub(timestamp).context("priority too large")?,

// Delete segments after 10s.
expires: Some(time::Duration::from_secs(10)),
})?;

// Create a single fragment for the segment that we will keep appending.
let mut fragment = segment.create_fragment(fragment::Info {
sequence: VarInt::ZERO,
size: None,
})?;

self.sequence += 1;

// Insert the raw atom into the segment.
segment.write_chunk(raw.into())?;
fragment.write_chunk(raw.into())?;

// Save for the next iteration
self.segment = Some(segment);
self.current = Some(fragment);

Ok(())
}

pub fn data(&mut self, raw: Vec<u8>) -> anyhow::Result<()> {
let segment = self.segment.as_mut().context("missing segment")?;
segment.write_chunk(raw.into())?;
let fragment = self.current.as_mut().context("missing current fragment")?;
fragment.write_chunk(raw.into())?;

Ok(())
}
Expand Down
16 changes: 8 additions & 8 deletions moq-relay/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ impl moq_transport::MoqError for RelayError {
}
}

fn reason(&self) -> &str {
fn reason(&self) -> String {
match self {
Self::Transport(err) => err.reason(),
Self::Cache(err) => err.reason(),
Self::MoqApi(_err) => "api error",
Self::Url(_) => "url error",
Self::MissingNode => "missing node",
Self::WebTransportServer(_) => "server error",
Self::WebTransportClient(_) => "upstream error",
Self::Transport(err) => format!("transport error: {}", err.reason()),
Self::Cache(err) => format!("cache error: {}", err.reason()),
Self::MoqApi(err) => format!("api error: {}", err),
Self::Url(err) => format!("url error: {}", err),
Self::MissingNode => "missing node".to_owned(),
Self::WebTransportServer(err) => format!("upstream server error: {}", err),
Self::WebTransportClient(err) => format!("upstream client error: {}", err),
}
}
}
2 changes: 2 additions & 0 deletions moq-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ indexmap = "2"
quinn = "0.10"
webtransport-quinn = "0.6"
#webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" }

async-trait = "0.1"
4 changes: 2 additions & 2 deletions moq-transport/src/cache/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ impl Publisher {
}

/// Block until the next track requested by a subscriber.
pub async fn next_track(&mut self) -> Result<Option<track::Publisher>, CacheError> {
pub async fn next_track(&mut self) -> Result<track::Publisher, CacheError> {
loop {
let notify = {
let state = self.state.lock();
if state.has_next()? {
return Ok(Some(state.into_mut().next()));
return Ok(state.into_mut().next());
}

state.changed()
Expand Down
12 changes: 6 additions & 6 deletions moq-transport/src/cache/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ impl MoqError for CacheError {
}

/// A reason that is sent over the wire.
fn reason(&self) -> &str {
fn reason(&self) -> String {
match self {
Self::Closed => "closed",
Self::Reset(_) => "reset",
Self::Stop => "stop",
Self::NotFound => "not found",
Self::Duplicate => "duplicate",
Self::Closed => "closed".to_owned(),
Self::Reset(code) => format!("reset code: {}", code),
Self::Stop => "stop".to_owned(),
Self::NotFound => "not found".to_owned(),
Self::Duplicate => "duplicate".to_owned(),
}
}
}
Loading

0 comments on commit ddfe796

Please sign in to comment.