Skip to content

Commit

Permalink
Fix some glaring publishing bugs. (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
kixelated authored Nov 30, 2024
1 parent 39b00dc commit 045e36f
Show file tree
Hide file tree
Showing 13 changed files with 118 additions and 68 deletions.
6 changes: 4 additions & 2 deletions lib/common/hex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ export function decode(str: string): Uint8Array {
return bytes
}

export function encode(_bytes: Uint8Array): string {
throw "todo"
export function encode(bytes: Uint8Array): string {
return Array.from(bytes)
.map((byte) => byte.toString(16).padStart(2, "0"))
.join("")
}
29 changes: 9 additions & 20 deletions lib/contribute/audio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,17 @@ export class Packer {
export class Encoder {
#encoder!: AudioEncoder
#encoderConfig: AudioEncoderConfig
#decoderConfig = new Deferred<AudioDecoderConfig>()
#decoderConfig: AudioDecoderConfig

frames: TransformStream<AudioData, EncodedAudioChunk>

constructor(config: AudioEncoderConfig) {
this.#encoderConfig = config
this.#decoderConfig = {
codec: config.codec,
numberOfChannels: config.numberOfChannels,
sampleRate: config.sampleRate,
}

this.frames = new TransformStream({
start: this.#start.bind(this),
Expand All @@ -76,7 +81,7 @@ export class Encoder {
#start(controller: TransformStreamDefaultController<EncodedAudioChunk>) {
this.#encoder = new AudioEncoder({
output: (frame, metadata) => {
this.#enqueue(controller, frame, metadata)
controller.enqueue(frame)
},
error: (err) => {
throw err
Expand All @@ -91,22 +96,6 @@ export class Encoder {
frame.close()
}

#enqueue(
controller: TransformStreamDefaultController<EncodedAudioChunk>,
frame: EncodedAudioChunk,
metadata?: EncodedAudioChunkMetadata,
) {
const config = metadata?.decoderConfig
if (config && !this.#decoderConfig.pending) {
const config = metadata.decoderConfig
if (!config) throw new Error("missing decoder config")

this.#decoderConfig.resolve(config)
}

controller.enqueue(frame)
}

#flush() {
this.#encoder.close()
}
Expand All @@ -124,7 +113,7 @@ export class Encoder {
return this.#encoderConfig
}

async decoderConfig(): Promise<AudioDecoderConfig> {
return await this.#decoderConfig.promise
decoderConfig(): AudioDecoderConfig {
return this.#decoderConfig
}
}
13 changes: 7 additions & 6 deletions lib/contribute/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as Transfork from "../transfork"
import * as Audio from "./audio"
import * as Video from "./video"

import * as Hex from "../common/hex"
import { isAudioTrackSettings, isVideoTrackSettings } from "../common/settings"

export interface BroadcastConfig {
Expand All @@ -24,15 +25,15 @@ export class Broadcast {
#path: string[]

constructor(config: BroadcastConfig) {
const id = config.id || new Date().getTime() / 1000
const id = config.id || (new Date().getTime() / 1000).toFixed(0)

this.#config = config
this.#path = config.path.concat(id.toString())
}

async publish(connection: Transfork.Connection) {
const broadcast: Catalog.Broadcast = {
path: this.#config.path,
path: this.#path,
audio: [],
video: [],
}
Expand All @@ -45,7 +46,7 @@ export class Broadcast {
priority: media.kind === "video" ? 1 : 2,
}

const track = new Transfork.Track(this.#config.path.concat(info.name), info.priority)
const track = new Transfork.Track(this.#path.concat(info.name), info.priority)

if (isVideoTrackSettings(settings)) {
if (!this.#config.video) {
Expand All @@ -64,7 +65,7 @@ export class Broadcast {
const video: Catalog.Video = {
track: info,
codec: decoder.codec,
description: description,
description: description ? Hex.encode(description) : undefined,
resolution: { width: settings.width, height: settings.height },
frame_rate: settings.frameRate,
bitrate: this.#config.video.bitrate,
Expand All @@ -80,7 +81,7 @@ export class Broadcast {
const packer = new Audio.Packer(media as MediaStreamAudioTrack, encoder, track)
packer.run().catch((err) => console.error("failed to run audio packer: ", err)) // TODO handle error

const decoder = await encoder.decoderConfig()
const decoder = encoder.decoderConfig()

const audio: Catalog.Audio = {
track: info,
Expand All @@ -98,7 +99,7 @@ export class Broadcast {
connection.publish(track.reader())
}

const track = new Transfork.Track(this.#config.path, 0)
const track = new Transfork.Track(this.#path, 0)
track.appendGroup().writeFrames(Catalog.encode(broadcast))

connection.publish(track.reader())
Expand Down
12 changes: 8 additions & 4 deletions lib/contribute/video.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Deferred } from "../common/async"
import type { Frame } from "../karp/frame"
import { Frame } from "../karp/frame"
import type { Group, Track } from "../transfork"
import { Closed } from "../transfork/error"

Expand Down Expand Up @@ -27,7 +27,7 @@ export class Packer {
}

async run() {
const output = new WritableStream({
const output = new WritableStream<EncodedVideoChunk>({
write: (chunk) => this.#write(chunk),
close: () => this.#close(),
abort: (e) => this.#close(e),
Expand All @@ -36,7 +36,7 @@ export class Packer {
return this.#source.readable.pipeThrough(this.#encoder.frames).pipeTo(output)
}

#write(frame: Frame) {
#write(frame: EncodedVideoChunk) {
if (!this.#current || frame.type === "key") {
if (this.#current) {
this.#current.close()
Expand All @@ -45,7 +45,11 @@ export class Packer {
this.#current = this.#data.appendGroup()
}

frame.encode(this.#current)
const buffer = new Uint8Array(frame.byteLength)
frame.copyTo(buffer)

const karp = new Frame(frame.type, frame.timestamp, buffer)
karp.encode(this.#current)
}

#close(err?: unknown) {
Expand Down
5 changes: 2 additions & 3 deletions lib/karp/catalog/video.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import * as Hex from "../../common/hex"
import { type Track, decodeTrack } from "./track"

export interface Video {
track: Track
codec: string
description?: Uint8Array
description?: string
bitrate?: number
frame_rate?: number
resolution: Dimensions
Expand All @@ -22,7 +21,7 @@ export function decodeVideo(o: unknown): o is Video {
if (!decodeTrack(obj.track)) return false
if (typeof obj.codec !== "string") return false

obj.description = obj.description && typeof obj.description === "string" ? Hex.decode(obj.description) : undefined
obj.description = obj.description && typeof obj.description === "string" ? obj.description : undefined

return true
}
14 changes: 10 additions & 4 deletions lib/playback/player.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,18 @@ export class Player {
throw new Error("expected resumable broadcast")
}

const path = announce.path.slice(0, this.#config.path.length + 1)
if (announce.path.length !== this.#config.path.length + 1) {
// Ignore subtracks
continue
}

const id = Number.parseInt(path[path.length - 1])
if (id <= activeId) continue
const id = Number.parseInt(announce.path[announce.path.length - 1])
if (id <= activeId) {
console.warn("skipping old broadcast", announce.path)
continue
}

const catalog = await Catalog.fetch(this.#config.connection, path)
const catalog = await Catalog.fetch(this.#config.connection, announce.path)

this.#active?.close()
this.#active = new Broadcast(this.#config.connection, catalog, this.#config.canvas)
Expand Down
3 changes: 2 additions & 1 deletion lib/playback/video.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import * as Hex from "../common/hex"
import type * as Catalog from "../karp/catalog"
import type { Frame } from "../karp/frame"
import type { Component } from "./timeline"
Expand Down Expand Up @@ -58,7 +59,7 @@ export class Renderer {
codec: this.#track.codec,
codedHeight: this.#track.resolution.height,
codedWidth: this.#track.resolution.width,
description: this.#track.description,
description: this.#track.description ? Hex.decode(this.#track.description) : undefined,
optimizeForLatency: true,
})
}
Expand Down
8 changes: 6 additions & 2 deletions lib/transfork/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ export class Connection {
}

const [msg, stream] = next
this.#runBidi(msg, stream).catch((err) => stream.writer.reset(Closed.extract(err)))
this.#runBidi(msg, stream)
.catch((err) => stream.writer.reset(Closed.extract(err)))
.finally(() => stream.writer.close())
}
}

Expand Down Expand Up @@ -131,7 +133,9 @@ export class Connection {
}

const [msg, stream] = next
this.#runUni(msg, stream).catch((err) => stream.stop(Closed.extract(err)))
this.#runUni(msg, stream)
.catch((err) => stream.stop(Closed.extract(err)))
.finally(() => stream.stop(0))
}
}

Expand Down
10 changes: 6 additions & 4 deletions lib/transfork/model.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { Watch } from "../common/async"
import { Closed } from "./error"
import type { Closed } from "./error"
import { Order } from "./message"

export class Track {
Expand Down Expand Up @@ -38,7 +38,7 @@ export class Track {
return group
}

close(closed = new Closed()) {
close(closed?: Closed) {
if (this.closed) return
this.closed = closed
this.latest.close()
Expand Down Expand Up @@ -124,7 +124,7 @@ export class Group {
return this.chunks.value()[0].length
}

close(closed = new Closed()) {
close(closed?: Closed) {
if (this.closed) return
this.closed = closed
this.chunks.close()
Expand All @@ -148,7 +148,9 @@ export class GroupReader {
return chunks[this.#index - 1]
}

if (this.#group.closed) throw this.#group.closed
if (this.#group.closed) {
throw this.#group.closed
}

if (!next) return
;[chunks, next] = await next
Expand Down
Loading

0 comments on commit 045e36f

Please sign in to comment.