From 183425f728356381d0dd12b96ce9bda8dcb88de3 Mon Sep 17 00:00:00 2001 From: texuf Date: Thu, 27 Jun 2024 11:24:57 -0700 Subject: [PATCH] SyncAgent, stub out spaces and channels (#296) --- packages/sdk/src/index.ts | 3 + .../sdk/src/observable/persistedObservable.ts | 6 +- packages/sdk/src/store/store.ts | 14 +++++ packages/sdk/src/sync-agent/db.ts | 6 ++ .../river-connection/riverConnection.ts | 5 ++ .../src/sync-agent/spaces/models/channel.ts | 35 +++++++++++ .../sdk/src/sync-agent/spaces/models/space.ts | 29 +++++++++ .../sdk/src/sync-agent/spaces/spaces.test.ts | 31 ++++++++++ packages/sdk/src/sync-agent/spaces/spaces.ts | 60 +++++++++++++++++++ packages/sdk/src/sync-agent/syncAgent.ts | 6 +- .../sdk/src/sync-agent/syncAgents.test.ts | 35 +++++++++++ 11 files changed, 226 insertions(+), 4 deletions(-) create mode 100644 packages/sdk/src/sync-agent/spaces/models/channel.ts create mode 100644 packages/sdk/src/sync-agent/spaces/models/space.ts create mode 100644 packages/sdk/src/sync-agent/spaces/spaces.test.ts create mode 100644 packages/sdk/src/sync-agent/spaces/spaces.ts create mode 100644 packages/sdk/src/sync-agent/syncAgents.test.ts diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index db1a61b6d..c2660f88c 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -43,6 +43,9 @@ export * from './sync-agent/db' export * from './sync-agent/entitlements/entitlements' export * from './sync-agent/river-connection/models/streamNodeUrls' export * from './sync-agent/river-connection/riverConnection' +export * from './sync-agent/spaces/models/channel' +export * from './sync-agent/spaces/models/space' +export * from './sync-agent/spaces/spaces' export * from './sync-agent/syncAgent' export * from './sync-agent/user/models/userDeviceKeys' export * from './sync-agent/user/models/userInbox' diff --git a/packages/sdk/src/observable/persistedObservable.ts b/packages/sdk/src/observable/persistedObservable.ts index 4eb899f68..75a85cb9d 100644 --- a/packages/sdk/src/observable/persistedObservable.ts +++ b/packages/sdk/src/observable/persistedObservable.ts @@ -41,9 +41,9 @@ export class PersistedObservable extends Observable> implements Storable { - private tableName: string = '' - private readonly store: Store - private readonly loadPriority: LoadPriority + protected tableName: string = '' + protected readonly store: Store + protected readonly loadPriority: LoadPriority // must be called in a store transaction constructor(initialValue: T, store: Store, loadPriority: LoadPriority = LoadPriority.low) { diff --git a/packages/sdk/src/store/store.ts b/packages/sdk/src/store/store.ts index da748cd74..fe06bc145 100644 --- a/packages/sdk/src/store/store.ts +++ b/packages/sdk/src/store/store.ts @@ -54,6 +54,7 @@ function makeSchema(classes: any[]) { export class Store { private db: Dexie private transactionGroup?: TransactionGroup + private isLoadedMap: Record> = {} constructor(name: string, version: number, classes: any[]) { const schema = makeSchema(classes) @@ -131,6 +132,8 @@ export class Store { log('+enqueue load', tableName, id, loadPriority) this.checkTableName(tableName) check(this.transactionGroup !== undefined, 'transaction not started') + check(!this.isLoaded(tableName, id), `model already loaded table: ${tableName} id: ${id}`) + this.setIsLoaded(tableName, id) const bundler = this.transactionGroup[loadPriority] bundler.tableNames.push(tableName) const dbOp = async () => { @@ -170,4 +173,15 @@ export class Store { bundler.dbOps.push(dbOp) bundler.onCommitted.push(onCommitted) } + + private isLoaded(tableName: string, id: string): boolean { + return this.isLoadedMap[tableName]?.has(id) ?? false + } + + private setIsLoaded(tableName: string, id: string) { + if (this.isLoadedMap[tableName] === undefined) { + this.isLoadedMap[tableName] = new Set() + } + this.isLoadedMap[tableName].add(id) + } } diff --git a/packages/sdk/src/sync-agent/db.ts b/packages/sdk/src/sync-agent/db.ts index 0d396f4ee..f28ec1c3e 100644 --- a/packages/sdk/src/sync-agent/db.ts +++ b/packages/sdk/src/sync-agent/db.ts @@ -1,4 +1,7 @@ import { StreamNodeUrls } from './river-connection/models/streamNodeUrls' +import { Channel } from './spaces/models/channel' +import { Space } from './spaces/models/space' +import { Spaces } from './spaces/spaces' import { UserDeviceKeys } from './user/models/userDeviceKeys' import { UserInbox } from './user/models/userInbox' import { UserMemberships } from './user/models/userMemberships' @@ -7,6 +10,9 @@ import { User } from './user/user' export const DB_VERSION = 1 export const DB_MODELS = [ + Channel, + Space, + Spaces, StreamNodeUrls, User, UserDeviceKeys, diff --git a/packages/sdk/src/sync-agent/river-connection/riverConnection.ts b/packages/sdk/src/sync-agent/river-connection/riverConnection.ts index 3083a2af8..6159773b5 100644 --- a/packages/sdk/src/sync-agent/river-connection/riverConnection.ts +++ b/packages/sdk/src/sync-agent/river-connection/riverConnection.ts @@ -8,6 +8,7 @@ import { CryptoStore, EntitlementsDelegate } from '@river-build/encryption' import { Client } from '../../client' import { SignerContext } from '../../signerContext' import { PersistedModel } from '../../observable/persistedObservable' +import { userIdFromAddress } from '../../id' const logger = dlogger('csb:riverConnection') @@ -41,6 +42,10 @@ export class RiverConnection { this.streamNodeUrls.subscribe(this.onNodeUrlsChanged, { fireImediately: true }) } + get userId(): string { + return userIdFromAddress(this.clientParams.signerContext.creatorAddress) + } + async stop() { this.stopped = true this.streamNodeUrls.unsubscribe(this.onNodeUrlsChanged) diff --git a/packages/sdk/src/sync-agent/spaces/models/channel.ts b/packages/sdk/src/sync-agent/spaces/models/channel.ts new file mode 100644 index 000000000..15eea7244 --- /dev/null +++ b/packages/sdk/src/sync-agent/spaces/models/channel.ts @@ -0,0 +1,35 @@ +import { Client } from '../../../client' +import { PersistedObservable, persistedObservable } from '../../../observable/persistedObservable' +import { Identifiable, Store } from '../../../store/store' +import { RiverConnection } from '../../river-connection/riverConnection' + +export interface ChannelMetadata { + name: string +} + +export interface ChannelModel extends Identifiable { + id: string + spaceId: string + isJoined: boolean + metadata?: ChannelMetadata +} + +@persistedObservable({ tableName: 'channel' }) +export class Channel extends PersistedObservable { + constructor( + id: string, + spaceId: string, + private riverConnection: RiverConnection, + store: Store, + ) { + super({ id, spaceId, isJoined: false }, store) + } + + protected override async onLoaded() { + this.riverConnection.registerView(this.onClientStarted) + } + + private onClientStarted = (_client: Client) => { + return () => {} + } +} diff --git a/packages/sdk/src/sync-agent/spaces/models/space.ts b/packages/sdk/src/sync-agent/spaces/models/space.ts new file mode 100644 index 000000000..4284d584a --- /dev/null +++ b/packages/sdk/src/sync-agent/spaces/models/space.ts @@ -0,0 +1,29 @@ +import { Client } from '../../../client' +import { PersistedObservable, persistedObservable } from '../../../observable/persistedObservable' +import { Identifiable, Store } from '../../../store/store' +import { RiverConnection } from '../../river-connection/riverConnection' + +export interface SpaceMetadata { + name: string +} + +export interface SpaceModel extends Identifiable { + id: string + channelIds: string[] + metadata?: SpaceMetadata +} + +@persistedObservable({ tableName: 'space' }) +export class Space extends PersistedObservable { + constructor(id: string, private riverConnection: RiverConnection, store: Store) { + super({ id, channelIds: [] }, store) + } + + protected override async onLoaded() { + this.riverConnection.registerView(this.onClientStarted) + } + + private onClientStarted = (_client: Client) => { + return () => {} + } +} diff --git a/packages/sdk/src/sync-agent/spaces/spaces.test.ts b/packages/sdk/src/sync-agent/spaces/spaces.test.ts new file mode 100644 index 000000000..ffc5eb54b --- /dev/null +++ b/packages/sdk/src/sync-agent/spaces/spaces.test.ts @@ -0,0 +1,31 @@ +/** + * @group with-entitilements + */ +import { dlogger } from '@river-build/dlog' +import { TestUser } from '../utils/testUser.test' + +const logger = dlogger('csb:test:spaces') + +describe('spaces.test.ts', () => { + logger.log('start') + const testUser = new TestUser() + + test('create/leave/join space', async () => { + const syncAgent = await testUser.makeSyncAgent() + await syncAgent.start() + expect(syncAgent.spaces.value.status).not.toBe('loading') + const { spaceId } = await syncAgent.user.createSpace( + { spaceName: 'BlastOff' }, + testUser.signer, + ) + expect(syncAgent.spaces.data.spaceIds.length).toBe(1) + expect(syncAgent.spaces.data.spaceIds[0]).toBe(spaceId) + // expect(bob.spaces.getSpaces().length).toBe(1) + // expect(bob.spaces.getSpaces()[0].id).toBe(spaceId) + // expect(bob.spaces.getSpace(spaceId)).toBeDefined() + // const space = bob.spaces.getSpace(spaceId)! + // expect(space.data.spaceIds.length).toBe(1) + // await waitFor(() => expect(space.data.spaceIds.length).toBe(1) + await syncAgent.stop() + }) +}) diff --git a/packages/sdk/src/sync-agent/spaces/spaces.ts b/packages/sdk/src/sync-agent/spaces/spaces.ts new file mode 100644 index 000000000..033948405 --- /dev/null +++ b/packages/sdk/src/sync-agent/spaces/spaces.ts @@ -0,0 +1,60 @@ +import { Identifiable, Store } from '../../store/store' +import { + PersistedModel, + PersistedObservable, + persistedObservable, +} from '../../observable/persistedObservable' +import { Space } from './models/space' +import { User } from '../user/user' +import { UserMembershipsModel } from '../user/models/userMemberships' +import { MembershipOp } from '@river-build/proto' +import { isSpaceStreamId } from '../../id' +import { RiverConnection } from '../river-connection/riverConnection' + +export interface SpacesModel extends Identifiable { + id: '0' // single data blobs need a fixed key + spaceIds: string[] // joined spaces +} + +@persistedObservable({ tableName: 'spaces' }) +export class Spaces extends PersistedObservable { + private spaces: Record = {} + private user: User + private riverConnection: RiverConnection + + constructor(riverConnection: RiverConnection, user: User, store: Store) { + super({ id: '0', spaceIds: [] }, store) + this.riverConnection = riverConnection + this.user = user + } + + protected override async onLoaded() { + this.user.streams.memberships.subscribe( + (userMemberships) => { + this.onUserDataChanged(userMemberships) + }, + { fireImediately: true }, + ) + } + + getSpace(spaceId: string): Space | undefined { + return this.spaces[spaceId] + } + + private onUserDataChanged(userData: PersistedModel) { + if (userData.status === 'loading') { + return + } + const spaceIds = Object.values(userData.data.memberships) + .filter((m) => isSpaceStreamId(m.streamId) && m.op === MembershipOp.SO_JOIN) + .map((m) => m.streamId) + + this.setData({ spaceIds }) + + for (const spaceId of spaceIds) { + if (!this.spaces[spaceId]) { + this.spaces[spaceId] = new Space(spaceId, this.riverConnection, this.store) + } + } + } +} diff --git a/packages/sdk/src/sync-agent/syncAgent.ts b/packages/sdk/src/sync-agent/syncAgent.ts index 4dc1374c7..07844f57e 100644 --- a/packages/sdk/src/sync-agent/syncAgent.ts +++ b/packages/sdk/src/sync-agent/syncAgent.ts @@ -18,6 +18,7 @@ import { UserInboxModel } from './user/models/userInbox' import { DB_MODELS, DB_VERSION } from './db' import { UserDeviceKeysModel } from './user/models/userDeviceKeys' import { UserSettingsModel } from './user/models/userSettings' +import { Spaces, SpacesModel } from './spaces/spaces' export interface SyncAgentConfig { context: SignerContext @@ -37,11 +38,12 @@ export class SyncAgent { riverConnection: RiverConnection store: Store user: User - //spaces: Spaces + spaces: Spaces // flattened observables - just pointers to the observable objects in the models observables: { riverStreamNodeUrls: PersistedObservable + spaces: PersistedObservable user: PersistedObservable userAuthStatus: Observable userMemberships: PersistedObservable @@ -71,10 +73,12 @@ export class SyncAgent { rpcRetryParams: config.retryParams, }) this.user = new User(this.userId, this.store, this.riverConnection, this.spaceDapp) + this.spaces = new Spaces(this.riverConnection, this.user, this.store) // flatten out the observables this.observables = { riverStreamNodeUrls: this.riverConnection.streamNodeUrls, + spaces: this.spaces, user: this.user, userAuthStatus: this.user.authStatus, userMemberships: this.user.streams.memberships, diff --git a/packages/sdk/src/sync-agent/syncAgents.test.ts b/packages/sdk/src/sync-agent/syncAgents.test.ts new file mode 100644 index 000000000..8720e152b --- /dev/null +++ b/packages/sdk/src/sync-agent/syncAgents.test.ts @@ -0,0 +1,35 @@ +/** + * @group with-entitilements + */ +import { dlogger } from '@river-build/dlog' +import { SyncAgent } from './syncAgent' +import { TestUser } from './utils/testUser.test' + +const logger = dlogger('csb:test:syncAgents') + +describe('syncAgents.test.ts', () => { + let bobUser: TestUser + let aliceUser: TestUser + let bob: SyncAgent + let alice: SyncAgent + + beforeEach(async () => { + bobUser = new TestUser() + aliceUser = new TestUser() + bob = await bobUser.makeSyncAgent() + alice = await aliceUser.makeSyncAgent() + }) + + afterEach(async () => { + await bob.stop() + await alice.stop() + }) + + test('syncAgents', async () => { + logger.log('syncAgents') + await Promise.all([bob.start(), alice.start()]) + + const { spaceId } = await bob.user.createSpace({ spaceName: 'BlastOff' }, bobUser.signer) + expect(spaceId).toBeDefined() + }) +})