Skip to content

Commit

Permalink
Add crude auto-recovery support. (#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
kixelated authored Oct 28, 2024
1 parent a50ec3e commit 262c42c
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 161 deletions.
5 changes: 4 additions & 1 deletion lib/contribute/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { isAudioTrackSettings, isVideoTrackSettings } from "../common/settings"
export interface BroadcastConfig {
path: string[]
media: MediaStream
id?: number

audio?: AudioEncoderConfig
video?: VideoEncoderConfig
Expand All @@ -23,8 +24,10 @@ export class Broadcast {
#path: string[]

constructor(config: BroadcastConfig) {
const id = config.id || new Date().getTime() / 1000

this.#config = config
this.#path = config.path
this.#path = config.path.concat(id.toString())
}

async publish(connection: Transfork.Connection) {
Expand Down
143 changes: 143 additions & 0 deletions lib/playback/broadcast.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import { Connection } from "../transfork/connection"
import * as Catalog from "../karp/catalog"

import { Track } from "../transfork"

import * as Audio from "./audio"
import * as Video from "./video"
import { Timeline } from "./timeline"
import { GroupReader } from "../transfork/model"
import { Frame } from "../karp/frame"

// This class must be created on the main thread due to AudioContext.
export class Broadcast {
#connection: Connection
#catalog: Catalog.Broadcast

// Running is a promise that resolves when the player is closed.
// #close is called with no error, while #abort is called with an error.
#running: Promise<void>

// Timeline receives samples, buffering them and choosing the timestamp to render.
#timeline = new Timeline()

#audio?: Audio.Renderer
#video?: Video.Renderer

constructor(connection: Connection, catalog: Catalog.Broadcast, canvas: HTMLCanvasElement) {
this.#connection = connection
this.#catalog = catalog

const running = []

// Only configure audio is we have an audio track
const audio = (catalog.audio || []).at(0)
if (audio) {
this.#audio = new Audio.Renderer(audio, this.#timeline.audio)
running.push(this.#runAudio(audio))
}

const video = (catalog.video || []).at(0)
if (video) {
this.#video = new Video.Renderer(video, canvas, this.#timeline.video)
running.push(this.#runVideo(video))
}

// Async work
this.#running = Promise.race([...running])
}

async #runAudio(audio: Catalog.Audio) {
const track = new Track(this.#catalog.path.concat(audio.track.name), audio.track.priority)
const sub = await this.#connection.subscribe(track)

try {
for (;;) {
const group = await Promise.race([sub.nextGroup(), this.#running])
if (!group) break

this.#runAudioGroup(audio, group)
.catch(() => {})
.finally(() => group.close())
}
} finally {
sub.close()
}
}

async #runVideo(video: Catalog.Video) {
const track = new Track(this.#catalog.path.concat(video.track.name), video.track.priority)
const sub = await this.#connection.subscribe(track)

try {
for (;;) {
const group = await Promise.race([sub.nextGroup(), this.#running])
if (!group) break

this.#runVideoGroup(video, group)
.catch(() => {})
.finally(() => group.close())
}
} finally {
sub.close()
}
}

async #runAudioGroup(audio: Catalog.Audio, group: GroupReader) {
const timeline = this.#timeline.audio

// Create a queue that will contain each frame
const queue = new TransformStream<Frame>({})
const segment = queue.writable.getWriter()

// Add the segment to the timeline
const segments = timeline.segments.getWriter()
await segments.write({
sequence: group.id,
frames: queue.readable,
})
segments.releaseLock()

// Read each chunk, decoding the MP4 frames and adding them to the queue.
for (;;) {
const frame = await Frame.decode(group)
if (!frame) break

await segment.write(frame)
}

// We done.
await segment.close()
}

async #runVideoGroup(video: Catalog.Video, group: GroupReader) {
const timeline = this.#timeline.video

// Create a queue that will contain each MP4 frame.
const queue = new TransformStream<Frame>({})
const segment = queue.writable.getWriter()

// Add the segment to the timeline
const segments = timeline.segments.getWriter()
await segments.write({
sequence: group.id,
frames: queue.readable,
})
segments.releaseLock()

for (;;) {
const frame = await Frame.decode(group)
if (!frame) break

await segment.write(frame)
}

// We done.
await segment.close()
}

close() {
this.#audio?.close()
this.#video?.close()
}
}
164 changes: 26 additions & 138 deletions lib/playback/player.ts
Original file line number Diff line number Diff line change
@@ -1,169 +1,57 @@
import { Connection } from "../transfork/connection"
import * as Catalog from "../karp/catalog"

import { Track } from "../transfork"

import * as Audio from "./audio"
import * as Video from "./video"
import { Timeline } from "./timeline"
import { GroupReader } from "../transfork/model"
import { Frame } from "../karp/frame"
import { Broadcast } from "./broadcast"

export interface PlayerConfig {
connection: Connection
catalog: Catalog.Broadcast
fingerprint?: string // URL to fetch TLS certificate fingerprint
path: string[]
canvas: HTMLCanvasElement
}

// This class must be created on the main thread due to AudioContext.
export class Player {
#connection: Connection
#broadcast: Catalog.Broadcast

// Running is a promise that resolves when the player is closed.
// #close is called with no error, while #abort is called with an error.
#config: PlayerConfig
#running: Promise<void>
#close!: () => void
#abort!: (err: Error) => void

// Timeline receives samples, buffering them and choosing the timestamp to render.
#timeline = new Timeline()

#audio?: Audio.Renderer
#video?: Video.Renderer

constructor(config: PlayerConfig) {
this.#connection = config.connection
this.#broadcast = config.catalog

const abort = new Promise<void>((resolve, reject) => {
this.#close = resolve
this.#abort = reject
})

const running = []

// Only configure audio is we have an audio track
const audio = (config.catalog.audio || []).at(0)
if (audio) {
this.#audio = new Audio.Renderer(audio, this.#timeline.audio)
running.push(this.#runAudio(audio))
}

const video = (config.catalog.video || []).at(0)
if (video) {
this.#video = new Video.Renderer(video, config.canvas, this.#timeline.video)
running.push(this.#runVideo(video))
}

// Async work
this.#running = Promise.race([abort, ...running]).catch(this.#close)
}

async #runAudio(audio: Catalog.Audio) {
const track = new Track(this.#broadcast.path.concat(audio.track.name), audio.track.priority)
const sub = await this.#connection.subscribe(track)

try {
for (;;) {
const group = await Promise.race([sub.nextGroup(), this.#running])
if (!group) break

this.#runAudioGroup(audio, group)
.catch((err) => console.warn("failed to run group: ", err))
.finally(() => group.close())
}
} finally {
sub.close()
}
this.#config = config
this.#running = this.#run()
}

async #runVideo(video: Catalog.Video) {
const track = new Track(this.#broadcast.path.concat(video.track.name), video.track.priority)
const sub = await this.#connection.subscribe(track)

try {
for (;;) {
const group = await Promise.race([sub.nextGroup(), this.#running])
if (!group) break
async #run() {
const announced = await this.#config.connection.announced(this.#config.path)

this.#runVideoGroup(video, group)
.catch((err) => console.warn("failed to run group: ", err))
.finally(() => group.close())
}
} finally {
sub.close()
}
}
let active = undefined
let activeId = -1

async #runAudioGroup(audio: Catalog.Audio, group: GroupReader) {
const timeline = this.#timeline.audio

// Create a queue that will contain each frame
const queue = new TransformStream<Frame>({})
const segment = queue.writable.getWriter()

// Add the segment to the timeline
const segments = timeline.segments.getWriter()
await segments.write({
sequence: group.id,
frames: queue.readable,
})
segments.releaseLock()

// Read each chunk, decoding the MP4 frames and adding them to the queue.
for (;;) {
const frame = await Frame.decode(group)
if (!frame) break
const announce = await announced.next()
if (!announce) break

await segment.write(frame)
}

// We done.
await segment.close()
}

async #runVideoGroup(video: Catalog.Video, group: GroupReader) {
const timeline = this.#timeline.video
if (announce.path.length == this.#config.path.length) {
throw new Error("expected resumable broadcast")
}

// Create a queue that will contain each MP4 frame.
const queue = new TransformStream<Frame>({})
const segment = queue.writable.getWriter()
const path = announce.path.slice(0, this.#config.path.length + 1)

// Add the segment to the timeline
const segments = timeline.segments.getWriter()
await segments.write({
sequence: group.id,
frames: queue.readable,
})
segments.releaseLock()
const id = parseInt(path[path.length - 1])
if (id <= activeId) continue

for (;;) {
const frame = await Frame.decode(group)
if (!frame) break
const catalog = await Catalog.fetch(this.#config.connection, path)

await segment.write(frame)
active?.close()
active = new Broadcast(this.#config.connection, catalog, this.#config.canvas)
activeId = id
}

// We done.
await segment.close()
}

close(err?: Error) {
if (err) this.#abort(err)
else this.#close()

if (this.#connection) this.#connection.close()
this.#audio?.close()
this.#video?.close()
active?.close()
}

async closed(): Promise<void> {
await this.#running
close() {
this.#config.connection.close()
}

play() {
this.#audio?.play()
async closed() {
await Promise.any([this.#running, this.#config.connection.closed()])
}
}
2 changes: 1 addition & 1 deletion lib/transfork/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export class Connection {
this.#publisher.publish(track)
}

async announced(prefix = []): Promise<Queue<Announced>> {
async announced(prefix: string[] = []): Promise<Queue<Announced>> {
return this.#subscriber.announced(prefix)
}

Expand Down
1 change: 0 additions & 1 deletion lib/transfork/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ export class Subscriber {
writer.writeFrame(frame)
}

console.trace("end of group", group)
writer.close()
}
}
Expand Down
Loading

0 comments on commit 262c42c

Please sign in to comment.