From 759d9a2c403aa11a64e5470aa53622022918e24e Mon Sep 17 00:00:00 2001 From: Anton Arnautov <43254280+arnautov-anton@users.noreply.github.com> Date: Tue, 26 Sep 2023 16:37:13 +0200 Subject: [PATCH] perf(client): share replay of computed observables (#1095) Adds `shareReplay` to computed observables which when previously subscribed to by multiple components would be evaluated separately, i.e., each source data emission would evaluate defined pipeline for each subscription individually. "Futureproofs" `RxUtils.getCurrentValue` method through usage of `combineLatest` function which will always take the last emitted value even from "pure" observables unlike `take` operator which "resolves" with first emitted value. Fixes `noop` sorting (\w) test where "sort" mutates source data - when we move to TS5.2 we should replace that with `Array.toSorted`. ### Implementation notes I mentioned using `share` operator which uses `refCount` and `Subject` under the hood meaning that if amount of subscriptions on such observable reaches 0 (refCount === 0) - it resets the `Subject` and re-evaluates the pipeline and assignes new value on next subscription. Using it in our SDK proved to be difficult and was behaving unexpectedly (broke tests and certain observables would return "undefined" instead of initial values) though on paper it makes perfect sense. I opted to use `shareReplay` which by default (with only buffer size as argument) works like a `ReplaySubject` but continues running defined pipeline (never unsubscribes from the source observable) even after the `refCount` reached zero. With the overloaded variant `shareReplay({ bufferSize: 1, refCount: true })` it behaves just as expected and does not break our SDK - meaning the values (pipelines) are being computed only once per multiple subscriptions and only if it has at least one subscription. ### Sources: - [difference between `share` and `shareReplay` operators](https://www.bitovi.com/blog/always-know-when-to-use-share-vs.-sharereplay) - though I found a discrepancy in the post - they mention that the `shareReplay` with `refCount` does not re-evaluate source observable but through my testing I found that to be false (yet it does something differently under the hood) - [`shareReplay` operator](https://rxjs.dev/api/index/function/shareReplay) - [`share` operator](https://rxjs.dev/api/index/function/share) --------- Co-authored-by: Oliver Lazoroski --- packages/client/src/helpers/DynascaleManager.ts | 3 ++- .../src/helpers/__tests__/DynascaleManager.test.ts | 8 ++++---- packages/client/src/sorting/comparator.ts | 2 +- packages/client/src/store/CallState.ts | 13 ++++++++++--- .../client/src/store/__tests__/CallState.test.ts | 3 ++- packages/client/src/store/rxUtils.ts | 8 +++----- 6 files changed, 22 insertions(+), 15 deletions(-) diff --git a/packages/client/src/helpers/DynascaleManager.ts b/packages/client/src/helpers/DynascaleManager.ts index ab62b5fc83..e8f57593e7 100644 --- a/packages/client/src/helpers/DynascaleManager.ts +++ b/packages/client/src/helpers/DynascaleManager.ts @@ -177,7 +177,7 @@ export class DynascaleManager { ), takeWhile((participant) => !!participant), distinctUntilChanged(), - shareReplay(1), + shareReplay({ bufferSize: 1, refCount: true }), ); /** @@ -339,6 +339,7 @@ export class DynascaleManager { ), takeWhile((p) => !!p), distinctUntilChanged(), + shareReplay({ bufferSize: 1, refCount: true }), ); const updateMediaStreamSubscription = participant$ diff --git a/packages/client/src/helpers/__tests__/DynascaleManager.test.ts b/packages/client/src/helpers/__tests__/DynascaleManager.test.ts index 94c7a3f043..3b8753afc1 100644 --- a/packages/client/src/helpers/__tests__/DynascaleManager.test.ts +++ b/packages/client/src/helpers/__tests__/DynascaleManager.test.ts @@ -227,7 +227,7 @@ describe('DynascaleManager', () => { cleanup?.(); - expect(updateSubscription).toHaveBeenCalledWith( + expect(updateSubscription).toHaveBeenLastCalledWith( 'videoTrack', { 'session-id': { dimension: undefined } }, DebounceType.FAST, @@ -278,7 +278,7 @@ describe('DynascaleManager', () => { cleanup?.(); - expect(updateSubscription).toHaveBeenCalledWith( + expect(updateSubscription).toHaveBeenLastCalledWith( 'videoTrack', { 'session-id': { dimension: undefined } }, DebounceType.FAST, @@ -366,7 +366,7 @@ describe('DynascaleManager', () => { cleanup?.(); - expect(updateSubscription).toHaveBeenCalledWith( + expect(updateSubscription).toHaveBeenLastCalledWith( 'videoTrack', { 'session-id': { dimension: undefined } }, DebounceType.FAST, @@ -436,7 +436,7 @@ describe('DynascaleManager', () => { cleanup?.(); - expect(updateSubscription).toHaveBeenCalledWith( + expect(updateSubscription).toHaveBeenLastCalledWith( 'videoTrack', { 'session-id': { dimension: undefined } }, DebounceType.FAST, diff --git a/packages/client/src/sorting/comparator.ts b/packages/client/src/sorting/comparator.ts index 9ae5df976f..00bdc1b18f 100644 --- a/packages/client/src/sorting/comparator.ts +++ b/packages/client/src/sorting/comparator.ts @@ -39,7 +39,7 @@ export const descending = (comparator: Comparator): Comparator => { * Creates a new comparator which conditionally applies the given comparator. * * @example - * const shouldSortByValue = () => return false; // to turn it off + * const shouldSortByValue = (a, b) => a % 2 === 0; // return false to turn it off * const byValue = (a, b) => a < b ? - 1 : a > b ? 1 : 0; * const comparator = conditional(shouldSortByValue)(byValue); * diff --git a/packages/client/src/store/CallState.ts b/packages/client/src/store/CallState.ts index 956aac21d0..cc3ab81bbc 100644 --- a/packages/client/src/store/CallState.ts +++ b/packages/client/src/store/CallState.ts @@ -1,5 +1,5 @@ import { BehaviorSubject, Observable } from 'rxjs'; -import { distinctUntilChanged, map } from 'rxjs/operators'; +import { distinctUntilChanged, map, shareReplay } from 'rxjs/operators'; import type { Patch } from './rxUtils'; import * as RxUtils from './rxUtils'; import { @@ -320,24 +320,30 @@ export class CallState { */ constructor() { this.logger = getLogger(['CallState']); - this.participants$ = this.participantsSubject.pipe( - map((ps) => ps.sort(this.sortParticipantsBy)), + this.participants$ = this.participantsSubject.asObservable().pipe( + // TODO: replace with Array.toSorted once available + map((ps) => [...ps].sort(this.sortParticipantsBy)), + shareReplay({ bufferSize: 1, refCount: true }), ); this.localParticipant$ = this.participants$.pipe( map((participants) => participants.find(isStreamVideoLocalParticipant)), + shareReplay({ bufferSize: 1, refCount: true }), ); this.remoteParticipants$ = this.participants$.pipe( map((participants) => participants.filter((p) => !p.isLocalParticipant)), + shareReplay({ bufferSize: 1, refCount: true }), ); this.pinnedParticipants$ = this.participants$.pipe( map((participants) => participants.filter((p) => !!p.pin)), + shareReplay({ bufferSize: 1, refCount: true }), ); this.dominantSpeaker$ = this.participants$.pipe( map((participants) => participants.find((p) => p.isDominantSpeaker)), + shareReplay({ bufferSize: 1, refCount: true }), ); this.hasOngoingScreenShare$ = this.participants$.pipe( @@ -347,6 +353,7 @@ export class CallState { ), ), distinctUntilChanged(), + shareReplay({ bufferSize: 1, refCount: true }), ); this.startedAt$ = this.startedAtSubject.asObservable(); diff --git a/packages/client/src/store/__tests__/CallState.test.ts b/packages/client/src/store/__tests__/CallState.test.ts index 9a14ef40f2..b6fce97e38 100644 --- a/packages/client/src/store/__tests__/CallState.test.ts +++ b/packages/client/src/store/__tests__/CallState.test.ts @@ -66,7 +66,8 @@ describe('CallState', () => { }); const ps2 = state.participants; - expect(ps2.map((p) => p.name)).toEqual(['F', 'B', 'E', 'A', 'C', 'D']); + // should resolve in initial - non-mutated state as set at the beginning + expect(ps2.map((p) => p.name)).toEqual(['A', 'B', 'C', 'D', 'E', 'F']); }); it('should support custom sorting', () => { diff --git a/packages/client/src/store/rxUtils.ts b/packages/client/src/store/rxUtils.ts index 85f8e230be..fb16b4fc01 100644 --- a/packages/client/src/store/rxUtils.ts +++ b/packages/client/src/store/rxUtils.ts @@ -1,5 +1,4 @@ -import { Observable, Subject } from 'rxjs'; -import { take } from 'rxjs/operators'; +import { Observable, Subject, combineLatest } from 'rxjs'; /** * A value or a function which takes the current value and returns a new value. @@ -15,10 +14,9 @@ export type Patch = T | ((currentValue: T) => T); export const getCurrentValue = (observable$: Observable) => { let value!: T; let err: Error | undefined = undefined; - observable$ - .pipe(take(1)) + combineLatest([observable$]) .subscribe({ - next: (v) => { + next: ([v]) => { value = v; }, error: (e) => {