Skip to content

Commit

Permalink
Merge branch 'main' into codec-negotiation
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverlaz committed Dec 3, 2024
2 parents cf4ed2f + 2695bab commit acb4eac
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 78 deletions.
7 changes: 7 additions & 0 deletions packages/client/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

This file was generated using [@jscutlery/semver](https://github.com/jscutlery/semver).

## [1.11.12](https://github.com/GetStream/stream-video-js/compare/@stream-io/video-client-1.11.11...@stream-io/video-client-1.11.12) (2024-12-03)


### Bug Fixes

* handle timeout on SFU WS connections ([#1600](https://github.com/GetStream/stream-video-js/issues/1600)) ([5f2db7b](https://github.com/GetStream/stream-video-js/commit/5f2db7bd5cfdf57cdc04d6a6ed752f43e5b06657))

## [1.11.11](https://github.com/GetStream/stream-video-js/compare/@stream-io/video-client-1.11.10...@stream-io/video-client-1.11.11) (2024-11-29)


Expand Down
2 changes: 1 addition & 1 deletion packages/client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@stream-io/video-client",
"version": "1.11.11",
"version": "1.11.12",
"packageManager": "[email protected]",
"main": "dist/index.cjs.js",
"module": "dist/index.es.js",
Expand Down
41 changes: 38 additions & 3 deletions packages/client/src/Call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,10 @@ import { getSdkSignature } from './stats/utils';
import { withoutConcurrency } from './helpers/concurrency';
import { ensureExhausted } from './helpers/ensureExhausted';
import {
makeSafePromise,
PromiseWithResolvers,
promiseWithResolvers,
} from './helpers/withResolvers';
} from './helpers/promise';

/**
* An object representation of a `Call`.
Expand Down Expand Up @@ -218,6 +219,7 @@ export class Call {
private reconnectAttempts = 0;
private reconnectStrategy = WebsocketReconnectStrategy.UNSPECIFIED;
private fastReconnectDeadlineSeconds: number = 0;
private disconnectionTimeoutSeconds: number = 0;
private lastOfflineTimestamp: number = 0;
private networkAvailableTask: PromiseWithResolvers<void> | undefined;
// maintain the order of publishing tracks to restore them after a reconnection
Expand Down Expand Up @@ -1110,6 +1112,9 @@ export class Call {
*/
private handleSfuSignalClose = (sfuClient: StreamSfuClient) => {
this.logger('debug', '[Reconnect] SFU signal connection closed');
// SFU WS closed before we finished current join, no need to schedule reconnect
// because join operation will fail
if (this.state.callingState === CallingState.JOINING) return;
// normal close, no need to reconnect
if (sfuClient.isLeaving) return;
this.reconnect(WebsocketReconnectStrategy.REJOIN).catch((err) => {
Expand All @@ -1127,14 +1132,35 @@ export class Call {
private reconnect = async (
strategy: WebsocketReconnectStrategy,
): Promise<void> => {
if (
this.state.callingState === CallingState.RECONNECTING ||
this.state.callingState === CallingState.RECONNECTING_FAILED
)
return;

return withoutConcurrency(this.reconnectConcurrencyTag, async () => {
this.logger(
'info',
`[Reconnect] Reconnecting with strategy ${WebsocketReconnectStrategy[strategy]}`,
);

let reconnectStartTime = Date.now();
this.reconnectStrategy = strategy;

do {
if (
this.disconnectionTimeoutSeconds > 0 &&
(Date.now() - reconnectStartTime) / 1000 >
this.disconnectionTimeoutSeconds
) {
this.logger(
'warn',
'[Reconnect] Stopping reconnection attempts after reaching disconnection timeout',
);
this.state.setCallingState(CallingState.RECONNECTING_FAILED);
return;
}

// we don't increment reconnect attempts for the FAST strategy.
if (this.reconnectStrategy !== WebsocketReconnectStrategy.FAST) {
this.reconnectAttempts++;
Expand Down Expand Up @@ -1252,7 +1278,7 @@ export class Call {
currentSubscriber?.detachEventHandlers();
currentPublisher?.detachEventHandlers();

const migrationTask = currentSfuClient.enterMigration();
const migrationTask = makeSafePromise(currentSfuClient.enterMigration());

try {
const currentSfu = currentSfuClient.edgeName;
Expand All @@ -1270,7 +1296,7 @@ export class Call {
// Wait for the migration to complete, then close the previous SFU client
// and the peer connection instances. In case of failure, the migration
// task would throw an error and REJOIN would be attempted.
await migrationTask;
await migrationTask();

// in MIGRATE, we can consider the call as joined only after
// `participantMigrationComplete` event is received, signaled by
Expand Down Expand Up @@ -2396,4 +2422,13 @@ export class Call {
);
this.dynascaleManager.applyTrackSubscriptions();
};

/**
* Sets the maximum amount of time a user can remain waiting for a reconnect
* after a network disruption
* @param timeoutSeconds Timeout in seconds, or 0 to keep reconnecting indefinetely
*/
setDisconnectionTimeout = (timeoutSeconds: number) => {
this.disconnectionTimeoutSeconds = timeoutSeconds;
};
}
55 changes: 27 additions & 28 deletions packages/client/src/StreamSfuClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,16 @@ import {
} from './gen/video/sfu/signal_rpc/signal';
import { ICETrickle, TrackType } from './gen/video/sfu/models/models';
import { StreamClient } from './coordinator/connection/client';
import { generateUUIDv4, sleep } from './coordinator/connection/utils';
import { generateUUIDv4 } from './coordinator/connection/utils';
import { Credentials } from './gen/coordinator';
import { Logger } from './coordinator/connection/types';
import { getLogger, getLogLevel } from './logger';
import { withoutConcurrency } from './helpers/concurrency';
import {
promiseWithResolvers,
PromiseWithResolvers,
} from './helpers/withResolvers';
makeSafePromise,
SafePromise,
} from './helpers/promise';

export type StreamSfuClientConstructor = {
/**
Expand Down Expand Up @@ -101,7 +102,7 @@ export class StreamSfuClient {
/**
* Promise that resolves when the WebSocket connection is ready (open).
*/
private signalReady!: Promise<WebSocket>;
private signalReady!: SafePromise<WebSocket>;

/**
* Flag to indicate if the client is in the process of leaving the call.
Expand All @@ -116,15 +117,14 @@ export class StreamSfuClient {
private pingIntervalInMs = 10 * 1000;
private unhealthyTimeoutInMs = this.pingIntervalInMs + 5 * 1000;
private lastMessageTimestamp?: Date;
private readonly restoreWebSocketConcurrencyTag = Symbol('recoverWebSocket');
private readonly unsubscribeIceTrickle: () => void;
private readonly unsubscribeNetworkChanged: () => void;
private readonly onSignalClose: (() => void) | undefined;
private readonly logger: Logger;
private readonly logTag: string;
private readonly credentials: Credentials;
private readonly dispatcher: Dispatcher;
private readonly joinResponseTimeout?: number;
private readonly joinResponseTimeout: number;
private networkAvailableTask: PromiseWithResolvers<void> | undefined;
/**
* Promise that resolves when the JoinResponse is received.
Expand Down Expand Up @@ -227,32 +227,31 @@ export class StreamSfuClient {
});

this.signalWs.addEventListener('close', this.handleWebSocketClose);
this.signalWs.addEventListener('error', this.restoreWebSocket);

this.signalReady = new Promise((resolve) => {
const onOpen = () => {
this.signalWs.removeEventListener('open', onOpen);
resolve(this.signalWs);
};
this.signalWs.addEventListener('open', onOpen);
});

this.signalReady = makeSafePromise(
Promise.race<WebSocket>([
new Promise((resolve) => {
const onOpen = () => {
this.signalWs.removeEventListener('open', onOpen);
resolve(this.signalWs);
};
this.signalWs.addEventListener('open', onOpen);
}),

new Promise((resolve, reject) => {
setTimeout(
() => reject(new Error('SFU WS connection timed out')),
this.joinResponseTimeout,
);
}),
]),
);
};

private cleanUpWebSocket = () => {
this.signalWs.removeEventListener('error', this.restoreWebSocket);
this.signalWs.removeEventListener('close', this.handleWebSocketClose);
};

private restoreWebSocket = () => {
withoutConcurrency(this.restoreWebSocketConcurrencyTag, async () => {
await this.networkAvailableTask?.promise;
this.logger('debug', 'Restoring SFU WS connection');
this.cleanUpWebSocket();
await sleep(500);
this.createWebSocket();
}).catch((err) => this.logger('debug', `Can't restore WS connection`, err));
};

get isHealthy() {
return this.signalWs.readyState === WebSocket.OPEN;
}
Expand Down Expand Up @@ -409,7 +408,7 @@ export class StreamSfuClient {
data: Omit<JoinRequest, 'sessionId' | 'token'>,
): Promise<JoinResponse> => {
// wait for the signal web socket to be ready before sending "joinRequest"
await this.signalReady;
await this.signalReady();
if (this.joinResponseTask.isResolved || this.joinResponseTask.isRejected) {
// we need to lock the RPC requests until we receive a JoinResponse.
// that's why we have this primitive lock mechanism.
Expand Down Expand Up @@ -478,7 +477,7 @@ export class StreamSfuClient {
};

private send = async (message: SfuRequest) => {
await this.signalReady; // wait for the signal ws to be open
await this.signalReady(); // wait for the signal ws to be open
const msgJson = SfuRequest.toJson(message);
if (this.signalWs.readyState !== WebSocket.OPEN) {
this.logger('debug', 'Signal WS is not open. Skipping message', msgJson);
Expand Down
44 changes: 44 additions & 0 deletions packages/client/src/helpers/promise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,47 @@ export function makeSafePromise<T>(promise: Promise<T>): SafePromise<T> {
unwrapPromise.checkPending = () => isPending;
return unwrapPromise;
}

export type PromiseWithResolvers<T> = {
promise: Promise<T>;
resolve: (value: T | PromiseLike<T>) => void;
reject: (reason: any) => void;
isResolved: boolean;
isRejected: boolean;
};

/**
* Creates a new promise with resolvers.
*
* Based on:
* - https://github.com/tc39/proposal-promise-with-resolvers/blob/main/polyfills.js
*/
export const promiseWithResolvers = <T = void>(): PromiseWithResolvers<T> => {
let resolve: (value: T | PromiseLike<T>) => void;
let reject: (reason: any) => void;
const promise = new Promise<T>((_resolve, _reject) => {
resolve = _resolve;
reject = _reject;
});

let isResolved = false;
let isRejected = false;

const resolver = (value: T | PromiseLike<T>) => {
isResolved = true;
resolve(value);
};

const rejecter = (reason: any) => {
isRejected = true;
reject(reason);
};

return {
promise,
resolve: resolver,
reject: rejecter,
isResolved,
isRejected,
};
};
43 changes: 0 additions & 43 deletions packages/client/src/helpers/withResolvers.ts

This file was deleted.

5 changes: 5 additions & 0 deletions packages/react-bindings/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

This file was generated using [@jscutlery/semver](https://github.com/jscutlery/semver).

## [1.2.6](https://github.com/GetStream/stream-video-js/compare/@stream-io/video-react-bindings-1.2.5...@stream-io/video-react-bindings-1.2.6) (2024-12-03)

### Dependency Updates

* `@stream-io/video-client` updated to version `1.11.12`
## [1.2.5](https://github.com/GetStream/stream-video-js/compare/@stream-io/video-react-bindings-1.2.4...@stream-io/video-react-bindings-1.2.5) (2024-11-29)

### Dependency Updates
Expand Down
2 changes: 1 addition & 1 deletion packages/react-bindings/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@stream-io/video-react-bindings",
"version": "1.2.5",
"version": "1.2.6",
"packageManager": "[email protected]",
"main": "./dist/index.cjs.js",
"module": "./dist/index.es.js",
Expand Down
6 changes: 6 additions & 0 deletions packages/react-native-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

This file was generated using [@jscutlery/semver](https://github.com/jscutlery/semver).

## [1.4.8](https://github.com/GetStream/stream-video-js/compare/@stream-io/video-react-native-sdk-1.4.7...@stream-io/video-react-native-sdk-1.4.8) (2024-12-03)

### Dependency Updates

* `@stream-io/video-client` updated to version `1.11.12`
* `@stream-io/video-react-bindings` updated to version `1.2.6`
## [1.4.7](https://github.com/GetStream/stream-video-js/compare/@stream-io/video-react-native-sdk-1.4.6...@stream-io/video-react-native-sdk-1.4.7) (2024-11-29)

### Dependency Updates
Expand Down
2 changes: 1 addition & 1 deletion packages/react-native-sdk/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@stream-io/video-react-native-sdk",
"version": "1.4.7",
"version": "1.4.8",
"packageManager": "[email protected]",
"main": "dist/commonjs/index.js",
"module": "dist/module/index.js",
Expand Down
6 changes: 6 additions & 0 deletions packages/react-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

This file was generated using [@jscutlery/semver](https://github.com/jscutlery/semver).

## [1.7.28](https://github.com/GetStream/stream-video-js/compare/@stream-io/video-react-sdk-1.7.27...@stream-io/video-react-sdk-1.7.28) (2024-12-03)

### Dependency Updates

* `@stream-io/video-client` updated to version `1.11.12`
* `@stream-io/video-react-bindings` updated to version `1.2.6`
## [1.7.27](https://github.com/GetStream/stream-video-js/compare/@stream-io/video-react-sdk-1.7.26...@stream-io/video-react-sdk-1.7.27) (2024-11-29)

### Dependency Updates
Expand Down
2 changes: 1 addition & 1 deletion packages/react-sdk/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@stream-io/video-react-sdk",
"version": "1.7.27",
"version": "1.7.28",
"packageManager": "[email protected]",
"main": "./dist/index.cjs.js",
"module": "./dist/index.es.js",
Expand Down

0 comments on commit acb4eac

Please sign in to comment.