Skip to content

Commit

Permalink
HNT-9124 Add SYNC_DOWN message to synced streams (#444)
Browse files Browse the repository at this point in the history
  • Loading branch information
texuf authored Jul 26, 2024
1 parent 72b9d74 commit 48387c4
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 33 deletions.
4 changes: 4 additions & 0 deletions packages/sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2127,4 +2127,8 @@ export class Client
const jsonStr = event.toJsonString()
await this.rpcClient.info({ debug: ['add_event', streamId, jsonStr] })
}

public async debugDropStream(syncId: string, streamId: string): Promise<void> {
await this.rpcClient.info({ debug: ['drop_stream', syncId, streamId] })
}
}
132 changes: 101 additions & 31 deletions packages/sdk/src/syncedStreams.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@
/* eslint-disable jest/no-commented-out-tests */
import { makeEvent, unpackStream } from './sign'
import { SyncState, SyncedStreams, stateConstraints } from './syncedStreams'
import { makeDonePromise, makeRandomUserContext, makeTestRpcClient } from './util.test'
import { makeUserStreamId, streamIdToBytes, userIdFromAddress } from './id'
import { make_UserPayload_Inception } from './types'
import { makeDonePromise, makeRandomUserContext, makeTestRpcClient, waitFor } from './util.test'
import { makeUserInboxStreamId, streamIdToBytes, userIdFromAddress } from './id'
import { make_UserInboxPayload_Ack, make_UserInboxPayload_Inception } from './types'
import { dlog } from '@river-build/dlog'
import TypedEmitter from 'typed-emitter'
import EventEmitter from 'events'
import { StreamEvents } from './streamEvents'
import { SyncedStream } from './syncedStream'
import { StubPersistenceStore } from './persistenceStore'
import { PartialMessage, PlainMessage } from '@bufbuild/protobuf'
import { Envelope, StreamEvent } from '@river-build/proto'

const log = dlog('csb:test:syncedStreams')

