diff --git a/packages/client/src/Call.ts b/packages/client/src/Call.ts index a8cb2b1eba..bd10494d15 100644 --- a/packages/client/src/Call.ts +++ b/packages/client/src/Call.ts @@ -214,6 +214,7 @@ export class Call { private reconnectAttempts = 0; private reconnectStrategy = WebsocketReconnectStrategy.UNSPECIFIED; private fastReconnectDeadlineSeconds: number = 0; + private disconnectionTimeoutSeconds: number = 0; private lastOfflineTimestamp: number = 0; private networkAvailableTask: PromiseWithResolvers | undefined; // maintain the order of publishing tracks to restore them after a reconnection @@ -1051,6 +1052,9 @@ export class Call { */ private handleSfuSignalClose = (sfuClient: StreamSfuClient) => { this.logger('debug', '[Reconnect] SFU signal connection closed'); + // SFU WS closed before we finished current join, no need to schedule reconnect + // because join operation will fail + if (this.state.callingState === CallingState.JOINING) return; // normal close, no need to reconnect if (sfuClient.isLeaving) return; this.reconnect(WebsocketReconnectStrategy.REJOIN).catch((err) => { @@ -1068,14 +1072,35 @@ export class Call { private reconnect = async ( strategy: WebsocketReconnectStrategy, ): Promise => { + if ( + this.state.callingState === CallingState.RECONNECTING || + this.state.callingState === CallingState.RECONNECTING_FAILED + ) + return; + return withoutConcurrency(this.reconnectConcurrencyTag, async () => { this.logger( 'info', `[Reconnect] Reconnecting with strategy ${WebsocketReconnectStrategy[strategy]}`, ); + let reconnectStartTime = Date.now(); this.reconnectStrategy = strategy; + do { + if ( + this.disconnectionTimeoutSeconds > 0 && + (Date.now() - reconnectStartTime) / 1000 > + this.disconnectionTimeoutSeconds + ) { + this.logger( + 'warn', + '[Reconnect] Stopping reconnection attempts after reaching disconnection timeout', + ); + this.state.setCallingState(CallingState.RECONNECTING_FAILED); + return; + } + // we don't increment reconnect attempts for the FAST strategy. if (this.reconnectStrategy !== WebsocketReconnectStrategy.FAST) { this.reconnectAttempts++; @@ -2337,4 +2362,13 @@ export class Call { ); this.dynascaleManager.applyTrackSubscriptions(); }; + + /** + * Sets the maximum amount of time a user can remain waiting for a reconnect + * after a network disruption + * @param timeoutSeconds Timeout in seconds, or 0 to keep reconnecting indefinetely + */ + setDisconnectionTimeout = (timeoutSeconds: number) => { + this.disconnectionTimeoutSeconds = timeoutSeconds; + }; } diff --git a/packages/client/src/StreamSfuClient.ts b/packages/client/src/StreamSfuClient.ts index a189345846..40f80898e7 100644 --- a/packages/client/src/StreamSfuClient.ts +++ b/packages/client/src/StreamSfuClient.ts @@ -25,11 +25,10 @@ import { } from './gen/video/sfu/signal_rpc/signal'; import { ICETrickle, TrackType } from './gen/video/sfu/models/models'; import { StreamClient } from './coordinator/connection/client'; -import { generateUUIDv4, sleep } from './coordinator/connection/utils'; +import { generateUUIDv4 } from './coordinator/connection/utils'; import { Credentials } from './gen/coordinator'; import { Logger } from './coordinator/connection/types'; import { getLogger, getLogLevel } from './logger'; -import { withoutConcurrency } from './helpers/concurrency'; import { promiseWithResolvers, PromiseWithResolvers, @@ -118,7 +117,6 @@ export class StreamSfuClient { private pingIntervalInMs = 10 * 1000; private unhealthyTimeoutInMs = this.pingIntervalInMs + 5 * 1000; private lastMessageTimestamp?: Date; - private readonly restoreWebSocketConcurrencyTag = Symbol('recoverWebSocket'); private readonly unsubscribeIceTrickle: () => void; private readonly unsubscribeNetworkChanged: () => void; private readonly onSignalClose: (() => void) | undefined; @@ -229,7 +227,6 @@ export class StreamSfuClient { }); this.signalWs.addEventListener('close', this.handleWebSocketClose); - this.signalWs.addEventListener('error', this.restoreWebSocket); this.signalReady = makeSafePromise( Promise.race([ @@ -252,20 +249,9 @@ export class StreamSfuClient { }; private cleanUpWebSocket = () => { - this.signalWs.removeEventListener('error', this.restoreWebSocket); this.signalWs.removeEventListener('close', this.handleWebSocketClose); }; - private restoreWebSocket = () => { - withoutConcurrency(this.restoreWebSocketConcurrencyTag, async () => { - await this.networkAvailableTask?.promise; - this.logger('debug', 'Restoring SFU WS connection'); - this.cleanUpWebSocket(); - await sleep(500); - this.createWebSocket(); - }).catch((err) => this.logger('debug', `Can't restore WS connection`, err)); - }; - get isHealthy() { return this.signalWs.readyState === WebSocket.OPEN; }