Skip to content

Commit

Permalink
Share replay of computed observables
Browse files Browse the repository at this point in the history
  • Loading branch information
arnautov-anton committed Sep 20, 2023
1 parent da140cb commit 1092f56
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 11 deletions.
3 changes: 2 additions & 1 deletion packages/client/src/helpers/DynascaleManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ export class DynascaleManager {
),
takeWhile((participant) => !!participant),
distinctUntilChanged(),
shareReplay(1),
shareReplay({ bufferSize: 1, refCount: true }),
);

// keep copy for resize observer handler
Expand Down Expand Up @@ -317,6 +317,7 @@ export class DynascaleManager {
),
takeWhile((p) => !!p),
distinctUntilChanged(),
shareReplay({ bufferSize: 1, refCount: true }),
);

const updateMediaStreamSubscription = participant$
Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/sorting/comparator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export const descending = <T>(comparator: Comparator<T>): Comparator<T> => {
* Creates a new comparator which conditionally applies the given comparator.
*
* @example
* const shouldSortByValue = () => return false; // to turn it off
* const shouldSortByValue = (a, b) => a % 2 === 0; // return false to turn it off
* const byValue = (a, b) => a < b ? - 1 : a > b ? 1 : 0;
* const comparator = conditional(shouldSortByValue)(byValue);
*
Expand Down
13 changes: 10 additions & 3 deletions packages/client/src/store/CallState.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { BehaviorSubject, Observable } from 'rxjs';
import { distinctUntilChanged, map } from 'rxjs/operators';
import { distinctUntilChanged, map, shareReplay } from 'rxjs/operators';
import type { Patch } from './rxUtils';
import * as RxUtils from './rxUtils';
import {
Expand Down Expand Up @@ -320,24 +320,30 @@ export class CallState {
*/
constructor() {
this.logger = getLogger(['CallState']);
this.participants$ = this.participantsSubject.pipe(
map((ps) => ps.sort(this.sortParticipantsBy)),
this.participants$ = this.participantsSubject.asObservable().pipe(
// TODO: replace with Array.toSorted once available
map((ps) => [...ps].sort(this.sortParticipantsBy)),
shareReplay({ bufferSize: 1, refCount: true }),
);

this.localParticipant$ = this.participants$.pipe(
map((participants) => participants.find(isStreamVideoLocalParticipant)),
shareReplay({ bufferSize: 1, refCount: true }),
);

this.remoteParticipants$ = this.participants$.pipe(
map((participants) => participants.filter((p) => !p.isLocalParticipant)),
shareReplay({ bufferSize: 1, refCount: true }),
);

this.pinnedParticipants$ = this.participants$.pipe(
map((participants) => participants.filter((p) => !!p.pin)),
shareReplay({ bufferSize: 1, refCount: true }),
);

this.dominantSpeaker$ = this.participants$.pipe(
map((participants) => participants.find((p) => p.isDominantSpeaker)),
shareReplay({ bufferSize: 1, refCount: true }),
);

this.hasOngoingScreenShare$ = this.participants$.pipe(
Expand All @@ -347,6 +353,7 @@ export class CallState {
),
),
distinctUntilChanged(),
shareReplay({ bufferSize: 1, refCount: true }),
);

this.startedAt$ = this.startedAtSubject.asObservable();
Expand Down
3 changes: 2 additions & 1 deletion packages/client/src/store/__tests__/CallState.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ describe('CallState', () => {
});

const ps2 = state.participants;
expect(ps2.map((p) => p.name)).toEqual(['F', 'B', 'E', 'A', 'C', 'D']);
// should resolve in initial - non-mutated state as set at the beginning
expect(ps2.map((p) => p.name)).toEqual(['A', 'B', 'C', 'D', 'E', 'F']);
});

it('should support custom sorting', () => {
Expand Down
8 changes: 3 additions & 5 deletions packages/client/src/store/rxUtils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Observable, Subject } from 'rxjs';
import { take } from 'rxjs/operators';
import { Observable, Subject, combineLatest } from 'rxjs';

/**
* A value or a function which takes the current value and returns a new value.
Expand All @@ -15,10 +14,9 @@ export type Patch<T> = T | ((currentValue: T) => T);
export const getCurrentValue = <T>(observable$: Observable<T>) => {
let value!: T;
let err: Error | undefined = undefined;
observable$
.pipe(take(1))
combineLatest([observable$])
.subscribe({
next: (v) => {
next: ([v]) => {
value = v;
},
error: (e) => {
Expand Down

0 comments on commit 1092f56

Please sign in to comment.