Skip to content

Commit

Permalink
Add explicit support for KIXEL_00. (#34)
Browse files Browse the repository at this point in the history
Javascript changes for the BREAKING changes.
  • Loading branch information
kixelated authored Sep 15, 2023
1 parent e89b01a commit 67b6e36
Show file tree
Hide file tree
Showing 16 changed files with 252 additions and 262 deletions.
51 changes: 16 additions & 35 deletions lib/contribute/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { isAudioTrackSettings, isVideoTrackSettings } from "../common/settings"
export interface BroadcastConfig {
connection: Connection
media: MediaStream
name: string // name of the broadcast
name: string

audio: Audio.EncoderConfig
video: Video.EncoderConfig
Expand Down Expand Up @@ -46,7 +46,6 @@ export class Broadcast {

const mp4Catalog: Mp4Track = {
container: "mp4",
namespace: config.name,
kind: media.kind,
init_track: `${track.name}.mp4`,
data_track: `${track.name}.m4s`,
Expand Down Expand Up @@ -87,34 +86,20 @@ export class Broadcast {
}

async #run() {
// Announce the namespace and wait for an explicit OK.
const announce = await this.connection.announce(this.config.name)
await announce.ok()
for (;;) {
const subscriber = await this.connection.subscribed()
if (!subscriber) break

try {
for (;;) {
const subscriber = await this.connection.subscribed()
if (!subscriber) break

// Run an async task to serve each subscription.
this.#serveSubscribe(subscriber).catch((e) => {
const err = asError(e)
console.warn("failed to serve subscribe", err)
})
}
} catch (e) {
const err = asError(e)
await announce.close(1n, `error serving broadcast: ${err.message}`)
// Run an async task to serve each subscription.
this.#serveSubscribe(subscriber).catch((e) => {
const err = asError(e)
console.warn("failed to serve subscribe", err)
})
}
}

