diff --git a/.vscode/launch.json b/.vscode/launch.json index 586f6ab06..a6f149c4b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -65,7 +65,7 @@ "name": "Jest: current file in 'packages/sdk/' (single entitlements)", "type": "node", "request": "launch", - "env": { "NODE_ENV": "development", "NODE_TLS_REJECT_UNAUTHORIZED": "0", "RIVER_ENV": "local_single", "DEBUG": "csb:*,test:*", "DEBUG_DEPTH":"10" }, + "env": { "NODE_ENV": "development", "NODE_TLS_REJECT_UNAUTHORIZED": "0", "RIVER_ENV": "local_single", "DEBUG": "csb:*,test:*", "DEBUG_DEPTH":"1" }, "program": "${workspaceFolder}/node_modules/.bin/jest", "runtimeArgs": ["--experimental-vm-modules", "--experimental-wasm-modules"], "args": ["${file}", "--config", "${workspaceFolder}/packages/sdk/jest.config.ts", "-i", "--no-cache", "--forceExit"], diff --git a/packages/encryption/src/decryptionExtensions.ts b/packages/encryption/src/decryptionExtensions.ts index 9b02863a5..68b491963 100644 --- a/packages/encryption/src/decryptionExtensions.ts +++ b/packages/encryption/src/decryptionExtensions.ts @@ -212,6 +212,7 @@ export abstract class BaseDecryptionExtensions { sessions: UserInboxPayload_GroupEncryptionSessions, _senderId: string, ): void { + this.log.info('enqueueNewGroupSessions', sessions) this.queues.newGroupSession.push(sessions) this.checkStartTicking() } diff --git a/packages/playground/src/routes/root.tsx b/packages/playground/src/routes/root.tsx index 2276210f4..fc64cfc30 100644 --- a/packages/playground/src/routes/root.tsx +++ b/packages/playground/src/routes/root.tsx @@ -76,7 +76,6 @@ const ConnectedContent = () => { const { data: nodeUrls } = useSyncValue((s) => s.riverStreamNodeUrls, { onUpdate: (data) => console.log('onUpdate', data), onError: (error) => console.error('onError', error), - onSaved: (data) => console.log('onSaved', data), }) return (
diff --git a/packages/react-sdk/src/useObservable.ts b/packages/react-sdk/src/useObservable.ts index 4ff08f11d..c7436fe7a 100644 --- a/packages/react-sdk/src/useObservable.ts +++ b/packages/react-sdk/src/useObservable.ts @@ -6,7 +6,6 @@ export type ObservableConfig = { fireImmediately?: boolean onUpdate?: (data: T) => void onError?: (error: Error) => void - onSaved?: (data: T) => void } type ObservableReturn = { @@ -15,8 +14,6 @@ type ObservableReturn = { status: PersistedModel['status'] isLoading: boolean isError: boolean - isSaving: boolean - isSaved: boolean isLoaded: boolean } @@ -65,9 +62,6 @@ export function useObservable( if (value.status === 'error') { opts.onError?.(value.error) } - if (value.status === 'saved') { - opts.onSaved?.(value.data) - } }, [opts], ) @@ -90,9 +84,7 @@ export function useObservable( status: 'loading', isLoading: true, isError: false, - isSaving: false, isLoaded: false, - isSaved: false, } } const { data, status } = value @@ -102,9 +94,7 @@ export function useObservable( status, isLoading: status === 'loading', isError: status === 'error', - isSaving: status === 'saving', isLoaded: status === 'loaded', - isSaved: status === 'saved', } }, [value]) satisfies ObservableReturn diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index 1ecd8ae0e..5b547f479 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -72,7 +72,7 @@ import { } from './id' import { makeEvent, unpackMiniblock, unpackStream, unpackStreamEx } from './sign' import { StreamEvents } from './streamEvents' -import { StreamStateView } from './streamStateView' +import { IStreamStateView, StreamStateView } from './streamStateView' import { make_UserDeviceKeyPayload_Inception, make_ChannelPayload_Inception, @@ -123,7 +123,7 @@ import { SyncedStream } from './syncedStream' import { SyncedStreamsExtension } from './syncedStreamsExtension' import { SignerContext } from './signerContext' -type ClientEvents = StreamEvents & DecryptionEvents +export type ClientEvents = StreamEvents & DecryptionEvents type SendChannelMessageOptions = { beforeSendEventHook?: Promise @@ -1178,7 +1178,7 @@ export class Client ): Promise<{ eventId: string }> { const stream = this.stream(streamId) check(stream !== undefined, 'stream not found') - const localId = stream.view.appendLocalEvent(payload, 'sending', this) + const localId = stream.appendLocalEvent(payload, 'sending') opts?.onLocalEventAppended?.(localId) if (opts?.beforeSendEventHook) { await opts?.beforeSendEventHook @@ -1656,7 +1656,7 @@ export class Client * that are in the room but that have been blocked. */ async getDevicesInStream(stream_id: string): Promise { - let stream: StreamStateView | undefined + let stream: IStreamStateView | undefined stream = this.stream(stream_id)?.view if (!stream || !stream.isInitialized) { stream = await this.getStream(stream_id) @@ -1796,7 +1796,7 @@ export class Client // when we have a localId, we need to update the local event with the eventId const stream = this.streams.get(streamId) assert(stream !== undefined, 'unknown stream ' + streamIdAsString(streamId)) - stream.view.updateLocalEvent(localId, eventId, 'sending', this) + stream.updateLocalEvent(localId, eventId, 'sending') } if (cleartext) { @@ -1812,7 +1812,7 @@ export class Client }) if (localId) { const stream = this.streams.get(streamId) - stream?.view.updateLocalEvent(localId, eventId, 'sent', this) + stream?.updateLocalEvent(localId, eventId, 'sent') } return { prevMiniblockHash, eventId, error } } catch (err) { @@ -1844,7 +1844,7 @@ export class Client } else { if (localId) { const stream = this.streams.get(streamId) - stream?.view.updateLocalEvent(localId, eventId, 'failed', this) + stream?.updateLocalEvent(localId, eventId, 'failed') } throw err } @@ -1957,7 +1957,7 @@ export class Client check(isEncryptedContentKind(kind), `invalid kind ${kind}`) const cleartext = await this.cleartextForGroupEvent(streamId, eventId, encryptedData) const decryptedContent = toDecryptedContent(kind, cleartext) - stream.view.updateDecryptedContent(eventId, decryptedContent, this) + stream.updateDecryptedContent(eventId, decryptedContent) } private async cleartextForGroupEvent( diff --git a/packages/sdk/src/clientDecryptionExtensions.ts b/packages/sdk/src/clientDecryptionExtensions.ts index 8636db200..b89657d1d 100644 --- a/packages/sdk/src/clientDecryptionExtensions.ts +++ b/packages/sdk/src/clientDecryptionExtensions.ts @@ -180,16 +180,12 @@ export class ClientDecryptionExtensions extends BaseDecryptionExtensions { } public onDecryptionError(item: EncryptedContentItem, err: DecryptionSessionError): void { - this.client.stream(item.streamId)?.view.updateDecryptedContentError( - item.eventId, - { - missingSession: err.missingSession, - kind: err.kind, - encryptedData: item.encryptedData, - error: err, - }, - this.client, - ) + this.client.stream(item.streamId)?.updateDecryptedContentError(item.eventId, { + missingSession: err.missingSession, + kind: err.kind, + encryptedData: item.encryptedData, + error: err, + }) } public async ackNewGroupSession( diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index c2660f88c..a0ab422a3 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -41,17 +41,24 @@ export * from './streamStateView_UserSettings' export * from './streamUtils' export * from './sync-agent/db' export * from './sync-agent/entitlements/entitlements' +export * from './sync-agent/river-connection/models/authStatus' export * from './sync-agent/river-connection/models/streamNodeUrls' +export * from './sync-agent/river-connection/models/transactionalClient' export * from './sync-agent/river-connection/riverConnection' export * from './sync-agent/spaces/models/channel' export * from './sync-agent/spaces/models/space' export * from './sync-agent/spaces/spaces' +export * from './sync-agent/streams/models/streamConnectionStatus' export * from './sync-agent/syncAgent' +export * from './sync-agent/timeline/models/timelineEvent' +export * from './sync-agent/timeline/models/timelineEvents' +export * from './sync-agent/timeline/timeline' export * from './sync-agent/user/models/userDeviceKeys' export * from './sync-agent/user/models/userInbox' export * from './sync-agent/user/models/userMemberships' export * from './sync-agent/user/models/userSettings' export * from './sync-agent/user/user' +export * from './sync-agent/utils/bot' export * from './sync-agent/utils/promiseQueue' export * from './sync-agent/utils/providers' export * from './sync-agent/utils/spaceUtils' diff --git a/packages/sdk/src/observable/observable.ts b/packages/sdk/src/observable/observable.ts index aeab01f20..b951dc212 100644 --- a/packages/sdk/src/observable/observable.ts +++ b/packages/sdk/src/observable/observable.ts @@ -1,5 +1,13 @@ +interface Subscription { + id: number + fn: (value: T) => void + condition: (value: T) => boolean + once: boolean +} + export class Observable { - protected subscribers: ((value: T) => void)[] = [] + private _nextId = 0 + protected subscribers: Subscription[] = [] protected _value: T constructor(value: T) { @@ -15,19 +23,56 @@ export class Observable { this.notify() } - subscribe(subscriber: (newValue: T) => void, opts: { fireImediately?: boolean } = {}): this { - this.subscribers.push(subscriber) + subscribe( + subscriber: (newValue: T) => void, + opts: { fireImediately?: boolean; once?: boolean; condition?: (value: T) => boolean } = {}, + ): this { + const sub = { + id: this._nextId++, + fn: subscriber, + once: opts?.once ?? false, + condition: opts?.condition ?? (() => true), + } satisfies Subscription + this.subscribers.push(sub) if (opts.fireImediately) { - subscriber(this.value) + this._notify(sub, this.value) } return this } + when( + condition: (value: T) => boolean, + opts: { timeoutMs: number } = { timeoutMs: 5000 }, + ): Promise { + return new Promise((resolve, reject) => { + const timeoutHandle = setTimeout(() => { + reject(new Error('Timeout waiting for condition')) + }, opts.timeoutMs) + this.subscribe( + (value) => { + clearTimeout(timeoutHandle) + resolve(value) + }, + { fireImediately: true, condition: condition, once: true }, + ) + }) + } + unsubscribe(subscriber: (value: T) => void) { - this.subscribers = this.subscribers.filter((sub) => sub !== subscriber) + this.subscribers = this.subscribers.filter((sub) => sub.fn !== subscriber) } private notify() { - this.subscribers.forEach((sub) => sub(this.value)) + const subscriptions = this.subscribers + subscriptions.forEach((sub) => this._notify(sub, this.value)) + } + + private _notify(sub: Subscription, value: T) { + if (sub.condition(value)) { + sub.fn(value) + if (sub.once) { + this.subscribers = this.subscribers.filter((s) => s !== sub) + } + } } } diff --git a/packages/sdk/src/observable/persistedObservable.ts b/packages/sdk/src/observable/persistedObservable.ts index 75a85cb9d..61bd68200 100644 --- a/packages/sdk/src/observable/persistedObservable.ts +++ b/packages/sdk/src/observable/persistedObservable.ts @@ -11,8 +11,6 @@ export type PersistedModel = | { status: 'loading'; data: T } | { status: 'loaded'; data: T } | { status: 'error'; data: T; error: Error } - | { status: 'saving'; data: T } - | { status: 'saved'; data: T } interface Storable {} @@ -29,8 +27,11 @@ export function persistedObservable(options: PersistedOpts) { super(...args) // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access ;(this as any).tableName = options.tableName - // eslint-disable-next-line @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-member-access - ;(this as any).load() + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + ;((this as any).store as Store).withTransaction(`new-${options.tableName}`, () => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-member-access + ;(this as any).load() + }) } static tableName = options.tableName } @@ -85,28 +86,23 @@ export class PersistedObservable // must be called in a store transaction setData(newDataPartial: Partial) { check(isDefined(newDataPartial), 'value is undefined') + const prevData = this.data const newData = { ...this.data, ...newDataPartial } check(newData.id === this.data.id, 'id mismatch') - super.setValue({ status: 'saving', data: newData }) - this.store - .withTransaction(`update-${this.tableName}:${this.data.id}`, () => { - this.store.save( - this.tableName, - newData, - () => { - super.setValue({ status: 'saved', data: newData }) - }, - (e) => { - super.setValue({ status: 'error', data: newData, error: e }) - }, - async () => { - await this.onSaved() - }, - ) - }) - .catch((e) => { - super.setValue({ status: 'error', data: this.data, error: e }) - }) + super.setValue({ status: 'loaded', data: newData }) + this.store.withTransaction(`update-${this.tableName}:${this.data.id}`, () => { + this.store.save( + this.tableName, + newData, + () => {}, + (e) => { + super.setValue({ status: 'error', data: prevData, error: e }) + }, + async () => { + await this.onSaved() + }, + ) + }) } protected async onLoaded() { diff --git a/packages/sdk/src/store/store.ts b/packages/sdk/src/store/store.ts index fe06bc145..019521ef0 100644 --- a/packages/sdk/src/store/store.ts +++ b/packages/sdk/src/store/store.ts @@ -68,8 +68,11 @@ export class Store { } newTransactionGroup(name: string) { - log(`newTransactionGroup "${name}"`) - check(this.transactionGroup === undefined, 'transaction already in progress') + // log(`newTransactionGroup "${name}"`) + check( + this.transactionGroup === undefined, + `transaction already in progress named: ${this.transactionGroup?.name}`, + ) this.transactionGroup = new TransactionGroup(name) } @@ -82,7 +85,7 @@ export class Store { this.transactionGroup = undefined // if no ops, return if (!tGroup.hasOps) { - log(`commitTransaction "${tGroup.name}" skipped (empty)`) + // log(`commitTransaction "${tGroup.name}" skipped (empty)`) return } log( @@ -111,13 +114,17 @@ export class Store { log(`commitTransaction "${tGroup.name}" done`, 'elapsedMs:', Date.now() - time) } - async withTransaction(name: string, fn: () => void) { + withTransaction(name: string, fn: () => T): T { if (this.transactionGroup !== undefined) { - fn() + return fn() } else { this.newTransactionGroup(name) - fn() - await this.commitTransaction() + const result = fn() + this.commitTransaction().catch((e) => { + log(`uncaught commitTransaction error in groun ${name}`, e) + throw e + }) + return result } } diff --git a/packages/sdk/src/stream.ts b/packages/sdk/src/stream.ts index b0d2094ac..ccf990468 100644 --- a/packages/sdk/src/stream.ts +++ b/packages/sdk/src/stream.ts @@ -1,16 +1,21 @@ -import { MembershipOp, Snapshot, SyncCookie } from '@river-build/proto' +import { ChannelMessage, MembershipOp, Snapshot, SyncCookie } from '@river-build/proto' import { DLogger } from '@river-build/dlog' import EventEmitter from 'events' import TypedEmitter from 'typed-emitter' -import { StreamStateView } from './streamStateView' -import { ParsedEvent, ParsedMiniblock, isLocalEvent } from './types' +import { IStreamStateView, StreamStateView } from './streamStateView' +import { LocalEventStatus, ParsedEvent, ParsedMiniblock, isLocalEvent } from './types' import { StreamEvents } from './streamEvents' +import { DecryptedContent } from './encryptedContentTypes' +import { DecryptionSessionError } from '@river-build/encryption' export class Stream extends (EventEmitter as new () => TypedEmitter) { readonly clientEmitter: TypedEmitter readonly logEmitFromStream: DLogger readonly userId: string - view: StreamStateView + _view: StreamStateView + get view(): IStreamStateView { + return this._view + } private stopped = false constructor( @@ -23,11 +28,11 @@ export class Stream extends (EventEmitter as new () => TypedEmitter TypedEmitter | undefined, ): void { // grab any local events from the previous view that haven't been processed - const localEvents = this.view.timeline + const localEvents = this._view.timeline .filter(isLocalEvent) .filter((e) => e.hashStr.startsWith('~')) - this.view = new StreamStateView(this.userId, this.streamId) - this.view.initialize( + this._view = new StreamStateView(this.userId, this.streamId) + this._view.initialize( nextSyncCookie, minipoolEvents, snapshot, @@ -71,7 +76,7 @@ export class Stream extends (EventEmitter as new () => TypedEmitter | undefined, ): Promise { - this.view.appendEvents(events, nextSyncCookie, cleartexts, this) + this._view.appendEvents(events, nextSyncCookie, cleartexts, this) } prependEvents( @@ -79,7 +84,23 @@ export class Stream extends (EventEmitter as new () => TypedEmitter | undefined, terminus: boolean, ) { - this.view.prependEvents(miniblocks, cleartexts, terminus, this, this) + this._view.prependEvents(miniblocks, cleartexts, terminus, this, this) + } + + appendLocalEvent(channelMessage: ChannelMessage, status: LocalEventStatus) { + return this._view.appendLocalEvent(channelMessage, status, this) + } + + updateDecryptedContent(eventId: string, content: DecryptedContent) { + return this._view.updateDecryptedContent(eventId, content, this) + } + + updateDecryptedContentError(eventId: string, content: DecryptionSessionError) { + return this._view.updateDecryptedContentError(eventId, content, this) + } + + updateLocalEvent(localId: string, parsedEventHash: string, status: LocalEventStatus) { + return this._view.updateLocalEvent(localId, parsedEventHash, status, this) } emit(event: E, ...args: Parameters): boolean { @@ -97,14 +118,14 @@ export class Stream extends (EventEmitter as new () => TypedEmitter { return ( (userId === undefined || userId === iUserId) && - this.view.getMembers().isMember(membership, userId ?? this.userId) + this._view.getMembers().isMember(membership, userId ?? this.userId) ) }) } diff --git a/packages/sdk/src/streamEvents.ts b/packages/sdk/src/streamEvents.ts index 4af926524..756b46ca9 100644 --- a/packages/sdk/src/streamEvents.ts +++ b/packages/sdk/src/streamEvents.ts @@ -71,7 +71,7 @@ export type StreamStateEvents = { ) => void userDeviceKeysUpdated: (streamId: string, deviceKeys: UserDevice[]) => void spaceChannelCreated: (spaceId: string, channelId: string) => void - spaceChannelUpdated: (spaceId: string, channelId: string) => void + spaceChannelUpdated: (spaceId: string, channelId: string, updatedAtEventNum: bigint) => void spaceChannelDeleted: (spaceId: string, channelId: string) => void channelPinAdded: (channelId: string, pin: Pin) => void channelPinRemoved: (channelId: string, pin: Pin, index: number) => void diff --git a/packages/sdk/src/streamStateView.ts b/packages/sdk/src/streamStateView.ts index dddca55dc..60e6b91fc 100644 --- a/packages/sdk/src/streamStateView.ts +++ b/packages/sdk/src/streamStateView.ts @@ -57,7 +57,40 @@ import { migrateSnapshot } from './migrations/migrateSnapshot' const log = dlog('csb:streams') const logError = dlogError('csb:streams:error') -export class StreamStateView { +// it's very important that the Stream is the emitter for all events +// for any mutations, go through the stream +export interface IStreamStateView { + readonly streamId: string + readonly userId: string + readonly contentKind: SnapshotCaseType + readonly timeline: StreamTimelineEvent[] + readonly events: Map + isInitialized: boolean + snapshot?: Snapshot + prevMiniblockHash?: Uint8Array + lastEventNum: bigint + prevSnapshotMiniblockNum: bigint + miniblockInfo?: { max: bigint; min: bigint; terminusReached: boolean } + syncCookie?: SyncCookie + membershipContent: StreamStateView_Members + get spaceContent(): StreamStateView_Space + get channelContent(): StreamStateView_Channel + get dmChannelContent(): StreamStateView_DMChannel + get gdmChannelContent(): StreamStateView_GDMChannel + get userContent(): StreamStateView_User + get userSettingsContent(): StreamStateView_UserSettings + get userDeviceKeyContent(): StreamStateView_UserDeviceKeys + get userInboxContent(): StreamStateView_UserInbox + get mediaContent(): StreamStateView_Media + getMembers(): StreamStateView_Members + getUserMetadata(): StreamStateView_UserMetadata + getChannelMetadata(): StreamStateView_ChannelMetadata | undefined + getContent(): StreamStateView_AbstractContent + userIsEntitledToKeyExchange(userId: string): boolean + getUsersEntitledToKeyExchange(): Set +} + +export class StreamStateView implements IStreamStateView { readonly streamId: string readonly userId: string readonly contentKind: SnapshotCaseType diff --git a/packages/sdk/src/streamStateView_Space.ts b/packages/sdk/src/streamStateView_Space.ts index b6f09d641..88f74a2fe 100644 --- a/packages/sdk/src/streamStateView_Space.ts +++ b/packages/sdk/src/streamStateView_Space.ts @@ -124,7 +124,12 @@ export class StreamStateView_Space extends StreamStateView_AbstractContent { isDefault: isDefaultChannelId(channelId), updatedAtEventNum, }) - stateEmitter?.emit('spaceChannelUpdated', this.streamId, channelId) + stateEmitter?.emit( + 'spaceChannelUpdated', + this.streamId, + channelId, + updatedAtEventNum, + ) break } default: diff --git a/packages/sdk/src/sync-agent/db.ts b/packages/sdk/src/sync-agent/db.ts index f28ec1c3e..3f80d729f 100644 --- a/packages/sdk/src/sync-agent/db.ts +++ b/packages/sdk/src/sync-agent/db.ts @@ -1,4 +1,5 @@ import { StreamNodeUrls } from './river-connection/models/streamNodeUrls' +import { RiverConnection } from './river-connection/riverConnection' import { Channel } from './spaces/models/channel' import { Space } from './spaces/models/space' import { Spaces } from './spaces/spaces' @@ -14,6 +15,7 @@ export const DB_MODELS = [ Space, Spaces, StreamNodeUrls, + RiverConnection, User, UserDeviceKeys, UserInbox, diff --git a/packages/sdk/src/sync-agent/river-connection/models/authStatus.ts b/packages/sdk/src/sync-agent/river-connection/models/authStatus.ts new file mode 100644 index 000000000..5546dfc26 --- /dev/null +++ b/packages/sdk/src/sync-agent/river-connection/models/authStatus.ts @@ -0,0 +1,17 @@ +export enum AuthStatus { + /** Fetching river urls. */ + Initializing = 'Initializing', + /** Transition state: None -> EvaluatingCredentials -> [Credentialed OR ConnectedToRiver] + * if a river user is found, will connect to river client, otherwise will just validate credentials. + */ + EvaluatingCredentials = 'EvaluatingCredentials', + /** User authenticated with a valid credential but without an active river stream client. */ + Credentialed = 'Credentialed', + /** User authenticated with a valid credential and with an active river river client. */ + ConnectingToRiver = 'ConnectingToRiver', + ConnectedToRiver = 'ConnectedToRiver', + /** Disconnected, client was stopped */ + Disconnected = 'Disconnected', + /** Error state: User failed to authenticate or connect to river client. */ + Error = 'Error', +} diff --git a/packages/sdk/src/sync-agent/river-connection/models/streamNodeUrls.ts b/packages/sdk/src/sync-agent/river-connection/models/streamNodeUrls.ts index 7fc540098..a96142cb7 100644 --- a/packages/sdk/src/sync-agent/river-connection/models/streamNodeUrls.ts +++ b/packages/sdk/src/sync-agent/river-connection/models/streamNodeUrls.ts @@ -15,7 +15,7 @@ export interface StreamNodeUrlsModel { // Define a class that will manage the data model, decorate it to give it store properties @persistedObservable({ - tableName: 'riverNodeUrls', // this is the name of the table in the database + tableName: 'streamNodeUrls', // this is the name of the table in the database }) export class StreamNodeUrls extends PersistedObservable { private riverRegistry: RiverRegistry // store any member variables required for logic diff --git a/packages/sdk/src/sync-agent/river-connection/models/transactionalClient.ts b/packages/sdk/src/sync-agent/river-connection/models/transactionalClient.ts new file mode 100644 index 000000000..62bb9761e --- /dev/null +++ b/packages/sdk/src/sync-agent/river-connection/models/transactionalClient.ts @@ -0,0 +1,39 @@ +import { CryptoStore, EntitlementsDelegate } from '@river-build/encryption' +import { Client, ClientEvents } from '../../../client' +import { StreamRpcClient } from '../../../makeStreamRpcClient' +import { SignerContext } from '../../../signerContext' +import { Store } from '../../../store/store' + +export class TransactionalClient extends Client { + store: Store + constructor( + store: Store, + signerContext: SignerContext, + rpcClient: StreamRpcClient, + cryptoStore: CryptoStore, + entitlementsDelegate: EntitlementsDelegate, + persistenceStoreName?: string, + logNamespaceFilter?: string, + highPriorityStreamIds?: string[], + ) { + super( + signerContext, + rpcClient, + cryptoStore, + entitlementsDelegate, + persistenceStoreName, + logNamespaceFilter, + highPriorityStreamIds, + ) + this.store = store + } + + override emit( + event: E, + ...args: Parameters + ): boolean { + return this.store.withTransaction(event.toLocaleString(), () => { + return super.emit(event, ...args) + }) + } +} diff --git a/packages/sdk/src/sync-agent/river-connection/riverConnection.test.ts b/packages/sdk/src/sync-agent/river-connection/riverConnection.test.ts index 492b8f1c6..3f58870d3 100644 --- a/packages/sdk/src/sync-agent/river-connection/riverConnection.test.ts +++ b/packages/sdk/src/sync-agent/river-connection/riverConnection.test.ts @@ -3,10 +3,10 @@ */ import { waitFor } from '../../util.test' -import { TestUser } from '../utils/testUser.test' +import { Bot } from '../utils/bot' describe('RiverConnection.test.ts', () => { - const testUser = new TestUser() + const testUser = new Bot() // test that a riverConnection will eventually be defined if passed valid config test('riverConnection initializes from empty', async () => { @@ -28,7 +28,7 @@ describe('RiverConnection.test.ts', () => { expect(riverConnection.client).toBeDefined() }) await waitFor(() => { - expect(riverConnection.streamNodeUrls.value.status).toBe('saved') + expect(riverConnection.streamNodeUrls.value.status).toBe('loaded') }) // cleanup await syncAgent.stop() diff --git a/packages/sdk/src/sync-agent/river-connection/riverConnection.ts b/packages/sdk/src/sync-agent/river-connection/riverConnection.ts index ef7edb109..177b2b82a 100644 --- a/packages/sdk/src/sync-agent/river-connection/riverConnection.ts +++ b/packages/sdk/src/sync-agent/river-connection/riverConnection.ts @@ -1,14 +1,21 @@ -import { RiverRegistry } from '@river-build/web3' +import { RiverRegistry, SpaceDapp } from '@river-build/web3' import { RetryParams, makeStreamRpcClient } from '../../makeStreamRpcClient' import { StreamNodeUrls, StreamNodeUrlsModel } from './models/streamNodeUrls' -import { Store } from '../../store/store' +import { Identifiable, LoadPriority, Store } from '../../store/store' import { dlogger } from '@river-build/dlog' import { PromiseQueue } from '../utils/promiseQueue' import { CryptoStore, EntitlementsDelegate } from '@river-build/encryption' import { Client } from '../../client' import { SignerContext } from '../../signerContext' -import { PersistedModel } from '../../observable/persistedObservable' +import { + PersistedModel, + PersistedObservable, + persistedObservable, +} from '../../observable/persistedObservable' import { userIdFromAddress } from '../../id' +import { TransactionalClient } from './models/transactionalClient' +import { Observable } from '../../observable/observable' +import { AuthStatus } from './models/authStatus' const logger = dlogger('csb:riverConnection') @@ -25,20 +32,39 @@ export interface ClientParams { export type OnStoppedFn = () => void export type onClientStartedFn = (client: Client) => OnStoppedFn -export class RiverConnection { - client?: Client +export interface RiverConnectionModel extends Identifiable { + id: '0' + userExists: boolean +} + +class LoginContext { + constructor(public client: Client, public cancelled: boolean = false) {} +} + +@persistedObservable({ tableName: 'riverConnection' }) +export class RiverConnection extends PersistedObservable { + client?: TransactionalClient streamNodeUrls: StreamNodeUrls - private riverRegistryDapp: RiverRegistry - private clientParams: ClientParams + authStatus = new Observable(AuthStatus.Initializing) + loginError?: Error private clientQueue = new PromiseQueue() private views: onClientStartedFn[] = [] private onStoppedFns: OnStoppedFn[] = [] private stopped = false + public newUserMetadata?: { spaceId: Uint8Array | string } + private loginPromise?: { promise: Promise; context: LoginContext } - constructor(store: Store, riverRegistryDapp: RiverRegistry, clientParams: ClientParams) { - this.riverRegistryDapp = riverRegistryDapp - this.clientParams = clientParams + constructor( + store: Store, + public spaceDapp: SpaceDapp, + public riverRegistryDapp: RiverRegistry, + private clientParams: ClientParams, + ) { + super({ id: '0', userExists: false }, store, LoadPriority.high) this.streamNodeUrls = new StreamNodeUrls(store, riverRegistryDapp) + } + + override async onLoaded() { this.streamNodeUrls.subscribe(this.onNodeUrlsChanged, { fireImediately: true }) } @@ -54,6 +80,7 @@ export class RiverConnection { } this.onStoppedFns = [] await this.client?.stop() + this.authStatus.setValue(AuthStatus.Disconnected) } call(fn: (client: Client) => Promise) { @@ -92,7 +119,8 @@ export class RiverConnection { const rpcClient = makeStreamRpcClient(urls, this.clientParams.rpcRetryParams, () => this.riverRegistryDapp.getOperationalNodeUrls(), ) - const client = new Client( + const client = new TransactionalClient( + this.store, this.clientParams.signerContext, rpcClient, this.clientParams.cryptoStore, @@ -102,12 +130,89 @@ export class RiverConnection { this.clientParams.highPriorityStreamIds, ) this.client = client - // initialize views - this.views.forEach((viewFn) => { - const onStopFn = viewFn(client) - this.onStoppedFns.push(onStopFn) + // try to log in + logger.log('attempting login after new client') + this.login().catch((err) => { + logger.log('error logging in', err) }) - // New rpcClient is available, resolve all queued requests - this.clientQueue.flush(client) } + + async login(newUserMetadata?: { spaceId: Uint8Array | string }) { + if (!this.client) { + return + } + this.newUserMetadata = this.newUserMetadata ?? newUserMetadata + logger.log('login', { newUserMetadata }) + const loginContext = new LoginContext(this.client) + await this.loginWithRetries(loginContext) + } + + private async loginWithRetries(loginContext: LoginContext) { + logger.log('login', { authStatus: this.authStatus.value, promise: this.loginPromise }) + if (this.loginPromise) { + this.loginPromise.context.cancelled = true + await this.loginPromise.promise + } + if (this.authStatus.value === AuthStatus.ConnectedToRiver) { + return + } + this.authStatus.setValue(AuthStatus.EvaluatingCredentials) + const login = async () => { + let retryCount = 0 + const MAX_RETRY_COUNT = 20 + while (!loginContext.cancelled) { + try { + logger.log('logging in') + const { client } = loginContext + const canInitialize = + this.data.userExists || + this.newUserMetadata !== undefined || + (await client.userExists(this.userId)) + logger.log('canInitialize', canInitialize) + if (canInitialize) { + this.authStatus.setValue(AuthStatus.ConnectingToRiver) + await client.initializeUser(this.newUserMetadata) + client.startSync() + this.setData({ userExists: true }) + // initialize views + this.store.withTransaction('RiverConnection::login', () => { + this.views.forEach((viewFn) => { + const onStopFn = viewFn(client) + this.onStoppedFns.push(onStopFn) + }) + }) + this.authStatus.setValue(AuthStatus.ConnectedToRiver) + // New rpcClient is available, resolve all queued requests + this.clientQueue.flush(client) + } else { + this.authStatus.setValue(AuthStatus.Credentialed) + } + break + } catch (err) { + retryCount++ + this.loginError = err as Error + logger.log('encountered exception while initializing', err) + if (retryCount >= MAX_RETRY_COUNT) { + this.authStatus.setValue(AuthStatus.Error) + throw err + } else { + const retryDelay = getRetryDelay(retryCount) + logger.log('retrying', { retryDelay, retryCount }) + // sleep + await new Promise((resolve) => setTimeout(resolve, retryDelay)) + } + } finally { + logger.log('exiting login loop') + this.loginPromise = undefined + } + } + } + this.loginPromise = { promise: login(), context: loginContext } + return this.loginPromise.promise + } +} + +// exponentially back off, but never wait more than 20 seconds +function getRetryDelay(retryCount: number) { + return Math.min(1000 * 2 ** retryCount, 20000) } diff --git a/packages/sdk/src/sync-agent/spaces/models/channel.ts b/packages/sdk/src/sync-agent/spaces/models/channel.ts index 15eea7244..0b5351a2a 100644 --- a/packages/sdk/src/sync-agent/spaces/models/channel.ts +++ b/packages/sdk/src/sync-agent/spaces/models/channel.ts @@ -1,7 +1,13 @@ -import { Client } from '../../../client' +import { PlainMessage } from '@bufbuild/protobuf' import { PersistedObservable, persistedObservable } from '../../../observable/persistedObservable' import { Identifiable, Store } from '../../../store/store' import { RiverConnection } from '../../river-connection/riverConnection' +import { ChannelMessage_Post_Attachment, ChannelMessage_Post_Mention } from '@river-build/proto' +import { Timeline } from '../../timeline/timeline' +import { check } from '@river-build/dlog' +import { isDefined } from '../../../check' +import { Observable } from '../../../observable/observable' +import { StreamConnectionStatus } from '../../streams/models/streamConnectionStatus' export interface ChannelMetadata { name: string @@ -16,6 +22,8 @@ export interface ChannelModel extends Identifiable { @persistedObservable({ tableName: 'channel' }) export class Channel extends PersistedObservable { + connectionStatus = new Observable(StreamConnectionStatus.connecting) + timeline: Timeline constructor( id: string, spaceId: string, @@ -23,13 +31,70 @@ export class Channel extends PersistedObservable { store: Store, ) { super({ id, spaceId, isJoined: false }, store) + this.timeline = new Timeline(riverConnection.userId) } protected override async onLoaded() { - this.riverConnection.registerView(this.onClientStarted) + this.riverConnection.registerView((client) => { + this.connectionStatus.setValue(StreamConnectionStatus.connecting) + if ( + client.streams.has(this.data.id) && + client.streams.get(this.data.id)?.view.isInitialized + ) { + this.onStreamInitialized(this.data.id) + } + client.on('streamInitialized', this.onStreamInitialized) + return () => { + client.off('streamInitialized', this.onStreamInitialized) + this.connectionStatus.setValue(StreamConnectionStatus.disconnected) + } + }) } - private onClientStarted = (_client: Client) => { - return () => {} + async sendMessage( + message: string, + options?: { + threadId?: string + replyId?: string + mentions?: PlainMessage[] + attachments?: PlainMessage[] + }, + ) { + await this.connectionStatus.when((status) => status === StreamConnectionStatus.connected) + const channelId = this.data.id + const eventId = await this.riverConnection.call((client) => + client.sendChannelMessage_Text(channelId, { + threadId: options?.threadId, + threadPreview: options?.threadId ? '🙉' : undefined, + replyId: options?.replyId, + replyPreview: options?.replyId ? '🙈' : undefined, + content: { + body: message, + mentions: options?.mentions ?? [], + attachments: [], + }, + }), + ) + return eventId + } + + async sendReaction(refEventId: string, reaction: string) { + const channelId = this.data.id + const eventId = await this.riverConnection.call((client) => + client.sendChannelMessage_Reaction(channelId, { + reaction, + refEventId, + }), + ) + return eventId + } + + private onStreamInitialized = (streamId: string) => { + if (streamId === this.data.id) { + const stream = this.riverConnection.client?.stream(this.data.id) + check(isDefined(stream), 'stream is not defined') + this.timeline.initialize(stream) + this.connectionStatus.setValue(StreamConnectionStatus.connected) + } } } diff --git a/packages/sdk/src/sync-agent/spaces/models/space.ts b/packages/sdk/src/sync-agent/spaces/models/space.ts index 4284d584a..86ccab77d 100644 --- a/packages/sdk/src/sync-agent/spaces/models/space.ts +++ b/packages/sdk/src/sync-agent/spaces/models/space.ts @@ -1,7 +1,14 @@ -import { Client } from '../../../client' +import { check, dlogger } from '@river-build/dlog' +import { isDefined } from '../../../check' +import { makeDefaultChannelStreamId, makeUniqueChannelStreamId } from '../../../id' import { PersistedObservable, persistedObservable } from '../../../observable/persistedObservable' import { Identifiable, Store } from '../../../store/store' import { RiverConnection } from '../../river-connection/riverConnection' +import { Channel } from './channel' +import { ethers } from 'ethers' +import { SpaceDapp } from '@river-build/web3' + +const logger = dlogger('csb:space') export interface SpaceMetadata { name: string @@ -9,21 +16,152 @@ export interface SpaceMetadata { export interface SpaceModel extends Identifiable { id: string + initialized: boolean channelIds: string[] metadata?: SpaceMetadata } @persistedObservable({ tableName: 'space' }) export class Space extends PersistedObservable { - constructor(id: string, private riverConnection: RiverConnection, store: Store) { - super({ id, channelIds: [] }, store) + private channels: Record + constructor( + id: string, + private riverConnection: RiverConnection, + store: Store, + private spaceDapp: SpaceDapp, + ) { + super({ id, channelIds: [], initialized: false }, store) + this.channels = { + [makeDefaultChannelStreamId(id)]: new Channel( + makeDefaultChannelStreamId(id), + id, + riverConnection, + store, + ), + } } protected override async onLoaded() { - this.riverConnection.registerView(this.onClientStarted) + this.riverConnection.registerView((client) => { + if ( + client.streams.has(this.data.id) && + client.streams.get(this.data.id)?.view.isInitialized + ) { + this.onStreamInitialized(this.data.id) + } + client.on('streamInitialized', this.onStreamInitialized) + client.on('spaceChannelCreated', this.onSpaceChannelCreated) + client.on('spaceChannelDeleted', this.onSpaceChannelDeleted) + client.on('spaceChannelUpdated', this.onSpaceChannelUpdated) + return () => { + client.off('spaceChannelCreated', this.onSpaceChannelCreated) + client.off('spaceChannelDeleted', this.onSpaceChannelDeleted) + client.off('spaceChannelUpdated', this.onSpaceChannelUpdated) + client.off('streamInitialized', this.onStreamInitialized) + } + }) + } + + async join(signer: ethers.Signer, opts?: { skipMintMembership?: boolean }) { + const spaceId = this.data.id + if (opts?.skipMintMembership !== true) { + const { issued } = await this.spaceDapp.joinSpace( + spaceId, + this.riverConnection.userId, + signer, + ) + logger.log('joinSpace transaction', issued) + } + await this.riverConnection.login({ spaceId }) + await this.riverConnection.call(async (client) => { + await client.joinStream(spaceId) + await client.joinStream(makeDefaultChannelStreamId(spaceId)) + }) + } + + async createChannel(channelName: string, signer: ethers.Signer) { + const spaceId = this.data.id + const channelId = makeUniqueChannelStreamId(spaceId) + const roles = await this.spaceDapp.getRoles(spaceId) + const tx = await this.spaceDapp.createChannel( + spaceId, + channelName, + '', + channelId, + roles.filter((role) => role.name !== 'Owner').map((role) => role.roleId), + signer, + ) + const receipt = await tx.wait() + logger.log('createChannel receipt', receipt) + await this.riverConnection.call((client) => + client.createChannel(spaceId, channelName, '', channelId), + ) + return channelId + } + + getChannel(channelId: string): Channel { + if (!this.channels[channelId]) { + this.channels[channelId] = new Channel( + channelId, + this.data.id, + this.riverConnection, + this.store, + ) + } + return this.channels[channelId] + } + + getDefaultChannel(): Channel { + return this.channels[makeDefaultChannelStreamId(this.data.id)] + } + + private onStreamInitialized = (streamId: string) => { + if (this.data.id === streamId) { + const stream = this.riverConnection.client?.stream(streamId) + check(isDefined(stream), 'stream is not defined') + const channelIds = stream.view.spaceContent.spaceChannelsMetadata.keys() + for (const channelId of channelIds) { + if (!this.channels[channelId]) { + this.channels[channelId] = new Channel( + channelId, + this.data.id, + this.riverConnection, + this.store, + ) + } + } + this.setData({ initialized: true }) + } + } + + private onSpaceChannelCreated = (streamId: string, channelId: string) => { + if (streamId === this.data.id) { + if (!this.channels[channelId]) { + this.channels[channelId] = new Channel( + channelId, + this.data.id, + this.riverConnection, + this.store, + ) + } + this.setData({ channelIds: [...this.data.channelIds, channelId] }) + } + } + + private onSpaceChannelDeleted = (streamId: string, channelId: string) => { + if (streamId === this.data.id) { + delete this.channels[channelId] + this.setData({ channelIds: this.data.channelIds.filter((id) => id !== channelId) }) + } } - private onClientStarted = (_client: Client) => { - return () => {} + private onSpaceChannelUpdated = ( + streamId: string, + _channelId: string, + _updatedAtEventNum: bigint, + ) => { + if (streamId === this.data.id) { + // refetch the channel data from on chain + } } } diff --git a/packages/sdk/src/sync-agent/spaces/spaces.test.ts b/packages/sdk/src/sync-agent/spaces/spaces.test.ts index ffc5eb54b..6d5849cf6 100644 --- a/packages/sdk/src/sync-agent/spaces/spaces.test.ts +++ b/packages/sdk/src/sync-agent/spaces/spaces.test.ts @@ -2,30 +2,35 @@ * @group with-entitilements */ import { dlogger } from '@river-build/dlog' -import { TestUser } from '../utils/testUser.test' +import { Bot } from '../utils/bot' +import { waitFor } from '../../util.test' const logger = dlogger('csb:test:spaces') describe('spaces.test.ts', () => { logger.log('start') - const testUser = new TestUser() + const testUser = new Bot() test('create/leave/join space', async () => { const syncAgent = await testUser.makeSyncAgent() await syncAgent.start() expect(syncAgent.spaces.value.status).not.toBe('loading') - const { spaceId } = await syncAgent.user.createSpace( + const { spaceId, defaultChannelId } = await syncAgent.spaces.createSpace( { spaceName: 'BlastOff' }, testUser.signer, ) expect(syncAgent.spaces.data.spaceIds.length).toBe(1) expect(syncAgent.spaces.data.spaceIds[0]).toBe(spaceId) - // expect(bob.spaces.getSpaces().length).toBe(1) - // expect(bob.spaces.getSpaces()[0].id).toBe(spaceId) - // expect(bob.spaces.getSpace(spaceId)).toBeDefined() - // const space = bob.spaces.getSpace(spaceId)! - // expect(space.data.spaceIds.length).toBe(1) - // await waitFor(() => expect(space.data.spaceIds.length).toBe(1) + expect(syncAgent.spaces.getSpace(spaceId)).toBeDefined() + const space = syncAgent.spaces.getSpace(spaceId)! + await waitFor(() => expect(space.value.status).not.toBe('loading')) + await waitFor(() => expect(space.data.channelIds.length).toBe(1)) + expect(space.data.channelIds[0]).toBe(defaultChannelId) + expect(space.getChannel(defaultChannelId)).toBeDefined() + const channel = space.getChannel(defaultChannelId) + await channel.sendMessage('hello world') + expect(channel.timeline.events.value.length).toBeGreaterThan(1) + expect(channel.timeline.events.value.find((x) => x.text === 'hello world')).toBeDefined() await syncAgent.stop() }) }) diff --git a/packages/sdk/src/sync-agent/spaces/spaces.ts b/packages/sdk/src/sync-agent/spaces/spaces.ts index 033948405..1dc00f759 100644 --- a/packages/sdk/src/sync-agent/spaces/spaces.ts +++ b/packages/sdk/src/sync-agent/spaces/spaces.ts @@ -1,15 +1,20 @@ -import { Identifiable, Store } from '../../store/store' +import { Identifiable, LoadPriority, Store } from '../../store/store' import { PersistedModel, PersistedObservable, persistedObservable, } from '../../observable/persistedObservable' import { Space } from './models/space' -import { User } from '../user/user' -import { UserMembershipsModel } from '../user/models/userMemberships' +import { UserMemberships, UserMembershipsModel } from '../user/models/userMemberships' import { MembershipOp } from '@river-build/proto' -import { isSpaceStreamId } from '../../id' +import { isSpaceStreamId, makeDefaultChannelStreamId, makeSpaceStreamId } from '../../id' import { RiverConnection } from '../river-connection/riverConnection' +import { CreateSpaceParams, SpaceDapp } from '@river-build/web3' +import { makeDefaultMembershipInfo } from '../utils/spaceUtils' +import { ethers } from 'ethers' +import { check, dlogger } from '@river-build/dlog' + +const logger = dlogger('csb:spaces') export interface SpacesModel extends Identifiable { id: '0' // single data blobs need a fixed key @@ -19,33 +24,44 @@ export interface SpacesModel extends Identifiable { @persistedObservable({ tableName: 'spaces' }) export class Spaces extends PersistedObservable { private spaces: Record = {} - private user: User - private riverConnection: RiverConnection - constructor(riverConnection: RiverConnection, user: User, store: Store) { - super({ id: '0', spaceIds: [] }, store) - this.riverConnection = riverConnection - this.user = user + constructor( + store: Store, + private riverConnection: RiverConnection, + private userMemberships: UserMemberships, + private spaceDapp: SpaceDapp, + ) { + super({ id: '0', spaceIds: [] }, store, LoadPriority.high) } protected override async onLoaded() { - this.user.streams.memberships.subscribe( - (userMemberships) => { - this.onUserDataChanged(userMemberships) + this.userMemberships.subscribe( + (value) => { + this.onUserMembershipsChanged(value) }, { fireImediately: true }, ) } - getSpace(spaceId: string): Space | undefined { + getSpace(spaceId: string): Space { + check(isSpaceStreamId(spaceId), 'Invalid spaceId') + if (!this.spaces[spaceId]) { + this.spaces[spaceId] = new Space( + spaceId, + this.riverConnection, + this.store, + this.spaceDapp, + ) + } return this.spaces[spaceId] } - private onUserDataChanged(userData: PersistedModel) { - if (userData.status === 'loading') { + private onUserMembershipsChanged(value: PersistedModel) { + if (value.status === 'loading') { return } - const spaceIds = Object.values(userData.data.memberships) + + const spaceIds = Object.values(value.data.memberships) .filter((m) => isSpaceStreamId(m.streamId) && m.op === MembershipOp.SO_JOIN) .map((m) => m.streamId) @@ -53,8 +69,48 @@ export class Spaces extends PersistedObservable { for (const spaceId of spaceIds) { if (!this.spaces[spaceId]) { - this.spaces[spaceId] = new Space(spaceId, this.riverConnection, this.store) + this.spaces[spaceId] = new Space( + spaceId, + this.riverConnection, + this.store, + this.spaceDapp, + ) } } } + + async createSpace( + params: Partial> & { spaceName: string }, + signer: ethers.Signer, + ) { + const membershipInfo = + params.membership ?? + (await makeDefaultMembershipInfo(this.spaceDapp, this.riverConnection.userId)) + const channelName = params.channelName ?? 'general' + const transaction = await this.spaceDapp.createSpace( + { + spaceName: params.spaceName, + spaceMetadata: params.spaceMetadata ?? params.spaceName, + channelName: channelName, + membership: membershipInfo, + }, + signer, + ) + const receipt = await transaction.wait() + logger.log('transaction receipt', receipt) + const spaceAddress = this.spaceDapp.getSpaceAddress(receipt) + if (!spaceAddress) { + throw new Error('Space address not found') + } + logger.log('spaceAddress', spaceAddress) + const spaceId = makeSpaceStreamId(spaceAddress) + const defaultChannelId = makeDefaultChannelStreamId(spaceAddress) + logger.log('spaceId, defaultChannelId', { spaceId, defaultChannelId }) + await this.riverConnection.login({ spaceId }) + await this.riverConnection.call(async (client) => { + await client.createSpace(spaceId) + await client.createChannel(spaceId, channelName, '', defaultChannelId) + }) + return { spaceId, defaultChannelId } + } } diff --git a/packages/sdk/src/sync-agent/streams/models/streamConnectionStatus.ts b/packages/sdk/src/sync-agent/streams/models/streamConnectionStatus.ts new file mode 100644 index 000000000..dff1fb87c --- /dev/null +++ b/packages/sdk/src/sync-agent/streams/models/streamConnectionStatus.ts @@ -0,0 +1,5 @@ +export enum StreamConnectionStatus { + connecting = 'connecting', + connected = 'connected', + disconnected = 'disconnected', +} diff --git a/packages/sdk/src/sync-agent/syncAgent.test.ts b/packages/sdk/src/sync-agent/syncAgent.test.ts index 44f87d029..16acaf3be 100644 --- a/packages/sdk/src/sync-agent/syncAgent.test.ts +++ b/packages/sdk/src/sync-agent/syncAgent.test.ts @@ -1,43 +1,40 @@ /** * @group with-entitilements */ -import { AuthStatus } from './user/user' import { dlogger } from '@river-build/dlog' import { waitFor } from '../util.test' import { MembershipOp } from '@river-build/proto' -import { TestUser } from './utils/testUser.test' +import { Bot } from './utils/bot' +import { AuthStatus } from './river-connection/models/authStatus' const logger = dlogger('csb:test:syncAgent') describe('syncAgent.test.ts', () => { - const testUser = new TestUser() + const testUser = new Bot() test('syncAgent', async () => { const syncAgent = await testUser.makeSyncAgent() expect(syncAgent.user.value.status).toBe('loading') await syncAgent.start() expect(syncAgent.user.value.status).toBe('loaded') - expect(syncAgent.user.data.initialized).toBe(false) - expect(syncAgent.user.authStatus.value).toBe(AuthStatus.None) - expect(Object.keys(syncAgent.user.streams.memberships.data.memberships).length).toBe(0) + expect(syncAgent.riverConnection.authStatus.value).toBe(AuthStatus.Initializing) + expect(Object.keys(syncAgent.user.memberships.data.memberships).length).toBe(0) syncAgent.store.newTransactionGroup('createSpace') - const { spaceId, defaultChannelId } = await syncAgent.user.createSpace( + const { spaceId, defaultChannelId } = await syncAgent.spaces.createSpace( { spaceName: 'BlastOff' }, testUser.signer, ) logger.log('spaceId', spaceId) - expect(Object.keys(syncAgent.user.streams.memberships.data.memberships).length).toBe(2) - expect(syncAgent.user.streams.memberships.data.memberships[spaceId].op).toBe( + expect(Object.keys(syncAgent.user.memberships.data.memberships).length).toBe(2) + expect(syncAgent.user.memberships.data.memberships[spaceId].op).toBe(MembershipOp.SO_JOIN) + expect(syncAgent.user.memberships.data.memberships[defaultChannelId].op).toBe( MembershipOp.SO_JOIN, ) - expect(syncAgent.user.streams.memberships.data.memberships[defaultChannelId].op).toBe( - MembershipOp.SO_JOIN, - ) - expect(syncAgent.user.authStatus.value).toBe(AuthStatus.ConnectedToRiver) - expect(syncAgent.user.data.initialized).toBe(true) - expect(syncAgent.user.value.status).toBe('saving') + expect(syncAgent.riverConnection.authStatus.value).toBe(AuthStatus.ConnectedToRiver) + expect(syncAgent.user.memberships.data.initialized).toBe(true) + expect(syncAgent.user.value.status).toBe('loaded') await syncAgent.store.commitTransaction() - expect(syncAgent.user.value.status).toBe('saved') + expect(syncAgent.user.value.status).toBe('loaded') await syncAgent.stop() }) test('syncAgent loads again', async () => { @@ -45,10 +42,11 @@ describe('syncAgent.test.ts', () => { expect(syncAgent.user.value.status).toBe('loading') await syncAgent.start() expect(syncAgent.user.value.status).toBe('loaded') - expect(syncAgent.user.data.initialized).toBe(true) - expect(syncAgent.user.authStatus.value).toBe(AuthStatus.EvaluatingCredentials) + expect(syncAgent.user.memberships.value.status).toBe('loaded') + expect(syncAgent.user.memberships.data.initialized).toBe(true) + expect(syncAgent.riverConnection.authStatus.value).toBe(AuthStatus.ConnectingToRiver) await waitFor(() => { - expect(syncAgent.user.authStatus.value).toBe(AuthStatus.ConnectedToRiver) + expect(syncAgent.riverConnection.authStatus.value).toBe(AuthStatus.ConnectedToRiver) }) await syncAgent.stop() }) diff --git a/packages/sdk/src/sync-agent/syncAgent.ts b/packages/sdk/src/sync-agent/syncAgent.ts index 07844f57e..05a0336c2 100644 --- a/packages/sdk/src/sync-agent/syncAgent.ts +++ b/packages/sdk/src/sync-agent/syncAgent.ts @@ -1,5 +1,4 @@ -import { providers } from 'ethers' -import { RiverConnection } from './river-connection/riverConnection' +import { RiverConnection, RiverConnectionModel } from './river-connection/riverConnection' import { RiverConfig } from '../riverConfig' import { RiverRegistry, SpaceDapp } from '@river-build/web3' import { RetryParams } from '../makeStreamRpcClient' @@ -7,7 +6,7 @@ import { Store } from '../store/store' import { SignerContext } from '../signerContext' import { userIdFromAddress } from '../id' import { StreamNodeUrlsModel } from './river-connection/models/streamNodeUrls' -import { AuthStatus, User, UserModel } from './user/user' +import { User, UserModel } from './user/user' import { makeBaseProvider, makeRiverProvider } from './utils/providers' import { UserMembershipsModel } from './user/models/userMemberships' import { RiverDbManager } from '../riverDbManager' @@ -19,6 +18,7 @@ import { DB_MODELS, DB_VERSION } from './db' import { UserDeviceKeysModel } from './user/models/userDeviceKeys' import { UserSettingsModel } from './user/models/userSettings' import { Spaces, SpacesModel } from './spaces/spaces' +import { AuthStatus } from './river-connection/models/authStatus' export interface SyncAgentConfig { context: SignerContext @@ -31,10 +31,6 @@ export interface SyncAgentConfig { export class SyncAgent { userId: string config: SyncAgentConfig - baseProvider: providers.StaticJsonRpcProvider - riverProvider: providers.StaticJsonRpcProvider - spaceDapp: SpaceDapp - riverRegistryDapp: RiverRegistry riverConnection: RiverConnection store: Store user: User @@ -42,10 +38,11 @@ export class SyncAgent { // flattened observables - just pointers to the observable objects in the models observables: { + riverAuthStatus: Observable + riverConnection: PersistedObservable riverStreamNodeUrls: PersistedObservable spaces: PersistedObservable user: PersistedObservable - userAuthStatus: Observable userMemberships: PersistedObservable userInbox: PersistedObservable userDeviceKeys: PersistedObservable @@ -57,34 +54,36 @@ export class SyncAgent { this.config = config const base = config.riverConfig.base const river = config.riverConfig.river - this.baseProvider = makeBaseProvider(config.riverConfig) - this.riverProvider = makeRiverProvider(config.riverConfig) + const baseProvider = makeBaseProvider(config.riverConfig) + const riverProvider = makeRiverProvider(config.riverConfig) this.store = new Store(this.syncAgentDbName(), DB_VERSION, DB_MODELS) this.store.newTransactionGroup('SyncAgent::initalization') - this.spaceDapp = new SpaceDapp(base.chainConfig, this.baseProvider) - this.riverRegistryDapp = new RiverRegistry(river.chainConfig, this.riverProvider) - this.riverConnection = new RiverConnection(this.store, this.riverRegistryDapp, { + const spaceDapp = new SpaceDapp(base.chainConfig, baseProvider) + const riverRegistryDapp = new RiverRegistry(river.chainConfig, riverProvider) + this.riverConnection = new RiverConnection(this.store, spaceDapp, riverRegistryDapp, { signerContext: config.context, cryptoStore: RiverDbManager.getCryptoDb(this.userId, this.cryptoDbName()), - entitlementsDelegate: new Entitlements(this.config.riverConfig, this.spaceDapp), + entitlementsDelegate: new Entitlements(this.config.riverConfig, spaceDapp), persistenceStoreName: this.persistenceDbName(), logNamespaceFilter: undefined, highPriorityStreamIds: this.config.highPriorityStreamIds, rpcRetryParams: config.retryParams, }) - this.user = new User(this.userId, this.store, this.riverConnection, this.spaceDapp) - this.spaces = new Spaces(this.riverConnection, this.user, this.store) + + this.user = new User(this.userId, this.store, this.riverConnection) + this.spaces = new Spaces(this.store, this.riverConnection, this.user.memberships, spaceDapp) // flatten out the observables this.observables = { + riverAuthStatus: this.riverConnection.authStatus, + riverConnection: this.riverConnection, riverStreamNodeUrls: this.riverConnection.streamNodeUrls, spaces: this.spaces, user: this.user, - userAuthStatus: this.user.authStatus, - userMemberships: this.user.streams.memberships, - userInbox: this.user.streams.inbox, - userDeviceKeys: this.user.streams.deviceKeys, - userSettings: this.user.streams.settings, + userMemberships: this.user.memberships, + userInbox: this.user.inbox, + userDeviceKeys: this.user.deviceKeys, + userSettings: this.user.settings, } } diff --git a/packages/sdk/src/sync-agent/syncAgents.test.ts b/packages/sdk/src/sync-agent/syncAgents.test.ts index 67e6be920..15b171154 100644 --- a/packages/sdk/src/sync-agent/syncAgents.test.ts +++ b/packages/sdk/src/sync-agent/syncAgents.test.ts @@ -3,20 +3,19 @@ */ import { dlogger } from '@river-build/dlog' import { SyncAgent } from './syncAgent' -import { TestUser } from './utils/testUser.test' +import { Bot } from './utils/bot' +import { waitFor } from '../util.test' const logger = dlogger('csb:test:syncAgents') describe('syncAgents.test.ts', () => { logger.log('start') - let bobUser: TestUser - let aliceUser: TestUser + const bobUser = new Bot() + const aliceUser = new Bot() let bob: SyncAgent let alice: SyncAgent beforeEach(async () => { - bobUser = new TestUser() - aliceUser = new TestUser() bob = await bobUser.makeSyncAgent() alice = await aliceUser.makeSyncAgent() }) @@ -29,10 +28,49 @@ describe('syncAgents.test.ts', () => { test('syncAgents', async () => { await Promise.all([bob.start(), alice.start()]) - const { spaceId } = await bob.user.createSpace({ spaceName: 'BlastOff' }, bobUser.signer) - expect(bob.user.streams.memberships.isJoined(spaceId)).toBe(true) + const { spaceId } = await bob.spaces.createSpace({ spaceName: 'BlastOff' }, bobUser.signer) + expect(bob.user.memberships.isJoined(spaceId)).toBe(true) - await alice.user.joinSpace(spaceId, aliceUser.signer) - expect(alice.user.streams.memberships.isJoined(spaceId)).toBe(true) + await alice.spaces.getSpace(spaceId).join(aliceUser.signer) + expect(alice.user.memberships.isJoined(spaceId)).toBe(true) + }) + + test('syncAgents load async', async () => { + await bob.start() + + const { spaceId } = await bob.spaces.createSpace( + { spaceName: 'OuterSpace' }, + bobUser.signer, + ) + expect(bob.user.memberships.isJoined(spaceId)).toBe(true) + + // queue up a join, then start the client (wow!) + const alicePromise = alice.spaces.getSpace(spaceId).join(aliceUser.signer) + await alice.start() + await alicePromise + expect(alice.user.memberships.isJoined(spaceId)).toBe(true) + }) + + test('syncAgents send a message', async () => { + await Promise.all([bob.start(), alice.start()]) + await waitFor(() => bob.spaces.value.status === 'loaded') + expect(bob.spaces.data.spaceIds.length).toBeGreaterThan(0) + const spaceId = bob.spaces.data.spaceIds[0] + expect(alice.user.memberships.isJoined(spaceId)).toBe(true) // alice joined above + const space = bob.spaces.getSpace(spaceId) + const channel = space.getDefaultChannel() + await channel.sendMessage('Hello, World!') + expect(channel.timeline.events.value.find((e) => e.text === 'Hello, World!')).toBeDefined() + + // sleep for a bit, then check if alice got the message + const aliceChannel = alice.spaces.getSpace(spaceId).getChannel(channel.data.id) + logger.log(aliceChannel.timeline.events.value) + await waitFor( + () => + expect( + aliceChannel.timeline.events.value.find((e) => e.text === 'Hello, World!'), + ).toBeDefined(), + { timeoutMS: 10000 }, + ) }) }) diff --git a/packages/sdk/src/sync-agent/timeline/models/timelineEvent.ts b/packages/sdk/src/sync-agent/timeline/models/timelineEvent.ts new file mode 100644 index 000000000..2cf3727ab --- /dev/null +++ b/packages/sdk/src/sync-agent/timeline/models/timelineEvent.ts @@ -0,0 +1,55 @@ +import { StreamTimelineEvent } from '../../../types' + +export interface TimelineEvent { + eventId: string + text: string +} + +export function toEvent(timelineEvent: StreamTimelineEvent, _userId: string): TimelineEvent { + const eventId = timelineEvent.hashStr + + // temporary until we port the rest of the timeline transforms + return { + eventId, + text: getEventText(timelineEvent), + } +} + +// temportary until we port the rest of the timeline transforms +function getEventText(event: StreamTimelineEvent): string { + if (event.decryptedContent) { + if (event.decryptedContent.kind === 'channelMessage') { + if ( + event.decryptedContent.content.payload.case === 'post' && + event.decryptedContent.content.payload.value.content.case === 'text' + ) { + return event.decryptedContent.content.payload.value.content.value.body + } else { + return event.decryptedContent.content.payload.case ?? 'unset' // temp + } + } else if (event.decryptedContent.kind === 'text') { + return event.decryptedContent.content + } else { + return event.decryptedContent.kind + } + } + + if (event.localEvent) { + if ( + event.localEvent?.channelMessage?.payload.case === 'post' && + event.localEvent?.channelMessage?.payload.value.content.case === 'text' + ) { + return event.localEvent?.channelMessage?.payload.value.content.value.body + } else { + return event.localEvent?.channelMessage?.payload.case ?? 'unset' // temp + } + } + + if (event.remoteEvent) { + const k1 = event.remoteEvent.event.payload.case + const k2 = event.remoteEvent.event.payload.value?.content?.case ?? 'unset' + return `${k1}: ${k2}` + } + + return 'idk...' +} diff --git a/packages/sdk/src/sync-agent/timeline/models/timelineEvents.ts b/packages/sdk/src/sync-agent/timeline/models/timelineEvents.ts new file mode 100644 index 000000000..685676c66 --- /dev/null +++ b/packages/sdk/src/sync-agent/timeline/models/timelineEvents.ts @@ -0,0 +1,12 @@ +import { Observable } from '../../../observable/observable' +import { TimelineEvent } from './timelineEvent' + +export class TimelineEvents extends Observable { + constructor(initialValue: TimelineEvent[] = []) { + super(initialValue) + } + + update(fn: (current: TimelineEvent[]) => TimelineEvent[]): void { + this.setValue(fn(this.value)) + } +} diff --git a/packages/sdk/src/sync-agent/timeline/timeline.ts b/packages/sdk/src/sync-agent/timeline/timeline.ts new file mode 100644 index 000000000..341232e91 --- /dev/null +++ b/packages/sdk/src/sync-agent/timeline/timeline.ts @@ -0,0 +1,113 @@ +import { SnapshotCaseType } from '@river-build/proto' +import { Stream } from '../../stream' +import { StreamChange } from '../../streamEvents' +import { TimelineEvent, toEvent } from './models/timelineEvent' +import { LocalTimelineEvent } from '../../types' +import { TimelineEvents } from './models/timelineEvents' + +export class Timeline { + events = new TimelineEvents() + filterFn: (event: TimelineEvent, kind: SnapshotCaseType) => boolean = (_event, _kind) => { + return true + } + constructor(private userId: string) { + // + } + + initialize(stream: Stream) { + stream.off('streamUpdated', this.onStreamUpdated) + stream.off('streamLocalEventUpdated', this.onStreamLocalEventUpdated) + stream.on('streamUpdated', this.onStreamUpdated) + stream.on('streamLocalEventUpdated', this.onStreamLocalEventUpdated) + const events = stream.view.timeline + .map((event) => toEvent(event, this.userId)) + .filter((event) => this.filterFn(event, stream.view.contentKind)) + this.events.setValue(events) + } + + private onStreamUpdated = (_streamId: string, kind: SnapshotCaseType, change: StreamChange) => { + const { prepended, appended, updated, confirmed } = change + if (prepended) { + const events = prepended + .map((event) => toEvent(event, this.userId)) + .filter((event) => this.filterFn(event, kind)) + this.prependEvents(events, this.userId) + } + if (appended) { + const events = appended + .map((event) => toEvent(event, this.userId)) + .filter((event) => this.filterFn(event, kind)) + this.appendEvents(events, this.userId) + } + if (updated) { + const events = updated + .map((event) => toEvent(event, this.userId)) + .filter((event) => this.filterFn(event, kind)) + this.updateEvents(events, this.userId) + } + if (confirmed) { + const confirmations = confirmed.map((event) => ({ + eventId: event.hashStr, + confirmedInBlockNum: event.miniblockNum, + confirmedEventNum: event.confirmedEventNum, + })) + this.confirmEvents(confirmations) + } + } + + private onStreamLocalEventUpdated = ( + _streamId: string, + kind: SnapshotCaseType, + localEventId: string, + localEvent: LocalTimelineEvent, + ) => { + const event = toEvent(localEvent, this.userId) + if (this.filterFn(event, kind)) { + this.updateEvent(event, this.userId, localEventId) + } + } + + private prependEvents(events: TimelineEvent[], _userId: string) { + this.events.update((current) => [...events, ...current]) + } + + private appendEvents(events: TimelineEvent[], _userId: string) { + this.events.update((current) => [...current, ...events]) + } + + private updateEvents(events: TimelineEvent[], _userId: string) { + this.events.update((current) => { + const newEvents = [...current] + for (const event of events) { + const index = current.findIndex((e) => e.eventId === event.eventId) + if (index !== -1) { + newEvents[index] = event + } + } + return newEvents + }) + } + + private updateEvent(event: TimelineEvent, userId: string, eventId: string) { + this.events.update((current) => { + const index = current.findIndex((e) => e.eventId === eventId) + if (index !== -1) { + const newEvents = [...current] + newEvents[index] = event + return newEvents + } else { + return current + } + }) + } + + private confirmEvents( + _confirmations: { + eventId: string + confirmedInBlockNum: bigint + confirmedEventNum: bigint + }[], + ) { + // + } +} diff --git a/packages/sdk/src/sync-agent/user/models/userDeviceKeys.ts b/packages/sdk/src/sync-agent/user/models/userDeviceKeys.ts index 183c500ca..7ec701607 100644 --- a/packages/sdk/src/sync-agent/user/models/userDeviceKeys.ts +++ b/packages/sdk/src/sync-agent/user/models/userDeviceKeys.ts @@ -4,7 +4,7 @@ import { UserDevice } from '@river-build/encryption' import { PersistedObservable, persistedObservable } from '../../../observable/persistedObservable' import { makeUserDeviceKeyStreamId } from '../../../id' import { RiverConnection } from '../../river-connection/riverConnection' -import { StreamStateView } from '../../../streamStateView' +import { IStreamStateView } from '../../../streamStateView' import { isDefined } from '../../../check' import { Client } from '../../../client' @@ -68,7 +68,7 @@ export class UserDeviceKeys extends PersistedObservable { } } - private initialize(deviceId: string, streamView: StreamStateView) { + private initialize(deviceId: string, streamView: IStreamStateView) { this.setData({ initialized: true, deviceId, diff --git a/packages/sdk/src/sync-agent/user/models/userInbox.ts b/packages/sdk/src/sync-agent/user/models/userInbox.ts index db088e16c..e4ed97268 100644 --- a/packages/sdk/src/sync-agent/user/models/userInbox.ts +++ b/packages/sdk/src/sync-agent/user/models/userInbox.ts @@ -5,7 +5,7 @@ import { PersistedObservable, persistedObservable } from '../../../observable/pe import { makeUserInboxStreamId } from '../../../id' import { RiverConnection } from '../../river-connection/riverConnection' import { Client } from '../../../client' -import { StreamStateView } from '../../../streamStateView' +import { IStreamStateView } from '../../../streamStateView' import { isDefined } from '../../../check' const logger = dlogger('csb:userInbox') @@ -43,13 +43,10 @@ export class UserInbox extends PersistedObservable { this.setData({ deviceId }) } } - client.addListener('userInboxDeviceSummaryUpdated', this.onUserInboxDeviceSummaryUpdated) + client.addListener('userInboxDeviceSummaryUpdated', this.onDeviceSummaryUpdated) client.addListener('streamInitialized', this.onStreamInitialized) return () => { - client.removeListener( - 'userInboxDeviceSummaryUpdated', - this.onUserInboxDeviceSummaryUpdated, - ) + client.removeListener('userInboxDeviceSummaryUpdated', this.onDeviceSummaryUpdated) client.removeListener('streamInitialized', this.onStreamInitialized) } } @@ -64,7 +61,7 @@ export class UserInbox extends PersistedObservable { } } - private onUserInboxDeviceSummaryUpdated = ( + private onDeviceSummaryUpdated = ( streamId: string, deviceId: string, deviceSummary: UserInboxPayload_Snapshot_DeviceSummary, @@ -75,7 +72,7 @@ export class UserInbox extends PersistedObservable { } } - private initialize(deviceId: string, streamView: StreamStateView) { + private initialize(deviceId: string, streamView: IStreamStateView) { this.setData({ initialized: true, deviceId, diff --git a/packages/sdk/src/sync-agent/user/models/userMemberships.ts b/packages/sdk/src/sync-agent/user/models/userMemberships.ts index e5fa3e330..82a9873bd 100644 --- a/packages/sdk/src/sync-agent/user/models/userMemberships.ts +++ b/packages/sdk/src/sync-agent/user/models/userMemberships.ts @@ -5,7 +5,7 @@ import { check, dlogger } from '@river-build/dlog' import { RiverConnection } from '../../river-connection/riverConnection' import { Client } from '../../../client' import { makeUserStreamId, streamIdFromBytes, userIdFromAddress } from '../../../id' -import { StreamStateView } from '../../../streamStateView' +import { IStreamStateView } from '../../../streamStateView' import { isDefined } from '../../../check' const logger = dlogger('csb:userMemberships') @@ -67,7 +67,7 @@ export class UserMemberships extends PersistedObservable { return this.isMember(streamId, MembershipOp.SO_JOIN) } - private initialize = (streamView: StreamStateView) => { + private initialize = (streamView: IStreamStateView) => { const memberships = Object.entries(streamView.userContent.streamMemberships).reduce( (acc, [streamId, payload]) => { acc[streamId] = toUserMembership(payload) diff --git a/packages/sdk/src/sync-agent/user/models/userSettings.ts b/packages/sdk/src/sync-agent/user/models/userSettings.ts index dd35acc33..5d3f9b74c 100644 --- a/packages/sdk/src/sync-agent/user/models/userSettings.ts +++ b/packages/sdk/src/sync-agent/user/models/userSettings.ts @@ -3,7 +3,7 @@ import { Identifiable, LoadPriority, Store } from '../../../store/store' import { PersistedObservable, persistedObservable } from '../../../observable/persistedObservable' import { RiverConnection } from '../../river-connection/riverConnection' import { makeUserSettingsStreamId } from '../../../id' -import { StreamStateView } from '../../../streamStateView' +import { IStreamStateView } from '../../../streamStateView' import { Client } from '../../../client' import { isDefined } from '../../../check' @@ -49,7 +49,7 @@ export class UserSettings extends PersistedObservable { } } - private initialize = (_streamView: StreamStateView) => { + private initialize = (_streamView: IStreamStateView) => { this.setData({ initialized: true }) } } diff --git a/packages/sdk/src/sync-agent/user/user.test.ts b/packages/sdk/src/sync-agent/user/user.test.ts index 76bd7e9ad..dae98451e 100644 --- a/packages/sdk/src/sync-agent/user/user.test.ts +++ b/packages/sdk/src/sync-agent/user/user.test.ts @@ -4,54 +4,57 @@ */ import { dlogger } from '@river-build/dlog' -import { TestUser } from '../utils/testUser.test' +import { Bot } from '../utils/bot' const logger = dlogger('csb:test:user') describe('User.test.ts', () => { logger.log('start') - const testUser = new TestUser() + const testUser = new Bot() test('User initializes', async () => { const syncAgent = await testUser.makeSyncAgent() + const riverConnection = syncAgent.riverConnection const user = syncAgent.user + const spaces = syncAgent.spaces expect(user.data.id).toBe(testUser.userId) - expect(user.data.initialized).toBe(false) - expect(user.streams.memberships.data.initialized).toBe(false) - expect(user.streams.inbox.data.initialized).toBe(false) - expect(user.streams.deviceKeys.data.initialized).toBe(false) - expect(user.streams.settings.data.initialized).toBe(false) + expect(riverConnection.data.userExists).toBe(false) + expect(user.memberships.data.initialized).toBe(false) + expect(user.inbox.data.initialized).toBe(false) + expect(user.deviceKeys.data.initialized).toBe(false) + expect(user.settings.data.initialized).toBe(false) await syncAgent.start() expect(user.data.id).toBe(testUser.userId) - expect(user.data.initialized).toBe(false) - expect(user.streams.memberships.data.initialized).toBe(false) - expect(user.streams.inbox.data.initialized).toBe(false) - expect(user.streams.deviceKeys.data.initialized).toBe(false) - expect(user.streams.settings.data.initialized).toBe(false) + expect(riverConnection.data.userExists).toBe(false) + expect(user.memberships.data.initialized).toBe(false) + expect(user.inbox.data.initialized).toBe(false) + expect(user.deviceKeys.data.initialized).toBe(false) + expect(user.settings.data.initialized).toBe(false) - const { spaceId } = await user.createSpace({ spaceName: 'bobs-space' }, testUser.signer) + const { spaceId } = await spaces.createSpace({ spaceName: 'bobs-space' }, testUser.signer) logger.log('created spaceId', spaceId) - expect(user.data.initialized).toBe(true) - expect(user.streams.memberships.data.initialized).toBe(true) - expect(user.streams.inbox.data.initialized).toBe(true) - expect(user.streams.deviceKeys.data.initialized).toBe(true) - expect(user.streams.settings.data.initialized).toBe(true) + expect(riverConnection.data.userExists).toBe(true) + expect(user.memberships.data.initialized).toBe(true) + expect(user.inbox.data.initialized).toBe(true) + expect(user.deviceKeys.data.initialized).toBe(true) + expect(user.settings.data.initialized).toBe(true) await syncAgent.stop() }) test('User loads from db', async () => { const syncAgent = await testUser.makeSyncAgent() + const riverConnection = syncAgent.riverConnection const user = syncAgent.user expect(user.value.status).toBe('loading') await syncAgent.start() expect(user.value.status).toBe('loaded') - expect(user.data.initialized).toBe(true) - expect(user.streams.memberships.data.initialized).toBe(true) - expect(user.streams.inbox.data.initialized).toBe(true) - expect(user.streams.deviceKeys.data.initialized).toBe(true) - expect(user.streams.settings.data.initialized).toBe(true) + expect(riverConnection.data.userExists).toBe(true) + expect(user.memberships.data.initialized).toBe(true) + expect(user.inbox.data.initialized).toBe(true) + expect(user.deviceKeys.data.initialized).toBe(true) + expect(user.settings.data.initialized).toBe(true) await syncAgent.stop() }) }) diff --git a/packages/sdk/src/sync-agent/user/user.ts b/packages/sdk/src/sync-agent/user/user.ts index 9d86b815d..c811d1bd7 100644 --- a/packages/sdk/src/sync-agent/user/user.ts +++ b/packages/sdk/src/sync-agent/user/user.ts @@ -1,6 +1,3 @@ -import { dlogger } from '@river-build/dlog' -import { Client } from '../../client' -import { Observable } from '../../observable/observable' import { PersistedObservable, persistedObservable } from '../../observable/persistedObservable' import { LoadPriority, Store } from '../../store/store' import { RiverConnection } from '../river-connection/riverConnection' @@ -8,190 +5,23 @@ import { UserDeviceKeys } from './models/userDeviceKeys' import { UserInbox } from './models/userInbox' import { UserMemberships } from './models/userMemberships' import { UserSettings } from './models/userSettings' -import { CreateSpaceParams, SpaceDapp } from '@river-build/web3' -import { ethers } from 'ethers' -import { makeDefaultChannelStreamId, makeSpaceStreamId } from '../../id' -import { makeDefaultMembershipInfo } from '../utils/spaceUtils' - -const logger = dlogger('csb:user') export interface UserModel { id: string - initialized: boolean -} - -export enum AuthStatus { - /** User is not authenticated or connected to the river client. */ - None = 'None', - /** Transition state: None -> EvaluatingCredentials -> [Credentialed OR ConnectedToRiver] - * if a river user is found, will connect to river client, otherwise will just validate credentials. - */ - EvaluatingCredentials = 'EvaluatingCredentials', - /** User authenticated with a valid credential but without an active river stream client. */ - Credentialed = 'Credentialed', - /** User authenticated with a valid credential and with an active river river client. */ - ConnectedToRiver = 'ConnectedToRiver', - /** Disconnected, client was stopped */ - Disconnected = 'Disconnected', - /** Error state: User failed to authenticate or connect to river client. */ - Error = 'Error', -} - -class LoginContext { - constructor( - public client: Client, - public newUserMetadata?: { spaceId: Uint8Array | string }, - public cancelled: boolean = false, - ) {} } @persistedObservable({ tableName: 'user' }) export class User extends PersistedObservable { - streams: { - memberships: UserMemberships - inbox: UserInbox - deviceKeys: UserDeviceKeys - settings: UserSettings + memberships: UserMemberships + inbox: UserInbox + deviceKeys: UserDeviceKeys + settings: UserSettings + + constructor(id: string, store: Store, riverConnection: RiverConnection) { + super({ id }, store, LoadPriority.high) + this.memberships = new UserMemberships(id, store, riverConnection) + this.inbox = new UserInbox(id, store, riverConnection) + this.deviceKeys = new UserDeviceKeys(id, store, riverConnection) + this.settings = new UserSettings(id, store, riverConnection) } - authStatus = new Observable(AuthStatus.None) - loginError?: Error - private riverConnection: RiverConnection - private spaceDapp: SpaceDapp - private loginPromise?: { promise: Promise; context: LoginContext } - - constructor(id: string, store: Store, riverConnection: RiverConnection, spaceDapp: SpaceDapp) { - super({ id, initialized: false }, store, LoadPriority.high) - this.streams = { - memberships: new UserMemberships(id, store, riverConnection), - inbox: new UserInbox(id, store, riverConnection), - deviceKeys: new UserDeviceKeys(id, store, riverConnection), - settings: new UserSettings(id, store, riverConnection), - } - this.riverConnection = riverConnection - this.spaceDapp = spaceDapp - } - - protected override async onLoaded() { - this.riverConnection.registerView(this.onClientStarted) - } - - private onClientStarted = (client: Client) => { - const loginContext = new LoginContext(client) - this.loginWithRetries(loginContext).catch((err) => { - logger.error('login failed', err) - }) - return () => { - loginContext.cancelled = true - this.authStatus.setValue(AuthStatus.Disconnected) - } - } - - async createSpace( - params: Partial> & { spaceName: string }, - signer: ethers.Signer, - ) { - const membershipInfo = - params.membership ?? (await makeDefaultMembershipInfo(this.spaceDapp, this.data.id)) - const channelName = params.channelName ?? 'general' - const transaction = await this.spaceDapp.createSpace( - { - spaceName: params.spaceName, - spaceMetadata: params.spaceMetadata ?? params.spaceName, - channelName: channelName, - membership: membershipInfo, - }, - signer, - ) - const receipt = await transaction.wait() - logger.log('transaction receipt', receipt) - const spaceAddress = this.spaceDapp.getSpaceAddress(receipt) - if (!spaceAddress) { - throw new Error('Space address not found') - } - logger.log('spaceAddress', spaceAddress) - const spaceId = makeSpaceStreamId(spaceAddress) - const defaultChannelId = makeDefaultChannelStreamId(spaceAddress) - logger.log('spaceId, defaultChannelId', { spaceId, defaultChannelId }) - await this.riverConnection.call(async (client) => { - logger.log('createSpace with client') - const context = new LoginContext(client, { spaceId }) - await this.loginWithRetries(context) - await client.createSpace(spaceId) - await client.createChannel(spaceId, channelName, '', defaultChannelId) - }) - return { spaceId, defaultChannelId } - } - - async joinSpace( - spaceId: string, - signer: ethers.Signer, - opts?: { skipMintMembership?: boolean }, - ) { - if (opts?.skipMintMembership !== true) { - const { issued } = await this.spaceDapp.joinSpace(spaceId, this.data.id, signer) - logger.log('joinSpace transaction', issued) - } - await this.riverConnection.call(async (client) => { - const context = new LoginContext(client, { spaceId }) - await this.loginWithRetries(context) - await client.joinStream(spaceId) - await client.joinStream(makeDefaultChannelStreamId(spaceId)) - }) - } - - private async loginWithRetries(loginContext: LoginContext) { - logger.log('login') - if (this.loginPromise) { - this.loginPromise.context.cancelled = true - await this.loginPromise.promise - } - if (this.authStatus.value === AuthStatus.ConnectedToRiver) { - return - } - this.authStatus.setValue(AuthStatus.EvaluatingCredentials) - const login = async () => { - let retryCount = 0 - const MAX_RETRY_COUNT = 20 - while (!loginContext.cancelled) { - try { - logger.log('logging in') - const canInitialize = - this.data.initialized || - loginContext.newUserMetadata || - (await loginContext.client.userExists(this.data.id)) - if (canInitialize) { - await loginContext.client.initializeUser(loginContext.newUserMetadata) - loginContext.client.startSync() - this.setData({ initialized: true }) - this.authStatus.setValue(AuthStatus.ConnectedToRiver) - } else { - this.authStatus.setValue(AuthStatus.Credentialed) - } - break - } catch (err) { - retryCount++ - this.loginError = err as Error - logger.log('encountered exception while initializing', err) - if (retryCount >= MAX_RETRY_COUNT) { - this.authStatus.setValue(AuthStatus.Error) - throw err - } else { - const retryDelay = getRetryDelay(retryCount) - logger.log('retrying', { retryDelay, retryCount }) - // sleep - await new Promise((resolve) => setTimeout(resolve, retryDelay)) - } - } finally { - this.loginPromise = undefined - } - } - } - this.loginPromise = { promise: login(), context: loginContext } - return this.loginPromise.promise - } -} - -// exponentially back off, but never wait more than 20 seconds -function getRetryDelay(retryCount: number) { - return Math.min(1000 * 2 ** retryCount, 20000) } diff --git a/packages/sdk/src/sync-agent/utils/bot.ts b/packages/sdk/src/sync-agent/utils/bot.ts new file mode 100644 index 000000000..38a0b8635 --- /dev/null +++ b/packages/sdk/src/sync-agent/utils/bot.ts @@ -0,0 +1,40 @@ +import { RiverConfig, makeRiverConfig } from '../../riverConfig' +import { ethers } from 'ethers' +import { LocalhostWeb3Provider } from '@river-build/web3' +import { makeSignerContext } from '../../signerContext' +import { SyncAgent } from '../syncAgent' + +export class Bot { + riverConfig: RiverConfig + rootWallet: ethers.Wallet + delegateWallet: ethers.Wallet + web3Provider: LocalhostWeb3Provider + + constructor(rootWallet?: ethers.Wallet, riverConfig?: RiverConfig) { + this.riverConfig = riverConfig || makeRiverConfig() + this.rootWallet = rootWallet || ethers.Wallet.createRandom() + this.delegateWallet = ethers.Wallet.createRandom() + this.web3Provider = new LocalhostWeb3Provider(this.riverConfig.base.rpcUrl, this.rootWallet) + } + + get userId() { + return this.rootWallet.address + } + + get signer(): ethers.Signer { + return this.web3Provider.signer + } + + async makeSyncAgent(opts?: { deviceId?: string }) { + await this.web3Provider.fundWallet() + const signerContext = await makeSignerContext(this.rootWallet, this.delegateWallet, { + days: 1, + }) + const syncAgent = new SyncAgent({ + context: signerContext, + riverConfig: this.riverConfig, + deviceId: opts?.deviceId, + }) + return syncAgent + } +} diff --git a/packages/sdk/src/sync-agent/utils/testUser.test.ts b/packages/sdk/src/sync-agent/utils/testUser.test.ts deleted file mode 100644 index abe615c51..000000000 --- a/packages/sdk/src/sync-agent/utils/testUser.test.ts +++ /dev/null @@ -1,29 +0,0 @@ -import { makeRiverConfig } from '../../riverConfig' -import { ethers } from 'ethers' -import { LocalhostWeb3Provider } from '@river-build/web3' -import { makeSignerContext } from '../../signerContext' -import { SyncAgent } from '../syncAgent' - -export class TestUser { - riverConfig = makeRiverConfig() - rootWallet = ethers.Wallet.createRandom() - delegateWallet = ethers.Wallet.createRandom() - web3Provider = new LocalhostWeb3Provider(this.riverConfig.base.rpcUrl, this.rootWallet) - - get userId() { - return this.rootWallet.address - } - - get signer(): ethers.Signer { - return this.web3Provider.signer - } - - async makeSyncAgent() { - await this.web3Provider.fundWallet() - const signerContext = await makeSignerContext(this.rootWallet, this.delegateWallet, { - days: 1, - }) - const syncAgent = new SyncAgent({ context: signerContext, riverConfig: this.riverConfig }) - return syncAgent - } -} diff --git a/packages/sdk/src/util.test.ts b/packages/sdk/src/util.test.ts index dd5881ed5..4170af967 100644 --- a/packages/sdk/src/util.test.ts +++ b/packages/sdk/src/util.test.ts @@ -11,7 +11,7 @@ import { SyncOp, } from '@river-build/proto' import { PlainMessage } from '@bufbuild/protobuf' -import { StreamStateView } from './streamStateView' +import { IStreamStateView } from './streamStateView' import { Client } from './client' import { makeBaseChainConfig, makeRiverChainConfig } from './riverConfig' import { @@ -344,7 +344,7 @@ export async function createSpaceAndDefaultChannel( ): Promise<{ spaceId: string defaultChannelId: string - userStreamView: StreamStateView + userStreamView: IStreamStateView }> { const transaction = await spaceDapp.createSpace( {