From 045e36f1c078bdd911a0d4ed78c8772539b3b0ce Mon Sep 17 00:00:00 2001 From: kixelated Date: Fri, 29 Nov 2024 21:41:29 -0800 Subject: [PATCH] Fix some glaring publishing bugs. (#134) --- lib/common/hex.ts | 6 ++- lib/contribute/audio.ts | 29 ++++--------- lib/contribute/broadcast.ts | 13 +++--- lib/contribute/video.ts | 12 ++++-- lib/karp/catalog/video.ts | 5 +-- lib/playback/player.ts | 14 ++++-- lib/playback/video.ts | 3 +- lib/transfork/connection.ts | 8 +++- lib/transfork/model.ts | 10 +++-- lib/transfork/publisher.ts | 79 +++++++++++++++++++++++++--------- lib/transfork/stream.ts | 2 + lib/transfork/subscriber.ts | 4 +- web/src/components/publish.tsx | 1 - 13 files changed, 118 insertions(+), 68 deletions(-) diff --git a/lib/common/hex.ts b/lib/common/hex.ts index ca833db..6236c58 100644 --- a/lib/common/hex.ts +++ b/lib/common/hex.ts @@ -6,6 +6,8 @@ export function decode(str: string): Uint8Array { return bytes } -export function encode(_bytes: Uint8Array): string { - throw "todo" +export function encode(bytes: Uint8Array): string { + return Array.from(bytes) + .map((byte) => byte.toString(16).padStart(2, "0")) + .join("") } diff --git a/lib/contribute/audio.ts b/lib/contribute/audio.ts index e170b59..f7a5c9a 100644 --- a/lib/contribute/audio.ts +++ b/lib/contribute/audio.ts @@ -59,12 +59,17 @@ export class Packer { export class Encoder { #encoder!: AudioEncoder #encoderConfig: AudioEncoderConfig - #decoderConfig = new Deferred() + #decoderConfig: AudioDecoderConfig frames: TransformStream constructor(config: AudioEncoderConfig) { this.#encoderConfig = config + this.#decoderConfig = { + codec: config.codec, + numberOfChannels: config.numberOfChannels, + sampleRate: config.sampleRate, + } this.frames = new TransformStream({ start: this.#start.bind(this), @@ -76,7 +81,7 @@ export class Encoder { #start(controller: TransformStreamDefaultController) { this.#encoder = new AudioEncoder({ output: (frame, metadata) => { - this.#enqueue(controller, frame, metadata) + controller.enqueue(frame) }, error: (err) => { throw err @@ -91,22 +96,6 @@ export class Encoder { frame.close() } - #enqueue( - controller: TransformStreamDefaultController, - frame: EncodedAudioChunk, - metadata?: EncodedAudioChunkMetadata, - ) { - const config = metadata?.decoderConfig - if (config && !this.#decoderConfig.pending) { - const config = metadata.decoderConfig - if (!config) throw new Error("missing decoder config") - - this.#decoderConfig.resolve(config) - } - - controller.enqueue(frame) - } - #flush() { this.#encoder.close() } @@ -124,7 +113,7 @@ export class Encoder { return this.#encoderConfig } - async decoderConfig(): Promise { - return await this.#decoderConfig.promise + decoderConfig(): AudioDecoderConfig { + return this.#decoderConfig } } diff --git a/lib/contribute/broadcast.ts b/lib/contribute/broadcast.ts index 5159ae9..76a4327 100644 --- a/lib/contribute/broadcast.ts +++ b/lib/contribute/broadcast.ts @@ -3,6 +3,7 @@ import * as Transfork from "../transfork" import * as Audio from "./audio" import * as Video from "./video" +import * as Hex from "../common/hex" import { isAudioTrackSettings, isVideoTrackSettings } from "../common/settings" export interface BroadcastConfig { @@ -24,7 +25,7 @@ export class Broadcast { #path: string[] constructor(config: BroadcastConfig) { - const id = config.id || new Date().getTime() / 1000 + const id = config.id || (new Date().getTime() / 1000).toFixed(0) this.#config = config this.#path = config.path.concat(id.toString()) @@ -32,7 +33,7 @@ export class Broadcast { async publish(connection: Transfork.Connection) { const broadcast: Catalog.Broadcast = { - path: this.#config.path, + path: this.#path, audio: [], video: [], } @@ -45,7 +46,7 @@ export class Broadcast { priority: media.kind === "video" ? 1 : 2, } - const track = new Transfork.Track(this.#config.path.concat(info.name), info.priority) + const track = new Transfork.Track(this.#path.concat(info.name), info.priority) if (isVideoTrackSettings(settings)) { if (!this.#config.video) { @@ -64,7 +65,7 @@ export class Broadcast { const video: Catalog.Video = { track: info, codec: decoder.codec, - description: description, + description: description ? Hex.encode(description) : undefined, resolution: { width: settings.width, height: settings.height }, frame_rate: settings.frameRate, bitrate: this.#config.video.bitrate, @@ -80,7 +81,7 @@ export class Broadcast { const packer = new Audio.Packer(media as MediaStreamAudioTrack, encoder, track) packer.run().catch((err) => console.error("failed to run audio packer: ", err)) // TODO handle error - const decoder = await encoder.decoderConfig() + const decoder = encoder.decoderConfig() const audio: Catalog.Audio = { track: info, @@ -98,7 +99,7 @@ export class Broadcast { connection.publish(track.reader()) } - const track = new Transfork.Track(this.#config.path, 0) + const track = new Transfork.Track(this.#path, 0) track.appendGroup().writeFrames(Catalog.encode(broadcast)) connection.publish(track.reader()) diff --git a/lib/contribute/video.ts b/lib/contribute/video.ts index fa25876..fefeb95 100644 --- a/lib/contribute/video.ts +++ b/lib/contribute/video.ts @@ -1,5 +1,5 @@ import { Deferred } from "../common/async" -import type { Frame } from "../karp/frame" +import { Frame } from "../karp/frame" import type { Group, Track } from "../transfork" import { Closed } from "../transfork/error" @@ -27,7 +27,7 @@ export class Packer { } async run() { - const output = new WritableStream({ + const output = new WritableStream({ write: (chunk) => this.#write(chunk), close: () => this.#close(), abort: (e) => this.#close(e), @@ -36,7 +36,7 @@ export class Packer { return this.#source.readable.pipeThrough(this.#encoder.frames).pipeTo(output) } - #write(frame: Frame) { + #write(frame: EncodedVideoChunk) { if (!this.#current || frame.type === "key") { if (this.#current) { this.#current.close() @@ -45,7 +45,11 @@ export class Packer { this.#current = this.#data.appendGroup() } - frame.encode(this.#current) + const buffer = new Uint8Array(frame.byteLength) + frame.copyTo(buffer) + + const karp = new Frame(frame.type, frame.timestamp, buffer) + karp.encode(this.#current) } #close(err?: unknown) { diff --git a/lib/karp/catalog/video.ts b/lib/karp/catalog/video.ts index d5f1a7f..9e888a9 100644 --- a/lib/karp/catalog/video.ts +++ b/lib/karp/catalog/video.ts @@ -1,10 +1,9 @@ -import * as Hex from "../../common/hex" import { type Track, decodeTrack } from "./track" export interface Video { track: Track codec: string - description?: Uint8Array + description?: string bitrate?: number frame_rate?: number resolution: Dimensions @@ -22,7 +21,7 @@ export function decodeVideo(o: unknown): o is Video { if (!decodeTrack(obj.track)) return false if (typeof obj.codec !== "string") return false - obj.description = obj.description && typeof obj.description === "string" ? Hex.decode(obj.description) : undefined + obj.description = obj.description && typeof obj.description === "string" ? obj.description : undefined return true } diff --git a/lib/playback/player.ts b/lib/playback/player.ts index b7ba1c6..71ad1da 100644 --- a/lib/playback/player.ts +++ b/lib/playback/player.ts @@ -32,12 +32,18 @@ export class Player { throw new Error("expected resumable broadcast") } - const path = announce.path.slice(0, this.#config.path.length + 1) + if (announce.path.length !== this.#config.path.length + 1) { + // Ignore subtracks + continue + } - const id = Number.parseInt(path[path.length - 1]) - if (id <= activeId) continue + const id = Number.parseInt(announce.path[announce.path.length - 1]) + if (id <= activeId) { + console.warn("skipping old broadcast", announce.path) + continue + } - const catalog = await Catalog.fetch(this.#config.connection, path) + const catalog = await Catalog.fetch(this.#config.connection, announce.path) this.#active?.close() this.#active = new Broadcast(this.#config.connection, catalog, this.#config.canvas) diff --git a/lib/playback/video.ts b/lib/playback/video.ts index 7721803..2556c2d 100644 --- a/lib/playback/video.ts +++ b/lib/playback/video.ts @@ -1,3 +1,4 @@ +import * as Hex from "../common/hex" import type * as Catalog from "../karp/catalog" import type { Frame } from "../karp/frame" import type { Component } from "./timeline" @@ -58,7 +59,7 @@ export class Renderer { codec: this.#track.codec, codedHeight: this.#track.resolution.height, codedWidth: this.#track.resolution.width, - description: this.#track.description, + description: this.#track.description ? Hex.decode(this.#track.description) : undefined, optimizeForLatency: true, }) } diff --git a/lib/transfork/connection.ts b/lib/transfork/connection.ts index 8c0b51b..74647e8 100644 --- a/lib/transfork/connection.ts +++ b/lib/transfork/connection.ts @@ -75,7 +75,9 @@ export class Connection { } const [msg, stream] = next - this.#runBidi(msg, stream).catch((err) => stream.writer.reset(Closed.extract(err))) + this.#runBidi(msg, stream) + .catch((err) => stream.writer.reset(Closed.extract(err))) + .finally(() => stream.writer.close()) } } @@ -131,7 +133,9 @@ export class Connection { } const [msg, stream] = next - this.#runUni(msg, stream).catch((err) => stream.stop(Closed.extract(err))) + this.#runUni(msg, stream) + .catch((err) => stream.stop(Closed.extract(err))) + .finally(() => stream.stop(0)) } } diff --git a/lib/transfork/model.ts b/lib/transfork/model.ts index 0e54193..ac0504e 100644 --- a/lib/transfork/model.ts +++ b/lib/transfork/model.ts @@ -1,5 +1,5 @@ import { Watch } from "../common/async" -import { Closed } from "./error" +import type { Closed } from "./error" import { Order } from "./message" export class Track { @@ -38,7 +38,7 @@ export class Track { return group } - close(closed = new Closed()) { + close(closed?: Closed) { if (this.closed) return this.closed = closed this.latest.close() @@ -124,7 +124,7 @@ export class Group { return this.chunks.value()[0].length } - close(closed = new Closed()) { + close(closed?: Closed) { if (this.closed) return this.closed = closed this.chunks.close() @@ -148,7 +148,9 @@ export class GroupReader { return chunks[this.#index - 1] } - if (this.#group.closed) throw this.#group.closed + if (this.#group.closed) { + throw this.#group.closed + } if (!next) return ;[chunks, next] = await next diff --git a/lib/transfork/publisher.ts b/lib/transfork/publisher.ts index 2e9f986..a16e929 100644 --- a/lib/transfork/publisher.ts +++ b/lib/transfork/publisher.ts @@ -1,4 +1,4 @@ -import { Watch } from "../common/async" +import { Watch, type WatchNext } from "../common/async" import { Closed } from "./error" import * as Message from "./message" import type { GroupReader, TrackReader } from "./model" @@ -8,7 +8,8 @@ export class Publisher { #quic: WebTransport // Our announced broadcasts. - #announce = new Map() + // NOTE: Because Javascript is dumb, we have to JSON.stringify the path. + #announce = new Watch(new Map()) // Their subscribed tracks. #subscribe = new Map() @@ -19,36 +20,68 @@ export class Publisher { // Publish a track publish(track: TrackReader) { - if (this.#announce.has(track.path)) { - throw new Error(`already announced: ${track.path.toString()}`) - } + this.#announce.update((current) => { + const key = JSON.stringify(track.path) + if (current.has(key)) { + throw new Error(`already announced: ${track.path.toString()}`) + } - this.#announce.set(track.path, track) + current.set(key, track) + return current + }) // TODO: clean up announcements // track.closed().then(() => this.#announce.delete(track.path)) } #get(path: string[]): TrackReader | undefined { - return this.#announce.get(path) + const key = JSON.stringify(path) + return this.#announce.value()[0].get(key) } async runAnnounce(msg: Message.AnnounceInterest, stream: Stream) { - for (const announce of this.#announce.values()) { - if (announce.path.length < msg.prefix.length) continue + let current = this.#announce.value() + let seen = new Map() - const prefix = announce.path.slice(0, msg.prefix.length) - if (prefix !== msg.prefix) continue + while (current) { + const newSeen = new Map() - const suffix = announce.path.slice(msg.prefix.length) + for (const [key, announce] of current[0]) { + if (announce.path.length < msg.prefix.length) { + continue + } - const active = new Message.Announce(suffix, "active") - await active.encode(stream.writer) - } + const prefix = announce.path.slice(0, msg.prefix.length) + if (!prefix.every((v, i) => v === msg.prefix[i])) { + continue + } - // TODO support updates. - // Until then, just keep the stream open. - await stream.reader.closed() + newSeen.set(key, announce) + + if (seen.delete(key)) { + // Already exists + continue + } + + // Add a new track + const suffix = announce.path.slice(msg.prefix.length) + const active = new Message.Announce(suffix, "active") + await active.encode(stream.writer) + } + + // Remove any closed tracks + for (const announce of seen.values()) { + const suffix = announce.path.slice(msg.prefix.length) + const ended = new Message.Announce(suffix, "closed") + await ended.encode(stream.writer) + } + + seen = newSeen + + const next = await current[1] + if (!next) return + current = next + } } async runSubscribe(msg: Message.Subscribe, stream: Stream) { @@ -132,8 +165,12 @@ class Subscribed { const closed = this.closed() for (;;) { - const [group, done] = await Promise.all([this.#track.nextGroup(), closed]) - if (done) return + const group = await Promise.any([this.#track.nextGroup(), closed]) + if (group instanceof Closed) { + this.close(group) + return + } + if (!group) break this.#runGroup(group).catch((err) => console.warn("failed to run group: ", err)) @@ -154,6 +191,8 @@ class Subscribed { await stream.u53(frame.byteLength) await stream.write(frame) } + + await stream.close() } close(err = new Closed()) { diff --git a/lib/transfork/stream.ts b/lib/transfork/stream.ts index 2a2a190..85b9cb3 100644 --- a/lib/transfork/stream.ts +++ b/lib/transfork/stream.ts @@ -347,6 +347,8 @@ export class Writer { throw new Error("invalid message type") } + await msg.encode(stream) + return stream } } diff --git a/lib/transfork/subscriber.ts b/lib/transfork/subscriber.ts index fd5f664..ef73f14 100644 --- a/lib/transfork/subscriber.ts +++ b/lib/transfork/subscriber.ts @@ -39,6 +39,8 @@ export class Subscriber { break } + const path = prefix.concat(announce.suffix) + const existing = toggle.get(announce.suffix) if (existing) { if (announce.status === "active") { @@ -52,7 +54,6 @@ export class Subscriber { throw new Error("unknown announce") } - const path = prefix.concat(announce.suffix) const item = new Announced(path) await announced.push(item) toggle.set(announce.suffix, item) @@ -72,6 +73,7 @@ export class Subscriber { const stream = await Stream.open(this.#quic, msg) const subscribe = new Subscribe(id, stream, track) + subscribe.run().catch((err) => console.warn("failed to run subscribe: ", err)) this.#subscribe.set(subscribe.id, subscribe) diff --git a/web/src/components/publish.tsx b/web/src/components/publish.tsx index 755f33e..e1bb854 100644 --- a/web/src/components/publish.tsx +++ b/web/src/components/publish.tsx @@ -326,7 +326,6 @@ function Device(props: { gamma: typedEvent.gamma, } setOrientation(orient) - //console.log(orient) } onMount(() => {