Skip to content

Commit

Permalink
feat: Fast Reconnection (#1212)
Browse files Browse the repository at this point in the history
Implements Fast Reconnection flow. This should significantly speed up the call drop recovery caused by a network switch.

### Implementation notes
In case the WebSocket connection interrupts (due to a network switch
WiFi -> 5G, or anything else), we'll attempt to re-establish a
connection to the existing SFU by reusing the existing credentials and
configuration. When this fails, we will attempt a Full Reconnect, and
involve the Coordinator in the picture, as we do it today.
  • Loading branch information
oliverlaz authored Dec 15, 2023
1 parent 6a5c05c commit 88a45e9
Show file tree
Hide file tree
Showing 6 changed files with 239 additions and 131 deletions.
209 changes: 137 additions & 72 deletions packages/client/src/Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import {
UpdateUserPermissionsRequest,
UpdateUserPermissionsResponse,
} from './gen/coordinator';
import { join, reconcileParticipantLocalState } from './rtc/flows/join';
import { join } from './rtc/flows/join';
import {
AudioTrackType,
CallConstructor,
Expand Down Expand Up @@ -653,7 +653,7 @@ export class Call {
*
* @returns a promise which resolves once the call join-flow has finished.
*/
join = async (data?: JoinCallData) => {
join = async (data?: JoinCallData): Promise<void> => {
const callingState = this.state.callingState;
if ([CallingState.JOINED, CallingState.JOINING].includes(callingState)) {
this.logger(
Expand All @@ -670,6 +670,7 @@ export class Call {
}

const isMigrating = callingState === CallingState.MIGRATING;
const isReconnecting = callingState === CallingState.RECONNECTING;
this.state.setCallingState(CallingState.JOINING);
this.logger('debug', 'Starting join flow');

Expand All @@ -686,13 +687,21 @@ export class Call {
let sfuToken: string;
let connectionConfig: RTCConfiguration | undefined;
try {
const call = await join(this.streamClient, this.type, this.id, data);
this.state.updateFromCallResponse(call.metadata);
this.state.setMembers(call.members);
this.state.setOwnCapabilities(call.ownCapabilities);
connectionConfig = call.connectionConfig;
sfuServer = call.sfuServer;
sfuToken = call.token;
if (this.sfuClient?.isFastReconnecting) {
// use previous SFU configuration and values
connectionConfig = this.publisher?.connectionConfiguration;
sfuServer = this.sfuClient.sfuServer;
sfuToken = this.sfuClient.token;
} else {
// full join flow - let the Coordinator pick a new SFU for us
const call = await join(this.streamClient, this.type, this.id, data);
this.state.updateFromCallResponse(call.metadata);
this.state.setMembers(call.members);
this.state.setOwnCapabilities(call.ownCapabilities);
connectionConfig = call.connectionConfig;
sfuServer = call.sfuServer;
sfuToken = call.token;
}

if (this.streamClient._hasConnectionID()) {
this.watching = true;
Expand All @@ -715,13 +724,25 @@ export class Call {
/**
* A closure which hides away the re-connection logic.
*/
const rejoin = async ({ migrate = false } = {}) => {
const reconnect = async (
strategy: 'full' | 'fast' | 'migrate' = 'full',
): Promise<void> => {
const currentState = this.state.callingState;
if (
currentState === CallingState.MIGRATING ||
currentState === CallingState.RECONNECTING
) {
// prevent parallel reconnection attempts
return;
}
this.reconnectAttempts++;
this.state.setCallingState(
migrate ? CallingState.MIGRATING : CallingState.RECONNECTING,
strategy === 'migrate'
? CallingState.MIGRATING
: CallingState.RECONNECTING,
);

if (migrate) {
if (strategy === 'migrate') {
this.logger(
'debug',
`[Migration]: migrating call ${this.cid} away from ${sfuServer.edge_name}`,
Expand All @@ -730,39 +751,49 @@ export class Call {
} else {
this.logger(
'debug',
`[Rejoin]: Rejoining call ${this.cid} (${this.reconnectAttempts})...`,
`[Rejoin]: ${strategy} rejoin call ${this.cid} (${this.reconnectAttempts})...`,
);
}

// take a snapshot of the current "local participant" state
// we'll need it for restoring the previous publishing state later
const localParticipant = this.state.localParticipant;

const disconnectFromPreviousSfu = () => {
if (!migrate) {
this.subscriber?.close();
this.subscriber = undefined;
this.publisher?.close({ stopTracks: false });
this.publisher = undefined;
this.statsReporter?.stop();
this.statsReporter = undefined;
}
previousSfuClient?.close(); // clean up previous connection
};

if (!migrate) {
if (strategy === 'fast') {
sfuClient.close(
StreamSfuClient.ERROR_CONNECTION_BROKEN,
'js-client: attempting fast reconnect',
);
} else if (strategy === 'full') {
// in migration or recovery scenarios, we don't want to
// wait before attempting to reconnect to an SFU server
await sleep(retryInterval(this.reconnectAttempts));
disconnectFromPreviousSfu();

// in full-reconnect, we need to dispose all Peer Connections
this.subscriber?.close();
this.subscriber = undefined;
this.publisher?.close({ stopTracks: false });
this.publisher = undefined;
this.statsReporter?.stop();
this.statsReporter = undefined;

// clean up current connection
sfuClient.close(
StreamSfuClient.NORMAL_CLOSURE,
'js-client: attempting full reconnect',
);
}
await this.join({
...data,
...(migrate && { migrating_from: sfuServer.edge_name }),
...(strategy === 'migrate' && { migrating_from: sfuServer.edge_name }),
});

if (migrate) {
disconnectFromPreviousSfu();
// clean up previous connection
if (strategy === 'migrate') {
sfuClient.close(
StreamSfuClient.NORMAL_CLOSURE,
'js-client: attempting migration',
);
}

this.logger(
Expand All @@ -772,7 +803,7 @@ export class Call {
// we shouldn't be republishing the streams if we're migrating
// as the underlying peer connection will take care of it as part
// of the ice-restart process
if (localParticipant && !migrate) {
if (localParticipant && strategy === 'full') {
const {
audioStream,
videoStream,
Expand All @@ -799,11 +830,12 @@ export class Call {
});
}
if (screenShare) await this.publishScreenShareStream(screenShare);

this.logger(
'info',
`[Rejoin]: State restored. Attempt: ${this.reconnectAttempts}`,
);
}
this.logger(
'info',
`[Rejoin]: State restored. Attempt: ${this.reconnectAttempts}`,
);
};

// reconnect if the connection was closed unexpectedly. example:
Expand All @@ -818,7 +850,7 @@ export class Call {
'info',
`[Migration]: Going away from SFU... Reason: ${GoAwayReason[reason]}`,
);
rejoin({ migrate: true }).catch((err) => {
reconnect('migrate').catch((err) => {
this.logger(
'warn',
`[Migration]: Failed to migrate to another SFU.`,
Expand All @@ -832,24 +864,31 @@ export class Call {
// the upcoming re-join will register a new handler anyway
unregisterGoAway();
// do nothing if the connection was closed on purpose
if (e.code === KnownCodes.WS_CLOSED_SUCCESS) return;
if (e.code === StreamSfuClient.NORMAL_CLOSURE) return;
// do nothing if the connection was closed because of a policy violation
// e.g., the user has been blocked by an admin or moderator
if (e.code === KnownCodes.WS_POLICY_VIOLATION) return;
// When the SFU is being shut down, it sends a goAway message.
// While we migrate to another SFU, we might have the WS connection
// to the old SFU closed abruptly. In this case, we don't want
// to reconnect to the old SFU, but rather to the new one.
if (
const isMigratingAway =
e.code === KnownCodes.WS_CLOSED_ABRUPTLY && sfuClient.isMigratingAway;
const isFastReconnecting =
e.code === KnownCodes.WS_CLOSED_ABRUPTLY &&
sfuClient.isMigratingAway
)
return;
sfuClient.isFastReconnecting;
if (isMigratingAway || isFastReconnecting) return;

// do nothing if the connection was closed because of a fast reconnect
if (e.code === StreamSfuClient.ERROR_CONNECTION_BROKEN) return;

if (this.reconnectAttempts < this.maxReconnectAttempts) {
rejoin().catch((err) => {
sfuClient.isFastReconnecting = this.reconnectAttempts === 0;
const strategy = sfuClient.isFastReconnecting ? 'fast' : 'full';
reconnect(strategy).catch((err) => {
this.logger(
'error',
`[Rejoin]: Rejoin failed for ${this.reconnectAttempts} times. Giving up.`,
`[Rejoin]: ${strategy} rejoin failed for ${this.reconnectAttempts} times. Giving up.`,
err,
);
this.state.setCallingState(CallingState.RECONNECTING_FAILED);
Expand All @@ -867,25 +906,37 @@ export class Call {
// handlers for connection online/offline events
const unsubscribeOnlineEvent = this.streamClient.on(
'connection.changed',
(e) => {
async (e) => {
if (e.type !== 'connection.changed') return;
if (!e.online) return;
unsubscribeOnlineEvent();
const currentCallingState = this.state.callingState;
if (
const shouldReconnect =
currentCallingState === CallingState.OFFLINE ||
currentCallingState === CallingState.RECONNECTING_FAILED
) {
this.logger('info', '[Rejoin]: Going online...');
rejoin().catch((err) => {
currentCallingState === CallingState.RECONNECTING_FAILED;
if (!shouldReconnect) return;
this.logger('info', '[Rejoin]: Going online...');
let isFirstReconnectAttempt = true;
do {
try {
sfuClient.isFastReconnecting = isFirstReconnectAttempt;
await reconnect(isFirstReconnectAttempt ? 'fast' : 'full');
return; // break the loop if rejoin is successful
} catch (err) {
this.logger(
'error',
`[Rejoin]: Rejoin failed for ${this.reconnectAttempts} times. Giving up.`,
`[Rejoin][Network]: Rejoin failed for attempt ${this.reconnectAttempts}`,
err,
);
this.state.setCallingState(CallingState.RECONNECTING_FAILED);
});
}
}
// wait for a bit before trying to reconnect again
await sleep(retryInterval(this.reconnectAttempts));
isFirstReconnectAttempt = false;
} while (this.reconnectAttempts < this.maxReconnectAttempts);

// if we're here, it means that we've exhausted all the reconnect attempts
this.logger('error', `[Rejoin][Network]: Rejoin failed. Giving up.`);
this.state.setCallingState(CallingState.RECONNECTING_FAILED);
},
);
const unsubscribeOfflineEvent = this.streamClient.on(
Expand Down Expand Up @@ -927,7 +978,7 @@ export class Call {
});
}

if (!isMigrating) {
if (!this.statsReporter) {
this.statsReporter = createStatsReporter({
subscriber: this.subscriber,
publisher: this.publisher,
Expand Down Expand Up @@ -957,17 +1008,37 @@ export class Call {
subscriberSdp: sdp || '',
clientDetails: getClientDetails(),
migration,
fastReconnect: false,
fastReconnect: previousSfuClient?.isFastReconnecting ?? false,
});
});

// 2. in parallel, wait for the SFU to send us the "joinResponse"
// this will throw an error if the SFU rejects the join request or
// fails to respond in time
const { callState } = await this.waitForJoinResponse();
const { callState, reconnected } = await this.waitForJoinResponse();
if (isReconnecting) {
this.logger('debug', '[Rejoin] fast reconnected:', reconnected);
}
if (isMigrating) {
await this.subscriber.migrateTo(sfuClient, connectionConfig);
await this.publisher.migrateTo(sfuClient, connectionConfig);
} else if (isReconnecting) {
if (reconnected) {
// update the SFU client instance on the subscriber and publisher
this.subscriber.setSfuClient(sfuClient);
this.publisher.setSfuClient(sfuClient);
// and perform a full ICE restart on the publisher
await this.publisher.restartIce();
} else if (previousSfuClient?.isFastReconnecting) {
// reconnection wasn't possible, so we need to do a full rejoin
return await reconnect('full').catch((err) => {
this.logger(
'error',
`[Rejoin]: Rejoin failed forced full rejoin.`,
err,
);
});
}
}
const currentParticipants = callState?.participants || [];
const participantCount = callState?.participantCount;
Expand All @@ -977,22 +1048,19 @@ export class Call {
const pins = callState?.pins ?? [];
this.state.setParticipants(() => {
const participantLookup = this.state.getParticipantLookupBySessionId();
return currentParticipants.map((p) => {
const participant: StreamVideoParticipant = Object.assign(p, {
isLocalParticipant: p.sessionId === sfuClient.sessionId,
viewportVisibilityState: {
videoTrack: VisibilityState.UNKNOWN,
screenShareTrack: VisibilityState.UNKNOWN,
},
});
return currentParticipants.map<StreamVideoParticipant>((p) => {
// We need to preserve the local state of the participant
// (e.g. videoDimension, visibilityState, pinnedAt, etc.)
// as it doesn't exist on the server.
const existingParticipant = participantLookup[p.sessionId];
return reconcileParticipantLocalState(
participant,
existingParticipant,
);
return Object.assign(p, existingParticipant, {
isLocalParticipant: p.sessionId === sfuClient.sessionId,
viewportVisibilityState:
existingParticipant?.viewportVisibilityState ?? {
videoTrack: VisibilityState.UNKNOWN,
screenShareTrack: VisibilityState.UNKNOWN,
},
} satisfies Partial<StreamVideoParticipant>);
});
});
this.state.setParticipantCount(participantCount?.total || 0);
Expand Down Expand Up @@ -1031,7 +1099,7 @@ export class Call {
`[Rejoin]: Rejoin ${this.reconnectAttempts} failed.`,
err,
);
await rejoin();
await reconnect();
this.logger(
'info',
`[Rejoin]: Rejoin ${this.reconnectAttempts} successful!`,
Expand Down Expand Up @@ -1185,9 +1253,6 @@ export class Call {
* Stops publishing the given track type to the call, if it is currently being published.
* Underlying track will be stopped and removed from the publisher.
*
* The `audioDeviceId`/`videoDeviceId` property of the [`localParticipant$`](./StreamVideoClient.md/#readonlystatestore) won't be updated, you can do that by calling the [`setAudioDevice`](#setaudiodevice)/[`setVideoDevice`](#setvideodevice) method.
*
*
* @param trackType the track type to stop publishing.
* @param stopTrack if `true` the track will be stopped, else it will be just disabled
*/
Expand Down
Loading

0 comments on commit 88a45e9

Please sign in to comment.