Skip to content

Commit

Permalink
Proper state management (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
kixelated authored Apr 1, 2024
1 parent 1409fcf commit a70f645
Show file tree
Hide file tree
Showing 42 changed files with 3,805 additions and 2,028 deletions.
17 changes: 8 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions moq-api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ impl Client {
Ok(Some(origin))
}

pub async fn set_origin(&mut self, id: &str, origin: &Origin) -> Result<(), ApiError> {
pub async fn set_origin(&self, id: &str, origin: Origin) -> Result<(), ApiError> {
let url = self.url.join("origin/")?.join(id)?;

let resp = self.client.post(url).json(origin).send().await?;
let resp = self.client.post(url).json(&origin).send().await?;
resp.error_for_status()?;

Ok(())
}

pub async fn delete_origin(&mut self, id: &str) -> Result<(), ApiError> {
pub async fn delete_origin(&self, id: &str) -> Result<(), ApiError> {
let url = self.url.join("origin/")?.join(id)?;

let resp = self.client.delete(url).send().await?;
Expand All @@ -45,10 +45,10 @@ impl Client {
Ok(())
}

pub async fn patch_origin(&mut self, id: &str, origin: &Origin) -> Result<(), ApiError> {
pub async fn patch_origin(&self, id: &str, origin: Origin) -> Result<(), ApiError> {
let url = self.url.join("origin/")?.join(id)?;

let resp = self.client.patch(url).json(origin).send().await?;
let resp = self.client.patch(url).json(&origin).send().await?;
resp.error_for_status()?;

Ok(())
Expand Down
12 changes: 12 additions & 0 deletions moq-api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ async fn set_origin(
// Convert the input back to JSON after validating it add adding any fields (TODO)
let payload = serde_json::to_string(&origin)?;

// Attempt to get the current value for the key
let current: Option<String> = redis::cmd("GET").arg(&key).query_async(&mut redis).await?;

if let Some(current) = &current {
if current.eq(&payload) {
// The value is the same, so we're done.
return Ok(());
} else {
return Err(AppError::Duplicate);
}
}

let res: Option<String> = redis::cmd("SET")
.arg(key)
.arg(payload)
Expand Down
6 changes: 3 additions & 3 deletions moq-clock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ moq-transport = { path = "../moq-transport", version = "0.3" }

# QUIC
quinn = "0.10"
webtransport-quinn = "0.8"
webtransport-generic = "0.8"
quictransport-quinn = "0.8"
webtransport-quinn = { version = "0.9" }
webtransport-generic = { version = "0.9" }
quictransport-quinn = { version = "0.9" }
url = "2"

# Crypto
Expand Down
115 changes: 50 additions & 65 deletions moq-clock/src/clock.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use anyhow::Context;
use moq_transport::serve;
use moq_transport::serve::{
DatagramsReader, Group, GroupWriter, GroupsReader, GroupsWriter, ObjectsReader, StreamReader, TrackReader,
TrackReaderMode,
};

use chrono::prelude::*;

pub struct Publisher {
track: serve::TrackPublisher,
track: GroupsWriter,
}

impl Publisher {
pub fn new(track: serve::TrackPublisher) -> Self {
pub fn new(track: GroupsWriter) -> Self {
Self { track }
}

Expand All @@ -22,9 +25,9 @@ impl Publisher {
loop {
let segment = self
.track
.create_group(serve::Group {
id: sequence as u64,
send_order: 0,
.create(Group {
group_id: sequence as u64,
priority: 0,
})
.context("failed to create minute segment")?;

Expand All @@ -46,19 +49,15 @@ impl Publisher {
}
}

async fn send_segment(mut segment: serve::GroupPublisher, mut now: DateTime<Utc>) -> anyhow::Result<()> {
async fn send_segment(mut segment: GroupWriter, mut now: DateTime<Utc>) -> anyhow::Result<()> {
// Everything but the second.
let base = now.format("%Y-%m-%d %H:%M:").to_string();

segment
.write_object(base.clone().into())
.context("failed to write base")?;
segment.write(base.clone().into()).context("failed to write base")?;

loop {
let delta = now.format("%S").to_string();
segment
.write_object(delta.clone().into())
.context("failed to write delta")?;
segment.write(delta.clone().into()).context("failed to write delta")?;

println!("{}{}", base, delta);

Expand All @@ -79,83 +78,69 @@ impl Publisher {
}
}
pub struct Subscriber {
track: serve::TrackSubscriber,
track: TrackReader,
}

impl Subscriber {
pub fn new(track: serve::TrackSubscriber) -> Self {
pub fn new(track: TrackReader) -> Self {
Self { track }
}

pub async fn run(mut self) -> anyhow::Result<()> {
while let Some(stream) = self.track.next().await.context("failed to get stream")? {
match stream {
serve::TrackMode::Group(group) => tokio::spawn(async move {
if let Err(err) = Self::recv_group(group).await {
log::warn!("failed to receive group: {:?}", err);
}
}),
serve::TrackMode::Object(object) => tokio::spawn(async move {
if let Err(err) = Self::recv_object(object).await {
log::warn!("failed to receive group: {:?}", err);
}
}),
serve::TrackMode::Stream(stream) => tokio::spawn(async move {
if let Err(err) = Self::recv_track(stream).await {
log::warn!("failed to receive stream: {:?}", err);
}
}),
serve::TrackMode::Datagram(datagram) => tokio::spawn(async move {
if let Err(err) = Self::recv_datagram(datagram) {
log::warn!("failed to receive datagram: {:?}", err);
}
}),
};
pub async fn run(self) -> anyhow::Result<()> {
match self.track.mode().await.context("failed to get mode")? {
TrackReaderMode::Stream(stream) => Self::recv_stream(stream).await,
TrackReaderMode::Groups(groups) => Self::recv_groups(groups).await,
TrackReaderMode::Objects(objects) => Self::recv_objects(objects).await,
TrackReaderMode::Datagrams(datagrams) => Self::recv_datagrams(datagrams).await,
}

Ok(())
}

async fn recv_track(mut track: serve::StreamSubscriber) -> anyhow::Result<()> {
while let Some(fragment) = track.next().await? {
let str = String::from_utf8_lossy(&fragment.payload);
println!("{}", str);
async fn recv_stream(mut track: StreamReader) -> anyhow::Result<()> {
while let Some(mut group) = track.next().await? {
while let Some(object) = group.read_next().await? {
let str = String::from_utf8_lossy(&object);
println!("{}", str);
}
}

Ok(())
}

async fn recv_group(mut segment: serve::GroupSubscriber) -> anyhow::Result<()> {
let mut first = segment
.next()
.await
.context("failed to get first fragment")?
.context("no fragments in segment")?;

let base = first.read_all().await?;
let base = String::from_utf8_lossy(&base);
async fn recv_groups(mut groups: GroupsReader) -> anyhow::Result<()> {
while let Some(mut group) = groups.next().await? {
let base = group
.read_next()
.await
.context("failed to get first object")?
.context("empty group")?;

while let Some(mut fragment) = segment.next().await? {
let value = fragment.read_all().await.context("failed to read fragment")?;
let str = String::from_utf8_lossy(&value);
let base = String::from_utf8_lossy(&base);

println!("{}{}", base, str);
while let Some(object) = group.read_next().await? {
let str = String::from_utf8_lossy(&object);
println!("{}{}", base, str);
}
}

Ok(())
}

async fn recv_object(mut object: serve::ObjectSubscriber) -> anyhow::Result<()> {
let value = object.read_all().await.context("failed to read object")?;
let str = String::from_utf8_lossy(&value);
async fn recv_objects(mut objects: ObjectsReader) -> anyhow::Result<()> {
while let Some(mut object) = objects.next().await? {
let payload = object.read_all().await?;
let str = String::from_utf8_lossy(&payload);
println!("{}", str);
}

println!("{}", str);
Ok(())
}

fn recv_datagram(datagram: serve::Datagram) -> anyhow::Result<()> {
let str = String::from_utf8_lossy(&datagram.payload);
println!("{}", str);
async fn recv_datagrams(mut datagrams: DatagramsReader) -> anyhow::Result<()> {
while let Some(datagram) = datagrams.read().await? {
let str = String::from_utf8_lossy(&datagram.payload);
println!("{}", str);
}

Ok(())
}
}
4 changes: 2 additions & 2 deletions moq-clock/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async fn main() -> anyhow::Result<()> {

async fn run<S: webtransport_generic::Session>(session: S, config: cli::Config) -> anyhow::Result<()> {
if config.publish {
let (session, publisher) = moq_transport::Publisher::connect(session)
let (session, mut publisher) = moq_transport::Publisher::connect(session)
.await
.context("failed to create MoQ Transport session")?;

Expand All @@ -102,7 +102,7 @@ async fn run<S: webtransport_generic::Session>(session: S, config: cli::Config)
.produce();

let track = broadcast.create_track(&config.track)?;
let clock = clock::Publisher::new(track);
let clock = clock::Publisher::new(track.groups()?);

tokio::select! {
res = session.run() => res.context("session error")?,
Expand Down
6 changes: 3 additions & 3 deletions moq-pub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ moq-transport = { path = "../moq-transport", version = "0.3" }

# QUIC
quinn = "0.10"
webtransport-quinn = "0.8"
quictransport-quinn = "0.8"
webtransport-generic = "0.8"
webtransport-quinn = { version = "0.9" }
quictransport-quinn = { version = "0.9" }
webtransport-generic = { version = "0.9" }
url = "2"

# Crypto
Expand Down
6 changes: 3 additions & 3 deletions moq-pub/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::io::AsyncRead;

// TODO: clap complete

#[tokio::main]
#[tokio::main(flavor = "current_thread")]
async fn main() -> anyhow::Result<()> {
env_logger::init();

Expand Down Expand Up @@ -99,9 +99,9 @@ async fn main() -> anyhow::Result<()> {
async fn run<T: webtransport_generic::Session, I: AsyncRead + Send + Unpin>(
session: T,
mut media: Media<I>,
broadcast: serve::BroadcastSubscriber,
broadcast: serve::BroadcastReader,
) -> anyhow::Result<()> {
let (session, publisher) = moq_transport::Publisher::connect(session)
let (session, mut publisher) = moq_transport::Publisher::connect(session)
.await
.context("failed to create MoQ Transport publisher")?;

Expand Down
Loading

0 comments on commit a70f645

Please sign in to comment.