Skip to content

Commit

Permalink
Revert changes to paginate miniblocks (#1844)
Browse files Browse the repository at this point in the history
We're seeing the route return 0 for the response.fromInclusive when it
shouldn't be...

Revert "Updated SDK to paginate miniblocks (#1790)"

This reverts commit ecefe23.


Revert "Fixes for new getMiniblocks function (#1840)"

This reverts commit cd79d8d.
  • Loading branch information
texuf authored Dec 17, 2024
1 parent 5b03e56 commit accb4bd
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 93 deletions.
23 changes: 13 additions & 10 deletions packages/sdk/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import {
makeSessionKeys,
type EncryptionDeviceInitOpts,
} from '@river-build/encryption'
import { getMaxTimeoutMs, StreamRpcClient, getMiniblocks } from './makeStreamRpcClient'
import { getMaxTimeoutMs, StreamRpcClient } from './makeStreamRpcClient'
import { errorContains, getRpcErrorProperty } from './rpcInterceptors'
import { assert, isDefined } from './check'
import EventEmitter from 'events'
Expand Down Expand Up @@ -86,6 +86,7 @@ import {
checkEventSignature,
makeEvent,
UnpackEnvelopeOpts,
unpackMiniblock,
unpackStream,
unpackStreamEx,
} from './sign'
Expand Down Expand Up @@ -1908,23 +1909,25 @@ export class Client
}
}

const { miniblocks, terminus } = await getMiniblocks(
this.rpcClient,
streamId,
const response = await this.rpcClient.getMiniblocks({
streamId: streamIdAsBytes(streamId),
fromInclusive,
toExclusive,
this.unpackEnvelopeOpts,
)
})

const unpackedMiniblocks: ParsedMiniblock[] = []
for (const miniblock of response.miniblocks) {
const unpackedMiniblock = await unpackMiniblock(miniblock, this.unpackEnvelopeOpts)
unpackedMiniblocks.push(unpackedMiniblock)
}
await this.persistenceStore.saveMiniblocks(
streamIdAsString(streamId),
miniblocks,
unpackedMiniblocks,
'backward',
)

return {
terminus: terminus,
miniblocks: [...miniblocks, ...cachedMiniblocks],
terminus: response.terminus,
miniblocks: [...unpackedMiniblocks, ...cachedMiniblocks],
}
}

Expand Down
74 changes: 0 additions & 74 deletions packages/sdk/src/makeStreamRpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,7 @@ import {
retryInterceptor,
type RetryParams,
} from './rpcInterceptors'
import { UnpackEnvelopeOpts, unpackMiniblock } from './sign'
import { RpcOptions, createHttp2ConnectTransport } from './rpcCommon'
import { streamIdAsBytes } from './id'
import { ParsedMiniblock } from './types'

const logInfo = dlog('csb:rpc:info')
let nextRpcClientNum = 0
Expand Down Expand Up @@ -73,74 +70,3 @@ export function getMaxTimeoutMs(opts: StreamRpcClientOptions): number {
}
return maxTimeoutMs
}

export async function getMiniblocks(
client: StreamRpcClient,
streamId: string | Uint8Array,
fromInclusive: bigint,
toExclusive: bigint,
unpackEnvelopeOpts: UnpackEnvelopeOpts | undefined,
): Promise<{ miniblocks: ParsedMiniblock[]; terminus: boolean }> {
const allMiniblocks: ParsedMiniblock[] = []
let currentFromInclusive = fromInclusive
let reachedTerminus = false

while (currentFromInclusive < toExclusive) {
const { miniblocks, terminus, nextFromInclusive } = await fetchMiniblocksFromRpc(
client,
streamId,
currentFromInclusive,
toExclusive,
unpackEnvelopeOpts,
)

allMiniblocks.push(...miniblocks)

// Set the terminus to true if we got at least one response with reached terminus
// The behaviour around this flag is not implemented yet
if (terminus && !reachedTerminus) {
reachedTerminus = true
}

if (currentFromInclusive === nextFromInclusive) {
break
}

currentFromInclusive = nextFromInclusive
}

return {
miniblocks: allMiniblocks,
terminus: reachedTerminus,
}
}

export async function fetchMiniblocksFromRpc(
client: StreamRpcClient,
streamId: string | Uint8Array,
fromInclusive: bigint,
toExclusive: bigint,
unpackEnvelopeOpts: UnpackEnvelopeOpts | undefined,
): Promise<{
miniblocks: ParsedMiniblock[]
terminus: boolean
nextFromInclusive: bigint
}> {
const response = await client.getMiniblocks({
streamId: streamIdAsBytes(streamId),
fromInclusive,
toExclusive,
})

const miniblocks: ParsedMiniblock[] = []
for (const miniblock of response.miniblocks) {
const unpackedMiniblock = await unpackMiniblock(miniblock, unpackEnvelopeOpts)
miniblocks.push(unpackedMiniblock)
}

return {
miniblocks: miniblocks,
terminus: response.terminus,
nextFromInclusive: response.fromInclusive + BigInt(response.miniblocks.length),
}
}
21 changes: 12 additions & 9 deletions packages/sdk/src/unauthenticatedClient.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import debug from 'debug'
import { DLogger, check, dlog, dlogError } from '@river-build/dlog'
import { hasElements, isDefined } from './check'
import { StreamRpcClient, getMiniblocks } from './makeStreamRpcClient'
import { UnpackEnvelopeOpts, unpackStream } from './sign'
import { StreamRpcClient } from './makeStreamRpcClient'
import { UnpackEnvelopeOpts, unpackMiniblock, unpackStream } from './sign'
import { StreamStateView } from './streamStateView'
import { ParsedMiniblock, StreamTimelineEvent } from './types'
import { streamIdAsString, streamIdAsBytes, userIdFromAddress, makeUserStreamId } from './id'
Expand Down Expand Up @@ -204,17 +204,20 @@ export class UnauthenticatedClient {
}
}

const { miniblocks, terminus } = await getMiniblocks(
this.rpcClient,
streamId,
const response = await this.rpcClient.getMiniblocks({
streamId: streamIdAsBytes(streamId),
fromInclusive,
toExclusive,
this.unpackEnvelopeOpts,
)
})

const unpackedMiniblocks: ParsedMiniblock[] = []
for (const miniblock of response.miniblocks) {
const unpackedMiniblock = await unpackMiniblock(miniblock, this.unpackEnvelopeOpts)
unpackedMiniblocks.push(unpackedMiniblock)
}
return {
terminus: terminus,
miniblocks: miniblocks,
terminus: response.terminus,
miniblocks: unpackedMiniblocks,
}
}

Expand Down

0 comments on commit accb4bd

Please sign in to comment.