Expand All @@ -40,46 +42,114 @@ describe('syncStreams', () => {

test('starting->syncing->canceling->notSyncing', async () => {
log('starting->syncing->canceling->notSyncing')
/** Arrange */
const alice = await makeTestRpcClient()
const done1 = makeDonePromise()
const alicesContext = await makeRandomUserContext()

const alicesUserId = userIdFromAddress(alicesContext.creatorAddress)
const alicesUserStreamIdStr = makeUserStreamId(alicesUserId)
const alicesUserStreamId = streamIdToBytes(alicesUserStreamIdStr)
// create account for alice
const aliceUserStream = await alice.createStream({
events: [
await makeEvent(
alicesContext,
make_UserPayload_Inception({
streamId: alicesUserStreamId,
}),
),
],
streamId: alicesUserStreamId,
})
const { streamAndCookie } = await unpackStream(aliceUserStream.stream)

// globals setup
const stubPersistenceStore = new StubPersistenceStore()
const done1 = makeDonePromise()
const mockClientEmitter = new EventEmitter() as TypedEmitter<StreamEvents>
mockClientEmitter.on('streamSyncActive', (isActive) => {
mockClientEmitter.on('streamSyncActive', (isActive: boolean) => {
if (isActive) {
done1.done()
}
})
const alicesSyncedStreams = new SyncedStreams(alicesUserId, alice, mockClientEmitter)
const stream = new SyncedStream(
// alice setup
const rpcClient = await makeTestRpcClient()
const alicesContext = await makeRandomUserContext()
const alicesUserId = userIdFromAddress(alicesContext.creatorAddress)
const alicesSyncedStreams = new SyncedStreams(alicesUserId, rpcClient, mockClientEmitter)

// some helper functions
const createStream = async (streamId: Uint8Array, events: PartialMessage<Envelope>[]) => {
const streamResponse = await rpcClient.createStream({
events,
streamId,
})
const response = await unpackStream(streamResponse.stream)
return response
}

// user inbox stream setup
const alicesUserInboxStreamIdStr = makeUserInboxStreamId(alicesUserId)
const alicesUserInboxStreamId = streamIdToBytes(alicesUserInboxStreamIdStr)
const userInboxStreamResponse = await createStream(alicesUserInboxStreamId, [
await makeEvent(
alicesContext,
make_UserInboxPayload_Inception({
streamId: alicesUserInboxStreamId,
}),
),
])
const userInboxStream = new SyncedStream(
alicesUserId,
alicesUserStreamIdStr,
alicesUserInboxStreamIdStr,
mockClientEmitter,
log,
new StubPersistenceStore(),
stubPersistenceStore,
)
await userInboxStream.initializeFromResponse(userInboxStreamResponse)

await alicesSyncedStreams.startSyncStreams()
await done1.promise
alicesSyncedStreams.set(alicesUserStreamIdStr, stream)
await alicesSyncedStreams.addStreamToSync(streamAndCookie.nextSyncCookie)

alicesSyncedStreams.set(alicesUserInboxStreamIdStr, userInboxStream)
await alicesSyncedStreams.addStreamToSync(userInboxStream.view.syncCookie!)

// some helper functions
const addEvent = async (payload: PlainMessage<StreamEvent>['payload']) => {
await rpcClient.addEvent({
streamId: alicesUserInboxStreamId,
event: await makeEvent(
alicesContext,
payload,
userInboxStreamResponse.streamAndCookie.miniblocks[0].hash,
),
})
}

// post an ack (easiest way to put a string in a stream)
await addEvent(
make_UserInboxPayload_Ack({
deviceKey: 'numero uno',
miniblockNum: 1n,
}),
)

// make sure it shows up
await waitFor(() =>
expect(
userInboxStream.view.timeline.find(
(e) =>
e.remoteEvent?.event.payload.case === 'userInboxPayload' &&
e.remoteEvent?.event.payload.value.content.case === 'ack' &&
e.remoteEvent?.event.payload.value.content.value.deviceKey === 'numero uno',
),
).toBeDefined(),
)

// drop the stream
await rpcClient.info({
debug: ['drop_stream', alicesSyncedStreams.getSyncId()!, alicesUserInboxStreamIdStr],
})

// add second event
await addEvent(
make_UserInboxPayload_Ack({
deviceKey: 'numero dos',
miniblockNum: 1n,
}),
)

// make sure it shows up
await waitFor(() =>
expect(
userInboxStream.view.timeline.find(
(e) =>
e.remoteEvent?.event.payload.case === 'userInboxPayload' &&
e.remoteEvent?.event.payload.value.content.case === 'ack' &&
e.remoteEvent?.event.payload.value.content.value.deviceKey === 'numero dos',
),
).toBeDefined(),
)

await alicesSyncedStreams.stopSync()
})
Expand Down
61 changes: 59 additions & 2 deletions packages/sdk/src/syncedStreams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { unpackStream, unpackStreamAndCookie } from './sign'
import { SyncedStreamEvents } from './streamEvents'
import { SyncedStream } from './syncedStream'
import TypedEmitter from 'typed-emitter'
import { isDefined } from './check'
import { isDefined, logNever } from './check'
import { nanoid } from 'nanoid'
import { isMobileSafari } from './utils'
import { streamIdAsBytes, streamIdAsString } from './id'
Expand Down Expand Up @@ -147,6 +147,10 @@ export class SyncedStreams {
return this.streams.size
}

public getSyncId(): string | undefined {
return this.syncId
}

public getStreams(): SyncedStream[] {
return Array.from(this.streams.values())
}
Expand Down Expand Up @@ -458,8 +462,12 @@ export class SyncedStreams {
this.printNonces()
}
break
case SyncOp.SYNC_DOWN:
this.syncDown(value.streamId)
break
default:
this.log(
logNever(
value.syncOp,
`unknown syncOp { syncId: ${this.syncId}, syncOp: ${value.syncOp} }`,
)
break
Expand Down Expand Up @@ -589,6 +597,55 @@ export class SyncedStreams {
}
}

private syncDown(
streamId: Uint8Array,
retryParams?: { syncId: string; retryCount: number },
): void {
if (this.syncId === undefined) {
return
}
if (retryParams !== undefined && retryParams.syncId !== this.syncId) {
return
}
if (streamId === undefined || streamId.length === 0) {
this.logError('syncDown: streamId is empty')
return
}
if (this.syncState !== SyncState.Syncing) {
this.logError('syncDown: invalid state transition', this.syncState)
return
}
const stream = this.streams.get(streamIdAsString(streamId))
if (!stream) {
this.log('syncDown: stream not found', streamIdAsString(streamId))
return
}
const cookie = stream.view.syncCookie
if (!cookie) {
this.logError('syncDown: syncCookie not found', streamIdAsString(streamId))
return
}
const syncId = this.syncId
const retryCount = retryParams?.retryCount ?? 0
this.addStreamToSync(cookie).catch((err) => {
const retryDelay = Math.min(1000 * Math.pow(2, retryCount), 60000)
this.logError(
'syncDown: addStreamToSync error',
err,
'retryParams',
retryParams,
'retryDelay',
retryDelay,
)
setTimeout(() => {
this.syncDown(streamId, {
syncId,
retryCount: retryCount + 1,
})
}, retryDelay)
})
}

private syncClosed() {
this.stopPing()
if (this.syncState === SyncState.Canceling) {
Expand Down

0 comments on commit 48387c4

Please sign in to comment.