Skip to content

Commit

Permalink
SyncAgent: Channels (#310)
Browse files Browse the repository at this point in the history
Move some stuff around, add channel scaffolding
  • Loading branch information
texuf authored Jun 28, 2024
1 parent 2a955b4 commit f56394b
Show file tree
Hide file tree
Showing 40 changed files with 1,027 additions and 439 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
"name": "Jest: current file in 'packages/sdk/' (single entitlements)",
"type": "node",
"request": "launch",
"env": { "NODE_ENV": "development", "NODE_TLS_REJECT_UNAUTHORIZED": "0", "RIVER_ENV": "local_single", "DEBUG": "csb:*,test:*", "DEBUG_DEPTH":"10" },
"env": { "NODE_ENV": "development", "NODE_TLS_REJECT_UNAUTHORIZED": "0", "RIVER_ENV": "local_single", "DEBUG": "csb:*,test:*", "DEBUG_DEPTH":"1" },
"program": "${workspaceFolder}/node_modules/.bin/jest",
"runtimeArgs": ["--experimental-vm-modules", "--experimental-wasm-modules"],
"args": ["${file}", "--config", "${workspaceFolder}/packages/sdk/jest.config.ts", "-i", "--no-cache", "--forceExit"],
Expand Down
1 change: 1 addition & 0 deletions packages/encryption/src/decryptionExtensions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ export abstract class BaseDecryptionExtensions {
sessions: UserInboxPayload_GroupEncryptionSessions,
_senderId: string,
): void {
this.log.info('enqueueNewGroupSessions', sessions)
this.queues.newGroupSession.push(sessions)
this.checkStartTicking()
}
Expand Down
1 change: 0 additions & 1 deletion packages/playground/src/routes/root.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ const ConnectedContent = () => {
const { data: nodeUrls } = useSyncValue((s) => s.riverStreamNodeUrls, {
onUpdate: (data) => console.log('onUpdate', data),
onError: (error) => console.error('onError', error),
onSaved: (data) => console.log('onSaved', data),
})
return (
<div className="max-w-xl rounded-sm border border-zinc-200 bg-zinc-100 p-2">
Expand Down
10 changes: 0 additions & 10 deletions packages/react-sdk/src/useObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ export type ObservableConfig<T> = {
fireImmediately?: boolean
onUpdate?: (data: T) => void
onError?: (error: Error) => void
onSaved?: (data: T) => void
}

type ObservableReturn<T> = {
Expand All @@ -15,8 +14,6 @@ type ObservableReturn<T> = {
status: PersistedModel<T>['status']
isLoading: boolean
isError: boolean
isSaving: boolean
isSaved: boolean
isLoaded: boolean
}

Expand Down Expand Up @@ -65,9 +62,6 @@ export function useObservable<T>(
if (value.status === 'error') {
opts.onError?.(value.error)
}
if (value.status === 'saved') {
opts.onSaved?.(value.data)
}
},
[opts],
)
Expand All @@ -90,9 +84,7 @@ export function useObservable<T>(
status: 'loading',
isLoading: true,
isError: false,
isSaving: false,
isLoaded: false,
isSaved: false,
}
}
const { data, status } = value
Expand All @@ -102,9 +94,7 @@ export function useObservable<T>(
status,
isLoading: status === 'loading',
isError: status === 'error',
isSaving: status === 'saving',
isLoaded: status === 'loaded',
isSaved: status === 'saved',
}
}, [value]) satisfies ObservableReturn<T>

