Skip to content

Commit

Permalink
Switch broadcasts from a name to a path. (#120)
Browse files Browse the repository at this point in the history
  • Loading branch information
kixelated authored Oct 23, 2024
1 parent bfcb883 commit 1bd58dc
Show file tree
Hide file tree
Showing 12 changed files with 123 additions and 75 deletions.
6 changes: 3 additions & 3 deletions lib/contribute/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import * as Video from "./video"
import { isAudioTrackSettings, isVideoTrackSettings } from "../common/settings"

export interface BroadcastConfig {
name: string
path: string[]
media: MediaStream

audio?: AudioEncoderConfig
Expand All @@ -24,11 +24,11 @@ export class Broadcast {

constructor(config: BroadcastConfig) {
this.#config = config
this.#broadcast = new Transfork.Broadcast(config.name)
this.#broadcast = new Transfork.Broadcast(config.path)
}

async publish(connection: Transfork.Connection) {
const broadcast: Catalog.Broadcast = { name: this.#config.name, audio: [], video: [] }
const broadcast: Catalog.Broadcast = { path: this.#config.path, audio: [], video: [] }

for (const media of this.#config.media.getTracks()) {
const settings = media.getSettings()
Expand Down
12 changes: 6 additions & 6 deletions lib/karp/catalog/broadcast.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { decodeAudio, Audio } from "./audio"
import { decodeVideo, Video } from "./video"

export interface Broadcast {
name: string
path: string[]
video: Video[]
audio: Audio[]
}
Expand All @@ -15,7 +15,7 @@ export function encode(catalog: Broadcast): Uint8Array {
return encoder.encode(str)
}

export function decode(broadcast: string, raw: Uint8Array): Broadcast {
export function decode(path: string[], raw: Uint8Array): Broadcast {
const decoder = new TextDecoder()
const str = decoder.decode(raw)

Expand All @@ -24,12 +24,12 @@ export function decode(broadcast: string, raw: Uint8Array): Broadcast {
throw new Error("invalid catalog")
}

catalog.name = broadcast
catalog.path = path
return catalog
}

export async function fetch(connection: Transfork.Connection, broadcast: string): Promise<Broadcast> {
const track = new Transfork.Track(broadcast, "catalog.json", 0)
export async function fetch(connection: Transfork.Connection, path: string[]): Promise<Broadcast> {
const track = new Transfork.Track(path, "catalog.json", 0)
const sub = await connection.subscribe(track)
try {
const segment = await sub.nextGroup()
Expand All @@ -39,7 +39,7 @@ export async function fetch(connection: Transfork.Connection, broadcast: string)
if (!frame) throw new Error("no catalog frame")

segment.close()
return decode(broadcast, frame)
return decode(path, frame)
} finally {
sub.close()
}
Expand Down
8 changes: 4 additions & 4 deletions lib/playback/player.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ export class Player {
}

async #runAudio(audio: Catalog.Audio) {
const track = new Track(this.#broadcast.name, audio.track.name, audio.track.priority)
const sub = await this.#connection.subscribe(new Track(this.#broadcast.name, track.name, track.priority))
const track = new Track(this.#broadcast.path, audio.track.name, audio.track.priority)
const sub = await this.#connection.subscribe(new Track(this.#broadcast.path, track.name, track.priority))

try {
for (;;) {
Expand All @@ -80,8 +80,8 @@ export class Player {
}

async #runVideo(video: Catalog.Video) {
const track = new Track(this.#broadcast.name, video.track.name, video.track.priority)
const sub = await this.#connection.subscribe(new Track(this.#broadcast.name, track.name, track.priority))
const track = new Track(this.#broadcast.path, video.track.name, video.track.priority)
const sub = await this.#connection.subscribe(new Track(this.#broadcast.path, track.name, track.priority))

try {
for (;;) {
Expand Down
2 changes: 1 addition & 1 deletion lib/transfork/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export class Connection {
this.#publisher.announce(broadcast)
}

async announced(prefix = ""): Promise<Queue<Announced>> {
async announced(prefix = []): Promise<Queue<Announced>> {
return this.#subscriber.announced(prefix)
}

Expand Down
42 changes: 26 additions & 16 deletions lib/transfork/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,19 +141,26 @@ export class SessionInfo {
}
}

export type AnnounceStatus = "active" | "closed"

export class Announce {
broadcast: string
suffix: string[]
status: AnnounceStatus

constructor(broadcast: string) {
this.broadcast = broadcast
constructor(suffix: string[], status: AnnounceStatus) {
this.suffix = suffix
this.status = status
}

async encode(w: Writer) {
await w.string(this.broadcast)
await w.u53(this.status == "active" ? 1 : 0)
await w.path(this.suffix)
}

static async decode(r: Reader): Promise<Announce> {
return new Announce(await r.string())
const status = (await r.u53()) == 1 ? "active" : "closed"
const suffix = await r.path()
return new Announce(suffix, status)
}

static async decode_maybe(r: Reader): Promise<Announce | undefined> {
Expand All @@ -165,14 +172,15 @@ export class Announce {
export class AnnounceInterest {
static StreamID = 0x1

constructor(public prefix: string) {}
constructor(public prefix: string[]) {}

async encode(w: Writer) {
await w.string(this.prefix)
await w.path(this.prefix)
}

static async decode(r: Reader): Promise<AnnounceInterest> {
return new AnnounceInterest(await r.string())
const prefix = await r.path()
return new AnnounceInterest(prefix)
}
}

Expand Down Expand Up @@ -224,12 +232,12 @@ export class SubscribeUpdate {

export class Subscribe extends SubscribeUpdate {
id: bigint
broadcast: string
broadcast: string[]
track: string

static StreamID = 0x2

constructor(id: bigint, broadcast: string, track: string, priority: number) {
constructor(id: bigint, broadcast: string[], track: string, priority: number) {
super(priority)

this.id = id
Expand All @@ -239,14 +247,14 @@ export class Subscribe extends SubscribeUpdate {

async encode(w: Writer) {
await w.u62(this.id)
await w.string(this.broadcast)
await w.path(this.broadcast)
await w.string(this.track)
await super.encode(w)
}

static async decode(r: Reader): Promise<Subscribe> {
const id = await r.u62()
const broadcast = await r.string()
const broadcast = await r.path()
const track = await r.string()
const update = await super.decode(r)

Expand Down Expand Up @@ -295,23 +303,25 @@ export class Info {
}

export class InfoRequest {
broadcast: string
broadcast: string[]
track: string

static StreamID = 0x5

constructor(broadcast: string, track: string) {
constructor(broadcast: string[], track: string) {
this.broadcast = broadcast
this.track = track
}

async encode(w: Writer) {
await w.string(this.broadcast)
await w.path(this.broadcast)
await w.string(this.track)
}

static async decode(r: Reader): Promise<InfoRequest> {
return new InfoRequest(await r.string(), await r.string())
const broadcast = await r.path()
const track = await r.string()
return new InfoRequest(broadcast, track)
}
}

Expand Down
12 changes: 6 additions & 6 deletions lib/transfork/model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ export class Broadcast {
readers = 0
closed?: Closed

constructor(public name: string) {}
constructor(public path: string[]) {}

createTrack(name: string, priority: number): Track {
if (this.closed) throw this.closed
const track = new Track(this.name, name, priority)
const track = new Track(this.path, name, priority)
track.readers += 1 // Avoid closing the track when all readers are closed
this.tracks.set(track.name, track)
return track
Expand Down Expand Up @@ -41,8 +41,8 @@ export class BroadcastReader {
}
}

get name(): string {
return this.#broadcast.name
get path(): string[] {
return this.#broadcast.path
}

close() {
Expand All @@ -52,7 +52,7 @@ export class BroadcastReader {
}

export class Track {
readonly broadcast: string
readonly broadcast: string[]
readonly name: string
readonly priority: number
order = Order.Any
Expand All @@ -63,7 +63,7 @@ export class Track {
readers = 0
closed?: Closed

constructor(broadcast: string, name: string, priority: number) {
constructor(broadcast: string[], name: string, priority: number) {
this.broadcast = broadcast
this.name = name
this.priority = priority
Expand Down
23 changes: 14 additions & 9 deletions lib/transfork/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export class Publisher {
#quic: WebTransport

// Our announced broadcasts.
#announce = new Map<string, Broadcast>()
#announce = new Map<string[], Broadcast>()

// Their subscribed tracks.
#subscribe = new Map<bigint, Subscribed>()
Expand All @@ -19,23 +19,28 @@ export class Publisher {

// Announce a track broadcast.
announce(broadcast: Broadcast) {
if (this.#announce.has(broadcast.name)) {
throw new Error(`already announced: ${broadcast.name}`)
if (this.#announce.has(broadcast.path)) {
throw new Error(`already announced: ${broadcast.path.toString()}`)
}

this.#announce.set(broadcast.name, broadcast)
this.#announce.set(broadcast.path, broadcast)
}

#get(msg: { broadcast: string; track: string }): TrackReader | undefined {
#get(msg: { broadcast: string[]; track: string }): TrackReader | undefined {
return this.#announce.get(msg.broadcast)?.reader().getTrack(msg.track)
}

async runAnnounce(msg: Message.AnnounceInterest, stream: Stream) {
for (const announce of this.#announce.values()) {
if (announce.name.startsWith(msg.prefix)) {
const msg = new Message.Announce(announce.name)
await msg.encode(stream.writer)
}
if (announce.path.length < msg.prefix.length) continue

const prefix = announce.path.slice(0, msg.prefix.length)
if (prefix != msg.prefix) continue

const suffix = announce.path.slice(msg.prefix.length)

const active = new Message.Announce(suffix, "active")
await active.encode(stream.writer)
}

// TODO support updates.
Expand Down
18 changes: 18 additions & 0 deletions lib/transfork/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,17 @@ export class Reader {
return new TextDecoder().decode(buffer)
}

async path(): Promise<string[]> {
const parts = await this.u53()
const path = []

for (let i = 0; i < parts; i++) {
path.push(await this.string())
}

return path
}

async u8(): Promise<number> {
await this.#fillTo(1)
return this.#slice(1)[0]
Expand Down Expand Up @@ -300,6 +311,13 @@ export class Writer {
await this.write(data)
}

async path(path: string[]) {
await this.u53(path.length)
for (const part of path) {
await this.string(part)
}
}

async close() {
this.#writer.releaseLock()
await this.#stream.close()
Expand Down
Loading

0 comments on commit 1bd58dc

Please sign in to comment.