Skip to content

Commit

Permalink
fix: better error handling, special connection close codes
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverlaz committed Dec 12, 2023
1 parent e7318d7 commit 07939b9
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 18 deletions.
27 changes: 22 additions & 5 deletions packages/client/src/Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,10 @@ export class Call {
const localParticipant = this.state.localParticipant;

if (strategy === 'fast') {
sfuClient.close();
sfuClient.close(
StreamSfuClient.ERROR_CONNECTION_BROKEN,
'js-client: attempting fast reconnect',
);
} else if (strategy === 'full') {
// in migration or recovery scenarios, we don't want to
// wait before attempting to reconnect to an SFU server
Expand All @@ -774,15 +777,23 @@ export class Call {
this.statsReporter?.stop();
this.statsReporter = undefined;

sfuClient.close(); // clean up current connection
// clean up current connection
sfuClient.close(
StreamSfuClient.NORMAL_CLOSURE,
'js-client: attempting full reconnect',
);
}
await this.join({
...data,
...(strategy === 'migrate' && { migrating_from: sfuServer.edge_name }),
});

// clean up previous connection
if (strategy === 'migrate') {
sfuClient?.close(); // clean up previous connection
sfuClient.close(
StreamSfuClient.NORMAL_CLOSURE,
'js-client: attempting migration',
);
}

this.logger(
Expand Down Expand Up @@ -853,7 +864,7 @@ export class Call {
// the upcoming re-join will register a new handler anyway
unregisterGoAway();
// do nothing if the connection was closed on purpose
if (e.code === KnownCodes.WS_CLOSED_SUCCESS) return;
if (e.code === StreamSfuClient.NORMAL_CLOSURE) return;
// do nothing if the connection was closed because of a policy violation
// e.g., the user has been blocked by an admin or moderator
if (e.code === KnownCodes.WS_POLICY_VIOLATION) return;
Expand All @@ -863,7 +874,13 @@ export class Call {
// to reconnect to the old SFU, but rather to the new one.
const isMigratingAway =
e.code === KnownCodes.WS_CLOSED_ABRUPTLY && sfuClient.isMigratingAway;
if (isMigratingAway) return;
const isFastReconnecting =
e.code === KnownCodes.WS_CLOSED_ABRUPTLY &&
sfuClient.isFastReconnecting;
if (isMigratingAway || isFastReconnecting) return;

// do nothing if the connection was closed because of a fast reconnect
if (e.code === StreamSfuClient.ERROR_CONNECTION_BROKEN) return;

if (this.reconnectAttempts < this.maxReconnectAttempts) {
sfuClient.isFastReconnecting = this.reconnectAttempts === 0;
Expand Down
43 changes: 30 additions & 13 deletions packages/client/src/StreamSfuClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,24 @@ export class StreamSfuClient {
private readonly unsubscribeIceTrickle: () => void;
private readonly logger: Logger;

/**
* The normal closure code. Used for controlled shutdowns.
*/
static NORMAL_CLOSURE = 1000;
/**
* The error code used when the SFU connection is unhealthy.
* Usually, this means that no message has been received from the SFU for
* a certain amount of time (`connectionCheckTimeout`).
*/
static ERROR_CONNECTION_UNHEALTHY = 4001;

/**
* The error code used when the SFU connection is broken.
* Usually, this means that the WS connection has been closed unexpectedly.
* This error code is used to announce a fast-reconnect.
*/
static ERROR_CONNECTION_BROKEN = 4002; // used in fast-reconnects

/**
* Constructs a new SFU client.
*
Expand Down Expand Up @@ -193,11 +211,13 @@ export class StreamSfuClient {
}

close = (
code: number = 1000,
reason: string = 'Requested signal connection close',
code: number = StreamSfuClient.NORMAL_CLOSURE,
reason: string = 'js-client: requested signal connection close',
) => {
this.logger('debug', 'Closing SFU WS connection', code, reason);
this.signalWs.close(code, reason);
if (this.signalWs.readyState === this.signalWs.CLOSED) {
this.signalWs.close(code, reason);
}

this.unsubscribeIceTrickle();
clearInterval(this.keepAliveInterval);
Expand Down Expand Up @@ -312,9 +332,7 @@ export class StreamSfuClient {
};

private keepAlive = () => {
if (this.keepAliveInterval) {
clearInterval(this.keepAliveInterval);
}
clearInterval(this.keepAliveInterval);
this.keepAliveInterval = setInterval(() => {
this.logger('trace', 'Sending healthCheckRequest to SFU');
const message = SfuRequest.create({
Expand All @@ -323,24 +341,23 @@ export class StreamSfuClient {
healthCheckRequest: {},
},
});
void this.send(message);
this.send(message).catch((e) => {
this.logger('error', 'Error sending healthCheckRequest to SFU', e);
});
}, this.pingIntervalInMs);
};

private scheduleConnectionCheck = () => {
if (this.connectionCheckTimeout) {
clearTimeout(this.connectionCheckTimeout);
}

clearTimeout(this.connectionCheckTimeout);
this.connectionCheckTimeout = setTimeout(() => {
if (this.lastMessageTimestamp) {
const timeSinceLastMessage =
new Date().getTime() - this.lastMessageTimestamp.getTime();

if (timeSinceLastMessage > this.unhealthyTimeoutInMs) {
this.close(
4001,
`SFU connection unhealthy. Didn't receive any healthcheck messages for ${this.unhealthyTimeoutInMs}ms`,
StreamSfuClient.ERROR_CONNECTION_UNHEALTHY,
`SFU connection unhealthy. Didn't receive any message for ${this.unhealthyTimeoutInMs}ms`,
);
}
}
Expand Down

0 comments on commit 07939b9

Please sign in to comment.