diff --git a/packages/client/src/Call.ts b/packages/client/src/Call.ts index 31ef31985a..4d959f3c9c 100644 --- a/packages/client/src/Call.ts +++ b/packages/client/src/Call.ts @@ -760,7 +760,10 @@ export class Call { const localParticipant = this.state.localParticipant; if (strategy === 'fast') { - sfuClient.close(); + sfuClient.close( + StreamSfuClient.ERROR_CONNECTION_BROKEN, + 'js-client: attempting fast reconnect', + ); } else if (strategy === 'full') { // in migration or recovery scenarios, we don't want to // wait before attempting to reconnect to an SFU server @@ -774,15 +777,23 @@ export class Call { this.statsReporter?.stop(); this.statsReporter = undefined; - sfuClient.close(); // clean up current connection + // clean up current connection + sfuClient.close( + StreamSfuClient.NORMAL_CLOSURE, + 'js-client: attempting full reconnect', + ); } await this.join({ ...data, ...(strategy === 'migrate' && { migrating_from: sfuServer.edge_name }), }); + // clean up previous connection if (strategy === 'migrate') { - sfuClient?.close(); // clean up previous connection + sfuClient.close( + StreamSfuClient.NORMAL_CLOSURE, + 'js-client: attempting migration', + ); } this.logger( @@ -853,7 +864,7 @@ export class Call { // the upcoming re-join will register a new handler anyway unregisterGoAway(); // do nothing if the connection was closed on purpose - if (e.code === KnownCodes.WS_CLOSED_SUCCESS) return; + if (e.code === StreamSfuClient.NORMAL_CLOSURE) return; // do nothing if the connection was closed because of a policy violation // e.g., the user has been blocked by an admin or moderator if (e.code === KnownCodes.WS_POLICY_VIOLATION) return; @@ -863,7 +874,13 @@ export class Call { // to reconnect to the old SFU, but rather to the new one. const isMigratingAway = e.code === KnownCodes.WS_CLOSED_ABRUPTLY && sfuClient.isMigratingAway; - if (isMigratingAway) return; + const isFastReconnecting = + e.code === KnownCodes.WS_CLOSED_ABRUPTLY && + sfuClient.isFastReconnecting; + if (isMigratingAway || isFastReconnecting) return; + + // do nothing if the connection was closed because of a fast reconnect + if (e.code === StreamSfuClient.ERROR_CONNECTION_BROKEN) return; if (this.reconnectAttempts < this.maxReconnectAttempts) { sfuClient.isFastReconnecting = this.reconnectAttempts === 0; diff --git a/packages/client/src/StreamSfuClient.ts b/packages/client/src/StreamSfuClient.ts index 4580191d76..d049c8283c 100644 --- a/packages/client/src/StreamSfuClient.ts +++ b/packages/client/src/StreamSfuClient.ts @@ -119,6 +119,24 @@ export class StreamSfuClient { private readonly unsubscribeIceTrickle: () => void; private readonly logger: Logger; + /** + * The normal closure code. Used for controlled shutdowns. + */ + static NORMAL_CLOSURE = 1000; + /** + * The error code used when the SFU connection is unhealthy. + * Usually, this means that no message has been received from the SFU for + * a certain amount of time (`connectionCheckTimeout`). + */ + static ERROR_CONNECTION_UNHEALTHY = 4001; + + /** + * The error code used when the SFU connection is broken. + * Usually, this means that the WS connection has been closed unexpectedly. + * This error code is used to announce a fast-reconnect. + */ + static ERROR_CONNECTION_BROKEN = 4002; // used in fast-reconnects + /** * Constructs a new SFU client. * @@ -193,11 +211,13 @@ export class StreamSfuClient { } close = ( - code: number = 1000, - reason: string = 'Requested signal connection close', + code: number = StreamSfuClient.NORMAL_CLOSURE, + reason: string = 'js-client: requested signal connection close', ) => { this.logger('debug', 'Closing SFU WS connection', code, reason); - this.signalWs.close(code, reason); + if (this.signalWs.readyState === this.signalWs.CLOSED) { + this.signalWs.close(code, reason); + } this.unsubscribeIceTrickle(); clearInterval(this.keepAliveInterval); @@ -312,9 +332,7 @@ export class StreamSfuClient { }; private keepAlive = () => { - if (this.keepAliveInterval) { - clearInterval(this.keepAliveInterval); - } + clearInterval(this.keepAliveInterval); this.keepAliveInterval = setInterval(() => { this.logger('trace', 'Sending healthCheckRequest to SFU'); const message = SfuRequest.create({ @@ -323,15 +341,14 @@ export class StreamSfuClient { healthCheckRequest: {}, }, }); - void this.send(message); + this.send(message).catch((e) => { + this.logger('error', 'Error sending healthCheckRequest to SFU', e); + }); }, this.pingIntervalInMs); }; private scheduleConnectionCheck = () => { - if (this.connectionCheckTimeout) { - clearTimeout(this.connectionCheckTimeout); - } - + clearTimeout(this.connectionCheckTimeout); this.connectionCheckTimeout = setTimeout(() => { if (this.lastMessageTimestamp) { const timeSinceLastMessage = @@ -339,8 +356,8 @@ export class StreamSfuClient { if (timeSinceLastMessage > this.unhealthyTimeoutInMs) { this.close( - 4001, - `SFU connection unhealthy. Didn't receive any healthcheck messages for ${this.unhealthyTimeoutInMs}ms`, + StreamSfuClient.ERROR_CONNECTION_UNHEALTHY, + `SFU connection unhealthy. Didn't receive any message for ${this.unhealthyTimeoutInMs}ms`, ); } }