async #serveSubscribe(subscriber: SubscribeRecv) {
try {
if (subscriber.namespace != this.config.name) {
// Don't reuse connections if you get this error; we don't demultiplex them.
throw new Error(`unknown namespace: ${subscriber.namespace}`)
}

const [base, ext] = splitExt(subscriber.track)
if (ext === "catalog") {
await this.#serveCatalog(subscriber, base)
Expand Down Expand Up @@ -144,9 +129,9 @@ export class Broadcast {
await subscriber.ack()

const stream = await subscriber.data({
group: 0n,
sequence: 0n,
send_order: 0, // TODO Highest priority
priority: 0, // TODO Highest priority
expires: 0, // never expires
})

const writer = stream.getWriter()
Expand Down Expand Up @@ -174,9 +159,9 @@ export class Broadcast {

// Create a new stream for each segment.
const stream = await subscriber.data({
group: 0n,
sequence: 0n,
send_order: 0, // TODO
priority: 0, // TODO
expires: 0, // Never expires
})

const writer = stream.getWriter()
Expand Down Expand Up @@ -219,9 +204,9 @@ export class Broadcast {
async #serveSegment(subscriber: SubscribeRecv, segment: Segment) {
// Create a new stream for each segment.
const stream = await subscriber.data({
group: BigInt(segment.id),
sequence: 0n,
send_order: 0, // TODO
sequence: BigInt(segment.id),
priority: 0, // TODO
expires: 30, // TODO configurable
})

// Pipe the segment to the stream.
Expand All @@ -233,10 +218,6 @@ export class Broadcast {
video.srcObject = this.config.media
}

get name() {
return this.config.name
}

close() {
// TODO implement publish close
}
Expand Down
8 changes: 4 additions & 4 deletions lib/media/catalog.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ export class Catalog {
return catalog
}

static async fetch(connection: Connection, namespace: string): Promise<Catalog> {
static async fetch(connection: Connection): Promise<Catalog> {
let raw: Uint8Array

const subscribe = await connection.subscribe(namespace, ".catalog")
const subscribe = await connection.subscribe("", ".catalog")
try {
const segment = await subscribe.data()
if (!segment) throw new Error("no catalog data")
Expand Down Expand Up @@ -61,7 +61,6 @@ export function isCatalog(catalog: any): catalog is Catalog {
}

export interface Track {
namespace: string
kind: string
container: string
}
Expand Down Expand Up @@ -91,7 +90,8 @@ export interface VideoTrack extends Track {
}

export function isTrack(track: any): track is Track {
if (typeof track.namespace !== "string") return false
if (typeof track.kind !== "string") return false
if (typeof track.container !== "string") return false
return true
}

Expand Down
6 changes: 0 additions & 6 deletions lib/playback/broadcast.ts

This file was deleted.

1 change: 0 additions & 1 deletion lib/playback/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export { Player } from "./player"
export type { Broadcast } from "./broadcast"
18 changes: 3 additions & 15 deletions lib/playback/player.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ export type Timeline = Message.Timeline

export interface PlayerConfig {
connection: Connection
namespace: string

canvas: HTMLCanvasElement
}

Expand All @@ -35,16 +33,14 @@ export class Player {
#running: Promise<void>

readonly connection: Connection
readonly namespace: string

constructor(config: PlayerConfig) {
this.#port = new Port(this.#onMessage.bind(this)) // TODO await an async method instead

this.#canvas = config.canvas.transferControlToOffscreen()
this.connection = config.connection
this.namespace = config.namespace

this.#catalog = Catalog.fetch(this.connection, this.namespace)
this.#catalog = Catalog.fetch(this.connection)

// Async work
this.#running = this.#run()
Expand Down Expand Up @@ -106,15 +102,11 @@ export class Player {
}

async #runInit(name: string) {
const sub = await this.connection.subscribe(this.namespace, name)
const sub = await this.connection.subscribe("", name)
try {
const init = await sub.data()
if (!init) throw new Error("no init data")

if (init.header.sequence !== 0n) {
throw new Error("TODO multiple objects per init not supported")
}

this.#port.sendInit({
name: name,
stream: init.stream,
Expand All @@ -129,16 +121,12 @@ export class Player {
throw new Error(`unknown track kind: ${track.kind}`)
}

const sub = await this.connection.subscribe(this.namespace, track.data_track)
const sub = await this.connection.subscribe("", track.data_track)
try {
for (;;) {
const segment = await sub.data()
if (!segment) break

if (segment.header.sequence !== 0n) {
throw new Error("TODO multiple objects per segment not supported")
}

this.#port.sendSegment({
init: track.init_track,
kind: track.kind,
Expand Down
2 changes: 1 addition & 1 deletion lib/transport/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ export class Client {
const setup = new Setup.Stream(reader, writer)

// Send the setup message.
await setup.send.client({ versions: [Setup.Version.DRAFT_00], role: this.config.role })
await setup.send.client({ versions: [Setup.Version.KIXEL_00], role: this.config.role })

// Receive the setup message.
// TODO verify the SETUP response.
Expand Down
18 changes: 9 additions & 9 deletions lib/transport/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,19 @@ export class Connection {
}

async #recv(msg: Control.Message) {
switch (msg.type) {
case Control.Type.Announce:
switch (msg.kind) {
case Control.Msg.Announce:
return this.#subscriber.recvAnnounce(msg)
case Control.Type.AnnounceOk:
case Control.Msg.AnnounceOk:
return this.#publisher.recvAnnounceOk(msg)
case Control.Type.AnnounceError:
return this.#publisher.recvAnnounceError(msg)
case Control.Type.Subscribe:
case Control.Msg.AnnounceReset:
return this.#publisher.recvAnnounceReset(msg)
case Control.Msg.Subscribe:
return this.#publisher.recvSubscribe(msg)
case Control.Type.SubscribeOk:
case Control.Msg.SubscribeOk:
return this.#subscriber.recvSubscribeOk(msg)
case Control.Type.SubscribeError:
return this.#subscriber.recvSubscribeError(msg)
case Control.Msg.SubscribeReset:
return this.#subscriber.recvSubscribeReset(msg)
}
}

Expand Down
Loading

0 comments on commit 67b6e36

Please sign in to comment.