Skip to content

Commit

Permalink
SyncAgent - refactor out the promise queue (#256)
Browse files Browse the repository at this point in the history
  • Loading branch information
texuf authored Jun 24, 2024
1 parent a6d8294 commit 981b651
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 24 deletions.
31 changes: 7 additions & 24 deletions packages/sdk/src/sync-agent/river-connection/riverConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@ import { Observable } from '../../observable/observable'
import { RiverNodeUrls } from './models/riverNodeUrls'
import { Store } from '../../store/store'
import { dlogger } from '@river-build/dlog'
import { PromiseQueue } from '../utils/promiseQueue'

const logger = dlogger('csb:riverConnection')

export class RiverConnection {
rpcClient: Observable<StreamRpcClient | undefined>
nodeUrls: RiverNodeUrls
queue: {
resolve: (value: any) => void
reject: (reason?: any) => void
fn: (rpcClient: StreamRpcClient) => Promise<any>
}[] = []
rpcClientQueue = new PromiseQueue<StreamRpcClient>()

constructor(store: Store, riverRegistryDapp: RiverRegistry, retryParams?: RetryParams) {
this.rpcClient = new Observable<StreamRpcClient | undefined>(undefined)
Expand All @@ -23,13 +20,11 @@ export class RiverConnection {
(value) => {
if (value.data.urls) {
logger.log('RiverConnection: setting rpcClient', value.data.urls)
this.rpcClient.set(
makeStreamRpcClient(value.data.urls, retryParams, () =>
riverRegistryDapp.getOperationalNodeUrls(),
),
const client = makeStreamRpcClient(value.data.urls, retryParams, () =>
riverRegistryDapp.getOperationalNodeUrls(),
)
// New rpcClient is available, resolve all queued requests
this.flushQueue()
this.rpcClient.set(client)
this.rpcClientQueue.flush(client) // New rpcClient is available, resolve all queued requests
} else {
this.rpcClient.set(undefined)
}
Expand All @@ -44,19 +39,7 @@ export class RiverConnection {
return fn(rpcClient)
} else {
// Enqueue the request if rpcClient is not available
return new Promise<T>((resolve, reject) => {
this.queue.push({ resolve, reject, fn })
})
}
}

private flushQueue() {
if (this.rpcClient.value && this.queue.length) {
logger.log('RiverConnection: flushing rpc queue', this.queue.length)
while (this.queue.length > 0) {
const { resolve, reject, fn } = this.queue.shift()!
fn(this.rpcClient.value).then(resolve).catch(reject)
}
return this.rpcClientQueue.enqueue(fn)
}
}
}
27 changes: 27 additions & 0 deletions packages/sdk/src/sync-agent/utils/promiseQueue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { dlogger } from '@river-build/dlog'

const logger = dlogger('csb:promisequeue')

export class PromiseQueue<T> {
private queue: {
resolve: (value: any) => void
reject: (reason?: any) => void
fn: (object: T) => Promise<any>
}[] = []

enqueue<Q>(fn: (object: T) => Promise<Q>) {
return new Promise<Q>((resolve, reject) => {
this.queue.push({ resolve, reject, fn })
})
}

flush(object: T) {
if (this.queue.length) {
logger.log('RiverConnection: flushing rpc queue', this.queue.length)
while (this.queue.length > 0) {
const { resolve, reject, fn } = this.queue.shift()!
fn(object).then(resolve).catch(reject)
}
}
}
}

0 comments on commit 981b651

Please sign in to comment.