Skip to content

Commit

Permalink
add disconnectionTimeoutSeconds
Browse files Browse the repository at this point in the history
  • Loading branch information
myandrienko committed Nov 27, 2024
1 parent a98d850 commit 2f1b523
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 15 deletions.
34 changes: 34 additions & 0 deletions packages/client/src/Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ export class Call {
private reconnectAttempts = 0;
private reconnectStrategy = WebsocketReconnectStrategy.UNSPECIFIED;
private fastReconnectDeadlineSeconds: number = 0;
private disconnectionTimeoutSeconds: number = 0;
private lastOfflineTimestamp: number = 0;
private networkAvailableTask: PromiseWithResolvers<void> | undefined;
// maintain the order of publishing tracks to restore them after a reconnection
Expand Down Expand Up @@ -1051,6 +1052,9 @@ export class Call {
*/
private handleSfuSignalClose = (sfuClient: StreamSfuClient) => {
this.logger('debug', '[Reconnect] SFU signal connection closed');
// SFU WS closed before we finished current join, no need to schedule reconnect
// because join operation will fail
if (this.state.callingState === CallingState.JOINING) return;
// normal close, no need to reconnect
if (sfuClient.isLeaving) return;
this.reconnect(WebsocketReconnectStrategy.REJOIN).catch((err) => {
Expand All @@ -1068,14 +1072,35 @@ export class Call {
private reconnect = async (
strategy: WebsocketReconnectStrategy,
): Promise<void> => {
if (
this.state.callingState === CallingState.RECONNECTING ||
this.state.callingState === CallingState.RECONNECTING_FAILED
)
return;

return withoutConcurrency(this.reconnectConcurrencyTag, async () => {
this.logger(
'info',
`[Reconnect] Reconnecting with strategy ${WebsocketReconnectStrategy[strategy]}`,
);

let reconnectStartTime = Date.now();
this.reconnectStrategy = strategy;

do {
if (
this.disconnectionTimeoutSeconds > 0 &&
(Date.now() - reconnectStartTime) / 1000 >
this.disconnectionTimeoutSeconds
) {
this.logger(
'warn',
'[Reconnect] Stopping reconnection attempts after reaching disconnection timeout',
);
this.state.setCallingState(CallingState.RECONNECTING_FAILED);
return;
}

// we don't increment reconnect attempts for the FAST strategy.
if (this.reconnectStrategy !== WebsocketReconnectStrategy.FAST) {
this.reconnectAttempts++;
Expand Down Expand Up @@ -2337,4 +2362,13 @@ export class Call {
);
this.dynascaleManager.applyTrackSubscriptions();
};

/**
* Sets the maximum amount of time a user can remain waiting for a reconnect
* after a network disruption
* @param timeoutSeconds Timeout in seconds, or 0 to keep reconnecting indefinetely
*/
setDisconnectionTimeout = (timeoutSeconds: number) => {
this.disconnectionTimeoutSeconds = timeoutSeconds;
};
}
16 changes: 1 addition & 15 deletions packages/client/src/StreamSfuClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ import {
} from './gen/video/sfu/signal_rpc/signal';
import { ICETrickle, TrackType } from './gen/video/sfu/models/models';
import { StreamClient } from './coordinator/connection/client';
import { generateUUIDv4, sleep } from './coordinator/connection/utils';
import { generateUUIDv4 } from './coordinator/connection/utils';
import { Credentials } from './gen/coordinator';
import { Logger } from './coordinator/connection/types';
import { getLogger, getLogLevel } from './logger';
import { withoutConcurrency } from './helpers/concurrency';
import {
promiseWithResolvers,
PromiseWithResolvers,
Expand Down Expand Up @@ -118,7 +117,6 @@ export class StreamSfuClient {
private pingIntervalInMs = 10 * 1000;
private unhealthyTimeoutInMs = this.pingIntervalInMs + 5 * 1000;
private lastMessageTimestamp?: Date;
private readonly restoreWebSocketConcurrencyTag = Symbol('recoverWebSocket');
private readonly unsubscribeIceTrickle: () => void;
private readonly unsubscribeNetworkChanged: () => void;
private readonly onSignalClose: (() => void) | undefined;
Expand Down Expand Up @@ -229,7 +227,6 @@ export class StreamSfuClient {
});

this.signalWs.addEventListener('close', this.handleWebSocketClose);
this.signalWs.addEventListener('error', this.restoreWebSocket);

this.signalReady = makeSafePromise(
Promise.race<WebSocket>([
Expand All @@ -252,20 +249,9 @@ export class StreamSfuClient {
};

private cleanUpWebSocket = () => {
this.signalWs.removeEventListener('error', this.restoreWebSocket);
this.signalWs.removeEventListener('close', this.handleWebSocketClose);
};

private restoreWebSocket = () => {
withoutConcurrency(this.restoreWebSocketConcurrencyTag, async () => {
await this.networkAvailableTask?.promise;
this.logger('debug', 'Restoring SFU WS connection');
this.cleanUpWebSocket();
await sleep(500);
this.createWebSocket();
}).catch((err) => this.logger('debug', `Can't restore WS connection`, err));
};

get isHealthy() {
return this.signalWs.readyState === WebSocket.OPEN;
}
Expand Down

0 comments on commit 2f1b523

Please sign in to comment.