From 67b6e36a1ecff5a59f8970602e7726115f7f1bef Mon Sep 17 00:00:00 2001 From: kixelated Date: Fri, 15 Sep 2023 12:06:52 -0700 Subject: [PATCH] Add explicit support for KIXEL_00. (#34) Javascript changes for the BREAKING changes. --- lib/contribute/broadcast.ts | 51 +++------- lib/media/catalog.ts | 8 +- lib/playback/broadcast.ts | 6 -- lib/playback/index.ts | 1 - lib/playback/player.ts | 18 +--- lib/transport/client.ts | 2 +- lib/transport/connection.ts | 18 ++-- lib/transport/control.ts | 191 ++++++++++++++++++++++++++---------- lib/transport/object.ts | 18 ++-- lib/transport/publisher.ts | 12 +-- lib/transport/setup.ts | 5 +- lib/transport/subscriber.ts | 10 +- web/connect.tsx | 34 ------- web/listing.tsx | 54 ++-------- web/publish.tsx | 48 ++++----- web/watch.tsx | 38 ++++--- 16 files changed, 252 insertions(+), 262 deletions(-) delete mode 100644 lib/playback/broadcast.ts delete mode 100644 web/connect.tsx diff --git a/lib/contribute/broadcast.ts b/lib/contribute/broadcast.ts index 659fcbb..4de20cf 100644 --- a/lib/contribute/broadcast.ts +++ b/lib/contribute/broadcast.ts @@ -11,7 +11,7 @@ import { isAudioTrackSettings, isVideoTrackSettings } from "../common/settings" export interface BroadcastConfig { connection: Connection media: MediaStream - name: string // name of the broadcast + name: string audio: Audio.EncoderConfig video: Video.EncoderConfig @@ -46,7 +46,6 @@ export class Broadcast { const mp4Catalog: Mp4Track = { container: "mp4", - namespace: config.name, kind: media.kind, init_track: `${track.name}.mp4`, data_track: `${track.name}.m4s`, @@ -87,34 +86,20 @@ export class Broadcast { } async #run() { - // Announce the namespace and wait for an explicit OK. - const announce = await this.connection.announce(this.config.name) - await announce.ok() + for (;;) { + const subscriber = await this.connection.subscribed() + if (!subscriber) break - try { - for (;;) { - const subscriber = await this.connection.subscribed() - if (!subscriber) break - - // Run an async task to serve each subscription. - this.#serveSubscribe(subscriber).catch((e) => { - const err = asError(e) - console.warn("failed to serve subscribe", err) - }) - } - } catch (e) { - const err = asError(e) - await announce.close(1n, `error serving broadcast: ${err.message}`) + // Run an async task to serve each subscription. + this.#serveSubscribe(subscriber).catch((e) => { + const err = asError(e) + console.warn("failed to serve subscribe", err) + }) } } async #serveSubscribe(subscriber: SubscribeRecv) { try { - if (subscriber.namespace != this.config.name) { - // Don't reuse connections if you get this error; we don't demultiplex them. - throw new Error(`unknown namespace: ${subscriber.namespace}`) - } - const [base, ext] = splitExt(subscriber.track) if (ext === "catalog") { await this.#serveCatalog(subscriber, base) @@ -144,9 +129,9 @@ export class Broadcast { await subscriber.ack() const stream = await subscriber.data({ - group: 0n, sequence: 0n, - send_order: 0, // TODO Highest priority + priority: 0, // TODO Highest priority + expires: 0, // never expires }) const writer = stream.getWriter() @@ -174,9 +159,9 @@ export class Broadcast { // Create a new stream for each segment. const stream = await subscriber.data({ - group: 0n, sequence: 0n, - send_order: 0, // TODO + priority: 0, // TODO + expires: 0, // Never expires }) const writer = stream.getWriter() @@ -219,9 +204,9 @@ export class Broadcast { async #serveSegment(subscriber: SubscribeRecv, segment: Segment) { // Create a new stream for each segment. const stream = await subscriber.data({ - group: BigInt(segment.id), - sequence: 0n, - send_order: 0, // TODO + sequence: BigInt(segment.id), + priority: 0, // TODO + expires: 30, // TODO configurable }) // Pipe the segment to the stream. @@ -233,10 +218,6 @@ export class Broadcast { video.srcObject = this.config.media } - get name() { - return this.config.name - } - close() { // TODO implement publish close } diff --git a/lib/media/catalog.ts b/lib/media/catalog.ts index bdc3927..de9bb9d 100644 --- a/lib/media/catalog.ts +++ b/lib/media/catalog.ts @@ -24,10 +24,10 @@ export class Catalog { return catalog } - static async fetch(connection: Connection, namespace: string): Promise { + static async fetch(connection: Connection): Promise { let raw: Uint8Array - const subscribe = await connection.subscribe(namespace, ".catalog") + const subscribe = await connection.subscribe("", ".catalog") try { const segment = await subscribe.data() if (!segment) throw new Error("no catalog data") @@ -61,7 +61,6 @@ export function isCatalog(catalog: any): catalog is Catalog { } export interface Track { - namespace: string kind: string container: string } @@ -91,7 +90,8 @@ export interface VideoTrack extends Track { } export function isTrack(track: any): track is Track { - if (typeof track.namespace !== "string") return false + if (typeof track.kind !== "string") return false + if (typeof track.container !== "string") return false return true } diff --git a/lib/playback/broadcast.ts b/lib/playback/broadcast.ts deleted file mode 100644 index ec443fe..0000000 --- a/lib/playback/broadcast.ts +++ /dev/null @@ -1,6 +0,0 @@ -import { Catalog } from "../media/catalog" - -export interface Broadcast { - namespace: string - catalog: Catalog -} diff --git a/lib/playback/index.ts b/lib/playback/index.ts index 083a4a1..caeccae 100644 --- a/lib/playback/index.ts +++ b/lib/playback/index.ts @@ -1,2 +1 @@ export { Player } from "./player" -export type { Broadcast } from "./broadcast" diff --git a/lib/playback/player.ts b/lib/playback/player.ts index 68404ec..5f87397 100644 --- a/lib/playback/player.ts +++ b/lib/playback/player.ts @@ -13,8 +13,6 @@ export type Timeline = Message.Timeline export interface PlayerConfig { connection: Connection - namespace: string - canvas: HTMLCanvasElement } @@ -35,16 +33,14 @@ export class Player { #running: Promise readonly connection: Connection - readonly namespace: string constructor(config: PlayerConfig) { this.#port = new Port(this.#onMessage.bind(this)) // TODO await an async method instead this.#canvas = config.canvas.transferControlToOffscreen() this.connection = config.connection - this.namespace = config.namespace - this.#catalog = Catalog.fetch(this.connection, this.namespace) + this.#catalog = Catalog.fetch(this.connection) // Async work this.#running = this.#run() @@ -106,15 +102,11 @@ export class Player { } async #runInit(name: string) { - const sub = await this.connection.subscribe(this.namespace, name) + const sub = await this.connection.subscribe("", name) try { const init = await sub.data() if (!init) throw new Error("no init data") - if (init.header.sequence !== 0n) { - throw new Error("TODO multiple objects per init not supported") - } - this.#port.sendInit({ name: name, stream: init.stream, @@ -129,16 +121,12 @@ export class Player { throw new Error(`unknown track kind: ${track.kind}`) } - const sub = await this.connection.subscribe(this.namespace, track.data_track) + const sub = await this.connection.subscribe("", track.data_track) try { for (;;) { const segment = await sub.data() if (!segment) break - if (segment.header.sequence !== 0n) { - throw new Error("TODO multiple objects per segment not supported") - } - this.#port.sendSegment({ init: track.init_track, kind: track.kind, diff --git a/lib/transport/client.ts b/lib/transport/client.ts index 0bcf85d..99f878a 100644 --- a/lib/transport/client.ts +++ b/lib/transport/client.ts @@ -47,7 +47,7 @@ export class Client { const setup = new Setup.Stream(reader, writer) // Send the setup message. - await setup.send.client({ versions: [Setup.Version.DRAFT_00], role: this.config.role }) + await setup.send.client({ versions: [Setup.Version.KIXEL_00], role: this.config.role }) // Receive the setup message. // TODO verify the SETUP response. diff --git a/lib/transport/connection.ts b/lib/transport/connection.ts index 803b37c..cf850a0 100644 --- a/lib/transport/connection.ts +++ b/lib/transport/connection.ts @@ -77,19 +77,19 @@ export class Connection { } async #recv(msg: Control.Message) { - switch (msg.type) { - case Control.Type.Announce: + switch (msg.kind) { + case Control.Msg.Announce: return this.#subscriber.recvAnnounce(msg) - case Control.Type.AnnounceOk: + case Control.Msg.AnnounceOk: return this.#publisher.recvAnnounceOk(msg) - case Control.Type.AnnounceError: - return this.#publisher.recvAnnounceError(msg) - case Control.Type.Subscribe: + case Control.Msg.AnnounceReset: + return this.#publisher.recvAnnounceReset(msg) + case Control.Msg.Subscribe: return this.#publisher.recvSubscribe(msg) - case Control.Type.SubscribeOk: + case Control.Msg.SubscribeOk: return this.#subscriber.recvSubscribeOk(msg) - case Control.Type.SubscribeError: - return this.#subscriber.recvSubscribeError(msg) + case Control.Msg.SubscribeReset: + return this.#subscriber.recvSubscribeReset(msg) } } diff --git a/lib/transport/control.ts b/lib/transport/control.ts index 2974aea..cc13f48 100644 --- a/lib/transport/control.ts +++ b/lib/transport/control.ts @@ -1,20 +1,42 @@ import { Reader, Writer } from "./stream" export type Message = Subscriber | Publisher -export type Subscriber = Subscribe | AnnounceOk | AnnounceError -export type Publisher = SubscribeOk | SubscribeError | Announce +export type Subscriber = Subscribe | SubscribeEnd | AnnounceOk | AnnounceReset +export type Publisher = SubscribeOk | SubscribeReset | Announce | AnnounceEnd -export enum Type { +// I wish we didn't have to split Msg and Id into separate enums. +// However using the string in the message makes it easier to debug. +// We'll take the tiny performance hit until I'm better at Typescript. +export enum Msg { // NOTE: object and setup are in other modules // Object = 0, // Setup = 1, - Subscribe = 3, - SubscribeOk = 4, - SubscribeError = 5, - Announce = 6, - AnnounceOk = 7, - AnnounceError = 8, + Subscribe = "subscribe", + SubscribeOk = "subscribe_ok", + SubscribeReset = "subscribe_reset", // error termination by the publisher + SubscribeEnd = "subscribe_end", // clean termination by the subscriber + Announce = "announce", + AnnounceOk = "announce_ok", + AnnounceReset = "announce_reset", // error termination by the subscriber + AnnounceEnd = "announce_end", // clean termination by the publisher + GoAway = "go_away", +} + +enum Id { + // NOTE: object and setup are in other modules + // Object = 0, + // Setup = 1, + + Subscribe = 0x3, + SubscribeOk = 0x4, + SubscribeReset = 0x5, // error termination by the publisher + SubscribeEnd = 0x15, // clean termination by the subscriber + Announce = 0x6, + AnnounceOk = 0x7, + AnnounceReset = 0x8, // error termination by the subscriber + AnnounceEnd = 0x18, // clean termination by the publisher + GoAway = 0x10, } // NOTE: These are forked from moq-transport-00. @@ -24,7 +46,7 @@ export enum Type { // 4. not allowed on undirectional streams; only after SETUP on the bidirectional stream export interface Subscribe { - type: Type.Subscribe + kind: Msg.Subscribe id: bigint namespace: string @@ -32,35 +54,44 @@ export interface Subscribe { } export interface SubscribeOk { - type: Type.SubscribeOk + kind: Msg.SubscribeOk id: bigint - expires?: bigint // ms } -export interface SubscribeError { - type: Type.SubscribeError +export interface SubscribeReset { + kind: Msg.SubscribeReset id: bigint code: bigint reason: string } +export interface SubscribeEnd { + kind: Msg.SubscribeEnd + id: bigint +} + export interface Announce { - type: Type.Announce + kind: Msg.Announce namespace: string } export interface AnnounceOk { - type: Type.AnnounceOk + kind: Msg.AnnounceOk namespace: string } -export interface AnnounceError { - type: Type.AnnounceError +export interface AnnounceReset { + kind: Msg.AnnounceReset namespace: string code: bigint reason: string } +export interface AnnounceEnd { + kind: Msg.AnnounceEnd + namespace: string +} + export class Stream { private decoder: Decoder private encoder: Encoder @@ -114,27 +145,53 @@ export class Decoder { this.r = r } - private async type(): Promise { + private async msg(): Promise { const t = await this.r.u53() - if (t in Type) return t + switch (t) { + case Id.Subscribe: + return Msg.Subscribe + case Id.SubscribeOk: + return Msg.SubscribeOk + case Id.SubscribeReset: + return Msg.SubscribeReset + case Id.SubscribeEnd: + return Msg.SubscribeEnd + case Id.Announce: + return Msg.Announce + case Id.AnnounceOk: + return Msg.AnnounceOk + case Id.AnnounceReset: + return Msg.AnnounceReset + case Id.AnnounceEnd: + return Msg.AnnounceEnd + case Id.GoAway: + return Msg.GoAway + } + throw new Error(`unknown control message type: ${t}`) } async message(): Promise { - const t = await this.type() + const t = await this.msg() switch (t) { - case Type.Subscribe: + case Msg.Subscribe: return this.subscribe() - case Type.SubscribeOk: + case Msg.SubscribeOk: return this.subscribe_ok() - case Type.SubscribeError: - return this.subscribe_error() - case Type.Announce: + case Msg.SubscribeReset: + return this.subscribe_reset() + case Msg.SubscribeEnd: + return this.subscribe_end() + case Msg.Announce: return this.announce() - case Type.AnnounceOk: + case Msg.AnnounceOk: return this.announce_ok() - case Type.AnnounceError: - return this.announce_error() + case Msg.AnnounceReset: + return this.announce_reset() + case Msg.AnnounceEnd: + return this.announce_end() + case Msg.GoAway: + throw new Error("TODO: implement go away") } } @@ -144,7 +201,7 @@ export class Decoder { const name = await this.r.string() return { - type: Type.Subscribe, + kind: Msg.Subscribe, id, namespace, name, @@ -153,45 +210,58 @@ export class Decoder { private async subscribe_ok(): Promise { return { - type: Type.SubscribeOk, + kind: Msg.SubscribeOk, id: await this.r.u62(), - expires: await this.r.u62(), } } - private async subscribe_error(): Promise { + private async subscribe_reset(): Promise { return { - type: Type.SubscribeError, + kind: Msg.SubscribeReset, id: await this.r.u62(), code: await this.r.u62(), reason: await this.r.string(), } } + private async subscribe_end(): Promise { + return { + kind: Msg.SubscribeEnd, + id: await this.r.u62(), + } + } + private async announce(): Promise { const namespace = await this.r.string() return { - type: Type.Announce, + kind: Msg.Announce, namespace, } } private async announce_ok(): Promise { return { - type: Type.AnnounceOk, + kind: Msg.AnnounceOk, namespace: await this.r.string(), } } - private async announce_error(): Promise { + private async announce_reset(): Promise { return { - type: Type.AnnounceError, + kind: Msg.AnnounceReset, namespace: await this.r.string(), code: await this.r.u62(), reason: await this.r.string(), } } + + private async announce_end(): Promise { + return { + kind: Msg.AnnounceEnd, + namespace: await this.r.string(), + } + } } export class Encoder { @@ -202,52 +272,69 @@ export class Encoder { } async message(m: Message) { - await this.w.u53(m.type) - - switch (m.type) { - case Type.Subscribe: + switch (m.kind) { + case Msg.Subscribe: return this.subscribe(m) - case Type.SubscribeOk: + case Msg.SubscribeOk: return this.subscribe_ok(m) - case Type.SubscribeError: - return this.subscribe_error(m) - case Type.Announce: + case Msg.SubscribeReset: + return this.subscribe_reset(m) + case Msg.SubscribeEnd: + return this.subscribe_end(m) + case Msg.Announce: return this.announce(m) - case Type.AnnounceOk: + case Msg.AnnounceOk: return this.announce_ok(m) - case Type.AnnounceError: - return this.announce_error(m) + case Msg.AnnounceReset: + return this.announce_reset(m) + case Msg.AnnounceEnd: + return this.announce_end(m) } } async subscribe(s: Subscribe) { + await this.w.u53(Id.Subscribe) await this.w.u62(s.id) await this.w.string(s.namespace) await this.w.string(s.name) } async subscribe_ok(s: SubscribeOk) { + await this.w.u53(Id.SubscribeOk) await this.w.u62(s.id) - await this.w.u62(s.expires ?? 0n) } - async subscribe_error(s: SubscribeError) { + async subscribe_reset(s: SubscribeReset) { + await this.w.u53(Id.SubscribeReset) await this.w.u62(s.id) await this.w.u62(s.code) await this.w.string(s.reason) } + async subscribe_end(s: SubscribeEnd) { + await this.w.u53(Id.SubscribeEnd) + await this.w.u62(s.id) + } + async announce(a: Announce) { + await this.w.u53(Id.Announce) await this.w.string(a.namespace) } async announce_ok(a: AnnounceOk) { + await this.w.u53(Id.AnnounceOk) await this.w.string(a.namespace) } - async announce_error(a: AnnounceError) { + async announce_reset(a: AnnounceReset) { + await this.w.u53(Id.AnnounceReset) await this.w.string(a.namespace) await this.w.u62(a.code) await this.w.string(a.reason) } + + async announce_end(a: AnnounceEnd) { + await this.w.u53(Id.AnnounceEnd) + await this.w.string(a.namespace) + } } diff --git a/lib/transport/object.ts b/lib/transport/object.ts index 139b6c0..4b9980c 100644 --- a/lib/transport/object.ts +++ b/lib/transport/object.ts @@ -9,9 +9,9 @@ export { Reader, Writer } export interface Header { track: bigint - group: bigint sequence: bigint - send_order: number // i32 + priority: number // i32 + expires: number // 0 means never // followed by payload } @@ -23,6 +23,7 @@ export class Objects { } async send(header: Header): Promise> { + //console.debug("sending object: ", header) const stream = await this.quic.createUnidirectionalStream() await this.#encode(stream, header) return stream @@ -38,6 +39,7 @@ export class Objects { const stream = value const header = await this.#decode(stream) + //console.debug("received object: ", header) return { header, stream } } @@ -48,15 +50,15 @@ export class Objects { if (type !== 0) throw new Error(`OBJECT type must be 0, got ${type}`) const track = await r.u62() - const group = await r.u62() const sequence = await r.u62() - const send_order = await r.i32() + const priority = await r.i32() + const expires = await r.u53() return { track, - group, sequence, - send_order, + priority, + expires, } } @@ -64,8 +66,8 @@ export class Objects { const w = new Writer(s) await w.u8(0) await w.u62(h.track) - await w.u62(h.group) await w.u62(h.sequence) - await w.i32(h.send_order) + await w.i32(h.priority) + await w.u53(h.expires) } } diff --git a/lib/transport/publisher.ts b/lib/transport/publisher.ts index fccb634..8eb27c4 100644 --- a/lib/transport/publisher.ts +++ b/lib/transport/publisher.ts @@ -31,7 +31,7 @@ export class Publisher { this.#announce.set(namespace, announce) await this.#control.send({ - type: Control.Type.Announce, + kind: Control.Msg.Announce, namespace, }) @@ -52,7 +52,7 @@ export class Publisher { announce.onOk() } - recvAnnounceError(msg: Control.AnnounceError) { + recvAnnounceReset(msg: Control.AnnounceReset) { const announce = this.#announce.get(msg.namespace) if (!announce) { // TODO debug this @@ -72,7 +72,7 @@ export class Publisher { this.#subscribe.set(msg.id, subscribe) await this.#subscribeQueue.push(subscribe) - await this.#control.send({ type: Control.Type.SubscribeOk, id: msg.id }) + await this.#control.send({ kind: Control.Msg.SubscribeOk, id: msg.id }) } } @@ -158,7 +158,7 @@ export class SubscribeRecv { this.#state = "ack" // Send the control message. - return this.#control.send({ type: Control.Type.SubscribeOk, id: this.#id }) + return this.#control.send({ kind: Control.Msg.SubscribeOk, id: this.#id }) } // Close the subscription with an error. @@ -166,11 +166,11 @@ export class SubscribeRecv { if (this.#state === "closed") return this.#state = "closed" - return this.#control.send({ type: Control.Type.SubscribeError, id: this.#id, code, reason }) + return this.#control.send({ kind: Control.Msg.SubscribeReset, id: this.#id, code, reason }) } // Create a writable data stream - async data(header: { group: bigint; sequence: bigint; send_order: number }) { + async data(header: { sequence: bigint; priority: number; expires: number }) { return this.#objects.send({ track: this.#id, ...header }) } } diff --git a/lib/transport/setup.ts b/lib/transport/setup.ts index 2337835..ea85152 100644 --- a/lib/transport/setup.ts +++ b/lib/transport/setup.ts @@ -5,6 +5,7 @@ export type Role = "publisher" | "subscriber" | "both" export enum Version { DRAFT_00 = 0xff00, + KIXEL_00 = 0xbad00, } // NOTE: These are forked from moq-transport-00. @@ -15,7 +16,6 @@ export enum Version { export interface Client { versions: Version[] role: Role - path?: string // not used with WebTransport } export interface Server { @@ -53,12 +53,10 @@ export class Decoder { } const role = await this.role() - const path = (await this.r.string()) || undefined return { versions, role, - path, } } @@ -104,7 +102,6 @@ export class Encoder { } await this.role(c.role) - await this.w.string(c.path ?? "") } async server(s: Server) { diff --git a/lib/transport/subscriber.ts b/lib/transport/subscriber.ts index f82776c..9534c0c 100644 --- a/lib/transport/subscriber.ts +++ b/lib/transport/subscriber.ts @@ -31,7 +31,7 @@ export class Subscriber { throw new Error(`duplicate announce for namespace: ${msg.namespace}`) } - await this.#control.send({ type: Control.Type.AnnounceOk, namespace: msg.namespace }) + await this.#control.send({ kind: Control.Msg.AnnounceOk, namespace: msg.namespace }) const announce = new AnnounceRecv(this.#control, msg.namespace) this.#announce.set(msg.namespace, announce) @@ -46,7 +46,7 @@ export class Subscriber { this.#subscribe.set(id, subscribe) await this.#control.send({ - type: Control.Type.Subscribe, + kind: Control.Msg.Subscribe, id, namespace, name: track, @@ -64,7 +64,7 @@ export class Subscriber { subscribe.onOk() } - async recvSubscribeError(msg: Control.SubscribeError) { + async recvSubscribeReset(msg: Control.SubscribeReset) { const subscribe = this.#subscribe.get(msg.id) if (!subscribe) { throw new Error(`subscribe error for unknown id: ${msg.id}`) @@ -102,14 +102,14 @@ export class AnnounceRecv { this.#state = "ack" // Send the control message. - return this.#control.send({ type: Control.Type.AnnounceOk, namespace: this.namespace }) + return this.#control.send({ kind: Control.Msg.AnnounceOk, namespace: this.namespace }) } async close(code = 0n, reason = "") { if (this.#state === "closed") return this.#state = "closed" - return this.#control.send({ type: Control.Type.AnnounceError, namespace: this.namespace, code, reason }) + return this.#control.send({ kind: Control.Msg.AnnounceReset, namespace: this.namespace, code, reason }) } } diff --git a/web/connect.tsx b/web/connect.tsx deleted file mode 100644 index d1867c9..0000000 --- a/web/connect.tsx +++ /dev/null @@ -1,34 +0,0 @@ -import { Client, Connection } from "@kixelated/moq/transport" -import { createFetch } from "./common" -import { Accessor, createEffect, createMemo, onCleanup } from "solid-js" - -export function connect( - server: string | Accessor | undefined, - role: "subscriber" | "publisher" | "both", -): [Accessor, Accessor] { - const connection = createFetch((server: string) => { - const url = `https://${server}` - - // Special case localhost to fetch the TLS fingerprint from the server. - // TODO remove this when WebTransport correctly supports self-signed certificates - const fingerprint = server.startsWith("localhost") ? url + "/fingerprint" : undefined - - const client = new Client({ - url, - fingerprint, - role: role, - }) - - return client.connect() - }, server) - - createEffect(() => { - // Close the connection when the component is unmounted. - onCleanup(() => connection()?.close()) - }) - - const closed = createFetch((connection: Connection) => connection.closed(), connection) - const error = createMemo(() => connection.error() ?? closed()) - - return [connection, error] -} diff --git a/web/listing.tsx b/web/listing.tsx index 5a7d486..ca0ea52 100644 --- a/web/listing.tsx +++ b/web/listing.tsx @@ -1,4 +1,4 @@ -import { For, Show, Switch, Match, createMemo, createSignal } from "solid-js" +import { For, Show, Switch, Match } from "solid-js" import { AudioCatalogTrack, Catalog, @@ -6,60 +6,22 @@ import { isAudioCatalogTrack, isVideoCatalogTrack, } from "@kixelated/moq/media" -import { A, useSearchParams } from "@solidjs/router" -import { createFetch } from "./common" -import { connect } from "./connect" +import { A } from "@solidjs/router" export function Listings() { - const [params] = useSearchParams<{ server?: string }>() - const server = params.server || process.env.RELAY_HOST - - const [connection, connectionError] = connect(server, "subscriber") - const [announced, setAnnounced] = createSignal() - - const fetch = createFetch(async (connection) => { - setAnnounced([]) - - let [announced, next] = connection.announced().value() - setAnnounced(announced.map((a) => a.namespace)) - - while (next) { - ;[announced, next] = await next - setAnnounced(announced.map((a) => a.namespace)) - } - }, connection) - - const error = createMemo(() => connectionError() || fetch.error()) - return ( <> - -
- {error()!.name}: {error()!.message} -
-
-

Watch a PUBLIC broadcast. Report any abuse pls.

Public
- - No live broadcasts. Somebody should PUBLISH. -

- } - > - {(broadcast) => { - const catalog = createFetch(async (connection) => { - return await Catalog.fetch(connection, broadcast) - }, connection) - - return - }} - +

