-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Scheduler
If you want to introduce multithreading into your cascade of Observable operators, you can do so by instructing those operators (or particular Observables) to operate on particular Schedulers.
Many of the RxJava Observable operators have varieties that take a Scheduler as a parameter. These instruct the operator to do some or all of its work on a particular Scheduler.
You can make an Observable act on a particular Scheduler by means of the observeOn
or subscribeOn
operators. observeOn
instructs an Observable to call its observer's onNext
, onError
, and onCompleted
methods on a particular Scheduler; subscribeOn
takes this a step further and instructs the Observable to do all of its processing (including the sending of items and notifications to observers) on a particular Scheduler.
You obtain a Scheduler from the factory methods described in the Schedulers
class. The following table shows the varieties of Scheduler that are available to you by means of these methods:
Scheduler |
purpose |
---|---|
Schedulers.computation( ) |
meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead) |
Schedulers.from(executor) |
uses the specified `Executor` as a Scheduler |
Schedulers.immediate( ) |
schedules work to begin immediately in the current thread |
Schedulers.io( ) |
meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by a thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( )
|
Schedulers.newThread( ) |
creates a new thread for each unit of work |
Schedulers.test( ) |
useful for testing purposes (see below) |
Schedulers.trampoline( ) |
queues work to begin on the current thread after any already-queued work |
Some Observable operators in RxJava have alternate forms that allow you to set which Scheduler the operator will use for (at least some part of) its operation. Others do not operate on any particular Scheduler, or operate on a particular default Scheduler. Those that have a particular default Scheduler include:
operator | Scheduler |
---|---|
buffer(timespan) |
computation |
buffer(timespan, count) |
computation |
buffer(timespan, timeshift) |
computation |
debounce(timeout, unit) |
computation |
delay(delay, unit) |
computation |
delaySubscription(delay, unit) |
computation |
interval |
computation |
parallel |
computation |
repeat |
trampoline |
repeatWhen |
trampoline |
replay(time, unit) |
computation |
replay(buffersize, time, unit) |
computation |
replay(selector, time, unit) |
computation |
replay(selector, buffersize, time, unit) |
computation |
retry |
trampoline |
sample(period, unit) |
computation |
skip(time, unit) |
computation |
skipLast(time, unit) |
computation |
take(time, unit) |
computation |
takeLast(time, unit) |
computation |
takeLast(count, time, unit) |
computation |
takeLastBuffer(time, unit) |
computation |
takeLastBuffer(count, time, unit) |
computation |
throttleFirst |
computation |
throttleLast |
computation |
throttleWithTimeout |
computation |
timeInterval |
immediate |
timeout(timeoutSelector) |
immediate |
timeout(firstTimeoutSelector, timeoutSelector) |
immediate |
timeout(timeoutSelector, other) |
immediate |
timeout(timeout, timeUnit) |
computation |
timeout(firstTimeoutSelector, timeoutSelector, other) |
immediate |
timeout(timeout, timeUnit, other) |
computation |
timer |
computation |
timestamp |
immediate |
window(timespan) |
computation |
window(timespan, count) |
computation |
window(timespan, timeshift) |
computation |
Aside from passing these Schedulers in to RxJava Observable operators, you can also use them to schedule your own work on Subscriptions. The following example uses the schedule( )
method of the Scheduler
class to schedule work on the newThread
Scheduler:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
}
});
// some time later...
worker.unsubscribe();
To schedule recursive calls, you can use schedule( )
and then schedule(this)
on the Worker object:
worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
@Override
public void call() {
yourWork();
// recurse until unsubscribed (schedule will do nothing if unsubscribed)
worker.schedule(this);
}
});
// some time later...
worker.unsubscribe();
Objects of the Worker
class implement the Subscription
interface, with its isUnsubscribed( )
and unsubscribe( )
methods, so you can stop work when a subscription is cancelled, or you can cancel the subscription from within the scheduled task:
Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {
@Override
public void call() {
while(!worker.isUnsubscribed()) {
status = yourWork();
if(QUIT == status) { worker.unsubscribe(); }
}
}
});
The Worker
is also a Subscription
and so you can (and should, eventually) call its unsubscribe( )
method to signal that it can halt work and release resources:
worker.unsubscribe();
You can also use a version of schedule( )
that delays your action on the given Scheduler until a certain timespan has passed. The following example schedules someAction
to be performed on someScheduler
after 500ms have passed according to that Scheduler's clock:
someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);
Another Scheduler
method allows you to schedule an action to take place at regular intervals. The following example schedules someAction
to be performed on someScheduler
after 500ms have passed, and then every 250ms thereafter:
someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);
A TestScheduler
allows you to exercise fine-tuned manual control over how the scheduler’s clock behaves. This can be useful for testing interactions that depend on precise arrangements of actions in time. This scheduler has four additional methods:
- advanceTimeTo(time,unit)
- advances the Scheduler’s clock to a particular point in time
- advanceTimeBy(time,unit)
- advances the Scheduler’s clock forward by a particular amount of time
- triggerActions( )
- start any unstarted actions that have been scheduled for a time equal to or earlier than the present time according to the Scheduler’s clock
- triggerActions(time) (the time unit is nanoseconds)
- start any unstarted actions that have been scheduled for a time equal to or earlier than the specified time
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava | Gitter @RxJava