From 09d2c2680bee28f38ba5622bb90c6af13016f1c8 Mon Sep 17 00:00:00 2001
From: Luke Curley
Date: Fri, 8 Sep 2023 22:22:44 -0700
Subject: [PATCH 1/4] WIP
---
lib/contribute/broadcast.ts | 37 +++++++--------------------
lib/media/catalog.ts | 8 +++---
lib/playback/broadcast.ts | 6 -----
lib/playback/player.ts | 10 +++-----
web/connect.tsx | 34 -------------------------
web/listing.tsx | 51 ++++---------------------------------
web/publish.tsx | 48 ++++++++++++++++++----------------
web/watch.tsx | 38 +++++++++++++++++----------
8 files changed, 71 insertions(+), 161 deletions(-)
delete mode 100644 lib/playback/broadcast.ts
delete mode 100644 web/connect.tsx
diff --git a/lib/contribute/broadcast.ts b/lib/contribute/broadcast.ts
index 659fcbb..a07861e 100644
--- a/lib/contribute/broadcast.ts
+++ b/lib/contribute/broadcast.ts
@@ -11,7 +11,7 @@ import { isAudioTrackSettings, isVideoTrackSettings } from "../common/settings"
export interface BroadcastConfig {
connection: Connection
media: MediaStream
- name: string // name of the broadcast
+ name: string
audio: Audio.EncoderConfig
video: Video.EncoderConfig
@@ -46,7 +46,6 @@ export class Broadcast {
const mp4Catalog: Mp4Track = {
container: "mp4",
- namespace: config.name,
kind: media.kind,
init_track: `${track.name}.mp4`,
data_track: `${track.name}.m4s`,
@@ -87,34 +86,20 @@ export class Broadcast {
}
async #run() {
- // Announce the namespace and wait for an explicit OK.
- const announce = await this.connection.announce(this.config.name)
- await announce.ok()
+ for (;;) {
+ const subscriber = await this.connection.subscribed()
+ if (!subscriber) break
- try {
- for (;;) {
- const subscriber = await this.connection.subscribed()
- if (!subscriber) break
-
- // Run an async task to serve each subscription.
- this.#serveSubscribe(subscriber).catch((e) => {
- const err = asError(e)
- console.warn("failed to serve subscribe", err)
- })
- }
- } catch (e) {
- const err = asError(e)
- await announce.close(1n, `error serving broadcast: ${err.message}`)
+ // Run an async task to serve each subscription.
+ this.#serveSubscribe(subscriber).catch((e) => {
+ const err = asError(e)
+ console.warn("failed to serve subscribe", err)
+ })
}
}
async #serveSubscribe(subscriber: SubscribeRecv) {
try {
- if (subscriber.namespace != this.config.name) {
- // Don't reuse connections if you get this error; we don't demultiplex them.
- throw new Error(`unknown namespace: ${subscriber.namespace}`)
- }
-
const [base, ext] = splitExt(subscriber.track)
if (ext === "catalog") {
await this.#serveCatalog(subscriber, base)
@@ -233,10 +218,6 @@ export class Broadcast {
video.srcObject = this.config.media
}
- get name() {
- return this.config.name
- }
-
close() {
// TODO implement publish close
}
diff --git a/lib/media/catalog.ts b/lib/media/catalog.ts
index bdc3927..de9bb9d 100644
--- a/lib/media/catalog.ts
+++ b/lib/media/catalog.ts
@@ -24,10 +24,10 @@ export class Catalog {
return catalog
}
- static async fetch(connection: Connection, namespace: string): Promise {
+ static async fetch(connection: Connection): Promise {
let raw: Uint8Array
- const subscribe = await connection.subscribe(namespace, ".catalog")
+ const subscribe = await connection.subscribe("", ".catalog")
try {
const segment = await subscribe.data()
if (!segment) throw new Error("no catalog data")
@@ -61,7 +61,6 @@ export function isCatalog(catalog: any): catalog is Catalog {
}
export interface Track {
- namespace: string
kind: string
container: string
}
@@ -91,7 +90,8 @@ export interface VideoTrack extends Track {
}
export function isTrack(track: any): track is Track {
- if (typeof track.namespace !== "string") return false
+ if (typeof track.kind !== "string") return false
+ if (typeof track.container !== "string") return false
return true
}
diff --git a/lib/playback/broadcast.ts b/lib/playback/broadcast.ts
deleted file mode 100644
index ec443fe..0000000
--- a/lib/playback/broadcast.ts
+++ /dev/null
@@ -1,6 +0,0 @@
-import { Catalog } from "../media/catalog"
-
-export interface Broadcast {
- namespace: string
- catalog: Catalog
-}
diff --git a/lib/playback/player.ts b/lib/playback/player.ts
index 68404ec..71f3b0c 100644
--- a/lib/playback/player.ts
+++ b/lib/playback/player.ts
@@ -13,8 +13,6 @@ export type Timeline = Message.Timeline
export interface PlayerConfig {
connection: Connection
- namespace: string
-
canvas: HTMLCanvasElement
}
@@ -35,16 +33,14 @@ export class Player {
#running: Promise
readonly connection: Connection
- readonly namespace: string
constructor(config: PlayerConfig) {
this.#port = new Port(this.#onMessage.bind(this)) // TODO await an async method instead
this.#canvas = config.canvas.transferControlToOffscreen()
this.connection = config.connection
- this.namespace = config.namespace
- this.#catalog = Catalog.fetch(this.connection, this.namespace)
+ this.#catalog = Catalog.fetch(this.connection)
// Async work
this.#running = this.#run()
@@ -106,7 +102,7 @@ export class Player {
}
async #runInit(name: string) {
- const sub = await this.connection.subscribe(this.namespace, name)
+ const sub = await this.connection.subscribe("", name)
try {
const init = await sub.data()
if (!init) throw new Error("no init data")
@@ -129,7 +125,7 @@ export class Player {
throw new Error(`unknown track kind: ${track.kind}`)
}
- const sub = await this.connection.subscribe(this.namespace, track.data_track)
+ const sub = await this.connection.subscribe("", track.data_track)
try {
for (;;) {
const segment = await sub.data()
diff --git a/web/connect.tsx b/web/connect.tsx
deleted file mode 100644
index d1867c9..0000000
--- a/web/connect.tsx
+++ /dev/null
@@ -1,34 +0,0 @@
-import { Client, Connection } from "@kixelated/moq/transport"
-import { createFetch } from "./common"
-import { Accessor, createEffect, createMemo, onCleanup } from "solid-js"
-
-export function connect(
- server: string | Accessor | undefined,
- role: "subscriber" | "publisher" | "both",
-): [Accessor, Accessor] {
- const connection = createFetch((server: string) => {
- const url = `https://${server}`
-
- // Special case localhost to fetch the TLS fingerprint from the server.
- // TODO remove this when WebTransport correctly supports self-signed certificates
- const fingerprint = server.startsWith("localhost") ? url + "/fingerprint" : undefined
-
- const client = new Client({
- url,
- fingerprint,
- role: role,
- })
-
- return client.connect()
- }, server)
-
- createEffect(() => {
- // Close the connection when the component is unmounted.
- onCleanup(() => connection()?.close())
- })
-
- const closed = createFetch((connection: Connection) => connection.closed(), connection)
- const error = createMemo(() => connection.error() ?? closed())
-
- return [connection, error]
-}
diff --git a/web/listing.tsx b/web/listing.tsx
index 682a0f9..b6e4854 100644
--- a/web/listing.tsx
+++ b/web/listing.tsx
@@ -1,4 +1,4 @@
-import { For, Show, Switch, Match, createMemo, createSignal } from "solid-js"
+import { For, Show, Switch, Match } from "solid-js"
import {
AudioCatalogTrack,
Catalog,
@@ -6,60 +6,19 @@ import {
isAudioCatalogTrack,
isVideoCatalogTrack,
} from "@kixelated/moq/media"
-import { A, useSearchParams } from "@solidjs/router"
-import { createFetch } from "./common"
-import { connect } from "./connect"
+import { A } from "@solidjs/router"
export function Listings() {
- const [params] = useSearchParams<{ server?: string }>()
- const server = params.server || process.env.RELAY_HOST
-
- const [connection, connectionError] = connect(server, "subscriber")
- const [announced, setAnnounced] = createSignal()
-
- const fetch = createFetch(async (connection) => {
- setAnnounced([])
-
- let [announced, next] = connection.announced().value()
- setAnnounced(announced.map((a) => a.namespace))
-
- while (next) {
- ;[announced, next] = await next
- setAnnounced(announced.map((a) => a.namespace))
- }
- }, connection)
-
- const error = createMemo(() => connectionError() || fetch.error())
-
return (
<>
-
-
- {error()!.name}: {error()!.message}
-
-
-
Watch a PUBLIC broadcast. Report any abuse pls.
-
- No live broadcasts. Somebody should PUBLISH.
-
- }
- >
- {(broadcast) => {
- const catalog = createFetch(async (connection) => {
- return await Catalog.fetch(connection, broadcast)
- }, connection)
-
- return
- }}
-
+
+ TODO list broadcasts here. In the meantime, PUBLISH and the resulting link.
+
>
)
}
diff --git a/web/publish.tsx b/web/publish.tsx
index 5790f0b..012ce94 100644
--- a/web/publish.tsx
+++ b/web/publish.tsx
@@ -6,7 +6,7 @@ import { useSearchParams } from "@solidjs/router"
import { Listing } from "./listing"
import { createFetch } from "./common"
-import { connect } from "./connect"
+import { Client } from "@kixelated/moq/transport"
interface GeneralConfig {
server: string
@@ -121,17 +121,11 @@ export function Publish() {
const [audio, setAudio] = createStore(AUDIO_DEFAULT)
const [video, setVideo] = createStore(VIDEO_DEFAULT)
- // TODO make a replacement for store that uses accessors instead of magic.
- const server = createMemo(() => general.server || process.env.RELAY_HOST)
-
- // eslint-disable-next-line solid/reactivity
- const [connection, connectionError] = connect(server, "publisher")
-
// Start loading the selected media device.
- const media = createFetch(async () => {
+ const broadcast = createFetch(async () => {
const width = Math.ceil((video.height * 16) / 9)
- return await window.navigator.mediaDevices.getUserMedia({
+ const mediaPromise = window.navigator.mediaDevices.getUserMedia({
audio: {
sampleRate: { ideal: audio.sampleRate },
channelCount: { max: 2, ideal: 2 },
@@ -145,25 +139,35 @@ export function Publish() {
deviceId: video.deviceId,
},
})
- })
-
- const broadcast = createMemo(() => {
- const m = media()
- const c = connection()
- if (!m || !c) return
+ const server = general.server || process.env.RELAY_HOST
const name = general.name != "" ? general.name : crypto.randomUUID()
+ const url = `https://${server}/${name}`
+
+ // Special case localhost to fetch the TLS fingerprint from the server.
+ // TODO remove this when WebTransport correctly supports self-signed certificates
+ const fingerprint = server.startsWith("localhost") ? `https://${server}/fingerprint` : undefined
+
+ const client = new Client({
+ url,
+ fingerprint,
+ role: "publisher",
+ })
+
+ const connection = await client.connect()
+ const media = await mediaPromise
+
return new Broadcast({
- connection: c,
- media: m,
+ connection,
+ media,
name,
audio: { codec: "opus", bitrate: 128_000 },
video: { codec: video.codec, bitrate: video.bitrate },
})
})
- const broadcastClosed = createFetch((b) => b?.closed(), broadcast)
+ const broadcastClosed = createFetch((b) => b.closed(), broadcast)
createEffect(() => {
// Close the broadcast when the component is unmounted.
@@ -179,12 +183,12 @@ export function Publish() {
const start = (e: Event) => {
e.preventDefault()
- media.fetch(true)
+ broadcast.fetch(true)
}
// Return a single error when something fails, in order of importance
const error = createMemo(() => {
- return connectionError() ?? media.error() ?? broadcastClosed()
+ return broadcast.error() ?? broadcastClosed()
})
// Report errors to terminal too so we'll get stack traces
@@ -221,7 +225,7 @@ export function Publish() {
-
+