diff --git a/.vscode/launch.json b/.vscode/launch.json index 283953988..f3f8069ea 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -82,5 +82,15 @@ "args": ["${file}", "--config", "${workspaceFolder}/packages/sdk/jest.config.ts", "-i", "--no-cache", "--forceExit"], "console": "integratedTerminal", }, + { + "name": "Jest: current file in 'packages/stress/' (single entitlements)", + "type": "node", + "request": "launch", + "env": { "NODE_ENV": "development", "NODE_TLS_REJECT_UNAUTHORIZED": "0", "RIVER_ENV": "local_single", "DEBUG": "csb:*,test:*", "DEBUG_DEPTH":"1" }, + "program": "${workspaceFolder}/node_modules/.bin/jest", + "runtimeArgs": ["--experimental-vm-modules", "--experimental-wasm-modules"], + "args": ["${file}", "--config", "${workspaceFolder}/packages/stress/jest.config.ts", "-i", "--no-cache", "--forceExit"], + "console": "integratedTerminal", + }, ] } diff --git a/packages/sdk/src/client.ts b/packages/sdk/src/client.ts index 884d63b78..34e277196 100644 --- a/packages/sdk/src/client.ts +++ b/packages/sdk/src/client.ts @@ -52,8 +52,9 @@ import { UserDeviceCollection, makeSessionKeys, } from '@river-build/encryption' +import { StreamRpcClient } from './makeStreamRpcClient' +import { errorContains, getRpcErrorProperty } from './rpcInterceptors' import { assert, isDefined } from './check' -import { errorContains, getRpcErrorProperty, StreamRpcClient } from './makeStreamRpcClient' import EventEmitter from 'events' import TypedEmitter from 'typed-emitter' import { diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index a0ab422a3..c9f6220fd 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -16,6 +16,7 @@ export * from './observable/persistedObservable' export * from './persistenceStore' export * from './riverConfig' export * from './riverDbManager' +export * from './rpcInterceptors' export * from './sign' export * from './signerContext' export * from './store/store' diff --git a/packages/sdk/src/makeRiverRpcClient.ts b/packages/sdk/src/makeRiverRpcClient.ts index 0762fa9a6..17fd1c36c 100644 --- a/packages/sdk/src/makeRiverRpcClient.ts +++ b/packages/sdk/src/makeRiverRpcClient.ts @@ -1,5 +1,6 @@ import { RiverChainConfig, createRiverRegistry } from '@river-build/web3' -import { RetryParams, StreamRpcClient, makeStreamRpcClient } from './makeStreamRpcClient' +import { StreamRpcClient, makeStreamRpcClient } from './makeStreamRpcClient' +import { RetryParams } from './rpcInterceptors' import { ethers } from 'ethers' export async function makeRiverRpcClient( diff --git a/packages/sdk/src/makeStreamRpcClient.test.ts b/packages/sdk/src/makeStreamRpcClient.test.ts index 83c3c1a14..5ed6b6d3b 100644 --- a/packages/sdk/src/makeStreamRpcClient.test.ts +++ b/packages/sdk/src/makeStreamRpcClient.test.ts @@ -4,7 +4,7 @@ import { Err, InfoRequest, InfoResponse } from '@river-build/proto' import { makeTestRpcClient } from './util.test' -import { errorContains } from './makeStreamRpcClient' +import { errorContains } from './rpcInterceptors' import { makeRiverRpcClient } from './makeRiverRpcClient' import { LocalhostWeb3Provider } from '@river-build/web3' import { makeRiverChainConfig } from './riverConfig' diff --git a/packages/sdk/src/makeStreamRpcClient.ts b/packages/sdk/src/makeStreamRpcClient.ts index f046e069e..4965d33a2 100644 --- a/packages/sdk/src/makeStreamRpcClient.ts +++ b/packages/sdk/src/makeStreamRpcClient.ts @@ -1,341 +1,12 @@ -import { - UnaryResponse, - StreamResponse, - Interceptor, - PromiseClient, - Transport, - createPromiseClient, - UnaryRequest, - StreamRequest, - Code, -} from '@connectrpc/connect' -import { AnyMessage } from '@bufbuild/protobuf' +import { PromiseClient, Transport, createPromiseClient } from '@connectrpc/connect' import { ConnectTransportOptions, createConnectTransport } from '@connectrpc/connect-web' -import { Err, StreamService } from '@river-build/proto' -import { check, dlog, dlogError } from '@river-build/dlog' -import { genShortId, streamIdAsString } from './id' -import { getEnvVar, isBaseUrlIncluded, isIConnectError } from './utils' +import { StreamService } from '@river-build/proto' +import { dlog } from '@river-build/dlog' +import { getEnvVar, randomUrlSelector } from './utils' +import { loggingInterceptor, retryInterceptor, type RetryParams } from './rpcInterceptors' const logInfo = dlog('csb:rpc:info') -const logCallsHistogram = dlog('csb:rpc:histogram') -const logCalls = dlog('csb:rpc:calls') -const logProtos = dlog('csb:rpc:protos') -const logError = dlogError('csb:rpc:error') - let nextRpcClientNum = 0 -const histogramIntervalMs = 5000 - -export type RetryParams = { - maxAttempts: number - initialRetryDelay: number - maxRetryDelay: number - refreshNodeUrl?: () => Promise -} - -const sortObjectKey = (obj: Record) => { - const sorted: Record = {} - Object.keys(obj) - .sort() - .forEach((key) => { - sorted[key] = obj[key] - }) - return sorted -} - -const retryInterceptor: (retryParams: RetryParams) => Interceptor = (retryParams: RetryParams) => { - return (next) => - async ( - req: UnaryRequest | StreamRequest, - ) => { - let attempt = 0 - // eslint-disable-next-line no-constant-condition - while (true) { - attempt++ - try { - return await next(req) - } catch (e) { - const retryDelay = getRetryDelay(e, attempt, retryParams) - if (retryDelay <= 0) { - throw e - } - if (retryParams.refreshNodeUrl) { - // re-materialize view and check if client is still operational according to network - const urls = await retryParams.refreshNodeUrl() - const isStillNodeUrl = isBaseUrlIncluded(urls.split(','), req.url) - if (!isStillNodeUrl) { - throw new Error(`Node url ${req.url} no longer operationl in registry`) - } - } - logError( - req.method.name, - 'ERROR RETRYING', - attempt, - 'of', - retryParams.maxAttempts, - 'retryDelay:', - retryDelay, - 'error:', - e, - ) - await new Promise((resolve) => setTimeout(resolve, retryDelay)) - } - } - } -} - -const loggingInterceptor: (transportId: number) => Interceptor = (transportId: number) => { - // Histogram data structure - const callHistogram: Record = {} - - // Function to update histogram - const updateHistogram = (methodName: string, suffix?: string, error?: boolean) => { - const name = suffix ? `${methodName} ${suffix}` : methodName - let e = callHistogram[name] - if (!e) { - e = { interval: 0, total: 0 } - callHistogram[name] = e - } - e.interval++ - e.total++ - if (error) { - e.error = (e.error ?? 0) + 1 - } - } - - // Periodic logging - setInterval(() => { - if (Object.keys(callHistogram).length !== 0) { - let interval = 0 - let total = 0 - let error = 0 - for (const key in callHistogram) { - const e = callHistogram[key] - interval += e.interval - total += e.total - error += e.error ?? 0 - } - if (interval > 0) { - logCallsHistogram( - 'RPC stats for transportId=', - transportId, - 'interval=', - interval, - 'total=', - total, - 'error=', - error, - 'intervalMs=', - histogramIntervalMs, - '\n', - sortObjectKey(callHistogram), - ) - for (const key in callHistogram) { - callHistogram[key].interval = 0 - } - } - } - }, histogramIntervalMs) - - return (next) => - async ( - req: UnaryRequest | StreamRequest, - ) => { - let localReq = req - const id = genShortId() - localReq.header.set('x-river-request-id', id) - - let streamId: string | undefined - if (req.stream) { - // to intercept streaming request messages, we wrap - // the AsynchronousIterable with a generator function - localReq = { - ...req, - message: logEachRequest(req.method.name, id, req.message), - } - } else { - const streamIdBytes = req.message['streamId'] as Uint8Array - streamId = streamIdBytes ? streamIdAsString(streamIdBytes) : undefined - if (streamId !== undefined) { - logCalls(req.method.name, streamId, id) - } else { - logCalls(req.method.name, id) - } - logProtos(req.method.name, 'REQUEST', id, req.message) - } - updateHistogram(req.method.name, streamId) - - try { - const res: - | UnaryResponse - | StreamResponse = await next(localReq) - - if (res.stream) { - // to intercept streaming response messages, we wrap - // the AsynchronousIterable with a generator function - return { - ...res, - message: logEachResponse(res.method.name, id, res.message), - } - } else { - logProtos(res.method.name, 'RESPONSE', id, res.message) - } - return res - } catch (e) { - // ignore NotFound errors for GetStream - if ( - !( - req.method.name === 'GetStream' && - isIConnectError(e) && - e.code === (Code.NotFound as number) - ) - ) { - logError(req.method.name, 'ERROR', id, e) - updateHistogram(req.method.name, streamId, true) - } - throw e - } - } - async function* logEachRequest(name: string, id: string, stream: AsyncIterable) { - try { - for await (const m of stream) { - try { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - const syncPos = m['syncPos'] - if (syncPos !== undefined) { - const args = [] - for (const p of syncPos) { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - const s = p['streamId'] - if (s !== undefined) { - args.push(s) - } - } - logCalls(name, 'num=', args.length, id, args) - } else { - logCalls(name, id) - } - updateHistogram(name) - - logProtos(name, 'STREAMING REQUEST', id, m) - yield m - } catch (err) { - logError(name, 'ERROR YIELDING REQUEST', id, err) - updateHistogram(name, undefined, true) - throw err - } - } - } catch (err) { - logError(name, 'ERROR STREAMING REQUEST', id, err) - updateHistogram(name, undefined, true) - throw err - } - logProtos(name, 'STREAMING REQUEST DONE', id) - } - - async function* logEachResponse(name: string, id: string, stream: AsyncIterable) { - try { - for await (const m of stream) { - try { - // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access - const streamId: string | undefined = m.stream?.nextSyncCookie?.streamId - if (streamId !== undefined) { - logCalls(name, 'RECV', streamId, id) - } else { - logCalls(name, 'RECV', id) - } - updateHistogram(`${name} RECV`, streamId) - logProtos(name, 'STREAMING RESPONSE', id, m) - yield m - } catch (err) { - logError(name, 'ERROR YIELDING RESPONSE', id, err) - updateHistogram(`${name} RECV`, undefined, true) - } - } - } catch (err) { - if (err == 'BLIP') { - logCalls(name, 'BLIP', id) - updateHistogram(`${name} BLIP`) - } else if (err == 'SHUTDOWN') { - logCalls(name, 'SHUTDOWN', id) - updateHistogram(`${name} SHUTDOWN`) - } else { - const stack = err instanceof Error && 'stack' in err ? err.stack ?? '' : '' - logError(name, 'ERROR STREAMING RESPONSE', id, err, stack) - updateHistogram(`${name} RECV`, undefined, true) - } - throw err - } - logProtos(name, 'STREAMING RESPONSE DONE', id) - } -} - -/// check to see of the error message contains an Rrc Err defineded in the protocol.proto -export function errorContains(err: unknown, error: Err): boolean { - if (err !== null && typeof err === 'object' && 'message' in err) { - const expected = `${error.valueOf()}:${Err[error]}` - if ((err.message as string).includes(expected)) { - return true - } - } - return false -} - -/// not great way to pull info out of the error messsage -export function getRpcErrorProperty(err: unknown, prop: string): string | undefined { - if (err !== null && typeof err === 'object' && 'message' in err) { - const expected = `${prop} = ` - const parts = (err.message as string).split(expected) - if (parts.length === 2) { - return parts[1].split(' ')[0].trim() - } - } - return undefined -} - -const randomUrlSelector = (urls: string) => { - const u = urls.split(',') - if (u.length === 0) { - throw new Error('No urls for backend provided') - } else if (u.length === 1) { - return u[0] - } else { - return u[Math.floor(Math.random() * u.length)] - } -} - -function getRetryDelay(error: unknown, attempts: number, retryParams: RetryParams): number { - check(attempts >= 1, 'attempts must be >= 1') - // aellis wondering if we should retry forever if there's no internet connection - if (attempts > retryParams.maxAttempts) { - return -1 // no more attempts - } - const retryDelay = Math.min( - retryParams.maxRetryDelay, - retryParams.initialRetryDelay * Math.pow(2, attempts), - ) - // we don't get a lot of info off of these errors... retry the ones that we know we need to - if (error !== null && typeof error === 'object') { - if ('message' in error) { - // this happens in the tests when the server is totally down - if ((error.message as string).toLowerCase().includes('fetch failed')) { - return retryDelay - } - // this happens in the browser when the server is totally down - if ((error.message as string).toLowerCase().includes('failed to fetch')) { - return retryDelay - } - } - - // we can't use the code for anything above 16 cause the connect lib squashes it and returns 2 - // see protocol.proto for description of error codes - if (errorContains(error, Err.RESOURCE_EXHAUSTED)) { - return retryDelay - } else if (errorContains(error, Err.DEBUG_ERROR)) { - return retryDelay - } - } - return -1 -} export type StreamRpcClient = PromiseClient & { url?: string } @@ -345,7 +16,7 @@ export function makeStreamRpcClient( refreshNodeUrl?: () => Promise, ): StreamRpcClient { const transportId = nextRpcClientNum++ - logCallsHistogram('makeStreamRpcClient, transportId =', transportId) + logInfo('makeStreamRpcClient, transportId =', transportId) let transport: Transport let url: string | undefined if (typeof dest === 'string') { diff --git a/packages/sdk/src/rpcInterceptors.ts b/packages/sdk/src/rpcInterceptors.ts new file mode 100644 index 000000000..174f5510c --- /dev/null +++ b/packages/sdk/src/rpcInterceptors.ts @@ -0,0 +1,322 @@ +import type { AnyMessage } from '@bufbuild/protobuf' +import { + type Interceptor, + type UnaryRequest, + type StreamRequest, + type UnaryResponse, + type StreamResponse, + Code, +} from '@connectrpc/connect' +import { Err } from '@river-build/proto' +import { genShortId, streamIdAsString } from './id' +import { isBaseUrlIncluded, isIConnectError } from './utils' +import { dlog, dlogError, check } from '@river-build/dlog' + +export type RetryParams = { + maxAttempts: number + initialRetryDelay: number + maxRetryDelay: number + refreshNodeUrl?: () => Promise +} + +const sortObjectKey = (obj: Record) => { + const sorted: Record = {} + Object.keys(obj) + .sort() + .forEach((key) => { + sorted[key] = obj[key] + }) + return sorted +} + +const logCallsHistogram = dlog('csb:rpc:histogram') +const logCalls = dlog('csb:rpc:calls') +const logProtos = dlog('csb:rpc:protos') +const logError = dlogError('csb:rpc:error') +const histogramIntervalMs = 5000 + +export const retryInterceptor: (retryParams: RetryParams) => Interceptor = ( + retryParams: RetryParams, +) => { + return (next) => + async ( + req: UnaryRequest | StreamRequest, + ) => { + let attempt = 0 + // eslint-disable-next-line no-constant-condition + while (true) { + attempt++ + try { + return await next(req) + } catch (e) { + const retryDelay = getRetryDelay(e, attempt, retryParams) + if (retryDelay <= 0) { + throw e + } + if (retryParams.refreshNodeUrl) { + // re-materialize view and check if client is still operational according to network + const urls = await retryParams.refreshNodeUrl() + const isStillNodeUrl = isBaseUrlIncluded(urls.split(','), req.url) + if (!isStillNodeUrl) { + throw new Error(`Node url ${req.url} no longer operationl in registry`) + } + } + logError( + req.method.name, + 'ERROR RETRYING', + attempt, + 'of', + retryParams.maxAttempts, + 'retryDelay:', + retryDelay, + 'error:', + e, + ) + await new Promise((resolve) => setTimeout(resolve, retryDelay)) + } + } + } +} + +export const loggingInterceptor: (transportId: number) => Interceptor = (transportId: number) => { + // Histogram data structure + const callHistogram: Record = {} + + // Function to update histogram + const updateHistogram = (methodName: string, suffix?: string, error?: boolean) => { + const name = suffix ? `${methodName} ${suffix}` : methodName + let e = callHistogram[name] + if (!e) { + e = { interval: 0, total: 0 } + callHistogram[name] = e + } + e.interval++ + e.total++ + if (error) { + e.error = (e.error ?? 0) + 1 + } + } + + // Periodic logging + setInterval(() => { + if (Object.keys(callHistogram).length !== 0) { + let interval = 0 + let total = 0 + let error = 0 + for (const key in callHistogram) { + const e = callHistogram[key] + interval += e.interval + total += e.total + error += e.error ?? 0 + } + if (interval > 0) { + logCallsHistogram( + 'RPC stats for transportId=', + transportId, + 'interval=', + interval, + 'total=', + total, + 'error=', + error, + 'intervalMs=', + histogramIntervalMs, + '\n', + sortObjectKey(callHistogram), + ) + for (const key in callHistogram) { + callHistogram[key].interval = 0 + } + } + } + }, histogramIntervalMs) + + return (next) => + async ( + req: UnaryRequest | StreamRequest, + ) => { + let localReq = req + const id = genShortId() + localReq.header.set('x-river-request-id', id) + + let streamId: string | undefined + if (req.stream) { + // to intercept streaming request messages, we wrap + // the AsynchronousIterable with a generator function + localReq = { + ...req, + message: logEachRequest(req.method.name, id, req.message), + } + } else { + const streamIdBytes = req.message['streamId'] as Uint8Array + streamId = streamIdBytes ? streamIdAsString(streamIdBytes) : undefined + if (streamId !== undefined) { + logCalls(req.method.name, streamId, id) + } else { + logCalls(req.method.name, id) + } + logProtos(req.method.name, 'REQUEST', id, req.message) + } + updateHistogram(req.method.name, streamId) + + try { + const res: + | UnaryResponse + | StreamResponse = await next(localReq) + + if (res.stream) { + // to intercept streaming response messages, we wrap + // the AsynchronousIterable with a generator function + return { + ...res, + message: logEachResponse(res.method.name, id, res.message), + } + } else { + logProtos(res.method.name, 'RESPONSE', id, res.message) + } + return res + } catch (e) { + // ignore NotFound errors for GetStream + if ( + !( + req.method.name === 'GetStream' && + isIConnectError(e) && + e.code === (Code.NotFound as number) + ) + ) { + logError(req.method.name, 'ERROR', id, e) + updateHistogram(req.method.name, streamId, true) + } + throw e + } + } + async function* logEachRequest(name: string, id: string, stream: AsyncIterable) { + try { + for await (const m of stream) { + try { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const syncPos = m['syncPos'] + if (syncPos !== undefined) { + const args = [] + for (const p of syncPos) { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const s = p['streamId'] + if (s !== undefined) { + args.push(s) + } + } + logCalls(name, 'num=', args.length, id, args) + } else { + logCalls(name, id) + } + updateHistogram(name) + + logProtos(name, 'STREAMING REQUEST', id, m) + yield m + } catch (err) { + logError(name, 'ERROR YIELDING REQUEST', id, err) + updateHistogram(name, undefined, true) + throw err + } + } + } catch (err) { + logError(name, 'ERROR STREAMING REQUEST', id, err) + updateHistogram(name, undefined, true) + throw err + } + logProtos(name, 'STREAMING REQUEST DONE', id) + } + + async function* logEachResponse(name: string, id: string, stream: AsyncIterable) { + try { + for await (const m of stream) { + try { + // eslint-disable-next-line @typescript-eslint/no-unsafe-member-access + const streamId: string | undefined = m.stream?.nextSyncCookie?.streamId + if (streamId !== undefined) { + logCalls(name, 'RECV', streamId, id) + } else { + logCalls(name, 'RECV', id) + } + updateHistogram(`${name} RECV`, streamId) + logProtos(name, 'STREAMING RESPONSE', id, m) + yield m + } catch (err) { + logError(name, 'ERROR YIELDING RESPONSE', id, err) + updateHistogram(`${name} RECV`, undefined, true) + } + } + } catch (err) { + if (err == 'BLIP') { + logCalls(name, 'BLIP', id) + updateHistogram(`${name} BLIP`) + } else if (err == 'SHUTDOWN') { + logCalls(name, 'SHUTDOWN', id) + updateHistogram(`${name} SHUTDOWN`) + } else { + const stack = err instanceof Error && 'stack' in err ? err.stack ?? '' : '' + logError(name, 'ERROR STREAMING RESPONSE', id, err, stack) + updateHistogram(`${name} RECV`, undefined, true) + } + throw err + } + logProtos(name, 'STREAMING RESPONSE DONE', id) + } +} + +/// check to see of the error message contains an Rrc Err defineded in the protocol.proto +export function errorContains(err: unknown, error: Err): boolean { + if (err !== null && typeof err === 'object' && 'message' in err) { + const expected = `${error.valueOf()}:${Err[error]}` + if ((err.message as string).includes(expected)) { + return true + } + } + return false +} + +/// not great way to pull info out of the error messsage +export function getRpcErrorProperty(err: unknown, prop: string): string | undefined { + if (err !== null && typeof err === 'object' && 'message' in err) { + const expected = `${prop} = ` + const parts = (err.message as string).split(expected) + if (parts.length === 2) { + return parts[1].split(' ')[0].trim() + } + } + return undefined +} + +function getRetryDelay(error: unknown, attempts: number, retryParams: RetryParams): number { + check(attempts >= 1, 'attempts must be >= 1') + // aellis wondering if we should retry forever if there's no internet connection + if (attempts > retryParams.maxAttempts) { + return -1 // no more attempts + } + const retryDelay = Math.min( + retryParams.maxRetryDelay, + retryParams.initialRetryDelay * Math.pow(2, attempts), + ) + // we don't get a lot of info off of these errors... retry the ones that we know we need to + if (error !== null && typeof error === 'object') { + if ('message' in error) { + // this happens in the tests when the server is totally down + if ((error.message as string).toLowerCase().includes('fetch failed')) { + return retryDelay + } + // this happens in the browser when the server is totally down + if ((error.message as string).toLowerCase().includes('failed to fetch')) { + return retryDelay + } + } + + // we can't use the code for anything above 16 cause the connect lib squashes it and returns 2 + // see protocol.proto for description of error codes + if (errorContains(error, Err.RESOURCE_EXHAUSTED)) { + return retryDelay + } else if (errorContains(error, Err.DEBUG_ERROR)) { + return retryDelay + } + } + return -1 +} diff --git a/packages/sdk/src/sync-agent/river-connection/riverConnection.ts b/packages/sdk/src/sync-agent/river-connection/riverConnection.ts index 2f5ba6a26..2bddd96f0 100644 --- a/packages/sdk/src/sync-agent/river-connection/riverConnection.ts +++ b/packages/sdk/src/sync-agent/river-connection/riverConnection.ts @@ -1,5 +1,5 @@ import { RiverRegistry, SpaceDapp } from '@river-build/web3' -import { RetryParams, makeStreamRpcClient } from '../../makeStreamRpcClient' +import { makeStreamRpcClient } from '../../makeStreamRpcClient' import { StreamNodeUrls, StreamNodeUrlsModel } from './models/streamNodeUrls' import { Identifiable, LoadPriority, Store } from '../../store/store' import { dlogger } from '@river-build/dlog' @@ -16,6 +16,7 @@ import { userIdFromAddress } from '../../id' import { TransactionalClient } from './models/transactionalClient' import { Observable } from '../../observable/observable' import { AuthStatus } from './models/authStatus' +import { RetryParams } from '../../rpcInterceptors' const logger = dlogger('csb:riverConnection') diff --git a/packages/sdk/src/sync-agent/syncAgent.ts b/packages/sdk/src/sync-agent/syncAgent.ts index 05a0336c2..bbf5f0ef6 100644 --- a/packages/sdk/src/sync-agent/syncAgent.ts +++ b/packages/sdk/src/sync-agent/syncAgent.ts @@ -1,7 +1,7 @@ import { RiverConnection, RiverConnectionModel } from './river-connection/riverConnection' import { RiverConfig } from '../riverConfig' import { RiverRegistry, SpaceDapp } from '@river-build/web3' -import { RetryParams } from '../makeStreamRpcClient' +import { RetryParams } from '../rpcInterceptors' import { Store } from '../store/store' import { SignerContext } from '../signerContext' import { userIdFromAddress } from '../id' diff --git a/packages/sdk/src/syncedStreams.ts b/packages/sdk/src/syncedStreams.ts index 4e9b33f87..2c410c3c4 100644 --- a/packages/sdk/src/syncedStreams.ts +++ b/packages/sdk/src/syncedStreams.ts @@ -1,6 +1,6 @@ import { Err, SyncCookie, SyncOp, SyncStreamsResponse } from '@river-build/proto' import { DLogger, dlog, dlogError, shortenHexString } from '@river-build/dlog' -import { StreamRpcClient, errorContains } from './makeStreamRpcClient' +import { StreamRpcClient } from './makeStreamRpcClient' import { unpackStream, unpackStreamAndCookie } from './sign' import { SyncedStreamEvents } from './streamEvents' import { SyncedStream } from './syncedStream' @@ -9,6 +9,7 @@ import { isDefined, logNever } from './check' import { nanoid } from 'nanoid' import { isMobileSafari } from './utils' import { streamIdAsBytes, streamIdAsString } from './id' +import { errorContains } from './rpcInterceptors' export enum SyncState { Canceling = 'Canceling', // syncLoop, maybe syncId if was syncing, not is was starting or retrying diff --git a/packages/sdk/src/utils.ts b/packages/sdk/src/utils.ts index 3c4ff9c6d..e9b8cba6c 100644 --- a/packages/sdk/src/utils.ts +++ b/packages/sdk/src/utils.ts @@ -115,3 +115,14 @@ export function isBaseUrlIncluded(baseUrls: string[], fullUrl: string): boolean return baseUrls.some((baseUrl) => fullUrlBase === baseUrl.trim()) } + +export const randomUrlSelector = (urls: string) => { + const u = urls.split(',') + if (u.length === 0) { + throw new Error('No urls for backend provided') + } else if (u.length === 1) { + return u[0] + } else { + return u[Math.floor(Math.random() * u.length)] + } +} diff --git a/packages/stress/jest.config.ts b/packages/stress/jest.config.ts index 9bd0d229e..8cced526b 100644 --- a/packages/stress/jest.config.ts +++ b/packages/stress/jest.config.ts @@ -34,7 +34,7 @@ const MSGPACKR_FOLDER = findMsgpackrFolder() const config: JestConfigWithTsJest = { preset: 'ts-jest/presets/default-esm', - testEnvironment: './../jest.env.ts', + testEnvironment: 'node', // we're using http2 via @connectrpc/connect-node for the stress tests, which is not supported in the jest-browser setup testEnvironmentOptions: { browsers: ['chrome', 'firefox', 'safari'], url: 'http://localhost:80', diff --git a/packages/stress/package.json b/packages/stress/package.json index 3947af639..6c00b13d6 100644 --- a/packages/stress/package.json +++ b/packages/stress/package.json @@ -16,6 +16,8 @@ "dependencies": { "@babel/node": "^7.23.9", "@bufbuild/protobuf": "^1.9.0", + "@connectrpc/connect": "^1.4.0", + "@connectrpc/connect-node": "^1.4.0", "@river-build/dlog": "workspace:^", "@river-build/encryption": "workspace:^", "@river-build/proto": "workspace:^", diff --git a/packages/stress/src/utils/connection.ts b/packages/stress/src/utils/connection.ts index 580a59197..e5e76a96b 100644 --- a/packages/stress/src/utils/connection.ts +++ b/packages/stress/src/utils/connection.ts @@ -1,11 +1,15 @@ +/* eslint-disable @typescript-eslint/no-unsafe-call */ +/* eslint-disable @typescript-eslint/no-unsafe-argument */ +/* eslint-disable @typescript-eslint/no-unsafe-assignment */ import { check } from '@river-build/dlog' -import { LocalhostWeb3Provider } from '@river-build/web3' +import { LocalhostWeb3Provider, createRiverRegistry } from '@river-build/web3' import { RiverConfig, - makeRiverRpcClient, makeSignerContext, userIdFromAddress, + randomUrlSelector, } from '@river-build/sdk' +import { makeStreamRpcClient } from './rpc-http2' import { ethers } from 'ethers' export async function makeConnection(config: RiverConfig, wallet?: ethers.Wallet) { @@ -16,7 +20,10 @@ export async function makeConnection(config: RiverConfig, wallet?: ethers.Wallet check(userId === wallet.address, `userId !== wallet.address ${userId} !== ${wallet.address}`) const riverProvider = new LocalhostWeb3Provider(config.river.rpcUrl, wallet) const baseProvider = new LocalhostWeb3Provider(config.base.rpcUrl, wallet) - const rpcClient = await makeRiverRpcClient(riverProvider, config.river.chainConfig) + const riverRegistry = createRiverRegistry(riverProvider, config.river.chainConfig) + const urls = await riverRegistry.getOperationalNodeUrls() + const selectedUrl = randomUrlSelector(urls) + const rpcClient = makeStreamRpcClient(selectedUrl, () => riverRegistry.getOperationalNodeUrls()) return { userId, delegateWallet, diff --git a/packages/stress/src/utils/rpc-http2.ts b/packages/stress/src/utils/rpc-http2.ts new file mode 100644 index 000000000..016b8dde6 --- /dev/null +++ b/packages/stress/src/utils/rpc-http2.ts @@ -0,0 +1,43 @@ +/* eslint-disable @typescript-eslint/no-unsafe-call */ +import { PromiseClient, createPromiseClient } from '@connectrpc/connect' +import { ConnectTransportOptions, createConnectTransport } from '@connectrpc/connect-node' +import { StreamService } from '@river-build/proto' +import { loggingInterceptor, retryInterceptor, type RetryParams } from '@river-build/sdk' +import { dlogger } from '@river-build/dlog' + +const logger = dlogger('csb:rpc:info') + +export type StreamRpcClient = PromiseClient & { url?: string } + +let nextRpcClientNum = 0 + +export function makeStreamRpcClient( + url: string, + refreshNodeUrl?: () => Promise, + retryParams: RetryParams = { maxAttempts: 3, initialRetryDelay: 2000, maxRetryDelay: 6000 }, +): StreamRpcClient { + const transportId = nextRpcClientNum++ + logger.info(`makeStreamRpcClient: Connecting to url=${url}`) + const options: ConnectTransportOptions = { + httpVersion: '2', + baseUrl: url, + interceptors: [ + loggingInterceptor(transportId), + retryInterceptor({ ...retryParams, refreshNodeUrl }), + ], + } + if (!process.env.RIVER_DEBUG_TRANSPORT) { + options.useBinaryFormat = true + } else { + options.useBinaryFormat = false + options.jsonOptions = { + emitDefaultValues: true, + useProtoFieldName: true, + } + } + const transport = createConnectTransport(options) + + const client: StreamRpcClient = createPromiseClient(StreamService, transport) as StreamRpcClient + client.url = url + return client +} diff --git a/yarn.lock b/yarn.lock index 7e8a7c86d..238d8652c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -683,6 +683,18 @@ __metadata: languageName: node linkType: hard +"@connectrpc/connect-node@npm:^1.4.0": + version: 1.4.0 + resolution: "@connectrpc/connect-node@npm:1.4.0" + dependencies: + undici: ^5.28.3 + peerDependencies: + "@bufbuild/protobuf": ^1.4.2 + "@connectrpc/connect": 1.4.0 + checksum: d13dc98d25e7321cce9969a009788874046ae9c0848922ecd2690dddc833b732fe756d486213fe0de55b1e4ae68024a45505d193ba0a927c6ec5b8a6d6a599e0 + languageName: node + linkType: hard + "@connectrpc/connect-web@npm:^1.4.0": version: 1.4.0 resolution: "@connectrpc/connect-web@npm:1.4.0" @@ -1482,6 +1494,13 @@ __metadata: languageName: node linkType: hard +"@fastify/busboy@npm:^2.0.0": + version: 2.1.1 + resolution: "@fastify/busboy@npm:2.1.1" + checksum: 42c32ef75e906c9a4809c1e1930a5ca6d4ddc8d138e1a8c8ba5ea07f997db32210617d23b2e4a85fe376316a41a1a0439fc6ff2dedf5126d96f45a9d80754fb2 + languageName: node + linkType: hard + "@floating-ui/core@npm:^1.0.0": version: 1.6.2 resolution: "@floating-ui/core@npm:1.6.2" @@ -3771,6 +3790,8 @@ __metadata: dependencies: "@babel/node": ^7.23.9 "@bufbuild/protobuf": ^1.9.0 + "@connectrpc/connect": ^1.4.0 + "@connectrpc/connect-node": ^1.4.0 "@jest/globals": ^29.6.2 "@river-build/dlog": "workspace:^" "@river-build/encryption": "workspace:^" @@ -18996,6 +19017,15 @@ __metadata: languageName: node linkType: hard +"undici@npm:^5.28.3": + version: 5.28.4 + resolution: "undici@npm:5.28.4" + dependencies: + "@fastify/busboy": ^2.0.0 + checksum: a8193132d84540e4dc1895ecc8dbaa176e8a49d26084d6fbe48a292e28397cd19ec5d13bc13e604484e76f94f6e334b2bdc740d5f06a6e50c44072818d0c19f9 + languageName: node + linkType: hard + "unenv@npm:^1.9.0": version: 1.9.0 resolution: "unenv@npm:1.9.0"