Skip to content

Commit

Permalink
fix: prevent races in full-reconnect mode
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverlaz committed Dec 6, 2023
1 parent 909d376 commit 14d794d
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 30 deletions.
75 changes: 51 additions & 24 deletions packages/client/src/Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ export class Call {
*
* @returns a promise which resolves once the call join-flow has finished.
*/
join = async (data?: JoinCallData) => {
join = async (data?: JoinCallData): Promise<void> => {
const callingState = this.state.callingState;
if ([CallingState.JOINED, CallingState.JOINING].includes(callingState)) {
this.logger(
Expand Down Expand Up @@ -726,7 +726,17 @@ export class Call {
/**
* A closure which hides away the re-connection logic.
*/
const rejoin = async (strategy: 'full' | 'fast' | 'migrate' = 'full') => {
const rejoin = async (
strategy: 'full' | 'fast' | 'migrate' = 'full',
): Promise<void> => {
const currentState = this.state.callingState;
if (
currentState === CallingState.MIGRATING ||
currentState === CallingState.RECONNECTING
) {
// prevent parallel reconnection attempts
return;
}
this.reconnectAttempts++;
this.state.setCallingState(
strategy === 'migrate'
Expand All @@ -752,7 +762,7 @@ export class Call {
const localParticipant = this.state.localParticipant;

if (strategy === 'fast') {
previousSfuClient?.close();
sfuClient.close();
} else if (strategy === 'full') {
// in migration or recovery scenarios, we don't want to
// wait before attempting to reconnect to an SFU server
Expand All @@ -766,15 +776,15 @@ export class Call {
this.statsReporter?.stop();
this.statsReporter = undefined;

previousSfuClient?.close(); // clean up previous connection
sfuClient.close(); // clean up current connection
}
await this.join({
...data,
...(strategy === 'migrate' && { migrating_from: sfuServer.edge_name }),
});

if (strategy === 'migrate') {
previousSfuClient?.close(); // clean up previous connection
sfuClient?.close(); // clean up previous connection
}

this.logger(
Expand Down Expand Up @@ -881,25 +891,34 @@ export class Call {
// handlers for connection online/offline events
const unsubscribeOnlineEvent = this.streamClient.on(
'connection.changed',
(e) => {
async (e) => {
if (e.type !== 'connection.changed') return;
if (!e.online) return;
unsubscribeOnlineEvent();
const currentCallingState = this.state.callingState;
if (
const shouldReconnect =
currentCallingState === CallingState.OFFLINE ||
currentCallingState === CallingState.RECONNECTING_FAILED
) {
this.logger('info', '[Rejoin]: Going online...');
rejoin().catch((err) => {
currentCallingState === CallingState.RECONNECTING_FAILED;
if (!shouldReconnect) return;
this.logger('info', '[Rejoin]: Going online...');
do {
try {
await rejoin();
return; // break the loop if rejoin is successful
} catch (err) {
this.logger(
'error',
`[Rejoin]: Rejoin failed for ${this.reconnectAttempts} times. Giving up.`,
`[Rejoin][Network]: Rejoin failed for attempt ${this.reconnectAttempts}`,
err,
);
this.state.setCallingState(CallingState.RECONNECTING_FAILED);
});
}
}
// wait for a bit before trying to reconnect again
await sleep(retryInterval(this.reconnectAttempts));
} while (this.reconnectAttempts < this.maxReconnectAttempts);

// if we're here, it means that we've exhausted all the reconnect attempts
this.logger('error', `[Rejoin][Network]: Rejoin failed. Giving up.`);
this.state.setCallingState(CallingState.RECONNECTING_FAILED);
},
);
const unsubscribeOfflineEvent = this.streamClient.on(
Expand Down Expand Up @@ -985,12 +1004,23 @@ export class Call {
if (isMigrating) {
await this.subscriber.migrateTo(sfuClient, connectionConfig);
await this.publisher.migrateTo(sfuClient, connectionConfig);
} else if (isReconnecting && reconnected) {
// update the SFU client instance on the subscriber and publisher
this.subscriber.setSfuClient(sfuClient);
this.publisher.setSfuClient(sfuClient);
// and perform a full ICE restart on the publisher
await this.publisher.restartIce();
} else if (isReconnecting) {
if (reconnected) {
// update the SFU client instance on the subscriber and publisher
this.subscriber.setSfuClient(sfuClient);
this.publisher.setSfuClient(sfuClient);
// and perform a full ICE restart on the publisher
await this.publisher.restartIce();
} else if (previousSfuClient?.isFastReconnecting) {
// reconnection wasn't possible, so we need to do a full rejoin
return await rejoin('full').catch((err) => {
this.logger(
'error',
`[Rejoin]: Rejoin failed forced full rejoin.`,
err,
);
});
}
}
const currentParticipants = callState?.participants || [];
const participantCount = callState?.participantCount;
Expand Down Expand Up @@ -1208,9 +1238,6 @@ export class Call {
* Stops publishing the given track type to the call, if it is currently being published.
* Underlying track will be stopped and removed from the publisher.
*
* The `audioDeviceId`/`videoDeviceId` property of the [`localParticipant$`](./StreamVideoClient.md/#readonlystatestore) won't be updated, you can do that by calling the [`setAudioDevice`](#setaudiodevice)/[`setVideoDevice`](#setvideodevice) method.
*
*
* @param trackType the track type to stop publishing.
* @param stopTrack if `true` the track will be stopped, else it will be just disabled
*/
Expand Down
11 changes: 8 additions & 3 deletions packages/client/src/rtc/Publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
} from './videoLayers';
import { getPreferredCodecs } from './codecs';
import { trackTypeToParticipantStreamKey } from './helpers/tracks';
import { CallState } from '../store';
import { CallingState, CallState } from '../store';
import { PublishOptions } from '../types';
import { isReactNative } from '../helpers/platforms';
import { enableHighQualityAudio, toggleDtx } from '../helpers/sdp-munging';
Expand Down Expand Up @@ -96,6 +96,7 @@ export class Publisher {

private readonly iceRestartDelay: number;
private isIceRestarting = false;
private iceRestartTimeout?: NodeJS.Timeout;

/**
* The SFU client instance to use for publishing and signaling.
Expand Down Expand Up @@ -183,6 +184,7 @@ export class Publisher {
});
}

clearTimeout(this.iceRestartTimeout);
this.unsubscribeOnIceRestart();
this.pc.removeEventListener('negotiationneeded', this.onNegotiationNeeded);
this.pc.close();
Expand Down Expand Up @@ -795,16 +797,19 @@ export class Publisher {
const state = this.pc.iceConnectionState;
logger('debug', `ICE Connection state changed to`, state);

const hasNetworkConnection =
this.state.callingState !== CallingState.OFFLINE;

if (state === 'failed') {
logger('warn', `Attempting to restart ICE`);
this.restartIce().catch((e) => {
logger('error', `ICE restart error`, e);
});
} else if (state === 'disconnected') {
} else if (state === 'disconnected' && hasNetworkConnection) {
// when in `disconnected` state, the browser may recover automatically,
// hence, we delay the ICE restart
logger('warn', `Scheduling ICE restart in ${this.iceRestartDelay} ms.`);
setTimeout(() => {
this.iceRestartTimeout = setTimeout(() => {
// check if the state is still `disconnected` or `failed`
// as the connection may have recovered (or failed) in the meantime
if (
Expand Down
11 changes: 8 additions & 3 deletions packages/client/src/rtc/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { PeerType } from '../gen/video/sfu/models/models';
import { SubscriberOffer } from '../gen/video/sfu/event/events';
import { Dispatcher } from './Dispatcher';
import { getLogger } from '../logger';
import { CallState } from '../store';
import { CallingState, CallState } from '../store';

export type SubscriberOpts = {
sfuClient: StreamSfuClient;
Expand All @@ -30,6 +30,7 @@ export class Subscriber {

private readonly iceRestartDelay: number;
private isIceRestarting = false;
private iceRestartTimeout?: NodeJS.Timeout;

/**
* Constructs a new `Subscriber` instance.
Expand Down Expand Up @@ -100,6 +101,7 @@ export class Subscriber {
* Closes the `RTCPeerConnection` and unsubscribes from the dispatcher.
*/
close = () => {
clearTimeout(this.iceRestartTimeout);
this.unregisterOnSubscriberOffer();
this.unregisterOnIceRestart();
this.pc.close();
Expand Down Expand Up @@ -345,16 +347,19 @@ export class Subscriber {
// do nothing when ICE is restarting
if (this.isIceRestarting) return;

const hasNetworkConnection =
this.state.callingState !== CallingState.OFFLINE;

if (state === 'failed') {
logger('warn', `Attempting to restart ICE`);
this.restartIce().catch((e) => {
logger('error', `ICE restart failed`, e);
});
} else if (state === 'disconnected') {
} else if (state === 'disconnected' && hasNetworkConnection) {
// when in `disconnected` state, the browser may recover automatically,
// hence, we delay the ICE restart
logger('warn', `Scheduling ICE restart in ${this.iceRestartDelay} ms.`);
setTimeout(() => {
this.iceRestartTimeout = setTimeout(() => {
// check if the state is still `disconnected` or `failed`
// as the connection may have recovered (or failed) in the meantime
if (
Expand Down

0 comments on commit 14d794d

Please sign in to comment.