From d9b54ec9683774b09cc57d9c2097d872b9c7748e Mon Sep 17 00:00:00 2001 From: Anton Arnautov Date: Mon, 18 Sep 2023 23:17:15 +0200 Subject: [PATCH 1/2] Share replay of computed observables --- packages/client/src/helpers/DynascaleManager.ts | 3 ++- packages/client/src/sorting/comparator.ts | 2 +- packages/client/src/store/CallState.ts | 13 ++++++++++--- .../client/src/store/__tests__/CallState.test.ts | 3 ++- packages/client/src/store/rxUtils.ts | 8 +++----- 5 files changed, 18 insertions(+), 11 deletions(-) diff --git a/packages/client/src/helpers/DynascaleManager.ts b/packages/client/src/helpers/DynascaleManager.ts index 3d36d8b7a3..f8a625a8f9 100644 --- a/packages/client/src/helpers/DynascaleManager.ts +++ b/packages/client/src/helpers/DynascaleManager.ts @@ -176,7 +176,7 @@ export class DynascaleManager { ), takeWhile((participant) => !!participant), distinctUntilChanged(), - shareReplay(1), + shareReplay({ bufferSize: 1, refCount: true }), ); /** @@ -333,6 +333,7 @@ export class DynascaleManager { ), takeWhile((p) => !!p), distinctUntilChanged(), + shareReplay({ bufferSize: 1, refCount: true }), ); const updateMediaStreamSubscription = participant$ diff --git a/packages/client/src/sorting/comparator.ts b/packages/client/src/sorting/comparator.ts index 9ae5df976f..00bdc1b18f 100644 --- a/packages/client/src/sorting/comparator.ts +++ b/packages/client/src/sorting/comparator.ts @@ -39,7 +39,7 @@ export const descending = (comparator: Comparator): Comparator => { * Creates a new comparator which conditionally applies the given comparator. * * @example - * const shouldSortByValue = () => return false; // to turn it off + * const shouldSortByValue = (a, b) => a % 2 === 0; // return false to turn it off * const byValue = (a, b) => a < b ? - 1 : a > b ? 1 : 0; * const comparator = conditional(shouldSortByValue)(byValue); * diff --git a/packages/client/src/store/CallState.ts b/packages/client/src/store/CallState.ts index 956aac21d0..cc3ab81bbc 100644 --- a/packages/client/src/store/CallState.ts +++ b/packages/client/src/store/CallState.ts @@ -1,5 +1,5 @@ import { BehaviorSubject, Observable } from 'rxjs'; -import { distinctUntilChanged, map } from 'rxjs/operators'; +import { distinctUntilChanged, map, shareReplay } from 'rxjs/operators'; import type { Patch } from './rxUtils'; import * as RxUtils from './rxUtils'; import { @@ -320,24 +320,30 @@ export class CallState { */ constructor() { this.logger = getLogger(['CallState']); - this.participants$ = this.participantsSubject.pipe( - map((ps) => ps.sort(this.sortParticipantsBy)), + this.participants$ = this.participantsSubject.asObservable().pipe( + // TODO: replace with Array.toSorted once available + map((ps) => [...ps].sort(this.sortParticipantsBy)), + shareReplay({ bufferSize: 1, refCount: true }), ); this.localParticipant$ = this.participants$.pipe( map((participants) => participants.find(isStreamVideoLocalParticipant)), + shareReplay({ bufferSize: 1, refCount: true }), ); this.remoteParticipants$ = this.participants$.pipe( map((participants) => participants.filter((p) => !p.isLocalParticipant)), + shareReplay({ bufferSize: 1, refCount: true }), ); this.pinnedParticipants$ = this.participants$.pipe( map((participants) => participants.filter((p) => !!p.pin)), + shareReplay({ bufferSize: 1, refCount: true }), ); this.dominantSpeaker$ = this.participants$.pipe( map((participants) => participants.find((p) => p.isDominantSpeaker)), + shareReplay({ bufferSize: 1, refCount: true }), ); this.hasOngoingScreenShare$ = this.participants$.pipe( @@ -347,6 +353,7 @@ export class CallState { ), ), distinctUntilChanged(), + shareReplay({ bufferSize: 1, refCount: true }), ); this.startedAt$ = this.startedAtSubject.asObservable(); diff --git a/packages/client/src/store/__tests__/CallState.test.ts b/packages/client/src/store/__tests__/CallState.test.ts index 9a14ef40f2..b6fce97e38 100644 --- a/packages/client/src/store/__tests__/CallState.test.ts +++ b/packages/client/src/store/__tests__/CallState.test.ts @@ -66,7 +66,8 @@ describe('CallState', () => { }); const ps2 = state.participants; - expect(ps2.map((p) => p.name)).toEqual(['F', 'B', 'E', 'A', 'C', 'D']); + // should resolve in initial - non-mutated state as set at the beginning + expect(ps2.map((p) => p.name)).toEqual(['A', 'B', 'C', 'D', 'E', 'F']); }); it('should support custom sorting', () => { diff --git a/packages/client/src/store/rxUtils.ts b/packages/client/src/store/rxUtils.ts index 85f8e230be..fb16b4fc01 100644 --- a/packages/client/src/store/rxUtils.ts +++ b/packages/client/src/store/rxUtils.ts @@ -1,5 +1,4 @@ -import { Observable, Subject } from 'rxjs'; -import { take } from 'rxjs/operators'; +import { Observable, Subject, combineLatest } from 'rxjs'; /** * A value or a function which takes the current value and returns a new value. @@ -15,10 +14,9 @@ export type Patch = T | ((currentValue: T) => T); export const getCurrentValue = (observable$: Observable) => { let value!: T; let err: Error | undefined = undefined; - observable$ - .pipe(take(1)) + combineLatest([observable$]) .subscribe({ - next: (v) => { + next: ([v]) => { value = v; }, error: (e) => { From 91d973494e3e21b0dc4358e60ad5ac201bc3f704 Mon Sep 17 00:00:00 2001 From: Anton Arnautov Date: Tue, 19 Sep 2023 10:23:00 +0200 Subject: [PATCH 2/2] Adjust DynascaleManager test --- .../client/src/helpers/__tests__/DynascaleManager.test.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/client/src/helpers/__tests__/DynascaleManager.test.ts b/packages/client/src/helpers/__tests__/DynascaleManager.test.ts index 94c7a3f043..3b8753afc1 100644 --- a/packages/client/src/helpers/__tests__/DynascaleManager.test.ts +++ b/packages/client/src/helpers/__tests__/DynascaleManager.test.ts @@ -227,7 +227,7 @@ describe('DynascaleManager', () => { cleanup?.(); - expect(updateSubscription).toHaveBeenCalledWith( + expect(updateSubscription).toHaveBeenLastCalledWith( 'videoTrack', { 'session-id': { dimension: undefined } }, DebounceType.FAST, @@ -278,7 +278,7 @@ describe('DynascaleManager', () => { cleanup?.(); - expect(updateSubscription).toHaveBeenCalledWith( + expect(updateSubscription).toHaveBeenLastCalledWith( 'videoTrack', { 'session-id': { dimension: undefined } }, DebounceType.FAST, @@ -366,7 +366,7 @@ describe('DynascaleManager', () => { cleanup?.(); - expect(updateSubscription).toHaveBeenCalledWith( + expect(updateSubscription).toHaveBeenLastCalledWith( 'videoTrack', { 'session-id': { dimension: undefined } }, DebounceType.FAST, @@ -436,7 +436,7 @@ describe('DynascaleManager', () => { cleanup?.(); - expect(updateSubscription).toHaveBeenCalledWith( + expect(updateSubscription).toHaveBeenLastCalledWith( 'videoTrack', { 'session-id': { dimension: undefined } }, DebounceType.FAST,