diff --git a/packages/client/src/Call.ts b/packages/client/src/Call.ts index 7b51a12680..a8cb2b1eba 100644 --- a/packages/client/src/Call.ts +++ b/packages/client/src/Call.ts @@ -120,9 +120,10 @@ import { getSdkSignature } from './stats/utils'; import { withoutConcurrency } from './helpers/concurrency'; import { ensureExhausted } from './helpers/ensureExhausted'; import { + makeSafePromise, PromiseWithResolvers, promiseWithResolvers, -} from './helpers/withResolvers'; +} from './helpers/promise'; /** * An object representation of a `Call`. @@ -1192,7 +1193,7 @@ export class Call { currentSubscriber?.detachEventHandlers(); currentPublisher?.detachEventHandlers(); - const migrationTask = currentSfuClient.enterMigration(); + const migrationTask = makeSafePromise(currentSfuClient.enterMigration()); try { const currentSfu = currentSfuClient.edgeName; @@ -1210,7 +1211,7 @@ export class Call { // Wait for the migration to complete, then close the previous SFU client // and the peer connection instances. In case of failure, the migration // task would throw an error and REJOIN would be attempted. - await migrationTask; + await migrationTask(); // in MIGRATE, we can consider the call as joined only after // `participantMigrationComplete` event is received, signaled by diff --git a/packages/client/src/StreamSfuClient.ts b/packages/client/src/StreamSfuClient.ts index eca72c36c4..a189345846 100644 --- a/packages/client/src/StreamSfuClient.ts +++ b/packages/client/src/StreamSfuClient.ts @@ -33,7 +33,9 @@ import { withoutConcurrency } from './helpers/concurrency'; import { promiseWithResolvers, PromiseWithResolvers, -} from './helpers/withResolvers'; + makeSafePromise, + SafePromise, +} from './helpers/promise'; export type StreamSfuClientConstructor = { /** @@ -101,7 +103,7 @@ export class StreamSfuClient { /** * Promise that resolves when the WebSocket connection is ready (open). */ - private signalReady!: Promise; + private signalReady!: SafePromise; /** * Flag to indicate if the client is in the process of leaving the call. @@ -124,7 +126,7 @@ export class StreamSfuClient { private readonly logTag: string; private readonly credentials: Credentials; private readonly dispatcher: Dispatcher; - private readonly joinResponseTimeout?: number; + private readonly joinResponseTimeout: number; private networkAvailableTask: PromiseWithResolvers | undefined; /** * Promise that resolves when the JoinResponse is received. @@ -229,13 +231,24 @@ export class StreamSfuClient { this.signalWs.addEventListener('close', this.handleWebSocketClose); this.signalWs.addEventListener('error', this.restoreWebSocket); - this.signalReady = new Promise((resolve) => { - const onOpen = () => { - this.signalWs.removeEventListener('open', onOpen); - resolve(this.signalWs); - }; - this.signalWs.addEventListener('open', onOpen); - }); + this.signalReady = makeSafePromise( + Promise.race([ + new Promise((resolve) => { + const onOpen = () => { + this.signalWs.removeEventListener('open', onOpen); + resolve(this.signalWs); + }; + this.signalWs.addEventListener('open', onOpen); + }), + + new Promise((resolve, reject) => { + setTimeout( + () => reject(new Error('SFU WS connection timed out')), + this.joinResponseTimeout, + ); + }), + ]), + ); }; private cleanUpWebSocket = () => { @@ -409,7 +422,7 @@ export class StreamSfuClient { data: Omit, ): Promise => { // wait for the signal web socket to be ready before sending "joinRequest" - await this.signalReady; + await this.signalReady(); if (this.joinResponseTask.isResolved || this.joinResponseTask.isRejected) { // we need to lock the RPC requests until we receive a JoinResponse. // that's why we have this primitive lock mechanism. @@ -478,7 +491,7 @@ export class StreamSfuClient { }; private send = async (message: SfuRequest) => { - await this.signalReady; // wait for the signal ws to be open + await this.signalReady(); // wait for the signal ws to be open const msgJson = SfuRequest.toJson(message); if (this.signalWs.readyState !== WebSocket.OPEN) { this.logger('debug', 'Signal WS is not open. Skipping message', msgJson); diff --git a/packages/client/src/helpers/promise.ts b/packages/client/src/helpers/promise.ts index 482bc7c07a..f607ce5058 100644 --- a/packages/client/src/helpers/promise.ts +++ b/packages/client/src/helpers/promise.ts @@ -45,3 +45,47 @@ export function makeSafePromise(promise: Promise): SafePromise { unwrapPromise.checkPending = () => isPending; return unwrapPromise; } + +export type PromiseWithResolvers = { + promise: Promise; + resolve: (value: T | PromiseLike) => void; + reject: (reason: any) => void; + isResolved: boolean; + isRejected: boolean; +}; + +/** + * Creates a new promise with resolvers. + * + * Based on: + * - https://github.com/tc39/proposal-promise-with-resolvers/blob/main/polyfills.js + */ +export const promiseWithResolvers = (): PromiseWithResolvers => { + let resolve: (value: T | PromiseLike) => void; + let reject: (reason: any) => void; + const promise = new Promise((_resolve, _reject) => { + resolve = _resolve; + reject = _reject; + }); + + let isResolved = false; + let isRejected = false; + + const resolver = (value: T | PromiseLike) => { + isResolved = true; + resolve(value); + }; + + const rejecter = (reason: any) => { + isRejected = true; + reject(reason); + }; + + return { + promise, + resolve: resolver, + reject: rejecter, + isResolved, + isRejected, + }; +}; diff --git a/packages/client/src/helpers/withResolvers.ts b/packages/client/src/helpers/withResolvers.ts deleted file mode 100644 index dde8182242..0000000000 --- a/packages/client/src/helpers/withResolvers.ts +++ /dev/null @@ -1,43 +0,0 @@ -export type PromiseWithResolvers = { - promise: Promise; - resolve: (value: T | PromiseLike) => void; - reject: (reason: any) => void; - isResolved: boolean; - isRejected: boolean; -}; - -/** - * Creates a new promise with resolvers. - * - * Based on: - * - https://github.com/tc39/proposal-promise-with-resolvers/blob/main/polyfills.js - */ -export const promiseWithResolvers = (): PromiseWithResolvers => { - let resolve: (value: T | PromiseLike) => void; - let reject: (reason: any) => void; - const promise = new Promise((_resolve, _reject) => { - resolve = _resolve; - reject = _reject; - }); - - let isResolved = false; - let isRejected = false; - - const resolver = (value: T | PromiseLike) => { - isResolved = true; - resolve(value); - }; - - const rejecter = (reason: any) => { - isRejected = true; - reject(reason); - }; - - return { - promise, - resolve: resolver, - reject: rejecter, - isResolved, - isRejected, - }; -};