Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial moq-transport-01 support #115

Merged
merged 11 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

50 changes: 35 additions & 15 deletions moq-pub/src/media.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::cli::Config;
use anyhow::{self, Context};
use moq_transport::cache::{broadcast, segment, track};
use moq_transport::cache::{broadcast, fragment, segment, track};
use moq_transport::VarInt;
use mp4::{self, ReadBox};
use serde_json::json;
Expand Down Expand Up @@ -44,11 +44,17 @@ impl Media {
let mut init_track = broadcast.create_track("0.mp4")?;
let mut init_segment = init_track.create_segment(segment::Info {
sequence: VarInt::ZERO,
priority: i32::MAX,
priority: 0,
expires: None,
})?;

init_segment.write_chunk(init.into())?;
// Create a single fragment, optionally setting the size
let mut init_fragment = init_segment.create_fragment(fragment::Info {
sequence: VarInt::ZERO,
size: None, // size is only needed when we have multiple fragments.
})?;

init_fragment.write_chunk(init.into())?;

let mut tracks = HashMap::new();

Expand Down Expand Up @@ -128,7 +134,7 @@ impl Media {
) -> Result<(), anyhow::Error> {
let mut segment = track.create_segment(segment::Info {
sequence: VarInt::ZERO,
priority: i32::MAX,
priority: 0,
expires: None,
})?;

Expand Down Expand Up @@ -211,8 +217,14 @@ impl Media {
let catalog_str = serde_json::to_string_pretty(&catalog)?;
log::info!("catalog: {}", catalog_str);

// Create a single fragment for the segment.
let mut fragment = segment.create_fragment(fragment::Info {
sequence: VarInt::ZERO,
size: None, // Size is only needed when we have multiple fragments.
})?;

// Add the segment and add the fragment.
segment.write_chunk(catalog_str.into())?;
fragment.write_chunk(catalog_str.into())?;

Ok(())
}
Expand Down Expand Up @@ -260,7 +272,7 @@ struct Track {
track: track::Publisher,

// The current segment
segment: Option<segment::Publisher>,
current: Option<fragment::Publisher>,

// The number of units per second.
timescale: u64,
Expand All @@ -274,16 +286,16 @@ impl Track {
Self {
track,
sequence: 0,
segment: None,
current: None,
timescale,
}
}

pub fn header(&mut self, raw: Vec<u8>, fragment: Fragment) -> anyhow::Result<()> {
if let Some(segment) = self.segment.as_mut() {
if let Some(current) = self.current.as_mut() {
if !fragment.keyframe {
// Use the existing segment
segment.write_chunk(raw.into())?;
current.write_chunk(raw.into())?;
return Ok(());
}
}
Expand All @@ -292,7 +304,7 @@ impl Track {

// Compute the timestamp in milliseconds.
// Overflows after 583 million years, so we're fine.
let timestamp: i32 = fragment
let timestamp: u32 = fragment
.timestamp(self.timescale)
.as_millis()
.try_into()
Expand All @@ -301,26 +313,34 @@ impl Track {
// Create a new segment.
let mut segment = self.track.create_segment(segment::Info {
sequence: VarInt::try_from(self.sequence).context("sequence too large")?,
priority: timestamp, // newer segments are higher priority

// Newer segments are higher priority
priority: u32::MAX.checked_sub(timestamp).context("priority too large")?,

// Delete segments after 10s.
expires: Some(time::Duration::from_secs(10)),
})?;

// Create a single fragment for the segment that we will keep appending.
let mut fragment = segment.create_fragment(fragment::Info {
sequence: VarInt::ZERO,
size: None,
})?;

self.sequence += 1;

// Insert the raw atom into the segment.
segment.write_chunk(raw.into())?;
fragment.write_chunk(raw.into())?;

// Save for the next iteration
self.segment = Some(segment);
self.current = Some(fragment);

Ok(())
}

pub fn data(&mut self, raw: Vec<u8>) -> anyhow::Result<()> {
let segment = self.segment.as_mut().context("missing segment")?;
segment.write_chunk(raw.into())?;
let fragment = self.current.as_mut().context("missing current fragment")?;
fragment.write_chunk(raw.into())?;

Ok(())
}
Expand Down
16 changes: 8 additions & 8 deletions moq-relay/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ impl moq_transport::MoqError for RelayError {
}
}

fn reason(&self) -> &str {
fn reason(&self) -> String {
match self {
Self::Transport(err) => err.reason(),
Self::Cache(err) => err.reason(),
Self::MoqApi(_err) => "api error",
Self::Url(_) => "url error",
Self::MissingNode => "missing node",
Self::WebTransportServer(_) => "server error",
Self::WebTransportClient(_) => "upstream error",
Self::Transport(err) => format!("transport error: {}", err.reason()),
Self::Cache(err) => format!("cache error: {}", err.reason()),
Self::MoqApi(err) => format!("api error: {}", err),
Self::Url(err) => format!("url error: {}", err),
Self::MissingNode => "missing node".to_owned(),
Self::WebTransportServer(err) => format!("upstream server error: {}", err),
Self::WebTransportClient(err) => format!("upstream client error: {}", err),
}
}
}
2 changes: 2 additions & 0 deletions moq-transport/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ indexmap = "2"
quinn = "0.10"
webtransport-quinn = "0.6"
#webtransport-quinn = { path = "../../webtransport-rs/webtransport-quinn" }

async-trait = "0.1"
4 changes: 2 additions & 2 deletions moq-transport/src/cache/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ impl Publisher {
}

/// Block until the next track requested by a subscriber.
pub async fn next_track(&mut self) -> Result<Option<track::Publisher>, CacheError> {
pub async fn next_track(&mut self) -> Result<track::Publisher, CacheError> {
loop {
let notify = {
let state = self.state.lock();
if state.has_next()? {
return Ok(Some(state.into_mut().next()));
return Ok(state.into_mut().next());
}

state.changed()
Expand Down
12 changes: 6 additions & 6 deletions moq-transport/src/cache/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@ impl MoqError for CacheError {
}

/// A reason that is sent over the wire.
fn reason(&self) -> &str {
fn reason(&self) -> String {
match self {
Self::Closed => "closed",
Self::Reset(_) => "reset",
Self::Stop => "stop",
Self::NotFound => "not found",
Self::Duplicate => "duplicate",
Self::Closed => "closed".to_owned(),
Self::Reset(code) => format!("reset code: {}", code),
Self::Stop => "stop".to_owned(),
Self::NotFound => "not found".to_owned(),
Self::Duplicate => "duplicate".to_owned(),
}
}
}
Loading
Loading