-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Scheduler
If you want to introduce multithreading into your cascade of Observable operators, you can do so by instructing those operators (or particular Observables) to operate on particular Schedulers.
You can make an Observable act on a particular Scheduler by means of the observeOn
and subscribeOn
operators. You can also split an operator that works on an Observable onto multiple threads with the parallel
operator.
Many of the RxJava Observable operators have varieties that take a Scheduler as an operator. These instruct the operator to do some or all of its work on a particular Scheduler.
You obtain a Scheduler from the factory methods described in the Schedulers
class. The following table shows the varieties of Scheduler that are available to you by means of these methods:
Scheduler |
purpose |
---|---|
Schedulers.computation( ) |
meant for computational work such as event-loops and callback processing; do not use this scheduler for I/O (use Schedulers.io( ) instead) |
Schedulers.trampoline( ) |
queues work to begin on the current thread after any already-queued work |
Schedulers.executor( ) |
queues work to be done on either an Executor or ScheduledExecutorService (Note that if you use an Executor instead of a ScheduledExecutorService then the Scheduler will use a system-wide Timer to handle delayed events.) |
Schedulers.immediate( ) |
schedules work to begin immediately in the current thread |
Schedulers.io( ) |
meant for I/O-bound work such as asynchronous performance of blocking I/O, this scheduler is backed by an Executor thread-pool that will grow as needed; for ordinary computational work, switch to Schedulers.computation( )
|
Schedulers.newThread( ) |
creates a new thread for each unit of work |
Some Observable operators in RxJava have alternate forms that allow you to set which Scheduler the operator will use for (at least some part of) its operation. For these operators, if you do not set the Scheduler, the operator will use the default computation
Scheduler.
Other operators do not have a form that permits you to set their Schedulers. Some of these, like startWith
, empty
, error
, from
, just
, merge
, and range
do not use a Scheduler. A few others use particular schedulers, as in the following table:
operator | Scheduler |
---|---|
parallelMerge |
currentThread |
repeat |
currentThread |
timeInterval |
immediate |
timestamp |
immediate |
Aside from passing these Schedulers in to RxJava Observable operators, you can also use them to schedule your own work. The following example uses the schedule( )
method of the Scheduler
class to schedule work on the newThread
Scheduler:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
doWork();
}
});
The inner
parameter allows you to schedule recursive calls:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
doWork();
// recurse until unsubscribed (the schedule will do nothing if unsubscribed)
inner.schedule(this);
}
});
inner
also implements the Subscription
interface, and its isUnsubscribed( )
and unsubscribe( )
methods, so you can stop work when a subscription is cancelled, or you can cancel the subscription from within the scheduled task:
Schedulers.newThread().schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
while(!inner.isUnsubscribed()) {
status = doWork();
if(QUIT == status) { inner.unsubscribe(); }
}
}
});
The schedule( )
method returns a Subscription
and so you can call its unsubscribe( )
method to signal that it can halt work.
You can also use a version of schedule( )
that delays your task on the given Scheduler until a certain timespan has passed. The following example schedules someTask
to be performed on someScheduler
after 500ms have passed according to that Scheduler's clock:
someScheduler.schedule(someTask, 500, TimeUnit.MILLISECONDS);
Copyright (c) 2016-present, RxJava Contributors.
Twitter @RxJava | Gitter @RxJava