From 48387c45e2e060cfe8c0bd3ef167d317c1fbadee Mon Sep 17 00:00:00 2001 From: texuf Date: Fri, 26 Jul 2024 13:40:25 -0700 Subject: [PATCH] HNT-9124 Add SYNC_DOWN message to synced streams (#444) --- packages/sdk/src/client.ts | 4 + packages/sdk/src/syncedStreams.test.ts | 132 +++++++++++++++++++------ packages/sdk/src/syncedStreams.ts | 61 +++++++++++- 3 files changed, 164 insertions(+), 33 deletions(-) diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index 4cb3735e3..bde274489 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -2127,4 +2127,8 @@ export class Client const jsonStr = event.toJsonString() await this.rpcClient.info({ debug: ['add_event', streamId, jsonStr] }) } + + public async debugDropStream(syncId: string, streamId: string): Promise { + await this.rpcClient.info({ debug: ['drop_stream', syncId, streamId] }) + } } diff --git a/packages/sdk/src/syncedStreams.test.ts b/packages/sdk/src/syncedStreams.test.ts index f2ef56bb0..a56f0b2fd 100644 --- a/packages/sdk/src/syncedStreams.test.ts +++ b/packages/sdk/src/syncedStreams.test.ts @@ -5,15 +5,17 @@ /* eslint-disable jest/no-commented-out-tests */ import { makeEvent, unpackStream } from './sign' import { SyncState, SyncedStreams, stateConstraints } from './syncedStreams' -import { makeDonePromise, makeRandomUserContext, makeTestRpcClient } from './util.test' -import { makeUserStreamId, streamIdToBytes, userIdFromAddress } from './id' -import { make_UserPayload_Inception } from './types' +import { makeDonePromise, makeRandomUserContext, makeTestRpcClient, waitFor } from './util.test' +import { makeUserInboxStreamId, streamIdToBytes, userIdFromAddress } from './id' +import { make_UserInboxPayload_Ack, make_UserInboxPayload_Inception } from './types' import { dlog } from '@river-build/dlog' import TypedEmitter from 'typed-emitter' import EventEmitter from 'events' import { StreamEvents } from './streamEvents' import { SyncedStream } from './syncedStream' import { StubPersistenceStore } from './persistenceStore' +import { PartialMessage, PlainMessage } from '@bufbuild/protobuf' +import { Envelope, StreamEvent } from '@river-build/proto' const log = dlog('csb:test:syncedStreams') @@ -40,46 +42,114 @@ describe('syncStreams', () => { test('starting->syncing->canceling->notSyncing', async () => { log('starting->syncing->canceling->notSyncing') - /** Arrange */ - const alice = await makeTestRpcClient() - const done1 = makeDonePromise() - const alicesContext = await makeRandomUserContext() - - const alicesUserId = userIdFromAddress(alicesContext.creatorAddress) - const alicesUserStreamIdStr = makeUserStreamId(alicesUserId) - const alicesUserStreamId = streamIdToBytes(alicesUserStreamIdStr) - // create account for alice - const aliceUserStream = await alice.createStream({ - events: [ - await makeEvent( - alicesContext, - make_UserPayload_Inception({ - streamId: alicesUserStreamId, - }), - ), - ], - streamId: alicesUserStreamId, - }) - const { streamAndCookie } = await unpackStream(aliceUserStream.stream) + // globals setup + const stubPersistenceStore = new StubPersistenceStore() + const done1 = makeDonePromise() const mockClientEmitter = new EventEmitter() as TypedEmitter - mockClientEmitter.on('streamSyncActive', (isActive) => { + mockClientEmitter.on('streamSyncActive', (isActive: boolean) => { if (isActive) { done1.done() } }) - const alicesSyncedStreams = new SyncedStreams(alicesUserId, alice, mockClientEmitter) - const stream = new SyncedStream( + // alice setup + const rpcClient = await makeTestRpcClient() + const alicesContext = await makeRandomUserContext() + const alicesUserId = userIdFromAddress(alicesContext.creatorAddress) + const alicesSyncedStreams = new SyncedStreams(alicesUserId, rpcClient, mockClientEmitter) + + // some helper functions + const createStream = async (streamId: Uint8Array, events: PartialMessage[]) => { + const streamResponse = await rpcClient.createStream({ + events, + streamId, + }) + const response = await unpackStream(streamResponse.stream) + return response + } + + // user inbox stream setup + const alicesUserInboxStreamIdStr = makeUserInboxStreamId(alicesUserId) + const alicesUserInboxStreamId = streamIdToBytes(alicesUserInboxStreamIdStr) + const userInboxStreamResponse = await createStream(alicesUserInboxStreamId, [ + await makeEvent( + alicesContext, + make_UserInboxPayload_Inception({ + streamId: alicesUserInboxStreamId, + }), + ), + ]) + const userInboxStream = new SyncedStream( alicesUserId, - alicesUserStreamIdStr, + alicesUserInboxStreamIdStr, mockClientEmitter, log, - new StubPersistenceStore(), + stubPersistenceStore, ) + await userInboxStream.initializeFromResponse(userInboxStreamResponse) + await alicesSyncedStreams.startSyncStreams() await done1.promise - alicesSyncedStreams.set(alicesUserStreamIdStr, stream) - await alicesSyncedStreams.addStreamToSync(streamAndCookie.nextSyncCookie) + + alicesSyncedStreams.set(alicesUserInboxStreamIdStr, userInboxStream) + await alicesSyncedStreams.addStreamToSync(userInboxStream.view.syncCookie!) + + // some helper functions + const addEvent = async (payload: PlainMessage['payload']) => { + await rpcClient.addEvent({ + streamId: alicesUserInboxStreamId, + event: await makeEvent( + alicesContext, + payload, + userInboxStreamResponse.streamAndCookie.miniblocks[0].hash, + ), + }) + } + + // post an ack (easiest way to put a string in a stream) + await addEvent( + make_UserInboxPayload_Ack({ + deviceKey: 'numero uno', + miniblockNum: 1n, + }), + ) + + // make sure it shows up + await waitFor(() => + expect( + userInboxStream.view.timeline.find( + (e) => + e.remoteEvent?.event.payload.case === 'userInboxPayload' && + e.remoteEvent?.event.payload.value.content.case === 'ack' && + e.remoteEvent?.event.payload.value.content.value.deviceKey === 'numero uno', + ), + ).toBeDefined(), + ) + + // drop the stream + await rpcClient.info({ + debug: ['drop_stream', alicesSyncedStreams.getSyncId()!, alicesUserInboxStreamIdStr], + }) + + // add second event + await addEvent( + make_UserInboxPayload_Ack({ + deviceKey: 'numero dos', + miniblockNum: 1n, + }), + ) + + // make sure it shows up + await waitFor(() => + expect( + userInboxStream.view.timeline.find( + (e) => + e.remoteEvent?.event.payload.case === 'userInboxPayload' && + e.remoteEvent?.event.payload.value.content.case === 'ack' && + e.remoteEvent?.event.payload.value.content.value.deviceKey === 'numero dos', + ), + ).toBeDefined(), + ) await alicesSyncedStreams.stopSync() }) diff --git a/packages/sdk/src/syncedStreams.ts b/packages/sdk/src/syncedStreams.ts index f48380200..4e9b33f87 100644 --- a/packages/sdk/src/syncedStreams.ts +++ b/packages/sdk/src/syncedStreams.ts @@ -5,7 +5,7 @@ import { unpackStream, unpackStreamAndCookie } from './sign' import { SyncedStreamEvents } from './streamEvents' import { SyncedStream } from './syncedStream' import TypedEmitter from 'typed-emitter' -import { isDefined } from './check' +import { isDefined, logNever } from './check' import { nanoid } from 'nanoid' import { isMobileSafari } from './utils' import { streamIdAsBytes, streamIdAsString } from './id' @@ -147,6 +147,10 @@ export class SyncedStreams { return this.streams.size } + public getSyncId(): string | undefined { + return this.syncId + } + public getStreams(): SyncedStream[] { return Array.from(this.streams.values()) } @@ -458,8 +462,12 @@ export class SyncedStreams { this.printNonces() } break + case SyncOp.SYNC_DOWN: + this.syncDown(value.streamId) + break default: - this.log( + logNever( + value.syncOp, `unknown syncOp { syncId: ${this.syncId}, syncOp: ${value.syncOp} }`, ) break @@ -589,6 +597,55 @@ export class SyncedStreams { } } + private syncDown( + streamId: Uint8Array, + retryParams?: { syncId: string; retryCount: number }, + ): void { + if (this.syncId === undefined) { + return + } + if (retryParams !== undefined && retryParams.syncId !== this.syncId) { + return + } + if (streamId === undefined || streamId.length === 0) { + this.logError('syncDown: streamId is empty') + return + } + if (this.syncState !== SyncState.Syncing) { + this.logError('syncDown: invalid state transition', this.syncState) + return + } + const stream = this.streams.get(streamIdAsString(streamId)) + if (!stream) { + this.log('syncDown: stream not found', streamIdAsString(streamId)) + return + } + const cookie = stream.view.syncCookie + if (!cookie) { + this.logError('syncDown: syncCookie not found', streamIdAsString(streamId)) + return + } + const syncId = this.syncId + const retryCount = retryParams?.retryCount ?? 0 + this.addStreamToSync(cookie).catch((err) => { + const retryDelay = Math.min(1000 * Math.pow(2, retryCount), 60000) + this.logError( + 'syncDown: addStreamToSync error', + err, + 'retryParams', + retryParams, + 'retryDelay', + retryDelay, + ) + setTimeout(() => { + this.syncDown(streamId, { + syncId, + retryCount: retryCount + 1, + }) + }, retryDelay) + }) + } + private syncClosed() { this.stopPing() if (this.syncState === SyncState.Canceling) {