Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add explicit support for KIXEL_00. #34

Merged
merged 7 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 16 additions & 35 deletions lib/contribute/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { isAudioTrackSettings, isVideoTrackSettings } from "../common/settings"
export interface BroadcastConfig {
connection: Connection
media: MediaStream
name: string // name of the broadcast
name: string

audio: Audio.EncoderConfig
video: Video.EncoderConfig
Expand Down Expand Up @@ -46,7 +46,6 @@ export class Broadcast {

const mp4Catalog: Mp4Track = {
container: "mp4",
namespace: config.name,
kind: media.kind,
init_track: `${track.name}.mp4`,
data_track: `${track.name}.m4s`,
Expand Down Expand Up @@ -87,34 +86,20 @@ export class Broadcast {
}

async #run() {
// Announce the namespace and wait for an explicit OK.
const announce = await this.connection.announce(this.config.name)
await announce.ok()
for (;;) {
const subscriber = await this.connection.subscribed()
if (!subscriber) break

try {
for (;;) {
const subscriber = await this.connection.subscribed()
if (!subscriber) break

// Run an async task to serve each subscription.
this.#serveSubscribe(subscriber).catch((e) => {
const err = asError(e)
console.warn("failed to serve subscribe", err)
})
}
} catch (e) {
const err = asError(e)
await announce.close(1n, `error serving broadcast: ${err.message}`)
// Run an async task to serve each subscription.
this.#serveSubscribe(subscriber).catch((e) => {
const err = asError(e)
console.warn("failed to serve subscribe", err)
})
}
}

