Skip to content

Commit

Permalink
fix: close previous sfu connection, ICE restart on the publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverlaz committed Dec 5, 2023
1 parent 8b8b3fa commit 909d376
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 50 deletions.
94 changes: 46 additions & 48 deletions packages/client/src/Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,7 @@ export class Call {
}

const isMigrating = callingState === CallingState.MIGRATING;
const isReconnecting = callingState === CallingState.RECONNECTING;
this.state.setCallingState(CallingState.JOINING);
this.logger('debug', 'Starting join flow');

Expand All @@ -688,7 +689,7 @@ export class Call {
let sfuToken: string;
let connectionConfig: RTCConfiguration | undefined;
try {
if (this.sfuClient?.attemptFastReconnect) {
if (this.sfuClient?.isFastReconnecting) {
// use previous SFU configuration and values
connectionConfig = this.publisher?.connectionConfiguration;
sfuServer = this.sfuClient.sfuServer;
Expand Down Expand Up @@ -725,13 +726,15 @@ export class Call {
/**
* A closure which hides away the re-connection logic.
*/
const rejoin = async ({ migrate = false } = {}) => {
const rejoin = async (strategy: 'full' | 'fast' | 'migrate' = 'full') => {
this.reconnectAttempts++;
this.state.setCallingState(
migrate ? CallingState.MIGRATING : CallingState.RECONNECTING,
strategy === 'migrate'
? CallingState.MIGRATING
: CallingState.RECONNECTING,
);

if (migrate) {
if (strategy === 'migrate') {
this.logger(
'debug',
`[Migration]: migrating call ${this.cid} away from ${sfuServer.edge_name}`,
Expand All @@ -740,43 +743,38 @@ export class Call {
} else {
this.logger(
'debug',
`[Rejoin]: ${
sfuClient?.attemptFastReconnect ? 'Fast' : 'Full'
} rejoin call ${this.cid} (${this.reconnectAttempts})...`,
`[Rejoin]: ${strategy} rejoin call ${this.cid} (${this.reconnectAttempts})...`,
);
}

// take a snapshot of the current "local participant" state
// we'll need it for restoring the previous publishing state later
const localParticipant = this.state.localParticipant;

const disconnectFromPreviousSfu = () => {
if (!migrate) {
this.subscriber?.close();
this.subscriber = undefined;
this.publisher?.close({ stopTracks: false });
this.publisher = undefined;
this.statsReporter?.stop();
this.statsReporter = undefined;
}
previousSfuClient?.close(); // clean up previous connection
};

if (sfuClient.attemptFastReconnect) {
sfuClient.close();
} else if (!migrate) {
if (strategy === 'fast') {
previousSfuClient?.close();
} else if (strategy === 'full') {
// in migration or recovery scenarios, we don't want to
// wait before attempting to reconnect to an SFU server
await sleep(retryInterval(this.reconnectAttempts));
disconnectFromPreviousSfu();

// in full-reconnect, we need to dispose all Peer Connections
this.subscriber?.close();
this.subscriber = undefined;
this.publisher?.close({ stopTracks: false });
this.publisher = undefined;
this.statsReporter?.stop();
this.statsReporter = undefined;

previousSfuClient?.close(); // clean up previous connection
}
await this.join({
...data,
...(migrate && { migrating_from: sfuServer.edge_name }),
...(strategy === 'migrate' && { migrating_from: sfuServer.edge_name }),
});

if (migrate) {
disconnectFromPreviousSfu();
if (strategy === 'migrate') {
previousSfuClient?.close(); // clean up previous connection
}

this.logger(
Expand All @@ -786,7 +784,7 @@ export class Call {
// we shouldn't be republishing the streams if we're migrating
// as the underlying peer connection will take care of it as part
// of the ice-restart process
if (localParticipant && !migrate && !sfuClient.attemptFastReconnect) {
if (localParticipant && strategy === 'full') {
const {
audioStream,
videoStream,
Expand All @@ -813,11 +811,12 @@ export class Call {
});
}
if (screenShare) await this.publishScreenShareStream(screenShare);

this.logger(
'info',
`[Rejoin]: State restored. Attempt: ${this.reconnectAttempts}`,
);
}
this.logger(
'info',
`[Rejoin]: State restored. Attempt: ${this.reconnectAttempts}`,
);
};

// reconnect if the connection was closed unexpectedly. example:
Expand All @@ -832,7 +831,7 @@ export class Call {
'info',
`[Migration]: Going away from SFU... Reason: ${GoAwayReason[reason]}`,
);
rejoin({ migrate: true }).catch((err) => {
rejoin('migrate').catch((err) => {
this.logger(
'warn',
`[Migration]: Failed to migrate to another SFU.`,
Expand All @@ -854,18 +853,14 @@ export class Call {
// While we migrate to another SFU, we might have the WS connection
// to the old SFU closed abruptly. In this case, we don't want
// to reconnect to the old SFU, but rather to the new one.
if (
e.code === KnownCodes.WS_CLOSED_ABRUPTLY &&
sfuClient.isMigratingAway
)
return;

// attempt a fast reconnect first,
// in case it fails, we'll try a full reconnect
sfuClient.attemptFastReconnect = this.reconnectAttempts === 0;
const isMigratingAway =
e.code === KnownCodes.WS_CLOSED_ABRUPTLY && sfuClient.isMigratingAway;
if (isMigratingAway) return;

if (this.reconnectAttempts < this.maxReconnectAttempts) {
rejoin().catch((err) => {
sfuClient.isFastReconnecting = this.reconnectAttempts === 0;
const strategy = sfuClient.isFastReconnecting ? 'fast' : 'full';
rejoin(strategy).catch((err) => {
this.logger(
'error',
`[Rejoin]: Rejoin failed for ${this.reconnectAttempts} times. Giving up.`,
Expand Down Expand Up @@ -946,7 +941,7 @@ export class Call {
});
}

if (!isMigrating) {
if (!this.statsReporter) {
this.statsReporter = createStatsReporter({
subscriber: this.subscriber,
publisher: this.publisher,
Expand Down Expand Up @@ -976,23 +971,26 @@ export class Call {
subscriberSdp: sdp || '',
clientDetails: getClientDetails(),
migration,
fastReconnect: previousSfuClient?.attemptFastReconnect ?? false,
fastReconnect: previousSfuClient?.isFastReconnecting ?? false,
});
});

// 2. in parallel, wait for the SFU to send us the "joinResponse"
// this will throw an error if the SFU rejects the join request or
// fails to respond in time
const { callState, reconnected } = await this.waitForJoinResponse();
this.logger('debug', '[Join] fast reconnect:', reconnected);
if (isReconnecting) {
this.logger('debug', '[Rejoin] fast reconnected:', reconnected);
}
if (isMigrating) {
await this.subscriber.migrateTo(sfuClient, connectionConfig);
await this.publisher.migrateTo(sfuClient, connectionConfig);
} else if (reconnected) {
} else if (isReconnecting && reconnected) {
// update the SFU client instance on the subscriber and publisher
// and perform a full ICE restart on the publisher
this.subscriber.setSfuClient(sfuClient);
await this.publisher.migrateTo(sfuClient, connectionConfig);
this.publisher.setSfuClient(sfuClient);
// and perform a full ICE restart on the publisher
await this.publisher.restartIce();
}
const currentParticipants = callState?.participants || [];
const participantCount = callState?.participantCount;
Expand Down
5 changes: 3 additions & 2 deletions packages/client/src/StreamSfuClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ export class StreamSfuClient {
* A flag indicating that the client connection is broken for the current
* client and that a fast-reconnect with a new client should be attempted.
*/
attemptFastReconnect = false;
isFastReconnecting = false;

private readonly rpc: SignalServerClient;
private keepAliveInterval?: NodeJS.Timeout;
Expand Down Expand Up @@ -299,8 +299,9 @@ export class StreamSfuClient {
);
};

send = (message: SfuRequest) => {
send = async (message: SfuRequest) => {
return this.signalReady.then((signal) => {
if (signal.readyState !== signal.OPEN) return;
this.logger(
'debug',
`Sending message to: ${this.edgeName}`,
Expand Down
9 changes: 9 additions & 0 deletions packages/client/src/rtc/Publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,15 @@ export class Publisher {
});
};

/**
* Sets the SFU client to use.
*
* @param sfuClient the SFU client to use.
*/
setSfuClient = (sfuClient: StreamSfuClient) => {
this.sfuClient = sfuClient;
};

/**
* Performs a migration of this publisher instance to a new SFU.
*
Expand Down

0 comments on commit 909d376

Please sign in to comment.