+ Public broadcasts will be listed here; we're busy setting up the CDN. +

+

+ In the meantime, PUBLISH and share the resulting link. +

) } diff --git a/web/publish.tsx b/web/publish.tsx index 0c7190b..7809841 100644 --- a/web/publish.tsx +++ b/web/publish.tsx @@ -6,7 +6,7 @@ import { useSearchParams } from "@solidjs/router" import { Listing } from "./listing" import { createFetch } from "./common" -import { connect } from "./connect" +import { Client } from "@kixelated/moq/transport" import { Notice } from "./issues" interface GeneralConfig { @@ -122,17 +122,11 @@ export function Publish() { const [audio, setAudio] = createStore(AUDIO_DEFAULT) const [video, setVideo] = createStore(VIDEO_DEFAULT) - // TODO make a replacement for store that uses accessors instead of magic. - const server = createMemo(() => general.server || process.env.RELAY_HOST) - - // eslint-disable-next-line solid/reactivity - const [connection, connectionError] = connect(server, "publisher") - // Start loading the selected media device. - const media = createFetch(async () => { + const broadcast = createFetch(async () => { const width = Math.ceil((video.height * 16) / 9) - return await window.navigator.mediaDevices.getUserMedia({ + const mediaPromise = window.navigator.mediaDevices.getUserMedia({ audio: { sampleRate: { ideal: audio.sampleRate }, channelCount: { max: 2, ideal: 2 }, @@ -146,25 +140,35 @@ export function Publish() { deviceId: video.deviceId, }, }) - }) - - const broadcast = createMemo(() => { - const m = media() - const c = connection() - if (!m || !c) return + const server = general.server || process.env.RELAY_HOST const name = general.name != "" ? general.name : crypto.randomUUID() + const url = `https://${server}/${name}` + + // Special case localhost to fetch the TLS fingerprint from the server. + // TODO remove this when WebTransport correctly supports self-signed certificates + const fingerprint = server.startsWith("localhost") ? `https://${server}/fingerprint` : undefined + + const client = new Client({ + url, + fingerprint, + role: "publisher", + }) + + const connection = await client.connect() + const media = await mediaPromise + return new Broadcast({ - connection: c, - media: m, + connection, + media, name, audio: { codec: "opus", bitrate: 128_000 }, video: { codec: video.codec, bitrate: video.bitrate }, }) }) - const broadcastClosed = createFetch((b) => b?.closed(), broadcast) + const broadcastClosed = createFetch((b) => b.closed(), broadcast) createEffect(() => { // Close the broadcast when the component is unmounted. @@ -180,12 +184,12 @@ export function Publish() { const start = (e: Event) => { e.preventDefault() - media.fetch(true) + broadcast.fetch(true) } // Return a single error when something fails, in order of importance const error = createMemo(() => { - return connectionError() ?? media.error() ?? broadcastClosed() + return broadcast.error() ?? broadcastClosed() }) // Report errors to terminal too so we'll get stack traces @@ -213,7 +217,7 @@ export function Publish() {
Preview
- +