Skip to content

Commit

Permalink
SyncAgent, stub out spaces and channels (#296)
Browse files Browse the repository at this point in the history
  • Loading branch information
texuf authored Jun 27, 2024
1 parent 522b463 commit 183425f
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 4 deletions.
3 changes: 3 additions & 0 deletions packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ export * from './sync-agent/db'
export * from './sync-agent/entitlements/entitlements'
export * from './sync-agent/river-connection/models/streamNodeUrls'
export * from './sync-agent/river-connection/riverConnection'
export * from './sync-agent/spaces/models/channel'
export * from './sync-agent/spaces/models/space'
export * from './sync-agent/spaces/spaces'
export * from './sync-agent/syncAgent'
export * from './sync-agent/user/models/userDeviceKeys'
export * from './sync-agent/user/models/userInbox'
Expand Down
6 changes: 3 additions & 3 deletions packages/sdk/src/observable/persistedObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ export class PersistedObservable<T extends Identifiable>
extends Observable<PersistedModel<T>>
implements Storable
{
private tableName: string = ''
private readonly store: Store
private readonly loadPriority: LoadPriority
protected tableName: string = ''
protected readonly store: Store
protected readonly loadPriority: LoadPriority

// must be called in a store transaction
constructor(initialValue: T, store: Store, loadPriority: LoadPriority = LoadPriority.low) {
Expand Down
14 changes: 14 additions & 0 deletions packages/sdk/src/store/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ function makeSchema(classes: any[]) {
export class Store {
private db: Dexie
private transactionGroup?: TransactionGroup
private isLoadedMap: Record<string, Set<string>> = {}

constructor(name: string, version: number, classes: any[]) {
const schema = makeSchema(classes)
Expand Down Expand Up @@ -131,6 +132,8 @@ export class Store {
log('+enqueue load', tableName, id, loadPriority)
this.checkTableName(tableName)
check(this.transactionGroup !== undefined, 'transaction not started')
check(!this.isLoaded(tableName, id), `model already loaded table: ${tableName} id: ${id}`)
this.setIsLoaded(tableName, id)
const bundler = this.transactionGroup[loadPriority]
bundler.tableNames.push(tableName)
const dbOp = async () => {
Expand Down Expand Up @@ -170,4 +173,15 @@ export class Store {
bundler.dbOps.push(dbOp)
bundler.onCommitted.push(onCommitted)
}

private isLoaded(tableName: string, id: string): boolean {
return this.isLoadedMap[tableName]?.has(id) ?? false
}

private setIsLoaded(tableName: string, id: string) {
if (this.isLoadedMap[tableName] === undefined) {
this.isLoadedMap[tableName] = new Set<string>()
}
this.isLoadedMap[tableName].add(id)
}
}
6 changes: 6 additions & 0 deletions packages/sdk/src/sync-agent/db.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { StreamNodeUrls } from './river-connection/models/streamNodeUrls'
import { Channel } from './spaces/models/channel'
import { Space } from './spaces/models/space'
import { Spaces } from './spaces/spaces'
import { UserDeviceKeys } from './user/models/userDeviceKeys'
import { UserInbox } from './user/models/userInbox'
import { UserMemberships } from './user/models/userMemberships'
Expand All @@ -7,6 +10,9 @@ import { User } from './user/user'

export const DB_VERSION = 1
export const DB_MODELS = [
Channel,
Space,
Spaces,
StreamNodeUrls,
User,
UserDeviceKeys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { CryptoStore, EntitlementsDelegate } from '@river-build/encryption'
import { Client } from '../../client'
import { SignerContext } from '../../signerContext'
import { PersistedModel } from '../../observable/persistedObservable'
import { userIdFromAddress } from '../../id'

const logger = dlogger('csb:riverConnection')

Expand Down Expand Up @@ -41,6 +42,10 @@ export class RiverConnection {
this.streamNodeUrls.subscribe(this.onNodeUrlsChanged, { fireImediately: true })
}

get userId(): string {
return userIdFromAddress(this.clientParams.signerContext.creatorAddress)
}

async stop() {
this.stopped = true
this.streamNodeUrls.unsubscribe(this.onNodeUrlsChanged)
Expand Down
35 changes: 35 additions & 0 deletions packages/sdk/src/sync-agent/spaces/models/channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { Client } from '../../../client'
import { PersistedObservable, persistedObservable } from '../../../observable/persistedObservable'
import { Identifiable, Store } from '../../../store/store'
import { RiverConnection } from '../../river-connection/riverConnection'

export interface ChannelMetadata {
name: string
}

export interface ChannelModel extends Identifiable {
id: string
spaceId: string
isJoined: boolean
metadata?: ChannelMetadata
}

@persistedObservable({ tableName: 'channel' })
export class Channel extends PersistedObservable<ChannelModel> {
constructor(
id: string,
spaceId: string,
private riverConnection: RiverConnection,
store: Store,
) {
super({ id, spaceId, isJoined: false }, store)
}

protected override async onLoaded() {
this.riverConnection.registerView(this.onClientStarted)
}

private onClientStarted = (_client: Client) => {
return () => {}
}
}
29 changes: 29 additions & 0 deletions packages/sdk/src/sync-agent/spaces/models/space.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Client } from '../../../client'
import { PersistedObservable, persistedObservable } from '../../../observable/persistedObservable'
import { Identifiable, Store } from '../../../store/store'
import { RiverConnection } from '../../river-connection/riverConnection'

export interface SpaceMetadata {
name: string
}

export interface SpaceModel extends Identifiable {
id: string
channelIds: string[]
metadata?: SpaceMetadata
}

@persistedObservable({ tableName: 'space' })
export class Space extends PersistedObservable<SpaceModel> {
constructor(id: string, private riverConnection: RiverConnection, store: Store) {
super({ id, channelIds: [] }, store)
}

protected override async onLoaded() {
this.riverConnection.registerView(this.onClientStarted)
}

private onClientStarted = (_client: Client) => {
return () => {}
}
}
31 changes: 31 additions & 0 deletions packages/sdk/src/sync-agent/spaces/spaces.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* @group with-entitilements
*/
import { dlogger } from '@river-build/dlog'
import { TestUser } from '../utils/testUser.test'

const logger = dlogger('csb:test:spaces')

describe('spaces.test.ts', () => {
logger.log('start')
const testUser = new TestUser()

test('create/leave/join space', async () => {
const syncAgent = await testUser.makeSyncAgent()
await syncAgent.start()
expect(syncAgent.spaces.value.status).not.toBe('loading')
const { spaceId } = await syncAgent.user.createSpace(
{ spaceName: 'BlastOff' },
testUser.signer,
)
expect(syncAgent.spaces.data.spaceIds.length).toBe(1)
expect(syncAgent.spaces.data.spaceIds[0]).toBe(spaceId)
// expect(bob.spaces.getSpaces().length).toBe(1)
// expect(bob.spaces.getSpaces()[0].id).toBe(spaceId)
// expect(bob.spaces.getSpace(spaceId)).toBeDefined()
// const space = bob.spaces.getSpace(spaceId)!
// expect(space.data.spaceIds.length).toBe(1)
// await waitFor(() => expect(space.data.spaceIds.length).toBe(1)
await syncAgent.stop()
})
})
60 changes: 60 additions & 0 deletions packages/sdk/src/sync-agent/spaces/spaces.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { Identifiable, Store } from '../../store/store'
import {
PersistedModel,
PersistedObservable,
persistedObservable,
} from '../../observable/persistedObservable'
import { Space } from './models/space'
import { User } from '../user/user'
import { UserMembershipsModel } from '../user/models/userMemberships'
import { MembershipOp } from '@river-build/proto'
import { isSpaceStreamId } from '../../id'
import { RiverConnection } from '../river-connection/riverConnection'

export interface SpacesModel extends Identifiable {
id: '0' // single data blobs need a fixed key
spaceIds: string[] // joined spaces
}

@persistedObservable({ tableName: 'spaces' })
export class Spaces extends PersistedObservable<SpacesModel> {
private spaces: Record<string, Space> = {}
private user: User
private riverConnection: RiverConnection

constructor(riverConnection: RiverConnection, user: User, store: Store) {
super({ id: '0', spaceIds: [] }, store)
this.riverConnection = riverConnection
this.user = user
}

protected override async onLoaded() {
this.user.streams.memberships.subscribe(
(userMemberships) => {
this.onUserDataChanged(userMemberships)
},
{ fireImediately: true },
)
}

getSpace(spaceId: string): Space | undefined {
return this.spaces[spaceId]
}

private onUserDataChanged(userData: PersistedModel<UserMembershipsModel>) {
if (userData.status === 'loading') {
return
}
const spaceIds = Object.values(userData.data.memberships)
.filter((m) => isSpaceStreamId(m.streamId) && m.op === MembershipOp.SO_JOIN)
.map((m) => m.streamId)

this.setData({ spaceIds })

for (const spaceId of spaceIds) {
if (!this.spaces[spaceId]) {
this.spaces[spaceId] = new Space(spaceId, this.riverConnection, this.store)
}
}
}
}
6 changes: 5 additions & 1 deletion packages/sdk/src/sync-agent/syncAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { UserInboxModel } from './user/models/userInbox'
import { DB_MODELS, DB_VERSION } from './db'
import { UserDeviceKeysModel } from './user/models/userDeviceKeys'
import { UserSettingsModel } from './user/models/userSettings'
import { Spaces, SpacesModel } from './spaces/spaces'

export interface SyncAgentConfig {
context: SignerContext
Expand All @@ -37,11 +38,12 @@ export class SyncAgent {
riverConnection: RiverConnection
store: Store
user: User
//spaces: Spaces
spaces: Spaces

// flattened observables - just pointers to the observable objects in the models
observables: {
riverStreamNodeUrls: PersistedObservable<StreamNodeUrlsModel>
spaces: PersistedObservable<SpacesModel>
user: PersistedObservable<UserModel>
userAuthStatus: Observable<AuthStatus>
userMemberships: PersistedObservable<UserMembershipsModel>
Expand Down Expand Up @@ -71,10 +73,12 @@ export class SyncAgent {
rpcRetryParams: config.retryParams,
})
this.user = new User(this.userId, this.store, this.riverConnection, this.spaceDapp)
this.spaces = new Spaces(this.riverConnection, this.user, this.store)

// flatten out the observables
this.observables = {
riverStreamNodeUrls: this.riverConnection.streamNodeUrls,
spaces: this.spaces,
user: this.user,
userAuthStatus: this.user.authStatus,
userMemberships: this.user.streams.memberships,
Expand Down
35 changes: 35 additions & 0 deletions packages/sdk/src/sync-agent/syncAgents.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* @group with-entitilements
*/
import { dlogger } from '@river-build/dlog'
import { SyncAgent } from './syncAgent'
import { TestUser } from './utils/testUser.test'

const logger = dlogger('csb:test:syncAgents')

describe('syncAgents.test.ts', () => {
let bobUser: TestUser
let aliceUser: TestUser
let bob: SyncAgent
let alice: SyncAgent

beforeEach(async () => {
bobUser = new TestUser()
aliceUser = new TestUser()
bob = await bobUser.makeSyncAgent()
alice = await aliceUser.makeSyncAgent()
})

afterEach(async () => {
await bob.stop()
await alice.stop()
})

test('syncAgents', async () => {
logger.log('syncAgents')
await Promise.all([bob.start(), alice.start()])

const { spaceId } = await bob.user.createSpace({ spaceName: 'BlastOff' }, bobUser.signer)
expect(spaceId).toBeDefined()
})
})

0 comments on commit 183425f

Please sign in to comment.