Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minimal updates for draft-04 interop with moq-rs #107

Merged
merged 8 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion lib/media/catalog/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ export async function fetch(connection: Connection, namespace: string): Promise<
await segment.close()
await subscribe.close() // we done

return decode(chunk.payload)
if (chunk.payload instanceof Uint8Array) {
return decode(chunk.payload)
} else {
throw new Error("invalid catalog chunk")
}
} catch (e) {
const err = asError(e)

Expand Down
1 change: 1 addition & 0 deletions lib/playback/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export class Player {
// We don't care what type of reader we get, we just want the payload.
const chunk = await init.read()
if (!chunk) throw new Error("no init chunk")
if (!(chunk.payload instanceof Uint8Array)) throw new Error("invalid init chunk")

this.#backend.init({ data: chunk.payload, name })
} finally {
Expand Down
4 changes: 4 additions & 0 deletions lib/playback/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ class Worker {
break
}

if (!(chunk.payload instanceof Uint8Array)) {
throw new Error(`invalid payload: ${chunk.payload}`)
}

const frames = container.decode(chunk.payload)
for (const frame of frames) {
await segment.write(frame)
Expand Down
4 changes: 2 additions & 2 deletions lib/transport/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ export class Client {
const setup = new Setup.Stream(reader, writer)

// Send the setup message.
await setup.send.client({ versions: [Setup.Version.DRAFT_03], role: this.config.role })
await setup.send.client({ versions: [Setup.Version.DRAFT_04], role: this.config.role })

// Receive the setup message.
// TODO verify the SETUP response.
const server = await setup.recv.server()

if (server.version != Setup.Version.DRAFT_03) {
if (server.version != Setup.Version.DRAFT_04) {
throw new Error(`unsupported server version: ${server.version}`)
}

Expand Down
97 changes: 64 additions & 33 deletions lib/transport/control.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,17 +66,33 @@ export interface Subscribe {
namespace: string
name: string

start_group: Location
start_object: Location
end_group: Location
end_object: Location
location: Location

params?: Parameters
}

export interface Location {
mode: "none" | "absolute" | "latest" | "future"
value?: number // ignored for type=none, otherwise defaults to 0
export type Location = LatestGroup | LatestObject | AbsoluteStart | AbsoluteRange

export interface LatestGroup {
mode: "latest_group"
}

export interface LatestObject {
mode: "latest_object"
}

export interface AbsoluteStart {
mode: "absolute_start"
start_group: number
start_object: number
}

export interface AbsoluteRange {
mode: "absolute_range"
start_group: number
start_object: number
end_group: number
end_object: number
}

export type Parameters = Map<bigint, Uint8Array>
Expand Down Expand Up @@ -245,26 +261,37 @@ export class Decoder {
trackId: await this.r.u62(),
namespace: await this.r.string(),
name: await this.r.string(),
start_group: await this.location(),
start_object: await this.location(),
end_group: await this.location(),
end_object: await this.location(),
location: await this.location(),
params: await this.parameters(),
}
}

private async location(): Promise<Location> {
const mode = await this.r.u62()
if (mode == 0n) {
return { mode: "none", value: 0 }
} else if (mode == 1n) {
return { mode: "absolute", value: await this.r.u53() }
if (mode == 1n) {
return {
mode: "latest_group",
}
} else if (mode == 2n) {
return { mode: "latest", value: await this.r.u53() }
return {
mode: "latest_object",
}
} else if (mode == 3n) {
return { mode: "future", value: await this.r.u53() }
return {
mode: "absolute_start",
start_group: await this.r.u53(),
start_object: await this.r.u53(),
}
} else if (mode == 4n) {
return {
mode: "absolute_range",
start_group: await this.r.u53(),
start_object: await this.r.u53(),
end_group: await this.r.u53(),
end_object: await this.r.u53(),
}
} else {
throw new Error(`invalid location mode: ${mode}`)
throw new Error(`invalid filter type: ${mode}`)
}
}

Expand Down Expand Up @@ -419,25 +446,29 @@ export class Encoder {
await this.w.u62(s.trackId)
await this.w.string(s.namespace)
await this.w.string(s.name)
await this.location(s.start_group)
await this.location(s.start_object)
await this.location(s.end_group)
await this.location(s.end_object)
await this.location(s.location)
await this.parameters(s.params)
}

private async location(l: Location) {
if (l.mode == "none") {
await this.w.u8(0)
} else if (l.mode == "absolute") {
await this.w.u8(1)
await this.w.u53(l.value ?? 0)
} else if (l.mode == "latest") {
await this.w.u8(2)
await this.w.u53(l.value ?? 0)
} else if (l.mode == "future") {
await this.w.u8(3)
await this.w.u53(l.value ?? 0)
switch (l.mode) {
case "latest_group":
await this.w.u62(1n)
break
case "latest_object":
await this.w.u62(2n)
break
case "absolute_start":
await this.w.u62(3n)
await this.w.u53(l.start_group)
await this.w.u53(l.start_object)
break
case "absolute_range":
await this.w.u62(3n)
await this.w.u53(l.start_group)
await this.w.u53(l.start_object)
await this.w.u53(l.end_group)
await this.w.u53(l.end_object)
}
}

Expand Down
50 changes: 42 additions & 8 deletions lib/transport/objects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ export enum StreamType {
Group = 0x51,
}

export enum Status {
OBJECT_NULL = 1,
GROUP_NULL = 2,
GROUP_END = 3,
TRACK_END = 4,
}

export interface TrackHeader {
type: StreamType.Track
sub: bigint
Expand All @@ -17,7 +24,7 @@ export interface TrackHeader {
export interface TrackChunk {
group: number // The group sequence, as a number because 2^53 is enough.
object: number
payload: Uint8Array
payload: Uint8Array | Status
}

export interface GroupHeader {
Expand All @@ -30,7 +37,7 @@ export interface GroupHeader {

export interface GroupChunk {
object: number
payload: Uint8Array
payload: Uint8Array | Status
}

export interface ObjectHeader {
Expand All @@ -40,6 +47,7 @@ export interface ObjectHeader {
group: number
object: number
priority: number
status: number
}

export interface ObjectChunk {
Expand Down Expand Up @@ -75,6 +83,7 @@ export class Objects {
await w.u53(h.group)
await w.u53(h.object)
await w.u53(h.priority)
await w.u53(h.status)

res = new ObjectWriter(h, w) as WriterType<T>
} else if (h.type === StreamType.Group) {
Expand Down Expand Up @@ -132,6 +141,7 @@ export class Objects {
track: await r.u62(),
group: await r.u53(),
object: await r.u53(),
status: await r.u53(),
priority: await r.u53(),
}

Expand All @@ -155,8 +165,15 @@ export class TrackWriter {
async write(c: TrackChunk) {
await this.stream.u53(c.group)
await this.stream.u53(c.object)
await this.stream.u53(c.payload.byteLength)
await this.stream.write(c.payload)

if (c.payload instanceof Uint8Array) {
await this.stream.u53(c.payload.byteLength)
await this.stream.write(c.payload)
} else {
// empty payload with status
await this.stream.u53(0)
await this.stream.u53(c.payload as number)
}
}

async close() {
Expand All @@ -172,8 +189,13 @@ export class GroupWriter {

async write(c: GroupChunk) {
await this.stream.u53(c.object)
await this.stream.u53(c.payload.byteLength)
await this.stream.write(c.payload)
if (c.payload instanceof Uint8Array) {
await this.stream.u53(c.payload.byteLength)
await this.stream.write(c.payload)
} else {
await this.stream.u53(0)
await this.stream.u53(c.payload as number)
}
}

async close() {
Expand Down Expand Up @@ -210,7 +232,13 @@ export class TrackReader {
const group = await this.stream.u53()
const object = await this.stream.u53()
const size = await this.stream.u53()
const payload = await this.stream.read(size)

let payload
if (size == 0) {
payload = (await this.stream.u53()) as Status
} else {
payload = await this.stream.read(size)
}

return {
group,
Expand All @@ -237,7 +265,13 @@ export class GroupReader {

const object = await this.stream.u53()
const size = await this.stream.u53()
const payload = await this.stream.read(size)

let payload
if (size == 0) {
payload = (await this.stream.u53()) as Status
} else {
payload = await this.stream.read(size)
}

return {
object,
Expand Down
1 change: 1 addition & 0 deletions lib/transport/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ export class SubscribeRecv {
group: props.group,
object: props.object,
priority: props.priority ?? 0,
status: 0,
})
}
}
1 change: 1 addition & 0 deletions lib/transport/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export enum Version {
DRAFT_01 = 0xff000001,
DRAFT_02 = 0xff000002,
DRAFT_03 = 0xff000003,
DRAFT_04 = 0xff000004,
KIXEL_00 = 0xbad00,
KIXEL_01 = 0xbad01,
}
Expand Down
7 changes: 3 additions & 4 deletions lib/transport/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,9 @@ export class Subscriber {
trackId: id,
namespace,
name: track,
start_group: { mode: "latest", value: 0 },
start_object: { mode: "absolute", value: 0 },
end_group: { mode: "none" },
end_object: { mode: "none" },
location: {
mode: "latest_group",
},
})

return subscribe
Expand Down
Loading