npm i rxjs-autorun
const o = of(1);
const r = combined(() => $(o));
r.subscribe(console.log); // > 1
combined
waits for Observable o
to emit a value
const o = new Subject();
const r = combined(() => $(o));
r.subscribe(console.log);
o.next('🐈'); // > 🐈
recompute c
with latest a
and b
, only when b
updates
const a = new BehaviorSubject('#');
const b = new BehaviorSubject(1);
const c = combined(() => _(a) + $(b));
c.subscribe(observer); // > #1
a.next('💡'); // ~no update~
b.next(42); // > 💡42
use NEVER to suspend emission till source$
emits again
const source$ = timer(0, 1_000);
const even$ = combined(() => $(source$) % 2 == 0 ? _(source$) : _(NEVER));
fetch data every second
function fetch(x){
// mock delayed fetching of x
return of('📦' + x).pipe(delay(100));
}
const a = timer(0, 1_000);
const b = combined(() => fetch($(a)));
const c = combined(() => $($(b)));
c.subscribe(console.log);
// > 📦 1
// > 📦 2
// > 📦 3
// > …
To run an expression, you must wrap it in one of these:
-
combined
returns an Observable that will emit evaluation results -
computed
returns an Observable that will emit distinct evaluation results with distinctive updates -
autorun
internally subscribes tocombined
and returns the subscription
E.g:
combined(() => { … });
You can read values from Observables inside combined
(or computed
, or autorun
) in two ways:
-
$(O)
tellscombined
that it should be re-evaluated whenO
emits, with it's latest value -
_(O)
still provides latest value tocombined
, but doesn't enforce re-evaluation withO
emission
Both functions would interrupt mid-flight if O
has not emitted before and doesn't produce a value synchronously.
If you don't want interruptions — try Observables that always contain a value, such as BehaviorSubject
s, of
, startWith
, etc.
Usually this is all one needs when to use rxjs-autorun
Some times you need to tweak what to do with subscription of an Observable that is not currently used.
So we provide three levels of subscription strength:
-
normal
- default - will unsubscribe if the latest run of expression didn't use this Observable:combined(() => $(a) ? $(b) : 0)
when
a
is falsy —b
is not used and will be dropped when expression finishesNOTE: when you use
$(…)
— it applies normal strength, but you can be explicit about that via$.normal(…)
notation -
strong
- will keep the subscription for the life of the expression:combined(() => $(a) ? $.strong(b) : 0)
when
a
is falsy —b
is not used, but the subscription will be kept -
weak
- will unsubscribe eagerly, if waiting for other Observable to emit:combined(() => $(a) ? $.weak(b) : $.weak(c));
When
a
is truthy —c
is not used and we'll waitb
to emit, meanwhilec
will be unsubscribed eagerly, even beforeb
emitsAnd vice versa: When
a
is falsy —b
is not used and we'll waitc
to emit, meanwhileb
will be unsubscribed eagerly, even beforec
emitsAnother example:
combined(() => $(a) ? $(b) + $.weak(c) : $.weak(c))
When
a
is falsy —b
is not used and will be dropped,c
is used Whena
becomes truthy -b
andc
are used Althoughc
will now have to wait forb
to emit, which takes indefinite time And that's when we might want to markc
for eager unsubscription, untila
orb
emits
See examples for more use-case details
$
and _
memorize Observables that you pass to them. That is done to keep subscriptions and values and not to re-subscribe to same $(O)
on each re-run.
Therefore if you create a new Observable on each run of the expression:
let a = timer(0, 100);
let b = timer(0, 1000);
let c = combined(() => $(a) + $(fetch($(b))));
function fetch(): Observable<any> {
return ajax.getJSON('…');
}
It might lead to unexpected fetches with each a
emission!
If that's not what we need — we can go two ways:
-
create a separate
combined()
that will callfetch
only whenb
changes — see switchMap example for details -
use some memoization or caching technique on
fetch
function that would return same Observable, when called with same arguments
If an Observable doesn't emit a synchronous value when it is subscribed, the expression will be interrupted mid-flight until the Observable emits.
So if you must make side-effects inside combined
— put that after reading from streams:
const o = new Subject();
combined(() => {
console.log('Hello'); // DANGEROUS: perform a side-effect before reading from stream
return $(o); // will fail here since o has not emitted yet
}).subscribe(console.log);
o.next('World');
/** OUTPUT:
* > Hello
* > Hello
* > World
*/
While:
const o = new Subject();
combined(() => {
let value = $(o); // will fail here since o has not emitted yet
console.log('Hello'); // SAFE: perform a side-effect after reading from stream
return value;
}).subscribe(console.log);
o.next('World');
/** OUTPUT:
* > Hello
* > World
*/
We might introduce alternative APIs to help with this
Logic branches might lead to late subscription to a given Observable, because it was not seen on previous runs. And if your Observable doesn't produce a value synchronously when subscribed — then expression will be interrupted mid-flight until any visited Observable from this latest run emits a new value.
We might introduce alternative APIs to help with this
Also note that you might want different handling of unused subscriptions, please see strength section for details.
Currently rxjs-autorun
will skip synchronous emissions and run expression only with latest value emitted, e.g.:
const o = of('a', 'b', 'c');
combined(() => $(o)).subscribe(console.log);
/** OUTPUT:
* > c
*/
This might be fixed in future updates
That will be awesome!
Please create an issue before submitting a PR — we'll be able to discuss it first!
Thanks!