diff --git a/lib/contribute/broadcast.ts b/lib/contribute/broadcast.ts
index 659fcbb..4de20cf 100644
--- a/lib/contribute/broadcast.ts
+++ b/lib/contribute/broadcast.ts
@@ -11,7 +11,7 @@ import { isAudioTrackSettings, isVideoTrackSettings } from "../common/settings"
export interface BroadcastConfig {
connection: Connection
media: MediaStream
- name: string // name of the broadcast
+ name: string
audio: Audio.EncoderConfig
video: Video.EncoderConfig
@@ -46,7 +46,6 @@ export class Broadcast {
const mp4Catalog: Mp4Track = {
container: "mp4",
- namespace: config.name,
kind: media.kind,
init_track: `${track.name}.mp4`,
data_track: `${track.name}.m4s`,
@@ -87,34 +86,20 @@ export class Broadcast {
}
async #run() {
- // Announce the namespace and wait for an explicit OK.
- const announce = await this.connection.announce(this.config.name)
- await announce.ok()
+ for (;;) {
+ const subscriber = await this.connection.subscribed()
+ if (!subscriber) break
- try {
- for (;;) {
- const subscriber = await this.connection.subscribed()
- if (!subscriber) break
-
- // Run an async task to serve each subscription.
- this.#serveSubscribe(subscriber).catch((e) => {
- const err = asError(e)
- console.warn("failed to serve subscribe", err)
- })
- }
- } catch (e) {
- const err = asError(e)
- await announce.close(1n, `error serving broadcast: ${err.message}`)
+ // Run an async task to serve each subscription.
+ this.#serveSubscribe(subscriber).catch((e) => {
+ const err = asError(e)
+ console.warn("failed to serve subscribe", err)
+ })
}
}
async #serveSubscribe(subscriber: SubscribeRecv) {
try {
- if (subscriber.namespace != this.config.name) {
- // Don't reuse connections if you get this error; we don't demultiplex them.
- throw new Error(`unknown namespace: ${subscriber.namespace}`)
- }
-
const [base, ext] = splitExt(subscriber.track)
if (ext === "catalog") {
await this.#serveCatalog(subscriber, base)
@@ -144,9 +129,9 @@ export class Broadcast {
await subscriber.ack()
const stream = await subscriber.data({
- group: 0n,
sequence: 0n,
- send_order: 0, // TODO Highest priority
+ priority: 0, // TODO Highest priority
+ expires: 0, // never expires
})
const writer = stream.getWriter()
@@ -174,9 +159,9 @@ export class Broadcast {
// Create a new stream for each segment.
const stream = await subscriber.data({
- group: 0n,
sequence: 0n,
- send_order: 0, // TODO
+ priority: 0, // TODO
+ expires: 0, // Never expires
})
const writer = stream.getWriter()
@@ -219,9 +204,9 @@ export class Broadcast {
async #serveSegment(subscriber: SubscribeRecv, segment: Segment) {
// Create a new stream for each segment.
const stream = await subscriber.data({
- group: BigInt(segment.id),
- sequence: 0n,
- send_order: 0, // TODO
+ sequence: BigInt(segment.id),
+ priority: 0, // TODO
+ expires: 30, // TODO configurable
})
// Pipe the segment to the stream.
@@ -233,10 +218,6 @@ export class Broadcast {
video.srcObject = this.config.media
}
- get name() {
- return this.config.name
- }
-
close() {
// TODO implement publish close
}
diff --git a/lib/media/catalog.ts b/lib/media/catalog.ts
index bdc3927..de9bb9d 100644
--- a/lib/media/catalog.ts
+++ b/lib/media/catalog.ts
@@ -24,10 +24,10 @@ export class Catalog {
return catalog
}
- static async fetch(connection: Connection, namespace: string): Promise {
+ static async fetch(connection: Connection): Promise {
let raw: Uint8Array
- const subscribe = await connection.subscribe(namespace, ".catalog")
+ const subscribe = await connection.subscribe("", ".catalog")
try {
const segment = await subscribe.data()
if (!segment) throw new Error("no catalog data")
@@ -61,7 +61,6 @@ export function isCatalog(catalog: any): catalog is Catalog {
}
export interface Track {
- namespace: string
kind: string
container: string
}
@@ -91,7 +90,8 @@ export interface VideoTrack extends Track {
}
export function isTrack(track: any): track is Track {
- if (typeof track.namespace !== "string") return false
+ if (typeof track.kind !== "string") return false
+ if (typeof track.container !== "string") return false
return true
}
diff --git a/lib/playback/broadcast.ts b/lib/playback/broadcast.ts
deleted file mode 100644
index ec443fe..0000000
--- a/lib/playback/broadcast.ts
+++ /dev/null
@@ -1,6 +0,0 @@
-import { Catalog } from "../media/catalog"
-
-export interface Broadcast {
- namespace: string
- catalog: Catalog
-}
diff --git a/lib/playback/index.ts b/lib/playback/index.ts
index 083a4a1..caeccae 100644
--- a/lib/playback/index.ts
+++ b/lib/playback/index.ts
@@ -1,2 +1 @@
export { Player } from "./player"
-export type { Broadcast } from "./broadcast"
diff --git a/lib/playback/player.ts b/lib/playback/player.ts
index 68404ec..5f87397 100644
--- a/lib/playback/player.ts
+++ b/lib/playback/player.ts
@@ -13,8 +13,6 @@ export type Timeline = Message.Timeline
export interface PlayerConfig {
connection: Connection
- namespace: string
-
canvas: HTMLCanvasElement
}
@@ -35,16 +33,14 @@ export class Player {
#running: Promise
readonly connection: Connection
- readonly namespace: string
constructor(config: PlayerConfig) {
this.#port = new Port(this.#onMessage.bind(this)) // TODO await an async method instead
this.#canvas = config.canvas.transferControlToOffscreen()
this.connection = config.connection
- this.namespace = config.namespace
- this.#catalog = Catalog.fetch(this.connection, this.namespace)
+ this.#catalog = Catalog.fetch(this.connection)
// Async work
this.#running = this.#run()
@@ -106,15 +102,11 @@ export class Player {
}
async #runInit(name: string) {
- const sub = await this.connection.subscribe(this.namespace, name)
+ const sub = await this.connection.subscribe("", name)
try {
const init = await sub.data()
if (!init) throw new Error("no init data")
- if (init.header.sequence !== 0n) {
- throw new Error("TODO multiple objects per init not supported")
- }
-
this.#port.sendInit({
name: name,
stream: init.stream,
@@ -129,16 +121,12 @@ export class Player {
throw new Error(`unknown track kind: ${track.kind}`)
}
- const sub = await this.connection.subscribe(this.namespace, track.data_track)
+ const sub = await this.connection.subscribe("", track.data_track)
try {
for (;;) {
const segment = await sub.data()
if (!segment) break
- if (segment.header.sequence !== 0n) {
- throw new Error("TODO multiple objects per segment not supported")
- }
-
this.#port.sendSegment({
init: track.init_track,
kind: track.kind,
diff --git a/lib/transport/client.ts b/lib/transport/client.ts
index 0bcf85d..99f878a 100644
--- a/lib/transport/client.ts
+++ b/lib/transport/client.ts
@@ -47,7 +47,7 @@ export class Client {
const setup = new Setup.Stream(reader, writer)
// Send the setup message.
- await setup.send.client({ versions: [Setup.Version.DRAFT_00], role: this.config.role })
+ await setup.send.client({ versions: [Setup.Version.KIXEL_00], role: this.config.role })
// Receive the setup message.
// TODO verify the SETUP response.
diff --git a/lib/transport/connection.ts b/lib/transport/connection.ts
index 803b37c..cf850a0 100644
--- a/lib/transport/connection.ts
+++ b/lib/transport/connection.ts
@@ -77,19 +77,19 @@ export class Connection {
}
async #recv(msg: Control.Message) {
- switch (msg.type) {
- case Control.Type.Announce:
+ switch (msg.kind) {
+ case Control.Msg.Announce:
return this.#subscriber.recvAnnounce(msg)
- case Control.Type.AnnounceOk:
+ case Control.Msg.AnnounceOk:
return this.#publisher.recvAnnounceOk(msg)
- case Control.Type.AnnounceError:
- return this.#publisher.recvAnnounceError(msg)
- case Control.Type.Subscribe:
+ case Control.Msg.AnnounceReset:
+ return this.#publisher.recvAnnounceReset(msg)
+ case Control.Msg.Subscribe:
return this.#publisher.recvSubscribe(msg)
- case Control.Type.SubscribeOk:
+ case Control.Msg.SubscribeOk:
return this.#subscriber.recvSubscribeOk(msg)
- case Control.Type.SubscribeError:
- return this.#subscriber.recvSubscribeError(msg)
+ case Control.Msg.SubscribeReset:
+ return this.#subscriber.recvSubscribeReset(msg)
}
}
diff --git a/lib/transport/control.ts b/lib/transport/control.ts
index 2974aea..cc13f48 100644
--- a/lib/transport/control.ts
+++ b/lib/transport/control.ts
@@ -1,20 +1,42 @@
import { Reader, Writer } from "./stream"
export type Message = Subscriber | Publisher
-export type Subscriber = Subscribe | AnnounceOk | AnnounceError
-export type Publisher = SubscribeOk | SubscribeError | Announce
+export type Subscriber = Subscribe | SubscribeEnd | AnnounceOk | AnnounceReset
+export type Publisher = SubscribeOk | SubscribeReset | Announce | AnnounceEnd
-export enum Type {
+// I wish we didn't have to split Msg and Id into separate enums.
+// However using the string in the message makes it easier to debug.
+// We'll take the tiny performance hit until I'm better at Typescript.
+export enum Msg {
// NOTE: object and setup are in other modules
// Object = 0,
// Setup = 1,
- Subscribe = 3,
- SubscribeOk = 4,
- SubscribeError = 5,
- Announce = 6,
- AnnounceOk = 7,
- AnnounceError = 8,
+ Subscribe = "subscribe",
+ SubscribeOk = "subscribe_ok",
+ SubscribeReset = "subscribe_reset", // error termination by the publisher
+ SubscribeEnd = "subscribe_end", // clean termination by the subscriber
+ Announce = "announce",
+ AnnounceOk = "announce_ok",
+ AnnounceReset = "announce_reset", // error termination by the subscriber
+ AnnounceEnd = "announce_end", // clean termination by the publisher
+ GoAway = "go_away",
+}
+
+enum Id {
+ // NOTE: object and setup are in other modules
+ // Object = 0,
+ // Setup = 1,
+
+ Subscribe = 0x3,
+ SubscribeOk = 0x4,
+ SubscribeReset = 0x5, // error termination by the publisher
+ SubscribeEnd = 0x15, // clean termination by the subscriber
+ Announce = 0x6,
+ AnnounceOk = 0x7,
+ AnnounceReset = 0x8, // error termination by the subscriber
+ AnnounceEnd = 0x18, // clean termination by the publisher
+ GoAway = 0x10,
}
// NOTE: These are forked from moq-transport-00.
@@ -24,7 +46,7 @@ export enum Type {
// 4. not allowed on undirectional streams; only after SETUP on the bidirectional stream
export interface Subscribe {
- type: Type.Subscribe
+ kind: Msg.Subscribe
id: bigint
namespace: string
@@ -32,35 +54,44 @@ export interface Subscribe {
}
export interface SubscribeOk {
- type: Type.SubscribeOk
+ kind: Msg.SubscribeOk
id: bigint
- expires?: bigint // ms
}
-export interface SubscribeError {
- type: Type.SubscribeError
+export interface SubscribeReset {
+ kind: Msg.SubscribeReset
id: bigint
code: bigint
reason: string
}
+export interface SubscribeEnd {
+ kind: Msg.SubscribeEnd
+ id: bigint
+}
+
export interface Announce {
- type: Type.Announce
+ kind: Msg.Announce
namespace: string
}
export interface AnnounceOk {
- type: Type.AnnounceOk
+ kind: Msg.AnnounceOk
namespace: string
}
-export interface AnnounceError {
- type: Type.AnnounceError
+export interface AnnounceReset {
+ kind: Msg.AnnounceReset
namespace: string
code: bigint
reason: string
}
+export interface AnnounceEnd {
+ kind: Msg.AnnounceEnd
+ namespace: string
+}
+
export class Stream {
private decoder: Decoder
private encoder: Encoder
@@ -114,27 +145,53 @@ export class Decoder {
this.r = r
}
- private async type(): Promise {
+ private async msg(): Promise {
const t = await this.r.u53()
- if (t in Type) return t
+ switch (t) {
+ case Id.Subscribe:
+ return Msg.Subscribe
+ case Id.SubscribeOk:
+ return Msg.SubscribeOk
+ case Id.SubscribeReset:
+ return Msg.SubscribeReset
+ case Id.SubscribeEnd:
+ return Msg.SubscribeEnd
+ case Id.Announce:
+ return Msg.Announce
+ case Id.AnnounceOk:
+ return Msg.AnnounceOk
+ case Id.AnnounceReset:
+ return Msg.AnnounceReset
+ case Id.AnnounceEnd:
+ return Msg.AnnounceEnd
+ case Id.GoAway:
+ return Msg.GoAway
+ }
+
throw new Error(`unknown control message type: ${t}`)
}
async message(): Promise {
- const t = await this.type()
+ const t = await this.msg()
switch (t) {
- case Type.Subscribe:
+ case Msg.Subscribe:
return this.subscribe()
- case Type.SubscribeOk:
+ case Msg.SubscribeOk:
return this.subscribe_ok()
- case Type.SubscribeError:
- return this.subscribe_error()
- case Type.Announce:
+ case Msg.SubscribeReset:
+ return this.subscribe_reset()
+ case Msg.SubscribeEnd:
+ return this.subscribe_end()
+ case Msg.Announce:
return this.announce()
- case Type.AnnounceOk:
+ case Msg.AnnounceOk:
return this.announce_ok()
- case Type.AnnounceError:
- return this.announce_error()
+ case Msg.AnnounceReset:
+ return this.announce_reset()
+ case Msg.AnnounceEnd:
+ return this.announce_end()
+ case Msg.GoAway:
+ throw new Error("TODO: implement go away")
}
}
@@ -144,7 +201,7 @@ export class Decoder {
const name = await this.r.string()
return {
- type: Type.Subscribe,
+ kind: Msg.Subscribe,
id,
namespace,
name,
@@ -153,45 +210,58 @@ export class Decoder {
private async subscribe_ok(): Promise {
return {
- type: Type.SubscribeOk,
+ kind: Msg.SubscribeOk,
id: await this.r.u62(),
- expires: await this.r.u62(),
}
}
- private async subscribe_error(): Promise {
+ private async subscribe_reset(): Promise {
return {
- type: Type.SubscribeError,
+ kind: Msg.SubscribeReset,
id: await this.r.u62(),
code: await this.r.u62(),
reason: await this.r.string(),
}
}
+ private async subscribe_end(): Promise {
+ return {
+ kind: Msg.SubscribeEnd,
+ id: await this.r.u62(),
+ }
+ }
+
private async announce(): Promise {
const namespace = await this.r.string()
return {
- type: Type.Announce,
+ kind: Msg.Announce,
namespace,
}
}
private async announce_ok(): Promise {
return {
- type: Type.AnnounceOk,
+ kind: Msg.AnnounceOk,
namespace: await this.r.string(),
}
}
- private async announce_error(): Promise {
+ private async announce_reset(): Promise {
return {
- type: Type.AnnounceError,
+ kind: Msg.AnnounceReset,
namespace: await this.r.string(),
code: await this.r.u62(),
reason: await this.r.string(),
}
}
+
+ private async announce_end(): Promise {
+ return {
+ kind: Msg.AnnounceEnd,
+ namespace: await this.r.string(),
+ }
+ }
}
export class Encoder {
@@ -202,52 +272,69 @@ export class Encoder {
}
async message(m: Message) {
- await this.w.u53(m.type)
-
- switch (m.type) {
- case Type.Subscribe:
+ switch (m.kind) {
+ case Msg.Subscribe:
return this.subscribe(m)
- case Type.SubscribeOk:
+ case Msg.SubscribeOk:
return this.subscribe_ok(m)
- case Type.SubscribeError:
- return this.subscribe_error(m)
- case Type.Announce:
+ case Msg.SubscribeReset:
+ return this.subscribe_reset(m)
+ case Msg.SubscribeEnd:
+ return this.subscribe_end(m)
+ case Msg.Announce:
return this.announce(m)
- case Type.AnnounceOk:
+ case Msg.AnnounceOk:
return this.announce_ok(m)
- case Type.AnnounceError:
- return this.announce_error(m)
+ case Msg.AnnounceReset:
+ return this.announce_reset(m)
+ case Msg.AnnounceEnd:
+ return this.announce_end(m)
}
}
async subscribe(s: Subscribe) {
+ await this.w.u53(Id.Subscribe)
await this.w.u62(s.id)
await this.w.string(s.namespace)
await this.w.string(s.name)
}
async subscribe_ok(s: SubscribeOk) {
+ await this.w.u53(Id.SubscribeOk)
await this.w.u62(s.id)
- await this.w.u62(s.expires ?? 0n)
}
- async subscribe_error(s: SubscribeError) {
+ async subscribe_reset(s: SubscribeReset) {
+ await this.w.u53(Id.SubscribeReset)
await this.w.u62(s.id)
await this.w.u62(s.code)
await this.w.string(s.reason)
}
+ async subscribe_end(s: SubscribeEnd) {
+ await this.w.u53(Id.SubscribeEnd)
+ await this.w.u62(s.id)
+ }
+
async announce(a: Announce) {
+ await this.w.u53(Id.Announce)
await this.w.string(a.namespace)
}
async announce_ok(a: AnnounceOk) {
+ await this.w.u53(Id.AnnounceOk)
await this.w.string(a.namespace)
}
- async announce_error(a: AnnounceError) {
+ async announce_reset(a: AnnounceReset) {
+ await this.w.u53(Id.AnnounceReset)
await this.w.string(a.namespace)
await this.w.u62(a.code)
await this.w.string(a.reason)
}
+
+ async announce_end(a: AnnounceEnd) {
+ await this.w.u53(Id.AnnounceEnd)
+ await this.w.string(a.namespace)
+ }
}
diff --git a/lib/transport/object.ts b/lib/transport/object.ts
index 139b6c0..4b9980c 100644
--- a/lib/transport/object.ts
+++ b/lib/transport/object.ts
@@ -9,9 +9,9 @@ export { Reader, Writer }
export interface Header {
track: bigint
- group: bigint
sequence: bigint
- send_order: number // i32
+ priority: number // i32
+ expires: number // 0 means never
// followed by payload
}
@@ -23,6 +23,7 @@ export class Objects {
}
async send(header: Header): Promise> {
+ //console.debug("sending object: ", header)
const stream = await this.quic.createUnidirectionalStream()
await this.#encode(stream, header)
return stream
@@ -38,6 +39,7 @@ export class Objects {
const stream = value
const header = await this.#decode(stream)
+ //console.debug("received object: ", header)
return { header, stream }
}
@@ -48,15 +50,15 @@ export class Objects {
if (type !== 0) throw new Error(`OBJECT type must be 0, got ${type}`)
const track = await r.u62()
- const group = await r.u62()
const sequence = await r.u62()
- const send_order = await r.i32()
+ const priority = await r.i32()
+ const expires = await r.u53()
return {
track,
- group,
sequence,
- send_order,
+ priority,
+ expires,
}
}
@@ -64,8 +66,8 @@ export class Objects {
const w = new Writer(s)
await w.u8(0)
await w.u62(h.track)
- await w.u62(h.group)
await w.u62(h.sequence)
- await w.i32(h.send_order)
+ await w.i32(h.priority)
+ await w.u53(h.expires)
}
}
diff --git a/lib/transport/publisher.ts b/lib/transport/publisher.ts
index fccb634..8eb27c4 100644
--- a/lib/transport/publisher.ts
+++ b/lib/transport/publisher.ts
@@ -31,7 +31,7 @@ export class Publisher {
this.#announce.set(namespace, announce)
await this.#control.send({
- type: Control.Type.Announce,
+ kind: Control.Msg.Announce,
namespace,
})
@@ -52,7 +52,7 @@ export class Publisher {
announce.onOk()
}
- recvAnnounceError(msg: Control.AnnounceError) {
+ recvAnnounceReset(msg: Control.AnnounceReset) {
const announce = this.#announce.get(msg.namespace)
if (!announce) {
// TODO debug this
@@ -72,7 +72,7 @@ export class Publisher {
this.#subscribe.set(msg.id, subscribe)
await this.#subscribeQueue.push(subscribe)
- await this.#control.send({ type: Control.Type.SubscribeOk, id: msg.id })
+ await this.#control.send({ kind: Control.Msg.SubscribeOk, id: msg.id })
}
}
@@ -158,7 +158,7 @@ export class SubscribeRecv {
this.#state = "ack"
// Send the control message.
- return this.#control.send({ type: Control.Type.SubscribeOk, id: this.#id })
+ return this.#control.send({ kind: Control.Msg.SubscribeOk, id: this.#id })
}
// Close the subscription with an error.
@@ -166,11 +166,11 @@ export class SubscribeRecv {
if (this.#state === "closed") return
this.#state = "closed"
- return this.#control.send({ type: Control.Type.SubscribeError, id: this.#id, code, reason })
+ return this.#control.send({ kind: Control.Msg.SubscribeReset, id: this.#id, code, reason })
}
// Create a writable data stream
- async data(header: { group: bigint; sequence: bigint; send_order: number }) {
+ async data(header: { sequence: bigint; priority: number; expires: number }) {
return this.#objects.send({ track: this.#id, ...header })
}
}
diff --git a/lib/transport/setup.ts b/lib/transport/setup.ts
index 2337835..ea85152 100644
--- a/lib/transport/setup.ts
+++ b/lib/transport/setup.ts
@@ -5,6 +5,7 @@ export type Role = "publisher" | "subscriber" | "both"
export enum Version {
DRAFT_00 = 0xff00,
+ KIXEL_00 = 0xbad00,
}
// NOTE: These are forked from moq-transport-00.
@@ -15,7 +16,6 @@ export enum Version {
export interface Client {
versions: Version[]
role: Role
- path?: string // not used with WebTransport
}
export interface Server {
@@ -53,12 +53,10 @@ export class Decoder {
}
const role = await this.role()
- const path = (await this.r.string()) || undefined
return {
versions,
role,
- path,
}
}
@@ -104,7 +102,6 @@ export class Encoder {
}
await this.role(c.role)
- await this.w.string(c.path ?? "")
}
async server(s: Server) {
diff --git a/lib/transport/subscriber.ts b/lib/transport/subscriber.ts
index f82776c..9534c0c 100644
--- a/lib/transport/subscriber.ts
+++ b/lib/transport/subscriber.ts
@@ -31,7 +31,7 @@ export class Subscriber {
throw new Error(`duplicate announce for namespace: ${msg.namespace}`)
}
- await this.#control.send({ type: Control.Type.AnnounceOk, namespace: msg.namespace })
+ await this.#control.send({ kind: Control.Msg.AnnounceOk, namespace: msg.namespace })
const announce = new AnnounceRecv(this.#control, msg.namespace)
this.#announce.set(msg.namespace, announce)
@@ -46,7 +46,7 @@ export class Subscriber {
this.#subscribe.set(id, subscribe)
await this.#control.send({
- type: Control.Type.Subscribe,
+ kind: Control.Msg.Subscribe,
id,
namespace,
name: track,
@@ -64,7 +64,7 @@ export class Subscriber {
subscribe.onOk()
}
- async recvSubscribeError(msg: Control.SubscribeError) {
+ async recvSubscribeReset(msg: Control.SubscribeReset) {
const subscribe = this.#subscribe.get(msg.id)
if (!subscribe) {
throw new Error(`subscribe error for unknown id: ${msg.id}`)
@@ -102,14 +102,14 @@ export class AnnounceRecv {
this.#state = "ack"
// Send the control message.
- return this.#control.send({ type: Control.Type.AnnounceOk, namespace: this.namespace })
+ return this.#control.send({ kind: Control.Msg.AnnounceOk, namespace: this.namespace })
}
async close(code = 0n, reason = "") {
if (this.#state === "closed") return
this.#state = "closed"
- return this.#control.send({ type: Control.Type.AnnounceError, namespace: this.namespace, code, reason })
+ return this.#control.send({ kind: Control.Msg.AnnounceReset, namespace: this.namespace, code, reason })
}
}
diff --git a/web/connect.tsx b/web/connect.tsx
deleted file mode 100644
index d1867c9..0000000
--- a/web/connect.tsx
+++ /dev/null
@@ -1,34 +0,0 @@
-import { Client, Connection } from "@kixelated/moq/transport"
-import { createFetch } from "./common"
-import { Accessor, createEffect, createMemo, onCleanup } from "solid-js"
-
-export function connect(
- server: string | Accessor | undefined,
- role: "subscriber" | "publisher" | "both",
-): [Accessor, Accessor] {
- const connection = createFetch((server: string) => {
- const url = `https://${server}`
-
- // Special case localhost to fetch the TLS fingerprint from the server.
- // TODO remove this when WebTransport correctly supports self-signed certificates
- const fingerprint = server.startsWith("localhost") ? url + "/fingerprint" : undefined
-
- const client = new Client({
- url,
- fingerprint,
- role: role,
- })
-
- return client.connect()
- }, server)
-
- createEffect(() => {
- // Close the connection when the component is unmounted.
- onCleanup(() => connection()?.close())
- })
-
- const closed = createFetch((connection: Connection) => connection.closed(), connection)
- const error = createMemo(() => connection.error() ?? closed())
-
- return [connection, error]
-}
diff --git a/web/listing.tsx b/web/listing.tsx
index 5a7d486..ca0ea52 100644
--- a/web/listing.tsx
+++ b/web/listing.tsx
@@ -1,4 +1,4 @@
-import { For, Show, Switch, Match, createMemo, createSignal } from "solid-js"
+import { For, Show, Switch, Match } from "solid-js"
import {
AudioCatalogTrack,
Catalog,
@@ -6,60 +6,22 @@ import {
isAudioCatalogTrack,
isVideoCatalogTrack,
} from "@kixelated/moq/media"
-import { A, useSearchParams } from "@solidjs/router"
-import { createFetch } from "./common"
-import { connect } from "./connect"
+import { A } from "@solidjs/router"
export function Listings() {
- const [params] = useSearchParams<{ server?: string }>()
- const server = params.server || process.env.RELAY_HOST
-
- const [connection, connectionError] = connect(server, "subscriber")
- const [announced, setAnnounced] = createSignal()
-
- const fetch = createFetch(async (connection) => {
- setAnnounced([])
-
- let [announced, next] = connection.announced().value()
- setAnnounced(announced.map((a) => a.namespace))
-
- while (next) {
- ;[announced, next] = await next
- setAnnounced(announced.map((a) => a.namespace))
- }
- }, connection)
-
- const error = createMemo(() => connectionError() || fetch.error())
-
return (
<>
-
-
- {error()!.name}: {error()!.message}
-
-
-
Watch a PUBLIC broadcast. Report any abuse pls.
-
- No live broadcasts. Somebody should PUBLISH.
-
- }
- >
- {(broadcast) => {
- const catalog = createFetch(async (connection) => {
- return await Catalog.fetch(connection, broadcast)
- }, connection)
-
- return
- }}
-
+
+ Public broadcasts will be listed here; we're busy setting up the CDN.
+
+
+ In the meantime, PUBLISH and share the resulting link.
+
>
)
}
diff --git a/web/publish.tsx b/web/publish.tsx
index 0c7190b..7809841 100644
--- a/web/publish.tsx
+++ b/web/publish.tsx
@@ -6,7 +6,7 @@ import { useSearchParams } from "@solidjs/router"
import { Listing } from "./listing"
import { createFetch } from "./common"
-import { connect } from "./connect"
+import { Client } from "@kixelated/moq/transport"
import { Notice } from "./issues"
interface GeneralConfig {
@@ -122,17 +122,11 @@ export function Publish() {
const [audio, setAudio] = createStore(AUDIO_DEFAULT)
const [video, setVideo] = createStore(VIDEO_DEFAULT)
- // TODO make a replacement for store that uses accessors instead of magic.
- const server = createMemo(() => general.server || process.env.RELAY_HOST)
-
- // eslint-disable-next-line solid/reactivity
- const [connection, connectionError] = connect(server, "publisher")
-
// Start loading the selected media device.
- const media = createFetch(async () => {
+ const broadcast = createFetch(async () => {
const width = Math.ceil((video.height * 16) / 9)
- return await window.navigator.mediaDevices.getUserMedia({
+ const mediaPromise = window.navigator.mediaDevices.getUserMedia({
audio: {
sampleRate: { ideal: audio.sampleRate },
channelCount: { max: 2, ideal: 2 },
@@ -146,25 +140,35 @@ export function Publish() {
deviceId: video.deviceId,
},
})
- })
-
- const broadcast = createMemo(() => {
- const m = media()
- const c = connection()
- if (!m || !c) return
+ const server = general.server || process.env.RELAY_HOST
const name = general.name != "" ? general.name : crypto.randomUUID()
+ const url = `https://${server}/${name}`
+
+ // Special case localhost to fetch the TLS fingerprint from the server.
+ // TODO remove this when WebTransport correctly supports self-signed certificates
+ const fingerprint = server.startsWith("localhost") ? `https://${server}/fingerprint` : undefined
+
+ const client = new Client({
+ url,
+ fingerprint,
+ role: "publisher",
+ })
+
+ const connection = await client.connect()
+ const media = await mediaPromise
+
return new Broadcast({
- connection: c,
- media: m,
+ connection,
+ media,
name,
audio: { codec: "opus", bitrate: 128_000 },
video: { codec: video.codec, bitrate: video.bitrate },
})
})
- const broadcastClosed = createFetch((b) => b?.closed(), broadcast)
+ const broadcastClosed = createFetch((b) => b.closed(), broadcast)
createEffect(() => {
// Close the broadcast when the component is unmounted.
@@ -180,12 +184,12 @@ export function Publish() {
const start = (e: Event) => {
e.preventDefault()
- media.fetch(true)
+ broadcast.fetch(true)
}
// Return a single error when something fails, in order of importance
const error = createMemo(() => {
- return connectionError() ?? media.error() ?? broadcastClosed()
+ return broadcast.error() ?? broadcastClosed()
})
// Report errors to terminal too so we'll get stack traces
@@ -213,7 +217,7 @@ export function Publish() {
-
+