From f4801c70fd08aeba72ea2e676341cc059641937f Mon Sep 17 00:00:00 2001 From: texuf Date: Wed, 26 Jun 2024 11:48:21 -0700 Subject: [PATCH] SyncAgent - create space, user streams, observable dock (#287) slowly copying code over from the stress client, leaving places for the app view logic to go --- packages/sdk/src/client.ts | 2 +- packages/sdk/src/observable/observable.ts | 8 +- .../sdk/src/observable/persistedObservable.ts | 58 ++++++------ packages/sdk/src/streamEvents.ts | 7 ++ .../sdk/src/streamStateView_UserDeviceKey.ts | 8 +- packages/sdk/src/streamStateView_UserInbox.ts | 16 +++- .../{riverNodeUrls.ts => streamNodeUrls.ts} | 6 +- .../river-connection/riverConnection.test.ts | 28 +++--- .../river-connection/riverConnection.ts | 43 ++++----- packages/sdk/src/sync-agent/syncAgent.test.ts | 44 ++++++++-- packages/sdk/src/sync-agent/syncAgent.ts | 42 +++++++-- packages/sdk/src/sync-agent/syncAgentStore.ts | 17 ++++ .../sync-agent/user/models/userDeviceKeys.ts | 79 +++++++++++++++-- .../src/sync-agent/user/models/userInbox.ts | 86 ++++++++++++++++-- .../sync-agent/user/models/userMemberships.ts | 31 +++---- .../sync-agent/user/models/userSettings.ts | 56 ++++++++++-- packages/sdk/src/sync-agent/user/user.test.ts | 63 +++++++------ packages/sdk/src/sync-agent/user/user.ts | 88 ++++++++++++++----- .../sdk/src/sync-agent/utils/spaceUtils.ts | 39 ++++++++ .../sync-agent/utils/syncAgentUtils.test.ts | 40 ++++++++- packages/web3/src/LocalhostWeb3Provider.ts | 4 + 21 files changed, 576 insertions(+), 189 deletions(-) rename packages/sdk/src/sync-agent/river-connection/models/{riverNodeUrls.ts => streamNodeUrls.ts} (90%) create mode 100644 packages/sdk/src/sync-agent/syncAgentStore.ts create mode 100644 packages/sdk/src/sync-agent/utils/spaceUtils.ts diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index a91e7590e..da802c0d9 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -234,7 +234,7 @@ export class Client return this.syncedStreamsExtensions.initStatus } - get cryptoEnabled(): boolean { + get cryptoInitialized(): boolean { return this.cryptoBackend !== undefined } diff --git a/packages/sdk/src/observable/observable.ts b/packages/sdk/src/observable/observable.ts index 081c70adc..aeab01f20 100644 --- a/packages/sdk/src/observable/observable.ts +++ b/packages/sdk/src/observable/observable.ts @@ -1,6 +1,6 @@ export class Observable { - subscribers: ((value: T) => void)[] = [] - private _value: T + protected subscribers: ((value: T) => void)[] = [] + protected _value: T constructor(value: T) { this._value = value @@ -10,8 +10,8 @@ export class Observable { return this._value } - set(value: T) { - this._value = value + setValue(newValue: T) { + this._value = newValue this.notify() } diff --git a/packages/sdk/src/observable/persistedObservable.ts b/packages/sdk/src/observable/persistedObservable.ts index d6cb28e9c..4eb899f68 100644 --- a/packages/sdk/src/observable/persistedObservable.ts +++ b/packages/sdk/src/observable/persistedObservable.ts @@ -14,10 +14,7 @@ export type PersistedModel = | { status: 'saving'; data: T } | { status: 'saved'; data: T } -interface Storable { - tableName: string - load(): void -} +interface Storable {} const all_tables = new Set() @@ -30,8 +27,10 @@ export function persistedObservable(options: PersistedOpts) { constructor(...args: any[]) { // eslint-disable-next-line @typescript-eslint/no-unsafe-argument super(...args) - this.tableName = options.tableName - this.load() + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + ;(this as any).tableName = options.tableName + // eslint-disable-next-line @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-member-access + ;(this as any).load() } static tableName = options.tableName } @@ -42,30 +41,28 @@ export class PersistedObservable extends Observable> implements Storable { + private tableName: string = '' private readonly store: Store - tableName: string = '' - readonly loadPriority: LoadPriority - readonly id: string + private readonly loadPriority: LoadPriority // must be called in a store transaction constructor(initialValue: T, store: Store, loadPriority: LoadPriority = LoadPriority.low) { super({ status: 'loading', data: initialValue }) - this.id = initialValue.id this.loadPriority = loadPriority this.store = store } - load() { - check(this.value.status === 'loading', 'already loaded') + protected load() { + check(super.value.status === 'loading', 'already loaded') this.store.load( this.tableName, - this.id, + this.data.id, this.loadPriority, (data?: T) => { - super.set({ status: 'loaded', data: data ?? this.data }) + super.setValue({ status: 'loaded', data: data ?? this.data }) }, (error: Error) => { - super.set({ status: 'error', data: this.data, error }) + super.setValue({ status: 'error', data: this.data, error }) }, async () => { await this.onLoaded() @@ -73,29 +70,34 @@ export class PersistedObservable ) } - get data(): T { - return this.value.data + override get value(): PersistedModel { + return super.value } - set(_: PersistedModel) { - throw new Error('use update method to update') + override setValue(_newValue: PersistedModel) { + throw new Error('use updateData instead of set value') + } + + get data(): T { + return super.value.data } // must be called in a store transaction - update(data: T) { - check(isDefined(data), 'value is undefined') - check(data.id === this.id, 'id mismatch') - super.set({ status: 'saving', data: data }) + setData(newDataPartial: Partial) { + check(isDefined(newDataPartial), 'value is undefined') + const newData = { ...this.data, ...newDataPartial } + check(newData.id === this.data.id, 'id mismatch') + super.setValue({ status: 'saving', data: newData }) this.store - .withTransaction(`update-${this.tableName}:${this.id}`, () => { + .withTransaction(`update-${this.tableName}:${this.data.id}`, () => { this.store.save( this.tableName, - data, + newData, () => { - super.set({ status: 'saved', data: data }) + super.setValue({ status: 'saved', data: newData }) }, (e) => { - super.set({ status: 'error', data: data, error: e }) + super.setValue({ status: 'error', data: newData, error: e }) }, async () => { await this.onSaved() @@ -103,7 +105,7 @@ export class PersistedObservable ) }) .catch((e) => { - super.set({ status: 'error', data: this.data, error: e }) + super.setValue({ status: 'error', data: this.data, error: e }) }) } diff --git a/packages/sdk/src/streamEvents.ts b/packages/sdk/src/streamEvents.ts index 45a6de09a..af37653c3 100644 --- a/packages/sdk/src/streamEvents.ts +++ b/packages/sdk/src/streamEvents.ts @@ -4,6 +4,7 @@ import { UserInboxPayload_GroupEncryptionSessions, UserSettingsPayload_UserBlock, UserPayload_UserMembership, + UserInboxPayload_Snapshot_DeviceSummary, } from '@river-build/proto' import { @@ -62,6 +63,12 @@ export type StreamStateEvents = { userInvitedToStream: (streamId: string) => void userLeftStream: (streamId: string) => void userStreamMembershipChanged: (streamId: string, payload: UserPayload_UserMembership) => void + userInboxDeviceSummaryUpdated: ( + streamId: string, + deviceKey: string, + summary: UserInboxPayload_Snapshot_DeviceSummary, + ) => void + userDeviceKeysUpdated: (streamId: string, deviceKeys: UserDevice[]) => void spaceChannelCreated: (spaceId: string, channelId: string) => void spaceChannelUpdated: (spaceId: string, channelId: string) => void spaceChannelDeleted: (spaceId: string, channelId: string) => void diff --git a/packages/sdk/src/streamStateView_UserDeviceKey.ts b/packages/sdk/src/streamStateView_UserDeviceKey.ts index 3f3c3066a..d4e65bd6b 100644 --- a/packages/sdk/src/streamStateView_UserDeviceKey.ts +++ b/packages/sdk/src/streamStateView_UserDeviceKey.ts @@ -33,7 +33,7 @@ export class StreamStateView_UserDeviceKeys extends StreamStateView_AbstractCont ): void { // dispatch events for all device keys, todo this seems inefficient? for (const value of content.encryptionDevices) { - this.addUserDeviceKey(value, encryptionEmitter) + this.addUserDeviceKey(value, encryptionEmitter, undefined) } } @@ -50,7 +50,7 @@ export class StreamStateView_UserDeviceKeys extends StreamStateView_AbstractCont event: RemoteTimelineEvent, _cleartext: string | undefined, encryptionEmitter: TypedEmitter | undefined, - _stateEmitter: TypedEmitter | undefined, + stateEmitter: TypedEmitter | undefined, ): void { check(event.remoteEvent.event.payload.case === 'userDeviceKeyPayload') const payload: UserDeviceKeyPayload = event.remoteEvent.event.payload.value @@ -58,7 +58,7 @@ export class StreamStateView_UserDeviceKeys extends StreamStateView_AbstractCont case 'inception': break case 'encryptionDevice': - this.addUserDeviceKey(payload.content.value, encryptionEmitter) + this.addUserDeviceKey(payload.content.value, encryptionEmitter, stateEmitter) break case undefined: break @@ -70,6 +70,7 @@ export class StreamStateView_UserDeviceKeys extends StreamStateView_AbstractCont private addUserDeviceKey( value: UserDeviceKeyPayload_EncryptionDevice, encryptionEmitter: TypedEmitter | undefined, + stateEmitter: TypedEmitter | undefined, ) { const device = { deviceKey: value.deviceKey, @@ -81,5 +82,6 @@ export class StreamStateView_UserDeviceKeys extends StreamStateView_AbstractCont } this.deviceKeys.push(device) encryptionEmitter?.emit('userDeviceKeyMessage', this.streamId, this.streamCreatorId, device) + stateEmitter?.emit('userDeviceKeysUpdated', this.streamId, this.deviceKeys) } } diff --git a/packages/sdk/src/streamStateView_UserInbox.ts b/packages/sdk/src/streamStateView_UserInbox.ts index c885d6ddb..25e4f958a 100644 --- a/packages/sdk/src/streamStateView_UserInbox.ts +++ b/packages/sdk/src/streamStateView_UserInbox.ts @@ -76,7 +76,7 @@ export class StreamStateView_UserInbox extends StreamStateView_AbstractContent { event: RemoteTimelineEvent, _cleartext: string | undefined, _encryptionEmitter: TypedEmitter | undefined, - _stateEmitter: TypedEmitter | undefined, + stateEmitter: TypedEmitter | undefined, ): void { check(event.remoteEvent.event.payload.case === 'userInboxPayload') const payload: UserInboxPayload = event.remoteEvent.event.payload.value @@ -90,7 +90,7 @@ export class StreamStateView_UserInbox extends StreamStateView_AbstractContent { } break case 'ack': - this.updateDeviceSummary(event.remoteEvent, payload.content.value) + this.updateDeviceSummary(event.remoteEvent, payload.content.value, stateEmitter) break case undefined: break @@ -119,7 +119,11 @@ export class StreamStateView_UserInbox extends StreamStateView_AbstractContent { encryptionEmitter?.emit('newGroupSessions', content, creatorUserId) } - private updateDeviceSummary(event: ParsedEvent, content: UserInboxPayload_Ack) { + private updateDeviceSummary( + event: ParsedEvent, + content: UserInboxPayload_Ack, + stateEmitter: TypedEmitter | undefined, + ) { const summary = this.deviceSummary[content.deviceKey] if (summary) { if (summary.upperBound <= content.miniblockNum) { @@ -128,5 +132,11 @@ export class StreamStateView_UserInbox extends StreamStateView_AbstractContent { summary.lowerBound = content.miniblockNum + 1n } } + stateEmitter?.emit( + 'userInboxDeviceSummaryUpdated', + this.streamId, + content.deviceKey, + summary, + ) } } diff --git a/packages/sdk/src/sync-agent/river-connection/models/riverNodeUrls.ts b/packages/sdk/src/sync-agent/river-connection/models/streamNodeUrls.ts similarity index 90% rename from packages/sdk/src/sync-agent/river-connection/models/riverNodeUrls.ts rename to packages/sdk/src/sync-agent/river-connection/models/streamNodeUrls.ts index 309b15ec4..7fc540098 100644 --- a/packages/sdk/src/sync-agent/river-connection/models/riverNodeUrls.ts +++ b/packages/sdk/src/sync-agent/river-connection/models/streamNodeUrls.ts @@ -8,7 +8,7 @@ import { dlogger } from '@river-build/dlog' const log = dlogger('csb:riverNodeUrls') // Define a data model, this is what will be stored in the database -export interface RiverNodeUrlsModel { +export interface StreamNodeUrlsModel { id: '0' // single data blobs need a fixed key urls: string // here's some data we're trying to keep track of } @@ -17,7 +17,7 @@ export interface RiverNodeUrlsModel { @persistedObservable({ tableName: 'riverNodeUrls', // this is the name of the table in the database }) -export class RiverNodeUrls extends PersistedObservable { +export class StreamNodeUrls extends PersistedObservable { private riverRegistry: RiverRegistry // store any member variables required for logic // The constructor is where we set up the class, we pass in the store and any other dependencies @@ -41,7 +41,7 @@ export class RiverNodeUrls extends PersistedObservable { .getOperationalNodeUrls() // here we are fetching the node urls .then((urls) => { if (urls !== this.data.urls) { - this.update({ ...this.data, urls }) // if the data is new, update our own state + this.setData({ urls }) // if the data is new, update our own state } }) .catch((e) => { diff --git a/packages/sdk/src/sync-agent/river-connection/riverConnection.test.ts b/packages/sdk/src/sync-agent/river-connection/riverConnection.test.ts index d08607182..bca71f44a 100644 --- a/packages/sdk/src/sync-agent/river-connection/riverConnection.test.ts +++ b/packages/sdk/src/sync-agent/river-connection/riverConnection.test.ts @@ -1,12 +1,12 @@ /** - * @group main + * @group with-entitilements */ import { providers } from 'ethers' import { genShortId } from '../../id' import { Store } from '../../store/store' import { makeRiverConfig } from '../../riverConfig' -import { RiverNodeUrls } from './models/riverNodeUrls' +import { StreamNodeUrls } from './models/streamNodeUrls' import { RiverRegistry, SpaceDapp } from '@river-build/web3' import { RiverConnection } from './riverConnection' import { makeRandomUserContext, waitFor } from '../../util.test' @@ -25,48 +25,50 @@ describe('RiverConnection.test.ts', () => { // init const context = await makeRandomUserContext() const clientParams = makeClientParams({ context, riverConfig }, spaceDapp) - const store = new Store(databaseName, 1, [RiverNodeUrls]) + const store = new Store(databaseName, 1, [StreamNodeUrls]) store.newTransactionGroup('init') const riverRegistry = new RiverRegistry(riverConfig.river.chainConfig, riverProvider) const riverConnection = new RiverConnection(store, riverRegistry, clientParams) // check initial state - expect(riverConnection.nodeUrls.data.urls).toBe('') - expect(riverConnection.client.value).toBeUndefined() + expect(riverConnection.streamNodeUrls.data.urls).toBe('') + expect(riverConnection.client).toBeUndefined() // load await store.commitTransaction() // we should get there await waitFor(() => { - expect(riverConnection.nodeUrls.data.urls).not.toBe('') + expect(riverConnection.streamNodeUrls.data.urls).not.toBe('') }) await waitFor(() => { - expect(riverConnection.client.value).toBeDefined() + expect(riverConnection.client).toBeDefined() }) await waitFor(() => { - expect(riverConnection.nodeUrls.value.status).toBe('saved') + expect(riverConnection.streamNodeUrls.value.status).toBe('saved') }) + await riverConnection.stop() }) // test that a riverConnection will instantly be defined if data exists in local store test('riverConnection loads from db', async () => { // init const context = await makeRandomUserContext() const clientParams = makeClientParams({ context, riverConfig }, spaceDapp) - const store = new Store(databaseName, 1, [RiverNodeUrls]) + const store = new Store(databaseName, 1, [StreamNodeUrls]) store.newTransactionGroup('init') const riverRegistry = new RiverRegistry(riverConfig.river.chainConfig, riverProvider) const riverConnection = new RiverConnection(store, riverRegistry, clientParams) // check initial state - expect(riverConnection.nodeUrls.data.urls).toBe('') - expect(riverConnection.client.value).toBeUndefined() + expect(riverConnection.streamNodeUrls.data.urls).toBe('') + expect(riverConnection.client).toBeUndefined() // load await store.commitTransaction() // should still be defined before we even start! - expect(riverConnection.nodeUrls.data.urls).not.toBe('') - expect(riverConnection.client.value).toBeDefined() + expect(riverConnection.streamNodeUrls.data.urls).not.toBe('') + expect(riverConnection.client).toBeDefined() + await riverConnection.stop() }) }) diff --git a/packages/sdk/src/sync-agent/river-connection/riverConnection.ts b/packages/sdk/src/sync-agent/river-connection/riverConnection.ts index e18549e15..3083a2af8 100644 --- a/packages/sdk/src/sync-agent/river-connection/riverConnection.ts +++ b/packages/sdk/src/sync-agent/river-connection/riverConnection.ts @@ -1,7 +1,6 @@ import { RiverRegistry } from '@river-build/web3' import { RetryParams, makeStreamRpcClient } from '../../makeStreamRpcClient' -import { Observable } from '../../observable/observable' -import { RiverNodeUrls, RiverNodeUrlsModel } from './models/riverNodeUrls' +import { StreamNodeUrls, StreamNodeUrlsModel } from './models/streamNodeUrls' import { Store } from '../../store/store' import { dlogger } from '@river-build/dlog' import { PromiseQueue } from '../utils/promiseQueue' @@ -23,31 +22,28 @@ export interface ClientParams { } export type OnStoppedFn = () => void - -export interface RiverView { - onClientStarted: (client: Client) => OnStoppedFn -} +export type onClientStartedFn = (client: Client) => OnStoppedFn export class RiverConnection { - client = new Observable(undefined) - nodeUrls: RiverNodeUrls + client?: Client + streamNodeUrls: StreamNodeUrls private riverRegistryDapp: RiverRegistry private clientParams: ClientParams private clientQueue = new PromiseQueue() - private views: RiverView[] = [] + private views: onClientStartedFn[] = [] private onStoppedFns: OnStoppedFn[] = [] private stopped = false constructor(store: Store, riverRegistryDapp: RiverRegistry, clientParams: ClientParams) { this.riverRegistryDapp = riverRegistryDapp this.clientParams = clientParams - this.nodeUrls = new RiverNodeUrls(store, riverRegistryDapp) - this.nodeUrls.subscribe(this.onNodeUrlsChanged, { fireImediately: true }) + this.streamNodeUrls = new StreamNodeUrls(store, riverRegistryDapp) + this.streamNodeUrls.subscribe(this.onNodeUrlsChanged, { fireImediately: true }) } async stop() { this.stopped = true - this.nodeUrls.unsubscribe(this.onNodeUrlsChanged) + this.streamNodeUrls.unsubscribe(this.onNodeUrlsChanged) for (const fn of this.onStoppedFns) { fn() } @@ -55,25 +51,24 @@ export class RiverConnection { } call(fn: (client: Client) => Promise) { - const client = this.client.value - if (client) { - return fn(client) + if (this.client) { + return fn(this.client) } else { // Enqueue the request if client is not available return this.clientQueue.enqueue(fn) } } - registerView(view: RiverView) { - if (this.client.value) { - const onStopFn = view.onClientStarted(this.client.value) + registerView(viewFn: onClientStartedFn) { + if (this.client) { + const onStopFn = viewFn(this.client) this.onStoppedFns.push(onStopFn) } - this.views.push(view) + this.views.push(viewFn) } - private onNodeUrlsChanged = (value: PersistedModel) => { - if (this.client.value !== undefined) { + private onNodeUrlsChanged = (value: PersistedModel) => { + if (this.client !== undefined) { logger.log('RiverConnection: rpc urls changed, client already set', value) return } @@ -97,11 +92,11 @@ export class RiverConnection { this.clientParams.logNamespaceFilter, this.clientParams.highPriorityStreamIds, ) - this.client.set(client) + this.client = client this.clientQueue.flush(client) // New rpcClient is available, resolve all queued requests // initialize views - this.views.forEach((view) => { - const onStopFn = view.onClientStarted(client) + this.views.forEach((viewFn) => { + const onStopFn = viewFn(client) this.onStoppedFns.push(onStopFn) }) } diff --git a/packages/sdk/src/sync-agent/syncAgent.test.ts b/packages/sdk/src/sync-agent/syncAgent.test.ts index 971169544..6ca7ab4dc 100644 --- a/packages/sdk/src/sync-agent/syncAgent.test.ts +++ b/packages/sdk/src/sync-agent/syncAgent.test.ts @@ -1,28 +1,53 @@ /** - * @group main + * @group with-entitilements */ import { Wallet } from 'ethers' import { makeSignerContext } from '../signerContext' import { makeRiverConfig } from '../riverConfig' import { SyncAgent } from './syncAgent' +import { AuthStatus } from './user/user' +import { dlogger } from '@river-build/dlog' +import { waitFor } from '../util.test' +import { LocalhostWeb3Provider } from '@river-build/web3' +import { MembershipOp } from '@river-build/proto' + +const logger = dlogger('csb:test:syncAgent') describe('syncAgent.test.ts', () => { + const riverConfig = makeRiverConfig() const rootWallet = Wallet.createRandom() const delegateWallet = Wallet.createRandom() - const riverConfig = makeRiverConfig() + const web3Provider = new LocalhostWeb3Provider(riverConfig.base.rpcUrl, rootWallet) + test('syncAgent', async () => { + await web3Provider.fundWallet() const signerContext = await makeSignerContext(rootWallet, delegateWallet, { days: 1 }) const syncAgent = new SyncAgent({ context: signerContext, riverConfig }) expect(syncAgent.user.value.status).toBe('loading') await syncAgent.start() expect(syncAgent.user.value.status).toBe('loaded') - expect(syncAgent.user.value.data.initialized).toBe(false) - syncAgent.store.newTransactionGroup('initializeUser') - await syncAgent.user.initialize() - expect(syncAgent.user.value.data.initialized).toBe(true) + expect(syncAgent.user.data.initialized).toBe(false) + expect(syncAgent.user.authStatus.value).toBe(AuthStatus.None) + expect(Object.keys(syncAgent.user.streams.memberships.data.memberships).length).toBe(0) + syncAgent.store.newTransactionGroup('createSpace') + const { spaceId, defaultChannelId } = await syncAgent.user.createSpace( + { spaceName: 'BlastOff' }, + web3Provider.signer, + ) + logger.log('spaceId', spaceId) + expect(Object.keys(syncAgent.user.streams.memberships.data.memberships).length).toBe(2) + expect(syncAgent.user.streams.memberships.data.memberships[spaceId].op).toBe( + MembershipOp.SO_JOIN, + ) + expect(syncAgent.user.streams.memberships.data.memberships[defaultChannelId].op).toBe( + MembershipOp.SO_JOIN, + ) + expect(syncAgent.user.authStatus.value).toBe(AuthStatus.ConnectedToRiver) + expect(syncAgent.user.data.initialized).toBe(true) expect(syncAgent.user.value.status).toBe('saving') await syncAgent.store.commitTransaction() expect(syncAgent.user.value.status).toBe('saved') + await syncAgent.stop() }) test('syncAgent loads again', async () => { const signerContext = await makeSignerContext(rootWallet, delegateWallet, { days: 1 }) @@ -30,6 +55,11 @@ describe('syncAgent.test.ts', () => { expect(syncAgent.user.value.status).toBe('loading') await syncAgent.start() expect(syncAgent.user.value.status).toBe('loaded') - expect(syncAgent.user.value.data.initialized).toBe(true) + expect(syncAgent.user.data.initialized).toBe(true) + expect(syncAgent.user.authStatus.value).toBe(AuthStatus.EvaluatingCredentials) + await waitFor(() => { + expect(syncAgent.user.authStatus.value).toBe(AuthStatus.ConnectedToRiver) + }) + await syncAgent.stop() }) }) diff --git a/packages/sdk/src/sync-agent/syncAgent.ts b/packages/sdk/src/sync-agent/syncAgent.ts index dc80864f4..edf8c296e 100644 --- a/packages/sdk/src/sync-agent/syncAgent.ts +++ b/packages/sdk/src/sync-agent/syncAgent.ts @@ -6,12 +6,18 @@ import { RetryParams } from '../makeStreamRpcClient' import { Store } from '../store/store' import { SignerContext } from '../signerContext' import { userIdFromAddress } from '../id' -import { RiverNodeUrls } from './river-connection/models/riverNodeUrls' -import { User } from './user/user' +import { StreamNodeUrlsModel } from './river-connection/models/streamNodeUrls' +import { AuthStatus, User, UserModel } from './user/user' import { makeBaseProvider, makeRiverProvider } from './utils/providers' -import { UserMemberships } from './user/models/userMemberships' +import { UserMembershipsModel } from './user/models/userMemberships' import { RiverDbManager } from '../riverDbManager' import { Entitlements } from './entitlements/entitlements' +import { PersistedObservable } from '../observable/persistedObservable' +import { Observable } from '../observable/observable' +import { UserInboxModel } from './user/models/userInbox' +import { SyncAgentStore } from './syncAgentStore' +import { UserDeviceKeysModel } from './user/models/userDeviceKeys' +import { UserSettingsModel } from './user/models/userSettings' export interface SyncAgentConfig { context: SignerContext @@ -33,6 +39,17 @@ export class SyncAgent { user: User //spaces: Spaces + // flattened observables - just pointers to the observable objects in the models + observables: { + riverStreamNodeUrls: PersistedObservable + user: PersistedObservable + userAuthStatus: Observable + userMemberships: PersistedObservable + userInbox: PersistedObservable + userDeviceKeys: PersistedObservable + userSettings: PersistedObservable + } + constructor(config: SyncAgentConfig) { this.userId = userIdFromAddress(config.context.creatorAddress) this.config = config @@ -40,11 +57,7 @@ export class SyncAgent { const river = config.riverConfig.river this.baseProvider = makeBaseProvider(config.riverConfig) this.riverProvider = makeRiverProvider(config.riverConfig) - this.store = new Store(`syncAgent-${this.userId}`, 1, [ - RiverNodeUrls, - User, - UserMemberships, - ]) + this.store = new SyncAgentStore(this.userId) this.store.newTransactionGroup('SyncAgent::initalization') this.spaceDapp = new SpaceDapp(base.chainConfig, this.baseProvider) this.riverRegistryDapp = new RiverRegistry(river.chainConfig, this.riverProvider) @@ -57,7 +70,18 @@ export class SyncAgent { highPriorityStreamIds: this.config.highPriorityStreamIds, rpcRetryParams: config.retryParams, }) - this.user = new User(this.userId, this.store, this.riverConnection) + this.user = new User(this.userId, this.store, this.riverConnection, this.spaceDapp) + + // flatten out the observables + this.observables = { + riverStreamNodeUrls: this.riverConnection.streamNodeUrls, + user: this.user, + userAuthStatus: this.user.authStatus, + userMemberships: this.user.streams.memberships, + userInbox: this.user.streams.inbox, + userDeviceKeys: this.user.streams.deviceKeys, + userSettings: this.user.streams.settings, + } } async start() { diff --git a/packages/sdk/src/sync-agent/syncAgentStore.ts b/packages/sdk/src/sync-agent/syncAgentStore.ts new file mode 100644 index 000000000..0b04fd0d0 --- /dev/null +++ b/packages/sdk/src/sync-agent/syncAgentStore.ts @@ -0,0 +1,17 @@ +import { Store } from '../store/store' +import { StreamNodeUrls } from './river-connection/models/streamNodeUrls' +import { UserDeviceKeys } from './user/models/userDeviceKeys' +import { UserInbox } from './user/models/userInbox' +import { UserMemberships } from './user/models/userMemberships' +import { UserSettings } from './user/models/userSettings' +import { User } from './user/user' + +const VERSION = 1 +const DB_NAME = (userId: string) => `syncAgent-${userId}` +const MODELS = [StreamNodeUrls, User, UserDeviceKeys, UserInbox, UserMemberships, UserSettings] + +export class SyncAgentStore extends Store { + constructor(userId: string) { + super(DB_NAME(userId), VERSION, MODELS) + } +} diff --git a/packages/sdk/src/sync-agent/user/models/userDeviceKeys.ts b/packages/sdk/src/sync-agent/user/models/userDeviceKeys.ts index a7a2d9f95..183c500ca 100644 --- a/packages/sdk/src/sync-agent/user/models/userDeviceKeys.ts +++ b/packages/sdk/src/sync-agent/user/models/userDeviceKeys.ts @@ -1,15 +1,78 @@ -import { dlogger } from '@river-build/dlog' -import { Store } from '../../../store/store' +import { check, dlogger } from '@river-build/dlog' +import { LoadPriority, Store } from '../../../store/store' +import { UserDevice } from '@river-build/encryption' +import { PersistedObservable, persistedObservable } from '../../../observable/persistedObservable' +import { makeUserDeviceKeyStreamId } from '../../../id' +import { RiverConnection } from '../../river-connection/riverConnection' +import { StreamStateView } from '../../../streamStateView' +import { isDefined } from '../../../check' +import { Client } from '../../../client' const logger = dlogger('csb:userDeviceKeys') -export class UserDeviceKeys { - constructor(id: string, store: Store) { - logger.log('new', id, store) +export interface UserDeviceKeysModel { + id: string + streamId: string + initialized: boolean + deviceId?: string + deviceKeys: UserDevice[] +} + +@persistedObservable({ tableName: 'userDeviceKeys' }) +export class UserDeviceKeys extends PersistedObservable { + constructor(id: string, store: Store, private riverConnection: RiverConnection) { + super( + { id, streamId: makeUserDeviceKeyStreamId(id), initialized: false, deviceKeys: [] }, + store, + LoadPriority.high, + ) + } + + protected override async onLoaded() { + this.riverConnection.registerView(this.onClientStarted) + } + + private onClientStarted = (client: Client) => { + logger.log('onClientStarted') + if (this.riverConnection.client?.cryptoInitialized) { + const deviceId = this.riverConnection.client.userDeviceKey().deviceKey + const streamView = this.riverConnection.client.stream(this.data.streamId)?.view + if (streamView && deviceId) { + this.initialize(deviceId, streamView) + } else if (deviceId) { + this.setData({ deviceId }) + } + } + client.addListener('userDeviceKeysUpdated', this.onUserDeviceKeysUpdated) + client.addListener('streamInitialized', this.onStreamInitialized) + return () => { + client.removeListener('userDeviceKeysUpdated', this.onUserDeviceKeysUpdated) + client.removeListener('streamInitialized', this.onStreamInitialized) + } + } + + private onStreamInitialized = (streamId: string) => { + if (streamId === this.data.streamId) { + const streamView = this.riverConnection.client?.stream(this.data.streamId)?.view + const deviceId = this.riverConnection.client?.userDeviceKey().deviceKey + check(isDefined(deviceId), 'deviceId is not defined') + check(isDefined(streamView), 'streamView is not defined') + this.initialize(deviceId, streamView) + } + } + + private onUserDeviceKeysUpdated = (streamId: string, deviceKeys: UserDevice[]) => { + if (streamId === this.data.streamId) { + logger.log('updated', streamId, deviceKeys) + this.setData({ deviceKeys }) + } } - async initialize(metadata?: { spaceId: Uint8Array }) { - logger.log('initialize', metadata) - return Promise.resolve() + private initialize(deviceId: string, streamView: StreamStateView) { + this.setData({ + initialized: true, + deviceId, + deviceKeys: streamView.userDeviceKeyContent.deviceKeys, + }) } } diff --git a/packages/sdk/src/sync-agent/user/models/userInbox.ts b/packages/sdk/src/sync-agent/user/models/userInbox.ts index 2189ef84d..db088e16c 100644 --- a/packages/sdk/src/sync-agent/user/models/userInbox.ts +++ b/packages/sdk/src/sync-agent/user/models/userInbox.ts @@ -1,15 +1,85 @@ -import { dlogger } from '@river-build/dlog' -import { Store } from '../../../store/store' +import { check, dlogger } from '@river-build/dlog' +import { Identifiable, LoadPriority, Store } from '../../../store/store' +import { UserInboxPayload_Snapshot_DeviceSummary } from '@river-build/proto' +import { PersistedObservable, persistedObservable } from '../../../observable/persistedObservable' +import { makeUserInboxStreamId } from '../../../id' +import { RiverConnection } from '../../river-connection/riverConnection' +import { Client } from '../../../client' +import { StreamStateView } from '../../../streamStateView' +import { isDefined } from '../../../check' const logger = dlogger('csb:userInbox') -export class UserInbox { - constructor(id: string, store: Store) { - logger.log('new', id, store) +export interface UserInboxModel extends Identifiable { + id: string + streamId: string + initialized: boolean + deviceId?: string + deviceSummary?: UserInboxPayload_Snapshot_DeviceSummary +} + +@persistedObservable({ tableName: 'userInbox' }) +export class UserInbox extends PersistedObservable { + constructor(id: string, store: Store, private riverConnection: RiverConnection) { + super( + { id, streamId: makeUserInboxStreamId(id), initialized: false }, + store, + LoadPriority.high, + ) + } + + protected override async onLoaded() { + this.riverConnection.registerView(this.onClientStarted) + } + + private onClientStarted = (client: Client) => { + logger.log('onClientStarted') + if (this.riverConnection.client?.cryptoInitialized) { + const deviceId = this.riverConnection.client.userDeviceKey().deviceKey + const streamView = this.riverConnection.client.stream(this.data.streamId)?.view + if (streamView && deviceId) { + this.initialize(deviceId, streamView) + } else if (deviceId) { + this.setData({ deviceId }) + } + } + client.addListener('userInboxDeviceSummaryUpdated', this.onUserInboxDeviceSummaryUpdated) + client.addListener('streamInitialized', this.onStreamInitialized) + return () => { + client.removeListener( + 'userInboxDeviceSummaryUpdated', + this.onUserInboxDeviceSummaryUpdated, + ) + client.removeListener('streamInitialized', this.onStreamInitialized) + } + } + + private onStreamInitialized = (streamId: string) => { + if (streamId === this.data.streamId) { + const streamView = this.riverConnection.client?.stream(this.data.streamId)?.view + const deviceId = this.riverConnection.client?.userDeviceKey().deviceKey + check(isDefined(deviceId), 'deviceId is not defined') + check(isDefined(streamView), 'streamView is not defined') + this.initialize(deviceId, streamView) + } + } + + private onUserInboxDeviceSummaryUpdated = ( + streamId: string, + deviceId: string, + deviceSummary: UserInboxPayload_Snapshot_DeviceSummary, + ) => { + if (streamId === this.data.streamId) { + logger.log('onUserInboxDeviceSummaryUpdated', deviceId, deviceSummary) + this.setData({ deviceId, deviceSummary }) + } } - async initialize(metadata?: { spaceId: Uint8Array }) { - logger.log('initialize', metadata) - return Promise.resolve() + private initialize(deviceId: string, streamView: StreamStateView) { + this.setData({ + initialized: true, + deviceId, + deviceSummary: streamView.userInboxContent.deviceSummary[deviceId], + }) } } diff --git a/packages/sdk/src/sync-agent/user/models/userMemberships.ts b/packages/sdk/src/sync-agent/user/models/userMemberships.ts index 7e408feeb..e5fa3e330 100644 --- a/packages/sdk/src/sync-agent/user/models/userMemberships.ts +++ b/packages/sdk/src/sync-agent/user/models/userMemberships.ts @@ -19,35 +19,31 @@ export interface UserMembership { export interface UserMembershipsModel { id: string + streamId: string initialized: boolean memberships: Record } -export type UserMembershipEvents = { - userJoinedStream: (streamId: string) => void - userInvitedToStream: (streamId: string) => void - userLeftStream: (streamId: string) => void - userStreamMembershipChanged: (streamId: string) => void -} - @persistedObservable({ tableName: 'userMemberships' }) export class UserMemberships extends PersistedObservable { private riverConnection: RiverConnection - private streamId: string constructor(id: string, store: Store, riverConnection: RiverConnection) { - super({ id, initialized: false, memberships: {} }, store, LoadPriority.high) + super( + { id, streamId: makeUserStreamId(id), initialized: false, memberships: {} }, + store, + LoadPriority.high, + ) this.riverConnection = riverConnection - this.streamId = makeUserStreamId(id) } override async onLoaded() { - this.riverConnection.registerView(this) + this.riverConnection.registerView(this.onClientStarted) } - onClientStarted(client: Client) { + private onClientStarted = (client: Client) => { logger.log('onClientStarted') - const streamView = this.riverConnection.client.value?.stream(this.streamId)?.view + const streamView = this.riverConnection.client?.stream(this.data.streamId)?.view if (streamView) { this.initialize(streamView) } @@ -79,12 +75,12 @@ export class UserMemberships extends PersistedObservable { }, {} as Record, ) - this.update({ ...this.data, memberships, initialized: true }) + this.setData({ memberships, initialized: true }) } private onStreamInitialized = (streamId: string) => { - if (streamId === this.streamId) { - const streamView = this.riverConnection.client.value?.stream(this.streamId)?.view + if (streamId === this.data.streamId) { + const streamView = this.riverConnection.client?.stream(this.data.streamId)?.view check(isDefined(streamView), 'streamView is not defined') this.initialize(streamView) } @@ -94,8 +90,7 @@ export class UserMemberships extends PersistedObservable { streamId: string, payload: UserPayload_UserMembership, ) => { - this.update({ - ...this.data, + this.setData({ memberships: { ...this.data.memberships, [streamId]: toUserMembership(payload), diff --git a/packages/sdk/src/sync-agent/user/models/userSettings.ts b/packages/sdk/src/sync-agent/user/models/userSettings.ts index 00a4c9c23..dd35acc33 100644 --- a/packages/sdk/src/sync-agent/user/models/userSettings.ts +++ b/packages/sdk/src/sync-agent/user/models/userSettings.ts @@ -1,15 +1,55 @@ -import { dlogger } from '@river-build/dlog' -import { Store } from '../../../store/store' +import { check, dlogger } from '@river-build/dlog' +import { Identifiable, LoadPriority, Store } from '../../../store/store' +import { PersistedObservable, persistedObservable } from '../../../observable/persistedObservable' +import { RiverConnection } from '../../river-connection/riverConnection' +import { makeUserSettingsStreamId } from '../../../id' +import { StreamStateView } from '../../../streamStateView' +import { Client } from '../../../client' +import { isDefined } from '../../../check' const logger = dlogger('csb:userSettings') -export class UserSettings { - constructor(id: string, store: Store) { - logger.log('new', id, store) +export interface UserSettingsModel extends Identifiable { + id: string + streamId: string + initialized: boolean +} + +@persistedObservable({ tableName: 'userSettings' }) +export class UserSettings extends PersistedObservable { + constructor(id: string, store: Store, private riverConnection: RiverConnection) { + super( + { id, streamId: makeUserSettingsStreamId(id), initialized: false }, + store, + LoadPriority.high, + ) + } + + protected override async onLoaded() { + this.riverConnection.registerView(this.onClientStarted) + } + + private onClientStarted = (client: Client) => { + logger.log('onClientStarted') + const streamView = client.stream(this.data.streamId)?.view + if (streamView) { + this.initialize(streamView) + } + client.addListener('streamInitialized', this.onStreamInitialized) + return () => { + client.removeListener('streamInitialized', this.onStreamInitialized) + } + } + + private onStreamInitialized = (streamId: string) => { + if (streamId === this.data.streamId) { + const streamView = this.riverConnection.client?.stream(this.data.streamId)?.view + check(isDefined(streamView), 'streamView is not defined') + this.initialize(streamView) + } } - async initialize(metadata?: { spaceId: Uint8Array }) { - logger.log('initialize', metadata) - return Promise.resolve() + private initialize = (_streamView: StreamStateView) => { + this.setData({ initialized: true }) } } diff --git a/packages/sdk/src/sync-agent/user/user.test.ts b/packages/sdk/src/sync-agent/user/user.test.ts index 8cea0af8a..d8fb4775b 100644 --- a/packages/sdk/src/sync-agent/user/user.test.ts +++ b/packages/sdk/src/sync-agent/user/user.test.ts @@ -1,76 +1,81 @@ /* eslint-disable @typescript-eslint/no-unnecessary-type-assertion */ /** - * @group main + * @group with-entitilements */ import { dlogger } from '@river-build/dlog' -import { Store } from '../../store/store' import { makeRiverConfig } from '../../riverConfig' -import { genShortId } from '../../id' import { Wallet, providers } from 'ethers' -import { RiverNodeUrls } from '../river-connection/models/riverNodeUrls' import { RiverConnection } from '../river-connection/riverConnection' -import { RiverRegistry, SpaceDapp } from '@river-build/web3' +import { LocalhostWeb3Provider, RiverRegistry, SpaceDapp } from '@river-build/web3' import { User } from './user' -import { UserMemberships } from './models/userMemberships' import { makeUserContextFromWallet } from '../../util.test' import { makeClientParams } from '../utils/syncAgentUtils.test' +import { SyncAgentStore } from '../syncAgentStore' const logger = dlogger('csb:test:user') describe('User.test.ts', () => { logger.log('start') + const rootWallet = Wallet.createRandom() + const userId = rootWallet.address const riverConfig = makeRiverConfig() - const store = new Store(genShortId(), 1, [RiverNodeUrls, UserMemberships, User]) + const store = new SyncAgentStore(userId) store.newTransactionGroup('init') const river = riverConfig.river const riverProvider = new providers.StaticJsonRpcProvider(river.rpcUrl) const base = riverConfig.base const baseProvider = new providers.StaticJsonRpcProvider(base.rpcUrl) + const web3Provider = new LocalhostWeb3Provider(riverConfig.base.rpcUrl, rootWallet) const riverRegistryDapp = new RiverRegistry(river.chainConfig, riverProvider) const spaceDapp = new SpaceDapp(base.chainConfig, baseProvider) - const userWallet = Wallet.createRandom() - const userId = userWallet.address - - test('User initializes from empty', async () => { - const context = await makeUserContextFromWallet(userWallet) + test('User initializes', async () => { + await web3Provider.fundWallet() + const context = await makeUserContextFromWallet(rootWallet) const clientParams = makeClientParams({ context, riverConfig }, spaceDapp) const riverConnection = new RiverConnection(store, riverRegistryDapp, clientParams) - const user = new User(userId, store, riverConnection) + const user = new User(userId, store, riverConnection, spaceDapp) expect(user.data.id).toBe(userId) expect(user.data.initialized).toBe(false) - expect(user.memberships.data.initialized).toBe(false) - //expect(user.inbox.data.initialized).toBe(false) - //expect(user.deviceKeys.data.initialized).toBe(false) - //expect(user.settings.data.initialized).toBe(false) + expect(user.streams.memberships.data.initialized).toBe(false) + expect(user.streams.inbox.data.initialized).toBe(false) + expect(user.streams.deviceKeys.data.initialized).toBe(false) + expect(user.streams.settings.data.initialized).toBe(false) await store.commitTransaction() expect(user.data.id).toBe(userId) expect(user.data.initialized).toBe(false) - expect(user.memberships.data.initialized).toBe(false) - //expect(user.inbox.data.initialized).toBe(false) - //expect(user.deviceKeys.data.initialized).toBe(false) - //expect(user.settings.data.initialized).toBe(false) + expect(user.streams.memberships.data.initialized).toBe(false) + expect(user.streams.inbox.data.initialized).toBe(false) + expect(user.streams.deviceKeys.data.initialized).toBe(false) + expect(user.streams.settings.data.initialized).toBe(false) + + const { spaceId } = await user.createSpace({ spaceName: 'bobs-space' }, web3Provider.signer) + logger.log('created spaceId', spaceId) - await user.initialize() // if we run against non entitled backend, we don't need to pass spaceid expect(user.data.initialized).toBe(true) - expect(user.memberships.data.initialized).toBe(true) - //expect(user.inbox.data.initialized).toBe(false) - //expect(user.deviceKeys.data.initialized).toBe(false) - //expect(user.settings.data.initialized).toBe(false) + expect(user.streams.memberships.data.initialized).toBe(true) + expect(user.streams.inbox.data.initialized).toBe(true) + expect(user.streams.deviceKeys.data.initialized).toBe(true) + expect(user.streams.settings.data.initialized).toBe(true) + await riverConnection.stop() }) test('User loads from db', async () => { store.newTransactionGroup('init2') - const context = await makeUserContextFromWallet(userWallet) + const context = await makeUserContextFromWallet(rootWallet) const clientParams = makeClientParams({ context, riverConfig }, spaceDapp) const riverConnection = new RiverConnection(store, riverRegistryDapp, clientParams) - const user = new User(userId, store, riverConnection) + const user = new User(userId, store, riverConnection, spaceDapp) expect(user.value.status).toBe('loading') await store.commitTransaction() expect(user.value.status).toBe('loaded') expect(user.data.initialized).toBe(true) - expect(user.memberships.data.initialized).toBe(true) + expect(user.streams.memberships.data.initialized).toBe(true) + expect(user.streams.inbox.data.initialized).toBe(true) + expect(user.streams.deviceKeys.data.initialized).toBe(true) + expect(user.streams.settings.data.initialized).toBe(true) + await riverConnection.stop() }) }) diff --git a/packages/sdk/src/sync-agent/user/user.ts b/packages/sdk/src/sync-agent/user/user.ts index 367296075..9c33ad47b 100644 --- a/packages/sdk/src/sync-agent/user/user.ts +++ b/packages/sdk/src/sync-agent/user/user.ts @@ -8,6 +8,10 @@ import { UserDeviceKeys } from './models/userDeviceKeys' import { UserInbox } from './models/userInbox' import { UserMemberships } from './models/userMemberships' import { UserSettings } from './models/userSettings' +import { CreateSpaceParams, SpaceDapp } from '@river-build/web3' +import { ethers } from 'ethers' +import { makeDefaultChannelStreamId, makeSpaceStreamId } from '../../id' +import { makeDefaultMembershipInfo } from '../utils/spaceUtils' const logger = dlogger('csb:user') @@ -27,6 +31,9 @@ export enum AuthStatus { Credentialed = 'Credentialed', /** User authenticated with a valid credential and with an active river river client. */ ConnectedToRiver = 'ConnectedToRiver', + /** Disconnected, client was stopped */ + Disconnected = 'Disconnected', + /** Error state: User failed to authenticate or connect to river client. */ Error = 'Error', } @@ -36,49 +43,87 @@ class LoginContext { @persistedObservable({ tableName: 'user' }) export class User extends PersistedObservable { - id: string - memberships: UserMemberships - inbox: UserInbox - deviceKeys: UserDeviceKeys - settings: UserSettings + streams: { + memberships: UserMemberships + inbox: UserInbox + deviceKeys: UserDeviceKeys + settings: UserSettings + } authStatus = new Observable(AuthStatus.None) loginError?: Error private riverConnection: RiverConnection + private spaceDapp: SpaceDapp - constructor(id: string, store: Store, riverConnection: RiverConnection) { + constructor(id: string, store: Store, riverConnection: RiverConnection, spaceDapp: SpaceDapp) { super({ id, initialized: false }, store, LoadPriority.high) - this.id = id + this.streams = { + memberships: new UserMemberships(id, store, riverConnection), + inbox: new UserInbox(id, store, riverConnection), + deviceKeys: new UserDeviceKeys(id, store, riverConnection), + settings: new UserSettings(id, store, riverConnection), + } this.riverConnection = riverConnection - this.memberships = new UserMemberships(id, store, riverConnection) - this.inbox = new UserInbox(id, store) - this.deviceKeys = new UserDeviceKeys(id, store) - this.settings = new UserSettings(id, store) + this.spaceDapp = spaceDapp } - override async onLoaded() { - this.riverConnection.registerView(this) + protected override async onLoaded() { + this.riverConnection.registerView(this.onClientStarted) } - async initialize(newUserMetadata?: { spaceId: Uint8Array | string }) { + private async initialize(newUserMetadata?: { spaceId: Uint8Array | string }) { await this.riverConnection.call(async (client) => { await client.initializeUser(newUserMetadata) client.startSync() }) - this.update({ ...this.data, initialized: true }) - this.authStatus.set(AuthStatus.ConnectedToRiver) + this.setData({ initialized: true }) + this.authStatus.setValue(AuthStatus.ConnectedToRiver) } - onClientStarted(client: Client) { - this.authStatus.set(AuthStatus.EvaluatingCredentials) + private onClientStarted = (client: Client) => { + this.authStatus.setValue(AuthStatus.EvaluatingCredentials) const loginContext = new LoginContext(client, false) this.loginWithRetries(loginContext).catch((err) => { logger.error('login failed', err) this.loginError = err - this.authStatus.set(AuthStatus.Error) + this.authStatus.setValue(AuthStatus.Error) }) return () => { loginContext.cancelled = true + this.authStatus.setValue(AuthStatus.Disconnected) + } + } + + async createSpace( + params: Partial> & { spaceName: string }, + signer: ethers.Signer, + ) { + const membershipInfo = + params.membership ?? (await makeDefaultMembershipInfo(this.spaceDapp, this.data.id)) + const transaction = await this.spaceDapp.createSpace( + { + spaceName: params.spaceName, + spaceMetadata: params.spaceMetadata ?? params.spaceName, + channelName: params.channelName ?? 'general', + membership: membershipInfo, + }, + signer, + ) + const receipt = await transaction.wait() + logger.log('transaction receipt', receipt) + const spaceAddress = this.spaceDapp.getSpaceAddress(receipt) + if (!spaceAddress) { + throw new Error('Space address not found') } + logger.log('spaceAddress', spaceAddress) + const spaceId = makeSpaceStreamId(spaceAddress) + const defaultChannelId = makeDefaultChannelStreamId(spaceAddress) + logger.log('spaceId, defaultChannelId', { spaceId, defaultChannelId }) + await this.initialize({ spaceId }) + await this.riverConnection.call((client) => client.createSpace(spaceId)) + await this.riverConnection.call((client) => + client.createChannel(spaceId, 'general', '', defaultChannelId), + ) + return { spaceId, defaultChannelId } } private async loginWithRetries(loginContext: LoginContext) { @@ -90,11 +135,10 @@ export class User extends PersistedObservable { if (this.data.initialized) { await this.initialize() } else { - const canInitialize = await loginContext.client.userExists(this.id) + const canInitialize = await loginContext.client.userExists(this.data.id) if (canInitialize) { await this.initialize() } - loginContext.client.startSync() } break } catch (err) { @@ -105,7 +149,7 @@ export class User extends PersistedObservable { throw err } else { const retryDelay = getRetryDelay(retryCount) - logger.log('******* retrying', { retryDelay, retryCount }) + logger.log('retrying', { retryDelay, retryCount }) // sleep await new Promise((resolve) => setTimeout(resolve, retryDelay)) } diff --git a/packages/sdk/src/sync-agent/utils/spaceUtils.ts b/packages/sdk/src/sync-agent/utils/spaceUtils.ts new file mode 100644 index 000000000..2d94c0039 --- /dev/null +++ b/packages/sdk/src/sync-agent/utils/spaceUtils.ts @@ -0,0 +1,39 @@ +import { + ETH_ADDRESS, + MembershipStruct, + NoopRuleData, + Permission, + SpaceDapp, + getDynamicPricingModule, + getFixedPricingModule, +} from '@river-build/web3' + +export async function makeDefaultMembershipInfo( + spaceDapp: SpaceDapp, + feeRecipient: string, + pricing: 'dynamic' | 'fixed' = 'dynamic', +) { + const pricingModule = + pricing == 'dynamic' + ? await getDynamicPricingModule(spaceDapp) + : await getFixedPricingModule(spaceDapp) + return { + settings: { + name: 'Everyone', + symbol: 'MEMBER', + price: 0, + maxSupply: 1000, + duration: 0, + currency: ETH_ADDRESS, + feeRecipient: feeRecipient, + freeAllocation: 0, + pricingModule: pricingModule.module, + }, + permissions: [Permission.Read, Permission.Write], + requirements: { + everyone: true, + users: [], + ruleData: NoopRuleData, + }, + } satisfies MembershipStruct +} diff --git a/packages/sdk/src/sync-agent/utils/syncAgentUtils.test.ts b/packages/sdk/src/sync-agent/utils/syncAgentUtils.test.ts index b6ca5ed87..516b8e1aa 100644 --- a/packages/sdk/src/sync-agent/utils/syncAgentUtils.test.ts +++ b/packages/sdk/src/sync-agent/utils/syncAgentUtils.test.ts @@ -5,7 +5,15 @@ import { makeRiverConfig } from '../../riverConfig' import { RiverDbManager } from '../../riverDbManager' import { userIdFromAddress } from '../../id' import { Entitlements } from '../entitlements/entitlements' -import { SpaceDapp } from '@river-build/web3' +import { + ETH_ADDRESS, + MembershipStruct, + NoopRuleData, + Permission, + SpaceDapp, + getDynamicPricingModule, + getFixedPricingModule, +} from '@river-build/web3' export async function makeRandomSyncAgentConfig(): Promise { const context = await makeRandomUserContext() @@ -48,3 +56,33 @@ export function makeTestDbName(prefix: string, userId: string, deviceId?: string const suffix = deviceId ? `-${deviceId}` : '' return `${prefix}-${userId}${suffix}` } + +export async function makeTestMembershipInfo( + spaceDapp: SpaceDapp, + feeRecipient: string, + pricing: 'dynamic' | 'fixed' = 'dynamic', +) { + const pricingModule = + pricing == 'dynamic' + ? await getDynamicPricingModule(spaceDapp) + : await getFixedPricingModule(spaceDapp) + return { + settings: { + name: 'Everyone', + symbol: 'MEMBER', + price: 0, + maxSupply: 1000, + duration: 0, + currency: ETH_ADDRESS, + feeRecipient: feeRecipient, + freeAllocation: 0, + pricingModule: pricingModule.module, + }, + permissions: [Permission.Read, Permission.Write], + requirements: { + everyone: true, + users: [], + ruleData: NoopRuleData, + }, + } satisfies MembershipStruct +} diff --git a/packages/web3/src/LocalhostWeb3Provider.ts b/packages/web3/src/LocalhostWeb3Provider.ts index fbea1b36b..a3d6557eb 100644 --- a/packages/web3/src/LocalhostWeb3Provider.ts +++ b/packages/web3/src/LocalhostWeb3Provider.ts @@ -14,6 +14,10 @@ export class LocalhostWeb3Provider extends ethers.providers.JsonRpcProvider { return true } + public get signer(): ethers.Signer { + return this.wallet + } + constructor(rpcUrl: string, wallet?: ethers.Wallet) { super(rpcUrl) this.wallet = (wallet ?? ethers.Wallet.createRandom()).connect(this)