From 909d37697eb0083055947b6c1e36579758038318 Mon Sep 17 00:00:00 2001 From: Oliver Lazoroski Date: Tue, 5 Dec 2023 17:25:28 +0100 Subject: [PATCH] fix: close previous sfu connection, ICE restart on the publisher --- packages/client/src/Call.ts | 94 +++++++++++++------------- packages/client/src/StreamSfuClient.ts | 5 +- packages/client/src/rtc/Publisher.ts | 9 +++ 3 files changed, 58 insertions(+), 50 deletions(-) diff --git a/packages/client/src/Call.ts b/packages/client/src/Call.ts index 2948202091..f237f7d961 100644 --- a/packages/client/src/Call.ts +++ b/packages/client/src/Call.ts @@ -672,6 +672,7 @@ export class Call { } const isMigrating = callingState === CallingState.MIGRATING; + const isReconnecting = callingState === CallingState.RECONNECTING; this.state.setCallingState(CallingState.JOINING); this.logger('debug', 'Starting join flow'); @@ -688,7 +689,7 @@ export class Call { let sfuToken: string; let connectionConfig: RTCConfiguration | undefined; try { - if (this.sfuClient?.attemptFastReconnect) { + if (this.sfuClient?.isFastReconnecting) { // use previous SFU configuration and values connectionConfig = this.publisher?.connectionConfiguration; sfuServer = this.sfuClient.sfuServer; @@ -725,13 +726,15 @@ export class Call { /** * A closure which hides away the re-connection logic. */ - const rejoin = async ({ migrate = false } = {}) => { + const rejoin = async (strategy: 'full' | 'fast' | 'migrate' = 'full') => { this.reconnectAttempts++; this.state.setCallingState( - migrate ? CallingState.MIGRATING : CallingState.RECONNECTING, + strategy === 'migrate' + ? CallingState.MIGRATING + : CallingState.RECONNECTING, ); - if (migrate) { + if (strategy === 'migrate') { this.logger( 'debug', `[Migration]: migrating call ${this.cid} away from ${sfuServer.edge_name}`, @@ -740,9 +743,7 @@ export class Call { } else { this.logger( 'debug', - `[Rejoin]: ${ - sfuClient?.attemptFastReconnect ? 'Fast' : 'Full' - } rejoin call ${this.cid} (${this.reconnectAttempts})...`, + `[Rejoin]: ${strategy} rejoin call ${this.cid} (${this.reconnectAttempts})...`, ); } @@ -750,33 +751,30 @@ export class Call { // we'll need it for restoring the previous publishing state later const localParticipant = this.state.localParticipant; - const disconnectFromPreviousSfu = () => { - if (!migrate) { - this.subscriber?.close(); - this.subscriber = undefined; - this.publisher?.close({ stopTracks: false }); - this.publisher = undefined; - this.statsReporter?.stop(); - this.statsReporter = undefined; - } - previousSfuClient?.close(); // clean up previous connection - }; - - if (sfuClient.attemptFastReconnect) { - sfuClient.close(); - } else if (!migrate) { + if (strategy === 'fast') { + previousSfuClient?.close(); + } else if (strategy === 'full') { // in migration or recovery scenarios, we don't want to // wait before attempting to reconnect to an SFU server await sleep(retryInterval(this.reconnectAttempts)); - disconnectFromPreviousSfu(); + + // in full-reconnect, we need to dispose all Peer Connections + this.subscriber?.close(); + this.subscriber = undefined; + this.publisher?.close({ stopTracks: false }); + this.publisher = undefined; + this.statsReporter?.stop(); + this.statsReporter = undefined; + + previousSfuClient?.close(); // clean up previous connection } await this.join({ ...data, - ...(migrate && { migrating_from: sfuServer.edge_name }), + ...(strategy === 'migrate' && { migrating_from: sfuServer.edge_name }), }); - if (migrate) { - disconnectFromPreviousSfu(); + if (strategy === 'migrate') { + previousSfuClient?.close(); // clean up previous connection } this.logger( @@ -786,7 +784,7 @@ export class Call { // we shouldn't be republishing the streams if we're migrating // as the underlying peer connection will take care of it as part // of the ice-restart process - if (localParticipant && !migrate && !sfuClient.attemptFastReconnect) { + if (localParticipant && strategy === 'full') { const { audioStream, videoStream, @@ -813,11 +811,12 @@ export class Call { }); } if (screenShare) await this.publishScreenShareStream(screenShare); + + this.logger( + 'info', + `[Rejoin]: State restored. Attempt: ${this.reconnectAttempts}`, + ); } - this.logger( - 'info', - `[Rejoin]: State restored. Attempt: ${this.reconnectAttempts}`, - ); }; // reconnect if the connection was closed unexpectedly. example: @@ -832,7 +831,7 @@ export class Call { 'info', `[Migration]: Going away from SFU... Reason: ${GoAwayReason[reason]}`, ); - rejoin({ migrate: true }).catch((err) => { + rejoin('migrate').catch((err) => { this.logger( 'warn', `[Migration]: Failed to migrate to another SFU.`, @@ -854,18 +853,14 @@ export class Call { // While we migrate to another SFU, we might have the WS connection // to the old SFU closed abruptly. In this case, we don't want // to reconnect to the old SFU, but rather to the new one. - if ( - e.code === KnownCodes.WS_CLOSED_ABRUPTLY && - sfuClient.isMigratingAway - ) - return; - - // attempt a fast reconnect first, - // in case it fails, we'll try a full reconnect - sfuClient.attemptFastReconnect = this.reconnectAttempts === 0; + const isMigratingAway = + e.code === KnownCodes.WS_CLOSED_ABRUPTLY && sfuClient.isMigratingAway; + if (isMigratingAway) return; if (this.reconnectAttempts < this.maxReconnectAttempts) { - rejoin().catch((err) => { + sfuClient.isFastReconnecting = this.reconnectAttempts === 0; + const strategy = sfuClient.isFastReconnecting ? 'fast' : 'full'; + rejoin(strategy).catch((err) => { this.logger( 'error', `[Rejoin]: Rejoin failed for ${this.reconnectAttempts} times. Giving up.`, @@ -946,7 +941,7 @@ export class Call { }); } - if (!isMigrating) { + if (!this.statsReporter) { this.statsReporter = createStatsReporter({ subscriber: this.subscriber, publisher: this.publisher, @@ -976,7 +971,7 @@ export class Call { subscriberSdp: sdp || '', clientDetails: getClientDetails(), migration, - fastReconnect: previousSfuClient?.attemptFastReconnect ?? false, + fastReconnect: previousSfuClient?.isFastReconnecting ?? false, }); }); @@ -984,15 +979,18 @@ export class Call { // this will throw an error if the SFU rejects the join request or // fails to respond in time const { callState, reconnected } = await this.waitForJoinResponse(); - this.logger('debug', '[Join] fast reconnect:', reconnected); + if (isReconnecting) { + this.logger('debug', '[Rejoin] fast reconnected:', reconnected); + } if (isMigrating) { await this.subscriber.migrateTo(sfuClient, connectionConfig); await this.publisher.migrateTo(sfuClient, connectionConfig); - } else if (reconnected) { + } else if (isReconnecting && reconnected) { // update the SFU client instance on the subscriber and publisher - // and perform a full ICE restart on the publisher this.subscriber.setSfuClient(sfuClient); - await this.publisher.migrateTo(sfuClient, connectionConfig); + this.publisher.setSfuClient(sfuClient); + // and perform a full ICE restart on the publisher + await this.publisher.restartIce(); } const currentParticipants = callState?.participants || []; const participantCount = callState?.participantCount; diff --git a/packages/client/src/StreamSfuClient.ts b/packages/client/src/StreamSfuClient.ts index 14c1e7a14d..4580191d76 100644 --- a/packages/client/src/StreamSfuClient.ts +++ b/packages/client/src/StreamSfuClient.ts @@ -108,7 +108,7 @@ export class StreamSfuClient { * A flag indicating that the client connection is broken for the current * client and that a fast-reconnect with a new client should be attempted. */ - attemptFastReconnect = false; + isFastReconnecting = false; private readonly rpc: SignalServerClient; private keepAliveInterval?: NodeJS.Timeout; @@ -299,8 +299,9 @@ export class StreamSfuClient { ); }; - send = (message: SfuRequest) => { + send = async (message: SfuRequest) => { return this.signalReady.then((signal) => { + if (signal.readyState !== signal.OPEN) return; this.logger( 'debug', `Sending message to: ${this.edgeName}`, diff --git a/packages/client/src/rtc/Publisher.ts b/packages/client/src/rtc/Publisher.ts index 613d323651..cbfbfcf0b0 100644 --- a/packages/client/src/rtc/Publisher.ts +++ b/packages/client/src/rtc/Publisher.ts @@ -548,6 +548,15 @@ export class Publisher { }); }; + /** + * Sets the SFU client to use. + * + * @param sfuClient the SFU client to use. + */ + setSfuClient = (sfuClient: StreamSfuClient) => { + this.sfuClient = sfuClient; + }; + /** * Performs a migration of this publisher instance to a new SFU. *