Expand Down
16 changes: 8 additions & 8 deletions packages/sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ import {
} from './id'
import { makeEvent, unpackMiniblock, unpackStream, unpackStreamEx } from './sign'
import { StreamEvents } from './streamEvents'
import { StreamStateView } from './streamStateView'
import { IStreamStateView, StreamStateView } from './streamStateView'
import {
make_UserDeviceKeyPayload_Inception,
make_ChannelPayload_Inception,
Expand Down Expand Up @@ -123,7 +123,7 @@ import { SyncedStream } from './syncedStream'
import { SyncedStreamsExtension } from './syncedStreamsExtension'
import { SignerContext } from './signerContext'

type ClientEvents = StreamEvents & DecryptionEvents
export type ClientEvents = StreamEvents & DecryptionEvents

type SendChannelMessageOptions = {
beforeSendEventHook?: Promise<void>
Expand Down Expand Up @@ -1178,7 +1178,7 @@ export class Client
): Promise<{ eventId: string }> {
const stream = this.stream(streamId)
check(stream !== undefined, 'stream not found')
const localId = stream.view.appendLocalEvent(payload, 'sending', this)
const localId = stream.appendLocalEvent(payload, 'sending')
opts?.onLocalEventAppended?.(localId)
if (opts?.beforeSendEventHook) {
await opts?.beforeSendEventHook
Expand Down Expand Up @@ -1656,7 +1656,7 @@ export class Client
* that are in the room but that have been blocked.
*/
async getDevicesInStream(stream_id: string): Promise<UserDeviceCollection> {
let stream: StreamStateView | undefined
let stream: IStreamStateView | undefined
stream = this.stream(stream_id)?.view
if (!stream || !stream.isInitialized) {
stream = await this.getStream(stream_id)
Expand Down Expand Up @@ -1796,7 +1796,7 @@ export class Client
// when we have a localId, we need to update the local event with the eventId
const stream = this.streams.get(streamId)
assert(stream !== undefined, 'unknown stream ' + streamIdAsString(streamId))
stream.view.updateLocalEvent(localId, eventId, 'sending', this)
stream.updateLocalEvent(localId, eventId, 'sending')
}

if (cleartext) {
Expand All @@ -1812,7 +1812,7 @@ export class Client
})
if (localId) {
const stream = this.streams.get(streamId)
stream?.view.updateLocalEvent(localId, eventId, 'sent', this)
stream?.updateLocalEvent(localId, eventId, 'sent')
}
return { prevMiniblockHash, eventId, error }
} catch (err) {
Expand Down Expand Up @@ -1844,7 +1844,7 @@ export class Client
} else {
if (localId) {
const stream = this.streams.get(streamId)
stream?.view.updateLocalEvent(localId, eventId, 'failed', this)
stream?.updateLocalEvent(localId, eventId, 'failed')
}
throw err
}
Expand Down Expand Up @@ -1957,7 +1957,7 @@ export class Client
check(isEncryptedContentKind(kind), `invalid kind ${kind}`)
const cleartext = await this.cleartextForGroupEvent(streamId, eventId, encryptedData)
const decryptedContent = toDecryptedContent(kind, cleartext)
stream.view.updateDecryptedContent(eventId, decryptedContent, this)
stream.updateDecryptedContent(eventId, decryptedContent)
}

private async cleartextForGroupEvent(
Expand Down
16 changes: 6 additions & 10 deletions packages/sdk/src/clientDecryptionExtensions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,16 +180,12 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions {
}

public onDecryptionError(item: EncryptedContentItem, err: DecryptionSessionError): void {
this.client.stream(item.streamId)?.view.updateDecryptedContentError(
item.eventId,
{
missingSession: err.missingSession,
kind: err.kind,
encryptedData: item.encryptedData,
error: err,
},
this.client,
)
this.client.stream(item.streamId)?.updateDecryptedContentError(item.eventId, {
missingSession: err.missingSession,
kind: err.kind,
encryptedData: item.encryptedData,
error: err,
})
}

public async ackNewGroupSession(
Expand Down
7 changes: 7 additions & 0 deletions packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,24 @@ export * from './streamStateView_UserSettings'
export * from './streamUtils'
export * from './sync-agent/db'
export * from './sync-agent/entitlements/entitlements'
export * from './sync-agent/river-connection/models/authStatus'
export * from './sync-agent/river-connection/models/streamNodeUrls'
export * from './sync-agent/river-connection/models/transactionalClient'
export * from './sync-agent/river-connection/riverConnection'
export * from './sync-agent/spaces/models/channel'
export * from './sync-agent/spaces/models/space'
export * from './sync-agent/spaces/spaces'
export * from './sync-agent/streams/models/streamConnectionStatus'
export * from './sync-agent/syncAgent'
export * from './sync-agent/timeline/models/timelineEvent'
export * from './sync-agent/timeline/models/timelineEvents'
export * from './sync-agent/timeline/timeline'
export * from './sync-agent/user/models/userDeviceKeys'
export * from './sync-agent/user/models/userInbox'
export * from './sync-agent/user/models/userMemberships'
export * from './sync-agent/user/models/userSettings'
export * from './sync-agent/user/user'
export * from './sync-agent/utils/bot'
export * from './sync-agent/utils/promiseQueue'
export * from './sync-agent/utils/providers'
export * from './sync-agent/utils/spaceUtils'
Expand Down
57 changes: 51 additions & 6 deletions packages/sdk/src/observable/observable.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
interface Subscription<T> {
id: number
fn: (value: T) => void
condition: (value: T) => boolean
once: boolean
}

export class Observable<T> {
protected subscribers: ((value: T) => void)[] = []
private _nextId = 0
protected subscribers: Subscription<T>[] = []
protected _value: T

constructor(value: T) {
Expand All @@ -15,19 +23,56 @@ export class Observable<T> {
this.notify()
}

subscribe(subscriber: (newValue: T) => void, opts: { fireImediately?: boolean } = {}): this {
this.subscribers.push(subscriber)
subscribe(
subscriber: (newValue: T) => void,
opts: { fireImediately?: boolean; once?: boolean; condition?: (value: T) => boolean } = {},
): this {
const sub = {
id: this._nextId++,
fn: subscriber,
once: opts?.once ?? false,
condition: opts?.condition ?? (() => true),
} satisfies Subscription<T>
this.subscribers.push(sub)
if (opts.fireImediately) {
subscriber(this.value)
this._notify(sub, this.value)
}
return this
}

when(
condition: (value: T) => boolean,
opts: { timeoutMs: number } = { timeoutMs: 5000 },
): Promise<T> {
return new Promise((resolve, reject) => {
const timeoutHandle = setTimeout(() => {
reject(new Error('Timeout waiting for condition'))
}, opts.timeoutMs)
this.subscribe(
(value) => {
clearTimeout(timeoutHandle)
resolve(value)
},
{ fireImediately: true, condition: condition, once: true },
)
})
}

unsubscribe(subscriber: (value: T) => void) {
this.subscribers = this.subscribers.filter((sub) => sub !== subscriber)
this.subscribers = this.subscribers.filter((sub) => sub.fn !== subscriber)
}

private notify() {
this.subscribers.forEach((sub) => sub(this.value))
const subscriptions = this.subscribers
subscriptions.forEach((sub) => this._notify(sub, this.value))
}

private _notify(sub: Subscription<T>, value: T) {
if (sub.condition(value)) {
sub.fn(value)
if (sub.once) {
this.subscribers = this.subscribers.filter((s) => s !== sub)
}
}
}
}
44 changes: 20 additions & 24 deletions packages/sdk/src/observable/persistedObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ export type PersistedModel<T> =
| { status: 'loading'; data: T }
| { status: 'loaded'; data: T }
| { status: 'error'; data: T; error: Error }
| { status: 'saving'; data: T }
| { status: 'saved'; data: T }

interface Storable {}

Expand All @@ -29,8 +27,11 @@ export function persistedObservable(options: PersistedOpts) {
super(...args)
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
;(this as any).tableName = options.tableName
// eslint-disable-next-line @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-member-access
;(this as any).load()
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
;((this as any).store as Store).withTransaction(`new-${options.tableName}`, () => {
// eslint-disable-next-line @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-member-access
;(this as any).load()
})
}
static tableName = options.tableName
}
Expand Down Expand Up @@ -85,28 +86,23 @@ export class PersistedObservable<T extends Identifiable>
// must be called in a store transaction
setData(newDataPartial: Partial<T>) {
check(isDefined(newDataPartial), 'value is undefined')
const prevData = this.data
const newData = { ...this.data, ...newDataPartial }
check(newData.id === this.data.id, 'id mismatch')
super.setValue({ status: 'saving', data: newData })
this.store
.withTransaction(`update-${this.tableName}:${this.data.id}`, () => {
this.store.save(
this.tableName,
newData,
() => {
super.setValue({ status: 'saved', data: newData })
},
(e) => {
super.setValue({ status: 'error', data: newData, error: e })
},
async () => {
await this.onSaved()
},
)
})
.catch((e) => {
super.setValue({ status: 'error', data: this.data, error: e })
})
super.setValue({ status: 'loaded', data: newData })
this.store.withTransaction(`update-${this.tableName}:${this.data.id}`, () => {
this.store.save(
this.tableName,
newData,
() => {},
(e) => {
super.setValue({ status: 'error', data: prevData, error: e })
},
async () => {
await this.onSaved()
},
)
})
}

protected async onLoaded() {
Expand Down
21 changes: 14 additions & 7 deletions packages/sdk/src/store/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ export class Store {
}

newTransactionGroup(name: string) {
log(`newTransactionGroup "${name}"`)
check(this.transactionGroup === undefined, 'transaction already in progress')
// log(`newTransactionGroup "${name}"`)
check(
this.transactionGroup === undefined,
`transaction already in progress named: ${this.transactionGroup?.name}`,
)
this.transactionGroup = new TransactionGroup(name)
}

Expand All @@ -82,7 +85,7 @@ export class Store {
this.transactionGroup = undefined
// if no ops, return
if (!tGroup.hasOps) {
log(`commitTransaction "${tGroup.name}" skipped (empty)`)
// log(`commitTransaction "${tGroup.name}" skipped (empty)`)
return
}
log(
Expand Down Expand Up @@ -111,13 +114,17 @@ export class Store {
log(`commitTransaction "${tGroup.name}" done`, 'elapsedMs:', Date.now() - time)
}

async withTransaction(name: string, fn: () => void) {
withTransaction<T>(name: string, fn: () => T): T {
if (this.transactionGroup !== undefined) {
fn()
return fn()
} else {
this.newTransactionGroup(name)
fn()
await this.commitTransaction()
const result = fn()
this.commitTransaction().catch((e) => {
log(`uncaught commitTransaction error in groun ${name}`, e)
throw e
})
return result
}
}

Expand Down
Loading

0 comments on commit f56394b

Please sign in to comment.