Skip to content

Commit

Permalink
fix: use worker to prevent timer throttling
Browse files Browse the repository at this point in the history
  • Loading branch information
myandrienko committed Nov 5, 2024
1 parent 4ec9488 commit 1daaa7b
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 10 deletions.
16 changes: 16 additions & 0 deletions packages/client/generate-timer-worker.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/usr/bin/env bash

npx tsc src/timers/worker.ts \
--skipLibCheck \
--removeComments \
--module preserve \
--lib ES2020,WebWorker \
--outDir worker-dist

cat <<EOF >src/timers/worker.build.ts
export const timerWorker = {
src: \`$(<worker-dist/worker.js)\`,
};
EOF

rm -r worker-dist
5 changes: 3 additions & 2 deletions packages/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
"scripts": {
"clean": "rimraf dist",
"start": "rollup -w -c",
"build": "yarn clean && rollup -c",
"build": "yarn clean && ./generate-timer-worker.sh && rollup -c",
"test": "vitest",
"clean:docs": "rimraf generated-docs",
"test-ci": "vitest --coverage",
"generate:open-api": "./generate-openapi.sh protocol",
"generate:open-api:dev": "./generate-openapi.sh chat"
"generate:open-api:dev": "./generate-openapi.sh chat",
"generate:timer-worker": "./generate-timer-worker.sh"
},
"files": [
"dist",
Expand Down
10 changes: 6 additions & 4 deletions packages/client/src/StreamSfuClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import {
promiseWithResolvers,
PromiseWithResolvers,
} from './helpers/withResolvers';
import { getTimers } from './timers';

export type StreamSfuClientConstructor = {
/**
Expand Down Expand Up @@ -110,7 +111,7 @@ export class StreamSfuClient {
isLeaving = false;

private readonly rpc: SignalServerClient;
private keepAliveInterval?: NodeJS.Timeout;
private keepAliveInterval?: number;
private connectionCheckTimeout?: NodeJS.Timeout;
private migrateAwayTimeout?: NodeJS.Timeout;
private pingIntervalInMs = 10 * 1000;
Expand Down Expand Up @@ -263,7 +264,7 @@ export class StreamSfuClient {

private handleWebSocketClose = () => {
this.signalWs.removeEventListener('close', this.handleWebSocketClose);
clearInterval(this.keepAliveInterval);
getTimers().clearInterval(this.keepAliveInterval);
clearTimeout(this.connectionCheckTimeout);
this.onSignalClose?.();
};
Expand Down Expand Up @@ -489,8 +490,9 @@ export class StreamSfuClient {
};

private keepAlive = () => {
clearInterval(this.keepAliveInterval);
this.keepAliveInterval = setInterval(() => {
const timers = getTimers();
timers.clearInterval(this.keepAliveInterval);
this.keepAliveInterval = timers.setInterval(() => {
this.ping().catch((e) => {
this.logger('error', 'Error sending healthCheckRequest to SFU', e);
});
Expand Down
10 changes: 6 additions & 4 deletions packages/client/src/coordinator/connection/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import type {
UR,
} from './types';
import type { ConnectedEvent, WSAuthMessage } from '../../gen/coordinator';
import { getTimers } from '../../timers';

// Type guards to check WebSocket error type
const isCloseEvent = (
Expand Down Expand Up @@ -58,7 +59,7 @@ export class StableWSConnection {
authenticationSent: boolean;
consecutiveFailures: number;
pingInterval: number;
healthCheckTimeoutRef?: NodeJS.Timeout;
healthCheckTimeoutRef?: number;
isConnecting: boolean;
isDisconnected: boolean;
isHealthy: boolean;
Expand Down Expand Up @@ -249,7 +250,7 @@ export class StableWSConnection {

// start by removing all the listeners
if (this.healthCheckTimeoutRef) {
clearInterval(this.healthCheckTimeoutRef);
getTimers().clearInterval(this.healthCheckTimeoutRef);
}
if (this.connectionCheckTimeoutRef) {
clearInterval(this.connectionCheckTimeoutRef);
Expand Down Expand Up @@ -757,12 +758,13 @@ export class StableWSConnection {
* Schedules a next health check ping for websocket.
*/
scheduleNextPing = () => {
const timers = getTimers();
if (this.healthCheckTimeoutRef) {
clearTimeout(this.healthCheckTimeoutRef);
timers.clearTimeout(this.healthCheckTimeoutRef);
}

// 30 seconds is the recommended interval (messenger uses this)
this.healthCheckTimeoutRef = setTimeout(() => {
this.healthCheckTimeoutRef = timers.setTimeout(() => {
// send the healthcheck..., server replies with a health check event
const data = [{ type: 'health.check', client_id: this.client.clientID }];
// try to send on the connection
Expand Down
116 changes: 116 additions & 0 deletions packages/client/src/timers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import { lazy } from '../helpers/lazy';
import { getLogger } from '../logger';
import { TimerWorkerEvent, TimerWorkerRequest } from './types';
import { timerWorker } from './worker.build';

class TimerWorker {
private currentTimerId = 1;
private callbacks = new Map<number, () => void>();
private worker: Worker | undefined;
private fallback = false;

setup(): void {
try {
const source = timerWorker.src;
const blob = new Blob([source], {
type: 'application/javascript; charset=utf-8',
});
const script = URL.createObjectURL(blob);
this.worker = new Worker(script, { name: 'str-timer-worker' });
this.worker.addEventListener('message', (event) => {
const { type, id } = event.data as TimerWorkerEvent;
if (type === 'tick') {
this.callbacks.get(id)?.();
}
});
} catch (err: any) {
getLogger(['timer-worker'])('error', err);
this.fallback = true;
}
}

destroy(): void {
this.callbacks.clear();
this.worker?.terminate();
this.worker = undefined;
this.fallback = false;
}

get ready() {
return this.fallback || Boolean(this.worker);
}

setInterval(callback: () => void, timeout: number): number {
return this.setTimer('setInterval', callback, timeout);
}

clearInterval(id?: number): void {
this.clearTimer('clearInterval', id);
}

setTimeout(callback: () => void, timeout: number): number {
return this.setTimer('setTimeout', callback, timeout);
}

clearTimeout(id?: number): void {
this.clearTimer('clearTimeout', id);
}

private setTimer(
type: 'setTimeout' | 'setInterval',
callback: () => void,
timeout: number,
) {
if (!this.ready) {
this.setup();
}

if (this.fallback) {
return (type === 'setTimeout' ? setTimeout : setInterval)(
callback,
timeout,
) as unknown as number;
}

const id = this.getTimerId();
this.callbacks.set(id, callback);
this.sendMessage({ type, id, timeout });
return id;
}

private clearTimer(type: 'clearTimeout' | 'clearInterval', id?: number) {
if (!id) {
return;
}

if (!this.ready) {
this.setup();
}

if (this.fallback) {
this.clearInterval(id);
return;
}

this.callbacks.delete(id);
this.sendMessage({ type, id });
}

private getTimerId() {
return this.currentTimerId++;
}

private sendMessage(message: TimerWorkerRequest) {
if (!this.worker) {
throw new Error("Cannot use timer worker before it's set up");
}

this.worker.postMessage(message);
}
}

export const getTimers = lazy(() => {
const instance = new TimerWorker();
instance.setup();
return instance;
});
15 changes: 15 additions & 0 deletions packages/client/src/timers/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
export type TimerWorkerRequest =
| {
type: 'setInterval' | 'setTimeout';
id: number;
timeout: number;
}
| {
type: 'clearInterval' | 'clearTimeout';
id: number;
};

export type TimerWorkerEvent = {
type: 'tick';
id: number;
};
9 changes: 9 additions & 0 deletions packages/client/src/timers/worker.build.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Do not modify this file manually. You can edit worker.ts if necessary
// and the run ./generate-timer-worker.sh
export const timerWorker = {
get src(): string {
throw new Error(
'Timer worker source missing. Did you forget to run generate-timer-worker.sh?',
);
},
};
37 changes: 37 additions & 0 deletions packages/client/src/timers/worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/* eslint-disable */

import type { TimerWorkerEvent, TimerWorkerRequest } from './types';

const timerIdMapping = new Map<number, NodeJS.Timeout>();

self.addEventListener('message', (event: MessageEvent) => {
const request = event.data as TimerWorkerRequest;

switch (request.type) {
case 'setTimeout':
case 'setInterval':
timerIdMapping.set(
request.id,
(request.type === 'setTimeout' ? setTimeout : setInterval)(
() => tick(request.id),
request.timeout,
),
);
break;

case 'clearTimeout':
case 'clearInterval':
(request.type === 'clearTimeout' ? clearTimeout : clearInterval)(
timerIdMapping.get(request.id),
);
timerIdMapping.delete(request.id);
break;
}
});

function tick(id: number) {
const message: TimerWorkerEvent = { type: 'tick', id };
self.postMessage(message);
}

/* eslint-enable */

0 comments on commit 1daaa7b

Please sign in to comment.