Skip to content

Commit

Permalink
fix: handle timeout on SFU WS connections
Browse files Browse the repository at this point in the history
  • Loading branch information
myandrienko committed Nov 26, 2024
1 parent 3c81092 commit a98d850
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 58 deletions.
7 changes: 4 additions & 3 deletions packages/client/src/Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@ import { getSdkSignature } from './stats/utils';
import { withoutConcurrency } from './helpers/concurrency';
import { ensureExhausted } from './helpers/ensureExhausted';
import {
makeSafePromise,
PromiseWithResolvers,
promiseWithResolvers,
} from './helpers/withResolvers';
} from './helpers/promise';

/**
* An object representation of a `Call`.
Expand Down Expand Up @@ -1192,7 +1193,7 @@ export class Call {
currentSubscriber?.detachEventHandlers();
currentPublisher?.detachEventHandlers();

const migrationTask = currentSfuClient.enterMigration();
const migrationTask = makeSafePromise(currentSfuClient.enterMigration());

try {
const currentSfu = currentSfuClient.edgeName;
Expand All @@ -1210,7 +1211,7 @@ export class Call {
// Wait for the migration to complete, then close the previous SFU client
// and the peer connection instances. In case of failure, the migration
// task would throw an error and REJOIN would be attempted.
await migrationTask;
await migrationTask();

// in MIGRATE, we can consider the call as joined only after
// `participantMigrationComplete` event is received, signaled by
Expand Down
37 changes: 25 additions & 12 deletions packages/client/src/StreamSfuClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ import { withoutConcurrency } from './helpers/concurrency';
import {
promiseWithResolvers,
PromiseWithResolvers,
} from './helpers/withResolvers';
makeSafePromise,
SafePromise,
} from './helpers/promise';

export type StreamSfuClientConstructor = {
/**
Expand Down Expand Up @@ -101,7 +103,7 @@ export class StreamSfuClient {
/**
* Promise that resolves when the WebSocket connection is ready (open).
*/
private signalReady!: Promise<WebSocket>;
private signalReady!: SafePromise<WebSocket>;

/**
* Flag to indicate if the client is in the process of leaving the call.
Expand All @@ -124,7 +126,7 @@ export class StreamSfuClient {
private readonly logTag: string;
private readonly credentials: Credentials;
private readonly dispatcher: Dispatcher;
private readonly joinResponseTimeout?: number;
private readonly joinResponseTimeout: number;
private networkAvailableTask: PromiseWithResolvers<void> | undefined;
/**
* Promise that resolves when the JoinResponse is received.
Expand Down Expand Up @@ -229,13 +231,24 @@ export class StreamSfuClient {
this.signalWs.addEventListener('close', this.handleWebSocketClose);
this.signalWs.addEventListener('error', this.restoreWebSocket);

this.signalReady = new Promise((resolve) => {
const onOpen = () => {
this.signalWs.removeEventListener('open', onOpen);
resolve(this.signalWs);
};
this.signalWs.addEventListener('open', onOpen);
});
this.signalReady = makeSafePromise(
Promise.race<WebSocket>([
new Promise((resolve) => {
const onOpen = () => {
this.signalWs.removeEventListener('open', onOpen);
resolve(this.signalWs);
};
this.signalWs.addEventListener('open', onOpen);
}),

new Promise((resolve, reject) => {
setTimeout(
() => reject(new Error('SFU WS connection timed out')),
this.joinResponseTimeout,
);
}),
]),
);
};

private cleanUpWebSocket = () => {
Expand Down Expand Up @@ -409,7 +422,7 @@ export class StreamSfuClient {
data: Omit<JoinRequest, 'sessionId' | 'token'>,
): Promise<JoinResponse> => {
// wait for the signal web socket to be ready before sending "joinRequest"
await this.signalReady;
await this.signalReady();
if (this.joinResponseTask.isResolved || this.joinResponseTask.isRejected) {
// we need to lock the RPC requests until we receive a JoinResponse.
// that's why we have this primitive lock mechanism.
Expand Down Expand Up @@ -478,7 +491,7 @@ export class StreamSfuClient {
};

private send = async (message: SfuRequest) => {
await this.signalReady; // wait for the signal ws to be open
await this.signalReady(); // wait for the signal ws to be open
const msgJson = SfuRequest.toJson(message);
if (this.signalWs.readyState !== WebSocket.OPEN) {
this.logger('debug', 'Signal WS is not open. Skipping message', msgJson);
Expand Down
44 changes: 44 additions & 0 deletions packages/client/src/helpers/promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,47 @@ export function makeSafePromise<T>(promise: Promise<T>): SafePromise<T> {
unwrapPromise.checkPending = () => isPending;
return unwrapPromise;
}

export type PromiseWithResolvers<T> = {
promise: Promise<T>;
resolve: (value: T | PromiseLike<T>) => void;
reject: (reason: any) => void;
isResolved: boolean;
isRejected: boolean;
};

/**
* Creates a new promise with resolvers.
*
* Based on:
* - https://github.com/tc39/proposal-promise-with-resolvers/blob/main/polyfills.js
*/
export const promiseWithResolvers = <T = void>(): PromiseWithResolvers<T> => {
let resolve: (value: T | PromiseLike<T>) => void;
let reject: (reason: any) => void;
const promise = new Promise<T>((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});

let isResolved = false;
let isRejected = false;

const resolver = (value: T | PromiseLike<T>) => {
isResolved = true;
resolve(value);
};

const rejecter = (reason: any) => {
isRejected = true;
reject(reason);
};

return {
promise,
resolve: resolver,
reject: rejecter,
isResolved,
isRejected,
};
};
43 changes: 0 additions & 43 deletions packages/client/src/helpers/withResolvers.ts

This file was deleted.

0 comments on commit a98d850

Please sign in to comment.