Skip to content

Commit

Permalink
SyncAgent - create space, user streams, observable dock (#287)
Browse files Browse the repository at this point in the history
slowly copying code over from the stress client, leaving places for the
app view logic to go
  • Loading branch information
texuf authored Jun 26, 2024
1 parent bb7c46b commit f4801c7
Show file tree
Hide file tree
Showing 21 changed files with 576 additions and 189 deletions.
2 changes: 1 addition & 1 deletion packages/sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ export class Client
return this.syncedStreamsExtensions.initStatus
}

get cryptoEnabled(): boolean {
get cryptoInitialized(): boolean {
return this.cryptoBackend !== undefined
}

Expand Down
8 changes: 4 additions & 4 deletions packages/sdk/src/observable/observable.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export class Observable<T> {
subscribers: ((value: T) => void)[] = []
private _value: T
protected subscribers: ((value: T) => void)[] = []
protected _value: T

constructor(value: T) {
this._value = value
Expand All @@ -10,8 +10,8 @@ export class Observable<T> {
return this._value
}

set(value: T) {
this._value = value
setValue(newValue: T) {
this._value = newValue
this.notify()
}

Expand Down
58 changes: 30 additions & 28 deletions packages/sdk/src/observable/persistedObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ export type PersistedModel<T> =
| { status: 'saving'; data: T }
| { status: 'saved'; data: T }

interface Storable {
tableName: string
load(): void
}
interface Storable {}

const all_tables = new Set<string>()

Expand All @@ -30,8 +27,10 @@ export function persistedObservable(options: PersistedOpts) {
constructor(...args: any[]) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
super(...args)
this.tableName = options.tableName
this.load()
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
;(this as any).tableName = options.tableName
// eslint-disable-next-line @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-member-access
;(this as any).load()
}
static tableName = options.tableName
}
Expand All @@ -42,68 +41,71 @@ export class PersistedObservable<T extends Identifiable>
extends Observable<PersistedModel<T>>
implements Storable
{
private tableName: string = ''
private readonly store: Store
tableName: string = ''
readonly loadPriority: LoadPriority
readonly id: string
private readonly loadPriority: LoadPriority

// must be called in a store transaction
constructor(initialValue: T, store: Store, loadPriority: LoadPriority = LoadPriority.low) {
super({ status: 'loading', data: initialValue })
this.id = initialValue.id
this.loadPriority = loadPriority
this.store = store
}

load() {
check(this.value.status === 'loading', 'already loaded')
protected load() {
check(super.value.status === 'loading', 'already loaded')
this.store.load(
this.tableName,
this.id,
this.data.id,
this.loadPriority,
(data?: T) => {
super.set({ status: 'loaded', data: data ?? this.data })
super.setValue({ status: 'loaded', data: data ?? this.data })
},
(error: Error) => {
super.set({ status: 'error', data: this.data, error })
super.setValue({ status: 'error', data: this.data, error })
},
async () => {
await this.onLoaded()
},
)
}

get data(): T {
return this.value.data
override get value(): PersistedModel<T> {
return super.value
}

set(_: PersistedModel<T>) {
throw new Error('use update method to update')
override setValue(_newValue: PersistedModel<T>) {
throw new Error('use updateData instead of set value')
}

get data(): T {
return super.value.data
}

// must be called in a store transaction
update(data: T) {
check(isDefined(data), 'value is undefined')
check(data.id === this.id, 'id mismatch')
super.set({ status: 'saving', data: data })
setData(newDataPartial: Partial<T>) {
check(isDefined(newDataPartial), 'value is undefined')
const newData = { ...this.data, ...newDataPartial }
check(newData.id === this.data.id, 'id mismatch')
super.setValue({ status: 'saving', data: newData })
this.store
.withTransaction(`update-${this.tableName}:${this.id}`, () => {
.withTransaction(`update-${this.tableName}:${this.data.id}`, () => {
this.store.save(
this.tableName,
data,
newData,
() => {
super.set({ status: 'saved', data: data })
super.setValue({ status: 'saved', data: newData })
},
(e) => {
super.set({ status: 'error', data: data, error: e })
super.setValue({ status: 'error', data: newData, error: e })
},
async () => {
await this.onSaved()
},
)
})
.catch((e) => {
super.set({ status: 'error', data: this.data, error: e })
super.setValue({ status: 'error', data: this.data, error: e })
})
}

Expand Down
7 changes: 7 additions & 0 deletions packages/sdk/src/streamEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
UserInboxPayload_GroupEncryptionSessions,
UserSettingsPayload_UserBlock,
UserPayload_UserMembership,
UserInboxPayload_Snapshot_DeviceSummary,
} from '@river-build/proto'

import {
Expand Down Expand Up @@ -62,6 +63,12 @@ export type StreamStateEvents = {
userInvitedToStream: (streamId: string) => void
userLeftStream: (streamId: string) => void
userStreamMembershipChanged: (streamId: string, payload: UserPayload_UserMembership) => void
userInboxDeviceSummaryUpdated: (
streamId: string,
deviceKey: string,
summary: UserInboxPayload_Snapshot_DeviceSummary,
) => void
userDeviceKeysUpdated: (streamId: string, deviceKeys: UserDevice[]) => void
spaceChannelCreated: (spaceId: string, channelId: string) => void
spaceChannelUpdated: (spaceId: string, channelId: string) => void
spaceChannelDeleted: (spaceId: string, channelId: string) => void
Expand Down
8 changes: 5 additions & 3 deletions packages/sdk/src/streamStateView_UserDeviceKey.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export class StreamStateView_UserDeviceKeys extends StreamStateView_AbstractCont
): void {
// dispatch events for all device keys, todo this seems inefficient?
for (const value of content.encryptionDevices) {
this.addUserDeviceKey(value, encryptionEmitter)
this.addUserDeviceKey(value, encryptionEmitter, undefined)
}
}

Expand All @@ -50,15 +50,15 @@ export class StreamStateView_UserDeviceKeys extends StreamStateView_AbstractCont
event: RemoteTimelineEvent,
_cleartext: string | undefined,
encryptionEmitter: TypedEmitter<StreamEncryptionEvents> | undefined,
_stateEmitter: TypedEmitter<StreamStateEvents> | undefined,
stateEmitter: TypedEmitter<StreamStateEvents> | undefined,
): void {
check(event.remoteEvent.event.payload.case === 'userDeviceKeyPayload')
const payload: UserDeviceKeyPayload = event.remoteEvent.event.payload.value
switch (payload.content.case) {
case 'inception':
break
case 'encryptionDevice':
this.addUserDeviceKey(payload.content.value, encryptionEmitter)
this.addUserDeviceKey(payload.content.value, encryptionEmitter, stateEmitter)
break
case undefined:
break
Expand All @@ -70,6 +70,7 @@ export class StreamStateView_UserDeviceKeys extends StreamStateView_AbstractCont
private addUserDeviceKey(
value: UserDeviceKeyPayload_EncryptionDevice,
encryptionEmitter: TypedEmitter<StreamEncryptionEvents> | undefined,
stateEmitter: TypedEmitter<StreamStateEvents> | undefined,
) {
const device = {
deviceKey: value.deviceKey,
Expand All @@ -81,5 +82,6 @@ export class StreamStateView_UserDeviceKeys extends StreamStateView_AbstractCont
}
this.deviceKeys.push(device)
encryptionEmitter?.emit('userDeviceKeyMessage', this.streamId, this.streamCreatorId, device)
stateEmitter?.emit('userDeviceKeysUpdated', this.streamId, this.deviceKeys)
}
}
16 changes: 13 additions & 3 deletions packages/sdk/src/streamStateView_UserInbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ export class StreamStateView_UserInbox extends StreamStateView_AbstractContent {
event: RemoteTimelineEvent,
_cleartext: string | undefined,
_encryptionEmitter: TypedEmitter<StreamEncryptionEvents> | undefined,
_stateEmitter: TypedEmitter<StreamStateEvents> | undefined,
stateEmitter: TypedEmitter<StreamStateEvents> | undefined,
): void {
check(event.remoteEvent.event.payload.case === 'userInboxPayload')
const payload: UserInboxPayload = event.remoteEvent.event.payload.value
Expand All @@ -90,7 +90,7 @@ export class StreamStateView_UserInbox extends StreamStateView_AbstractContent {
}
break
case 'ack':
this.updateDeviceSummary(event.remoteEvent, payload.content.value)
this.updateDeviceSummary(event.remoteEvent, payload.content.value, stateEmitter)
break
case undefined:
break
Expand Down Expand Up @@ -119,7 +119,11 @@ export class StreamStateView_UserInbox extends StreamStateView_AbstractContent {
encryptionEmitter?.emit('newGroupSessions', content, creatorUserId)
}

private updateDeviceSummary(event: ParsedEvent, content: UserInboxPayload_Ack) {
private updateDeviceSummary(
event: ParsedEvent,
content: UserInboxPayload_Ack,
stateEmitter: TypedEmitter<StreamStateEvents> | undefined,
) {
const summary = this.deviceSummary[content.deviceKey]
if (summary) {
if (summary.upperBound <= content.miniblockNum) {
Expand All @@ -128,5 +132,11 @@ export class StreamStateView_UserInbox extends StreamStateView_AbstractContent {
summary.lowerBound = content.miniblockNum + 1n
}
}
stateEmitter?.emit(
'userInboxDeviceSummaryUpdated',
this.streamId,
content.deviceKey,
summary,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { dlogger } from '@river-build/dlog'
const log = dlogger('csb:riverNodeUrls')

// Define a data model, this is what will be stored in the database
export interface RiverNodeUrlsModel {
export interface StreamNodeUrlsModel {
id: '0' // single data blobs need a fixed key
urls: string // here's some data we're trying to keep track of
}
Expand All @@ -17,7 +17,7 @@ export interface RiverNodeUrlsModel {
@persistedObservable({
tableName: 'riverNodeUrls', // this is the name of the table in the database
})
export class RiverNodeUrls extends PersistedObservable<RiverNodeUrlsModel> {
export class StreamNodeUrls extends PersistedObservable<StreamNodeUrlsModel> {
private riverRegistry: RiverRegistry // store any member variables required for logic

// The constructor is where we set up the class, we pass in the store and any other dependencies
Expand All @@ -41,7 +41,7 @@ export class RiverNodeUrls extends PersistedObservable<RiverNodeUrlsModel> {
.getOperationalNodeUrls() // here we are fetching the node urls
.then((urls) => {
if (urls !== this.data.urls) {
this.update({ ...this.data, urls }) // if the data is new, update our own state
this.setData({ urls }) // if the data is new, update our own state
}
})
.catch((e) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* @group main
* @group with-entitilements
*/

import { providers } from 'ethers'
import { genShortId } from '../../id'
import { Store } from '../../store/store'
import { makeRiverConfig } from '../../riverConfig'
import { RiverNodeUrls } from './models/riverNodeUrls'
import { StreamNodeUrls } from './models/streamNodeUrls'
import { RiverRegistry, SpaceDapp } from '@river-build/web3'
import { RiverConnection } from './riverConnection'
import { makeRandomUserContext, waitFor } from '../../util.test'
Expand All @@ -25,48 +25,50 @@ describe('RiverConnection.test.ts', () => {
// init
const context = await makeRandomUserContext()
const clientParams = makeClientParams({ context, riverConfig }, spaceDapp)
const store = new Store(databaseName, 1, [RiverNodeUrls])
const store = new Store(databaseName, 1, [StreamNodeUrls])
store.newTransactionGroup('init')
const riverRegistry = new RiverRegistry(riverConfig.river.chainConfig, riverProvider)
const riverConnection = new RiverConnection(store, riverRegistry, clientParams)

// check initial state
expect(riverConnection.nodeUrls.data.urls).toBe('')
expect(riverConnection.client.value).toBeUndefined()
expect(riverConnection.streamNodeUrls.data.urls).toBe('')
expect(riverConnection.client).toBeUndefined()

// load
await store.commitTransaction()

// we should get there
await waitFor(() => {
expect(riverConnection.nodeUrls.data.urls).not.toBe('')
expect(riverConnection.streamNodeUrls.data.urls).not.toBe('')
})
await waitFor(() => {
expect(riverConnection.client.value).toBeDefined()
expect(riverConnection.client).toBeDefined()
})
await waitFor(() => {
expect(riverConnection.nodeUrls.value.status).toBe('saved')
expect(riverConnection.streamNodeUrls.value.status).toBe('saved')
})
await riverConnection.stop()
})
// test that a riverConnection will instantly be defined if data exists in local store
test('riverConnection loads from db', async () => {
// init
const context = await makeRandomUserContext()
const clientParams = makeClientParams({ context, riverConfig }, spaceDapp)
const store = new Store(databaseName, 1, [RiverNodeUrls])
const store = new Store(databaseName, 1, [StreamNodeUrls])
store.newTransactionGroup('init')
const riverRegistry = new RiverRegistry(riverConfig.river.chainConfig, riverProvider)
const riverConnection = new RiverConnection(store, riverRegistry, clientParams)

// check initial state
expect(riverConnection.nodeUrls.data.urls).toBe('')
expect(riverConnection.client.value).toBeUndefined()
expect(riverConnection.streamNodeUrls.data.urls).toBe('')
expect(riverConnection.client).toBeUndefined()

// load
await store.commitTransaction()

// should still be defined before we even start!
expect(riverConnection.nodeUrls.data.urls).not.toBe('')
expect(riverConnection.client.value).toBeDefined()
expect(riverConnection.streamNodeUrls.data.urls).not.toBe('')
expect(riverConnection.client).toBeDefined()
await riverConnection.stop()
})
})
Loading

0 comments on commit f4801c7

Please sign in to comment.