async #serveSubscribe(subscriber: SubscribeRecv) {
try {
if (subscriber.namespace != this.config.name) {
// Don't reuse connections if you get this error; we don't demultiplex them.
throw new Error(`unknown namespace: ${subscriber.namespace}`)
}

const [base, ext] = splitExt(subscriber.track)
if (ext === "catalog") {
await this.#serveCatalog(subscriber, base)
Expand Down Expand Up @@ -144,9 +129,9 @@ export class Broadcast {
await subscriber.ack()

const stream = await subscriber.data({
group: 0n,
sequence: 0n,
send_order: 0, // TODO Highest priority
priority: 0, // TODO Highest priority
expires: 0, // never expires
})

const writer = stream.getWriter()
Expand Down Expand Up @@ -174,9 +159,9 @@ export class Broadcast {

// Create a new stream for each segment.
const stream = await subscriber.data({
group: 0n,
sequence: 0n,
send_order: 0, // TODO
priority: 0, // TODO
expires: 0, // Never expires
})

const writer = stream.getWriter()
Expand Down Expand Up @@ -219,9 +204,9 @@ export class Broadcast {
async #serveSegment(subscriber: SubscribeRecv, segment: Segment) {
// Create a new stream for each segment.
const stream = await subscriber.data({
group: BigInt(segment.id),
sequence: 0n,
send_order: 0, // TODO
sequence: BigInt(segment.id),
priority: 0, // TODO
expires: 30, // TODO configurable
})

// Pipe the segment to the stream.
Expand All @@ -233,10 +218,6 @@ export class Broadcast {
video.srcObject = this.config.media
}

get name() {
return this.config.name
}

close() {
// TODO implement publish close
}
Expand Down
8 changes: 4 additions & 4 deletions lib/media/catalog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ export class Catalog {
return catalog
}

static async fetch(connection: Connection, namespace: string): Promise<Catalog> {
static async fetch(connection: Connection): Promise<Catalog> {
let raw: Uint8Array

const subscribe = await connection.subscribe(namespace, ".catalog")
const subscribe = await connection.subscribe("", ".catalog")
try {
const segment = await subscribe.data()
if (!segment) throw new Error("no catalog data")
Expand Down Expand Up @@ -61,7 +61,6 @@ export function isCatalog(catalog: any): catalog is Catalog {
}

export interface Track {
namespace: string
kind: string
container: string
}
Expand Down Expand Up @@ -91,7 +90,8 @@ export interface VideoTrack extends Track {
}

export function isTrack(track: any): track is Track {
if (typeof track.namespace !== "string") return false
if (typeof track.kind !== "string") return false
if (typeof track.container !== "string") return false
return true
}

Expand Down
6 changes: 0 additions & 6 deletions lib/playback/broadcast.ts

This file was deleted.

1 change: 0 additions & 1 deletion lib/playback/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export { Player } from "./player"
export type { Broadcast } from "./broadcast"
18 changes: 3 additions & 15 deletions lib/playback/player.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ export type Timeline = Message.Timeline

export interface PlayerConfig {
connection: Connection
namespace: string

canvas: HTMLCanvasElement
}

Expand All @@ -35,16 +33,14 @@ export class Player {
#running: Promise<void>

readonly connection: Connection
readonly namespace: string

constructor(config: PlayerConfig) {
this.#port = new Port(this.#onMessage.bind(this)) // TODO await an async method instead

this.#canvas = config.canvas.transferControlToOffscreen()
this.connection = config.connection
this.namespace = config.namespace

this.#catalog = Catalog.fetch(this.connection, this.namespace)
this.#catalog = Catalog.fetch(this.connection)

// Async work
this.#running = this.#run()
Expand Down Expand Up @@ -106,15 +102,11 @@ export class Player {
}

async #runInit(name: string) {
const sub = await this.connection.subscribe(this.namespace, name)
const sub = await this.connection.subscribe("", name)
try {
const init = await sub.data()
if (!init) throw new Error("no init data")

if (init.header.sequence !== 0n) {
throw new Error("TODO multiple objects per init not supported")
}

this.#port.sendInit({
name: name,
stream: init.stream,
Expand All @@ -129,16 +121,12 @@ export class Player {
throw new Error(`unknown track kind: ${track.kind}`)
}

const sub = await this.connection.subscribe(this.namespace, track.data_track)
const sub = await this.connection.subscribe("", track.data_track)
try {
for (;;) {
const segment = await sub.data()
if (!segment) break

if (segment.header.sequence !== 0n) {
throw new Error("TODO multiple objects per segment not supported")
}

this.#port.sendSegment({
init: track.init_track,
kind: track.kind,
Expand Down
2 changes: 1 addition & 1 deletion lib/transport/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class Client {
const setup = new Setup.Stream(reader, writer)

// Send the setup message.
await setup.send.client({ versions: [Setup.Version.DRAFT_00], role: this.config.role })
await setup.send.client({ versions: [Setup.Version.KIXEL_00], role: this.config.role })

// Receive the setup message.
// TODO verify the SETUP response.
Expand Down
18 changes: 9 additions & 9 deletions lib/transport/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,19 @@ export class Connection {
}

async #recv(msg: Control.Message) {
switch (msg.type) {
case Control.Type.Announce:
switch (msg.kind) {
case Control.Msg.Announce:
return this.#subscriber.recvAnnounce(msg)
case Control.Type.AnnounceOk:
case Control.Msg.AnnounceOk:
return this.#publisher.recvAnnounceOk(msg)
case Control.Type.AnnounceError:
return this.#publisher.recvAnnounceError(msg)
case Control.Type.Subscribe:
case Control.Msg.AnnounceReset:
return this.#publisher.recvAnnounceReset(msg)
case Control.Msg.Subscribe:
return this.#publisher.recvSubscribe(msg)
case Control.Type.SubscribeOk:
case Control.Msg.SubscribeOk:
return this.#subscriber.recvSubscribeOk(msg)
case Control.Type.SubscribeError:
return this.#subscriber.recvSubscribeError(msg)
case Control.Msg.SubscribeReset:
return this.#subscriber.recvSubscribeReset(msg)
}
}

Expand Down
Loading