Skip to content

Commit

Permalink
perf(client): share replay of computed observables (#1095)
Browse files Browse the repository at this point in the history
Adds `shareReplay` to computed observables which when previously
subscribed to by multiple components would be evaluated separately,
i.e., each source data emission would evaluate defined pipeline for each
subscription individually.

"Futureproofs" `RxUtils.getCurrentValue` method through usage of
`combineLatest` function which will always take the last emitted value
even from "pure" observables unlike `take` operator which "resolves"
with first emitted value.

Fixes `noop` sorting (\w) test where "sort" mutates source data - when
we move to TS5.2 we should replace that with `Array.toSorted`.

### Implementation notes

I mentioned using `share` operator which uses `refCount` and `Subject`
under the hood meaning that if amount of subscriptions on such
observable reaches 0 (refCount === 0) - it resets the `Subject` and
re-evaluates the pipeline and assignes new value on next subscription.
Using it in our SDK proved to be difficult and was behaving unexpectedly
(broke tests and certain observables would return "undefined" instead of
initial values) though on paper it makes perfect sense. I opted to use
`shareReplay` which by default (with only buffer size as argument) works
like a `ReplaySubject` but continues running defined pipeline (never
unsubscribes from the source observable) even after the `refCount`
reached zero. With the overloaded variant `shareReplay({ bufferSize: 1,
refCount: true })` it behaves just as expected and does not break our
SDK - meaning the values (pipelines) are being computed only once per
multiple subscriptions and only if it has at least one subscription.

### Sources: 
- [difference between `share` and `shareReplay`
operators](https://www.bitovi.com/blog/always-know-when-to-use-share-vs.-sharereplay)
- though I found a discrepancy in the post - they mention that the
`shareReplay` with `refCount` does not re-evaluate source observable but
through my testing I found that to be false (yet it does something
differently under the hood)
- [`shareReplay`
operator](https://rxjs.dev/api/index/function/shareReplay)
- [`share` operator](https://rxjs.dev/api/index/function/share)

---------

Co-authored-by: Oliver Lazoroski <[email protected]>
  • Loading branch information
arnautov-anton and oliverlaz authored Sep 26, 2023
1 parent a253cbf commit 759d9a2
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 15 deletions.
3 changes: 2 additions & 1 deletion packages/client/src/helpers/DynascaleManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ export class DynascaleManager {
),
takeWhile((participant) => !!participant),
distinctUntilChanged(),
shareReplay(1),
shareReplay({ bufferSize: 1, refCount: true }),
);

/**
Expand Down Expand Up @@ -339,6 +339,7 @@ export class DynascaleManager {
),
takeWhile((p) => !!p),
distinctUntilChanged(),
shareReplay({ bufferSize: 1, refCount: true }),
);

const updateMediaStreamSubscription = participant$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ describe('DynascaleManager', () => {

cleanup?.();

expect(updateSubscription).toHaveBeenCalledWith(
expect(updateSubscription).toHaveBeenLastCalledWith(
'videoTrack',
{ 'session-id': { dimension: undefined } },
DebounceType.FAST,
Expand Down Expand Up @@ -278,7 +278,7 @@ describe('DynascaleManager', () => {

cleanup?.();

expect(updateSubscription).toHaveBeenCalledWith(
expect(updateSubscription).toHaveBeenLastCalledWith(
'videoTrack',
{ 'session-id': { dimension: undefined } },
DebounceType.FAST,
Expand Down Expand Up @@ -366,7 +366,7 @@ describe('DynascaleManager', () => {

cleanup?.();

expect(updateSubscription).toHaveBeenCalledWith(
expect(updateSubscription).toHaveBeenLastCalledWith(
'videoTrack',
{ 'session-id': { dimension: undefined } },
DebounceType.FAST,
Expand Down Expand Up @@ -436,7 +436,7 @@ describe('DynascaleManager', () => {

cleanup?.();

expect(updateSubscription).toHaveBeenCalledWith(
expect(updateSubscription).toHaveBeenLastCalledWith(
'videoTrack',
{ 'session-id': { dimension: undefined } },
DebounceType.FAST,
Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/sorting/comparator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export const descending = <T>(comparator: Comparator<T>): Comparator<T> => {
* Creates a new comparator which conditionally applies the given comparator.
*
* @example
* const shouldSortByValue = () => return false; // to turn it off
* const shouldSortByValue = (a, b) => a % 2 === 0; // return false to turn it off
* const byValue = (a, b) => a < b ? - 1 : a > b ? 1 : 0;
* const comparator = conditional(shouldSortByValue)(byValue);
*
Expand Down
13 changes: 10 additions & 3 deletions packages/client/src/store/CallState.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { BehaviorSubject, Observable } from 'rxjs';
import { distinctUntilChanged, map } from 'rxjs/operators';
import { distinctUntilChanged, map, shareReplay } from 'rxjs/operators';
import type { Patch } from './rxUtils';
import * as RxUtils from './rxUtils';
import {
Expand Down Expand Up @@ -320,24 +320,30 @@ export class CallState {
*/
constructor() {
this.logger = getLogger(['CallState']);
this.participants$ = this.participantsSubject.pipe(
map((ps) => ps.sort(this.sortParticipantsBy)),
this.participants$ = this.participantsSubject.asObservable().pipe(
// TODO: replace with Array.toSorted once available
map((ps) => [...ps].sort(this.sortParticipantsBy)),
shareReplay({ bufferSize: 1, refCount: true }),
);

this.localParticipant$ = this.participants$.pipe(
map((participants) => participants.find(isStreamVideoLocalParticipant)),
shareReplay({ bufferSize: 1, refCount: true }),
);

this.remoteParticipants$ = this.participants$.pipe(
map((participants) => participants.filter((p) => !p.isLocalParticipant)),
shareReplay({ bufferSize: 1, refCount: true }),
);

this.pinnedParticipants$ = this.participants$.pipe(
map((participants) => participants.filter((p) => !!p.pin)),
shareReplay({ bufferSize: 1, refCount: true }),
);

this.dominantSpeaker$ = this.participants$.pipe(
map((participants) => participants.find((p) => p.isDominantSpeaker)),
shareReplay({ bufferSize: 1, refCount: true }),
);

this.hasOngoingScreenShare$ = this.participants$.pipe(
Expand All @@ -347,6 +353,7 @@ export class CallState {
),
),
distinctUntilChanged(),
shareReplay({ bufferSize: 1, refCount: true }),
);

this.startedAt$ = this.startedAtSubject.asObservable();
Expand Down
3 changes: 2 additions & 1 deletion packages/client/src/store/__tests__/CallState.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ describe('CallState', () => {
});

const ps2 = state.participants;
expect(ps2.map((p) => p.name)).toEqual(['F', 'B', 'E', 'A', 'C', 'D']);
// should resolve in initial - non-mutated state as set at the beginning
expect(ps2.map((p) => p.name)).toEqual(['A', 'B', 'C', 'D', 'E', 'F']);
});

it('should support custom sorting', () => {
Expand Down
8 changes: 3 additions & 5 deletions packages/client/src/store/rxUtils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Observable, Subject } from 'rxjs';
import { take } from 'rxjs/operators';
import { Observable, Subject, combineLatest } from 'rxjs';

/**
* A value or a function which takes the current value and returns a new value.
Expand All @@ -15,10 +14,9 @@ export type Patch<T> = T | ((currentValue: T) => T);
export const getCurrentValue = <T>(observable$: Observable<T>) => {
let value!: T;
let err: Error | undefined = undefined;
observable$
.pipe(take(1))
combineLatest([observable$])
.subscribe({
next: (v) => {
next: ([v]) => {
value = v;
},
error: (e) => {
Expand Down

0 comments on commit 759d9a2

Please sign in to comment.