diff --git a/packages/client/src/Call.ts b/packages/client/src/Call.ts index d3a3f4a546..4d959f3c9c 100644 --- a/packages/client/src/Call.ts +++ b/packages/client/src/Call.ts @@ -62,7 +62,7 @@ import { UpdateUserPermissionsRequest, UpdateUserPermissionsResponse, } from './gen/coordinator'; -import { join, reconcileParticipantLocalState } from './rtc/flows/join'; +import { join } from './rtc/flows/join'; import { AudioTrackType, CallConstructor, @@ -653,7 +653,7 @@ export class Call { * * @returns a promise which resolves once the call join-flow has finished. */ - join = async (data?: JoinCallData) => { + join = async (data?: JoinCallData): Promise => { const callingState = this.state.callingState; if ([CallingState.JOINED, CallingState.JOINING].includes(callingState)) { this.logger( @@ -670,6 +670,7 @@ export class Call { } const isMigrating = callingState === CallingState.MIGRATING; + const isReconnecting = callingState === CallingState.RECONNECTING; this.state.setCallingState(CallingState.JOINING); this.logger('debug', 'Starting join flow'); @@ -686,13 +687,21 @@ export class Call { let sfuToken: string; let connectionConfig: RTCConfiguration | undefined; try { - const call = await join(this.streamClient, this.type, this.id, data); - this.state.updateFromCallResponse(call.metadata); - this.state.setMembers(call.members); - this.state.setOwnCapabilities(call.ownCapabilities); - connectionConfig = call.connectionConfig; - sfuServer = call.sfuServer; - sfuToken = call.token; + if (this.sfuClient?.isFastReconnecting) { + // use previous SFU configuration and values + connectionConfig = this.publisher?.connectionConfiguration; + sfuServer = this.sfuClient.sfuServer; + sfuToken = this.sfuClient.token; + } else { + // full join flow - let the Coordinator pick a new SFU for us + const call = await join(this.streamClient, this.type, this.id, data); + this.state.updateFromCallResponse(call.metadata); + this.state.setMembers(call.members); + this.state.setOwnCapabilities(call.ownCapabilities); + connectionConfig = call.connectionConfig; + sfuServer = call.sfuServer; + sfuToken = call.token; + } if (this.streamClient._hasConnectionID()) { this.watching = true; @@ -715,13 +724,25 @@ export class Call { /** * A closure which hides away the re-connection logic. */ - const rejoin = async ({ migrate = false } = {}) => { + const reconnect = async ( + strategy: 'full' | 'fast' | 'migrate' = 'full', + ): Promise => { + const currentState = this.state.callingState; + if ( + currentState === CallingState.MIGRATING || + currentState === CallingState.RECONNECTING + ) { + // prevent parallel reconnection attempts + return; + } this.reconnectAttempts++; this.state.setCallingState( - migrate ? CallingState.MIGRATING : CallingState.RECONNECTING, + strategy === 'migrate' + ? CallingState.MIGRATING + : CallingState.RECONNECTING, ); - if (migrate) { + if (strategy === 'migrate') { this.logger( 'debug', `[Migration]: migrating call ${this.cid} away from ${sfuServer.edge_name}`, @@ -730,7 +751,7 @@ export class Call { } else { this.logger( 'debug', - `[Rejoin]: Rejoining call ${this.cid} (${this.reconnectAttempts})...`, + `[Rejoin]: ${strategy} rejoin call ${this.cid} (${this.reconnectAttempts})...`, ); } @@ -738,31 +759,41 @@ export class Call { // we'll need it for restoring the previous publishing state later const localParticipant = this.state.localParticipant; - const disconnectFromPreviousSfu = () => { - if (!migrate) { - this.subscriber?.close(); - this.subscriber = undefined; - this.publisher?.close({ stopTracks: false }); - this.publisher = undefined; - this.statsReporter?.stop(); - this.statsReporter = undefined; - } - previousSfuClient?.close(); // clean up previous connection - }; - - if (!migrate) { + if (strategy === 'fast') { + sfuClient.close( + StreamSfuClient.ERROR_CONNECTION_BROKEN, + 'js-client: attempting fast reconnect', + ); + } else if (strategy === 'full') { // in migration or recovery scenarios, we don't want to // wait before attempting to reconnect to an SFU server await sleep(retryInterval(this.reconnectAttempts)); - disconnectFromPreviousSfu(); + + // in full-reconnect, we need to dispose all Peer Connections + this.subscriber?.close(); + this.subscriber = undefined; + this.publisher?.close({ stopTracks: false }); + this.publisher = undefined; + this.statsReporter?.stop(); + this.statsReporter = undefined; + + // clean up current connection + sfuClient.close( + StreamSfuClient.NORMAL_CLOSURE, + 'js-client: attempting full reconnect', + ); } await this.join({ ...data, - ...(migrate && { migrating_from: sfuServer.edge_name }), + ...(strategy === 'migrate' && { migrating_from: sfuServer.edge_name }), }); - if (migrate) { - disconnectFromPreviousSfu(); + // clean up previous connection + if (strategy === 'migrate') { + sfuClient.close( + StreamSfuClient.NORMAL_CLOSURE, + 'js-client: attempting migration', + ); } this.logger( @@ -772,7 +803,7 @@ export class Call { // we shouldn't be republishing the streams if we're migrating // as the underlying peer connection will take care of it as part // of the ice-restart process - if (localParticipant && !migrate) { + if (localParticipant && strategy === 'full') { const { audioStream, videoStream, @@ -799,11 +830,12 @@ export class Call { }); } if (screenShare) await this.publishScreenShareStream(screenShare); + + this.logger( + 'info', + `[Rejoin]: State restored. Attempt: ${this.reconnectAttempts}`, + ); } - this.logger( - 'info', - `[Rejoin]: State restored. Attempt: ${this.reconnectAttempts}`, - ); }; // reconnect if the connection was closed unexpectedly. example: @@ -818,7 +850,7 @@ export class Call { 'info', `[Migration]: Going away from SFU... Reason: ${GoAwayReason[reason]}`, ); - rejoin({ migrate: true }).catch((err) => { + reconnect('migrate').catch((err) => { this.logger( 'warn', `[Migration]: Failed to migrate to another SFU.`, @@ -832,7 +864,7 @@ export class Call { // the upcoming re-join will register a new handler anyway unregisterGoAway(); // do nothing if the connection was closed on purpose - if (e.code === KnownCodes.WS_CLOSED_SUCCESS) return; + if (e.code === StreamSfuClient.NORMAL_CLOSURE) return; // do nothing if the connection was closed because of a policy violation // e.g., the user has been blocked by an admin or moderator if (e.code === KnownCodes.WS_POLICY_VIOLATION) return; @@ -840,16 +872,23 @@ export class Call { // While we migrate to another SFU, we might have the WS connection // to the old SFU closed abruptly. In this case, we don't want // to reconnect to the old SFU, but rather to the new one. - if ( + const isMigratingAway = + e.code === KnownCodes.WS_CLOSED_ABRUPTLY && sfuClient.isMigratingAway; + const isFastReconnecting = e.code === KnownCodes.WS_CLOSED_ABRUPTLY && - sfuClient.isMigratingAway - ) - return; + sfuClient.isFastReconnecting; + if (isMigratingAway || isFastReconnecting) return; + + // do nothing if the connection was closed because of a fast reconnect + if (e.code === StreamSfuClient.ERROR_CONNECTION_BROKEN) return; + if (this.reconnectAttempts < this.maxReconnectAttempts) { - rejoin().catch((err) => { + sfuClient.isFastReconnecting = this.reconnectAttempts === 0; + const strategy = sfuClient.isFastReconnecting ? 'fast' : 'full'; + reconnect(strategy).catch((err) => { this.logger( 'error', - `[Rejoin]: Rejoin failed for ${this.reconnectAttempts} times. Giving up.`, + `[Rejoin]: ${strategy} rejoin failed for ${this.reconnectAttempts} times. Giving up.`, err, ); this.state.setCallingState(CallingState.RECONNECTING_FAILED); @@ -867,25 +906,37 @@ export class Call { // handlers for connection online/offline events const unsubscribeOnlineEvent = this.streamClient.on( 'connection.changed', - (e) => { + async (e) => { if (e.type !== 'connection.changed') return; if (!e.online) return; unsubscribeOnlineEvent(); const currentCallingState = this.state.callingState; - if ( + const shouldReconnect = currentCallingState === CallingState.OFFLINE || - currentCallingState === CallingState.RECONNECTING_FAILED - ) { - this.logger('info', '[Rejoin]: Going online...'); - rejoin().catch((err) => { + currentCallingState === CallingState.RECONNECTING_FAILED; + if (!shouldReconnect) return; + this.logger('info', '[Rejoin]: Going online...'); + let isFirstReconnectAttempt = true; + do { + try { + sfuClient.isFastReconnecting = isFirstReconnectAttempt; + await reconnect(isFirstReconnectAttempt ? 'fast' : 'full'); + return; // break the loop if rejoin is successful + } catch (err) { this.logger( 'error', - `[Rejoin]: Rejoin failed for ${this.reconnectAttempts} times. Giving up.`, + `[Rejoin][Network]: Rejoin failed for attempt ${this.reconnectAttempts}`, err, ); - this.state.setCallingState(CallingState.RECONNECTING_FAILED); - }); - } + } + // wait for a bit before trying to reconnect again + await sleep(retryInterval(this.reconnectAttempts)); + isFirstReconnectAttempt = false; + } while (this.reconnectAttempts < this.maxReconnectAttempts); + + // if we're here, it means that we've exhausted all the reconnect attempts + this.logger('error', `[Rejoin][Network]: Rejoin failed. Giving up.`); + this.state.setCallingState(CallingState.RECONNECTING_FAILED); }, ); const unsubscribeOfflineEvent = this.streamClient.on( @@ -927,7 +978,7 @@ export class Call { }); } - if (!isMigrating) { + if (!this.statsReporter) { this.statsReporter = createStatsReporter({ subscriber: this.subscriber, publisher: this.publisher, @@ -957,17 +1008,37 @@ export class Call { subscriberSdp: sdp || '', clientDetails: getClientDetails(), migration, - fastReconnect: false, + fastReconnect: previousSfuClient?.isFastReconnecting ?? false, }); }); // 2. in parallel, wait for the SFU to send us the "joinResponse" // this will throw an error if the SFU rejects the join request or // fails to respond in time - const { callState } = await this.waitForJoinResponse(); + const { callState, reconnected } = await this.waitForJoinResponse(); + if (isReconnecting) { + this.logger('debug', '[Rejoin] fast reconnected:', reconnected); + } if (isMigrating) { await this.subscriber.migrateTo(sfuClient, connectionConfig); await this.publisher.migrateTo(sfuClient, connectionConfig); + } else if (isReconnecting) { + if (reconnected) { + // update the SFU client instance on the subscriber and publisher + this.subscriber.setSfuClient(sfuClient); + this.publisher.setSfuClient(sfuClient); + // and perform a full ICE restart on the publisher + await this.publisher.restartIce(); + } else if (previousSfuClient?.isFastReconnecting) { + // reconnection wasn't possible, so we need to do a full rejoin + return await reconnect('full').catch((err) => { + this.logger( + 'error', + `[Rejoin]: Rejoin failed forced full rejoin.`, + err, + ); + }); + } } const currentParticipants = callState?.participants || []; const participantCount = callState?.participantCount; @@ -977,22 +1048,19 @@ export class Call { const pins = callState?.pins ?? []; this.state.setParticipants(() => { const participantLookup = this.state.getParticipantLookupBySessionId(); - return currentParticipants.map((p) => { - const participant: StreamVideoParticipant = Object.assign(p, { - isLocalParticipant: p.sessionId === sfuClient.sessionId, - viewportVisibilityState: { - videoTrack: VisibilityState.UNKNOWN, - screenShareTrack: VisibilityState.UNKNOWN, - }, - }); + return currentParticipants.map((p) => { // We need to preserve the local state of the participant // (e.g. videoDimension, visibilityState, pinnedAt, etc.) // as it doesn't exist on the server. const existingParticipant = participantLookup[p.sessionId]; - return reconcileParticipantLocalState( - participant, - existingParticipant, - ); + return Object.assign(p, existingParticipant, { + isLocalParticipant: p.sessionId === sfuClient.sessionId, + viewportVisibilityState: + existingParticipant?.viewportVisibilityState ?? { + videoTrack: VisibilityState.UNKNOWN, + screenShareTrack: VisibilityState.UNKNOWN, + }, + } satisfies Partial); }); }); this.state.setParticipantCount(participantCount?.total || 0); @@ -1031,7 +1099,7 @@ export class Call { `[Rejoin]: Rejoin ${this.reconnectAttempts} failed.`, err, ); - await rejoin(); + await reconnect(); this.logger( 'info', `[Rejoin]: Rejoin ${this.reconnectAttempts} successful!`, @@ -1185,9 +1253,6 @@ export class Call { * Stops publishing the given track type to the call, if it is currently being published. * Underlying track will be stopped and removed from the publisher. * - * The `audioDeviceId`/`videoDeviceId` property of the [`localParticipant$`](./StreamVideoClient.md/#readonlystatestore) won't be updated, you can do that by calling the [`setAudioDevice`](#setaudiodevice)/[`setVideoDevice`](#setvideodevice) method. - * - * * @param trackType the track type to stop publishing. * @param stopTrack if `true` the track will be stopped, else it will be just disabled */ diff --git a/packages/client/src/StreamSfuClient.ts b/packages/client/src/StreamSfuClient.ts index f2dd38f95e..d049c8283c 100644 --- a/packages/client/src/StreamSfuClient.ts +++ b/packages/client/src/StreamSfuClient.ts @@ -104,6 +104,12 @@ export class StreamSfuClient { */ isMigratingAway = false; + /** + * A flag indicating that the client connection is broken for the current + * client and that a fast-reconnect with a new client should be attempted. + */ + isFastReconnecting = false; + private readonly rpc: SignalServerClient; private keepAliveInterval?: NodeJS.Timeout; private connectionCheckTimeout?: NodeJS.Timeout; @@ -113,6 +119,24 @@ export class StreamSfuClient { private readonly unsubscribeIceTrickle: () => void; private readonly logger: Logger; + /** + * The normal closure code. Used for controlled shutdowns. + */ + static NORMAL_CLOSURE = 1000; + /** + * The error code used when the SFU connection is unhealthy. + * Usually, this means that no message has been received from the SFU for + * a certain amount of time (`connectionCheckTimeout`). + */ + static ERROR_CONNECTION_UNHEALTHY = 4001; + + /** + * The error code used when the SFU connection is broken. + * Usually, this means that the WS connection has been closed unexpectedly. + * This error code is used to announce a fast-reconnect. + */ + static ERROR_CONNECTION_BROKEN = 4002; // used in fast-reconnects + /** * Constructs a new SFU client. * @@ -187,11 +211,13 @@ export class StreamSfuClient { } close = ( - code: number = 1000, - reason: string = 'Requested signal connection close', + code: number = StreamSfuClient.NORMAL_CLOSURE, + reason: string = 'js-client: requested signal connection close', ) => { this.logger('debug', 'Closing SFU WS connection', code, reason); - this.signalWs.close(code, reason); + if (this.signalWs.readyState === this.signalWs.CLOSED) { + this.signalWs.close(code, reason); + } this.unsubscribeIceTrickle(); clearInterval(this.keepAliveInterval); @@ -293,8 +319,9 @@ export class StreamSfuClient { ); }; - send = (message: SfuRequest) => { + send = async (message: SfuRequest) => { return this.signalReady.then((signal) => { + if (signal.readyState !== signal.OPEN) return; this.logger( 'debug', `Sending message to: ${this.edgeName}`, @@ -305,9 +332,7 @@ export class StreamSfuClient { }; private keepAlive = () => { - if (this.keepAliveInterval) { - clearInterval(this.keepAliveInterval); - } + clearInterval(this.keepAliveInterval); this.keepAliveInterval = setInterval(() => { this.logger('trace', 'Sending healthCheckRequest to SFU'); const message = SfuRequest.create({ @@ -316,15 +341,14 @@ export class StreamSfuClient { healthCheckRequest: {}, }, }); - void this.send(message); + this.send(message).catch((e) => { + this.logger('error', 'Error sending healthCheckRequest to SFU', e); + }); }, this.pingIntervalInMs); }; private scheduleConnectionCheck = () => { - if (this.connectionCheckTimeout) { - clearTimeout(this.connectionCheckTimeout); - } - + clearTimeout(this.connectionCheckTimeout); this.connectionCheckTimeout = setTimeout(() => { if (this.lastMessageTimestamp) { const timeSinceLastMessage = @@ -332,8 +356,8 @@ export class StreamSfuClient { if (timeSinceLastMessage > this.unhealthyTimeoutInMs) { this.close( - 4001, - `SFU connection unhealthy. Didn't receive any healthcheck messages for ${this.unhealthyTimeoutInMs}ms`, + StreamSfuClient.ERROR_CONNECTION_UNHEALTHY, + `SFU connection unhealthy. Didn't receive any message for ${this.unhealthyTimeoutInMs}ms`, ); } } diff --git a/packages/client/src/rtc/Publisher.ts b/packages/client/src/rtc/Publisher.ts index 735ed7c6ac..102c5f4401 100644 --- a/packages/client/src/rtc/Publisher.ts +++ b/packages/client/src/rtc/Publisher.ts @@ -15,7 +15,7 @@ import { } from './videoLayers'; import { getPreferredCodecs } from './codecs'; import { trackTypeToParticipantStreamKey } from './helpers/tracks'; -import { CallState } from '../store'; +import { CallingState, CallState } from '../store'; import { PublishOptions } from '../types'; import { isReactNative } from '../helpers/platforms'; import { enableHighQualityAudio, toggleDtx } from '../helpers/sdp-munging'; @@ -96,6 +96,7 @@ export class Publisher { private readonly iceRestartDelay: number; private isIceRestarting = false; + private iceRestartTimeout?: NodeJS.Timeout; /** * The SFU client instance to use for publishing and signaling. @@ -158,6 +159,15 @@ export class Publisher { return pc; }; + /** + * Returns the current connection configuration. + * + * @internal + */ + get connectionConfiguration() { + return this.pc.getConfiguration(); + } + /** * Closes the publisher PeerConnection and cleans up the resources. */ @@ -174,6 +184,7 @@ export class Publisher { }); } + clearTimeout(this.iceRestartTimeout); this.unsubscribeOnIceRestart(); this.pc.removeEventListener('negotiationneeded', this.onNegotiationNeeded); this.pc.close(); @@ -523,6 +534,15 @@ export class Publisher { }); }; + /** + * Sets the SFU client to use. + * + * @param sfuClient the SFU client to use. + */ + setSfuClient = (sfuClient: StreamSfuClient) => { + this.sfuClient = sfuClient; + }; + /** * Performs a migration of this publisher instance to a new SFU. * @@ -752,23 +772,28 @@ export class Publisher { const errorMessage = e instanceof RTCPeerConnectionIceErrorEvent && `${e.errorCode}: ${e.errorText}`; - logger('error', `ICE Candidate error`, errorMessage); + const logLevel = + this.pc.iceConnectionState === 'connected' ? 'debug' : 'error'; + logger(logLevel, `ICE Candidate error`, errorMessage); }; private onIceConnectionStateChange = () => { const state = this.pc.iceConnectionState; logger('debug', `ICE Connection state changed to`, state); + const hasNetworkConnection = + this.state.callingState !== CallingState.OFFLINE; + if (state === 'failed') { logger('warn', `Attempting to restart ICE`); this.restartIce().catch((e) => { logger('error', `ICE restart error`, e); }); - } else if (state === 'disconnected') { + } else if (state === 'disconnected' && hasNetworkConnection) { // when in `disconnected` state, the browser may recover automatically, // hence, we delay the ICE restart logger('warn', `Scheduling ICE restart in ${this.iceRestartDelay} ms.`); - setTimeout(() => { + this.iceRestartTimeout = setTimeout(() => { // check if the state is still `disconnected` or `failed` // as the connection may have recovered (or failed) in the meantime if ( diff --git a/packages/client/src/rtc/Subscriber.ts b/packages/client/src/rtc/Subscriber.ts index f50e6ccf66..84d54418de 100644 --- a/packages/client/src/rtc/Subscriber.ts +++ b/packages/client/src/rtc/Subscriber.ts @@ -4,7 +4,7 @@ import { PeerType } from '../gen/video/sfu/models/models'; import { SubscriberOffer } from '../gen/video/sfu/event/events'; import { Dispatcher } from './Dispatcher'; import { getLogger } from '../logger'; -import { CallState } from '../store'; +import { CallingState, CallState } from '../store'; export type SubscriberOpts = { sfuClient: StreamSfuClient; @@ -23,7 +23,6 @@ const logger = getLogger(['Subscriber']); export class Subscriber { private pc: RTCPeerConnection; private sfuClient: StreamSfuClient; - private dispatcher: Dispatcher; private state: CallState; private readonly unregisterOnSubscriberOffer: () => void; @@ -31,6 +30,7 @@ export class Subscriber { private readonly iceRestartDelay: number; private isIceRestarting = false; + private iceRestartTimeout?: NodeJS.Timeout; /** * Constructs a new `Subscriber` instance. @@ -49,7 +49,6 @@ export class Subscriber { iceRestartDelay = 2500, }: SubscriberOpts) { this.sfuClient = sfuClient; - this.dispatcher = dispatcher; this.state = state; this.iceRestartDelay = iceRestartDelay; @@ -102,6 +101,7 @@ export class Subscriber { * Closes the `RTCPeerConnection` and unsubscribes from the dispatcher. */ close = () => { + clearTimeout(this.iceRestartTimeout); this.unregisterOnSubscriberOffer(); this.unregisterOnIceRestart(); this.pc.close(); @@ -116,6 +116,15 @@ export class Subscriber { return this.pc.getStats(selector); }; + /** + * Sets the SFU client to use. + * + * @param sfuClient the SFU client to use. + */ + setSfuClient = (sfuClient: StreamSfuClient) => { + this.sfuClient = sfuClient; + }; + /** * Migrates the subscriber to a new SFU client. * @@ -126,7 +135,7 @@ export class Subscriber { sfuClient: StreamSfuClient, connectionConfig?: RTCConfiguration, ) => { - this.sfuClient = sfuClient; + this.setSfuClient(sfuClient); // when migrating, we want to keep the previous subscriber open // until the new one is connected @@ -338,16 +347,19 @@ export class Subscriber { // do nothing when ICE is restarting if (this.isIceRestarting) return; + const hasNetworkConnection = + this.state.callingState !== CallingState.OFFLINE; + if (state === 'failed') { logger('warn', `Attempting to restart ICE`); this.restartIce().catch((e) => { logger('error', `ICE restart failed`, e); }); - } else if (state === 'disconnected') { + } else if (state === 'disconnected' && hasNetworkConnection) { // when in `disconnected` state, the browser may recover automatically, // hence, we delay the ICE restart logger('warn', `Scheduling ICE restart in ${this.iceRestartDelay} ms.`); - setTimeout(() => { + this.iceRestartTimeout = setTimeout(() => { // check if the state is still `disconnected` or `failed` // as the connection may have recovered (or failed) in the meantime if ( diff --git a/packages/client/src/rtc/flows/join.ts b/packages/client/src/rtc/flows/join.ts index 983c21fda4..62e8d814dc 100644 --- a/packages/client/src/rtc/flows/join.ts +++ b/packages/client/src/rtc/flows/join.ts @@ -3,7 +3,7 @@ import { JoinCallRequest, JoinCallResponse, } from '../../gen/coordinator'; -import { JoinCallData, StreamVideoParticipant } from '../../types'; +import { JoinCallData } from '../../types'; import { StreamClient } from '../../coordinator/connection/client'; /** @@ -61,19 +61,3 @@ const toRtcConfiguration = (config?: ICEServer[]) => { }; return rtcConfig; }; - -/** - * Reconciles the local state of the source participant into the target participant. - * - * @param target the participant to reconcile into. - * @param source the participant to reconcile from. - */ -export const reconcileParticipantLocalState = ( - target: StreamVideoParticipant, - source?: StreamVideoParticipant, -) => { - if (!source) return target; - - // copy everything from source to target - return Object.assign(target, source); -}; diff --git a/packages/client/src/rtc/signal.ts b/packages/client/src/rtc/signal.ts index 2d795f94d0..4b11b0da93 100644 --- a/packages/client/src/rtc/signal.ts +++ b/packages/client/src/rtc/signal.ts @@ -4,7 +4,7 @@ import { getLogger } from '../logger'; export const createWebSocketSignalChannel = (opts: { endpoint: string; - onMessage?: (message: SfuEvent) => void; + onMessage: (message: SfuEvent) => void; }) => { const logger = getLogger(['sfu-client']); const { endpoint, onMessage } = opts; @@ -23,23 +23,21 @@ export const createWebSocketSignalChannel = (opts: { logger('info', 'Signaling WS channel is open', e); }); - if (onMessage) { - ws.addEventListener('message', (e) => { - try { - const message = - e.data instanceof ArrayBuffer - ? SfuEvent.fromBinary(new Uint8Array(e.data)) - : SfuEvent.fromJsonString(e.data.toString()); + ws.addEventListener('message', (e) => { + try { + const message = + e.data instanceof ArrayBuffer + ? SfuEvent.fromBinary(new Uint8Array(e.data)) + : SfuEvent.fromJsonString(e.data.toString()); - onMessage(message); - } catch (err) { - logger( - 'error', - 'Failed to decode a message. Check whether the Proto models match.', - { event: e, error: err }, - ); - } - }); - } + onMessage(message); + } catch (err) { + logger( + 'error', + 'Failed to decode a message. Check whether the Proto models match.', + { event: e, error: err }, + ); + } + }); return ws; };