ReactiveX/RxJava

View on GitHub
src/main/java/rx/Scheduler.java

Summary

Maintainability
A
0 mins
Test Coverage
/**
 * Copyright 2014 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package rx;

import java.util.concurrent.TimeUnit;

import rx.functions.*;
import rx.internal.schedulers.*;
import rx.schedulers.Schedulers;

/**
 * A {@code Scheduler} is an object that schedules units of work. You can find common implementations of this
 * class in {@link Schedulers}.
 */
public abstract class Scheduler {
/*
 * Why is this an abstract class instead of an interface?
 *
 *  : Java doesn't support extension methods and there are many overload methods needing default
 *    implementations.
 *
 *  : Virtual extension methods aren't available until Java8 which RxJava will not set as a minimum target for
 *    a long time.
 *
 *  : If only an interface were used Scheduler implementations would then need to extend from an
 *    AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the
 *    functionality.
 *
 *  : Without virtual extension methods even additive changes are breaking and thus severely impede library
 *    maintenance.
 */
    /**
     * Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
     * <p>
     * When work is completed it should be unsubscribed using {@link Scheduler.Worker#unsubscribe()}.
     * <p>
     * Work on a {@link Scheduler.Worker} is guaranteed to be sequential.
     *
     * @return a Worker representing a serial queue of actions to be executed
     */
    public abstract Worker createWorker();

    /**
     * Sequential Scheduler for executing actions on a single thread or event loop.
     * <p>
     * Unsubscribing the {@link Worker} cancels all outstanding work and allows resources cleanup.
     */
    public abstract static class Worker implements Subscription {

        /**
         * Schedules an Action for execution.
         *
         * @param action
         *            Action to schedule
         * @return a subscription to be able to prevent or cancel the execution of the action
         */
        public abstract Subscription schedule(Action0 action);

        /**
         * Schedules an Action for execution at some point in the future.
         * <p>
         * Note to implementors: non-positive {@code delayTime} should be regarded as non-delayed schedule, i.e.,
         * as if the {@link #schedule(rx.functions.Action0)} was called.
         *
         * @param action
         *            the Action to schedule
         * @param delayTime
         *            time to wait before executing the action; non-positive values indicate an non-delayed
         *            schedule
         * @param unit
         *            the time unit of {@code delayTime}
         * @return a subscription to be able to prevent or cancel the execution of the action
         */
        public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);

        /**
         * Schedules a cancelable action to be executed periodically. This default implementation schedules
         * recursively and waits for actions to complete (instead of potentially executing long-running actions
         * concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
         * <p>
         * Note to implementors: non-positive {@code initialTime} and {@code period} should be regarded as
         * non-delayed scheduling of the first and any subsequent executions.
         *
         * @param action
         *            the Action to execute periodically
         * @param initialDelay
         *            time to wait before executing the action for the first time; non-positive values indicate
         *            an non-delayed schedule
         * @param period
         *            the time interval to wait each time in between executing the action; non-positive values
         *            indicate no delay between repeated schedules
         * @param unit
         *            the time unit of {@code period}
         * @return a subscription to be able to prevent or cancel the execution of the action
         */
        public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
            return SchedulePeriodicHelper.schedulePeriodically(this, action,
                    initialDelay, period, unit, null);
        }

        /**
         * Gets the current time, in milliseconds, according to this Scheduler.
         *
         * @return the scheduler's notion of current absolute time in milliseconds
         */
        public long now() {
            return System.currentTimeMillis();
        }
    }

    /**
     * Gets the current time, in milliseconds, according to this Scheduler.
     *
     * @return the scheduler's notion of current absolute time in milliseconds
     */
    public long now() {
        return System.currentTimeMillis();
    }

    /**
     * Allows the use of operators for controlling the timing around when
     * actions scheduled on workers are actually done. This makes it possible to
     * layer additional behavior on this {@link Scheduler}. The only parameter
     * is a function that flattens an {@link Observable} of {@link Observable}
     * of {@link Completable}s into just one {@link Completable}. There must be
     * a chain of operators connecting the returned value to the source
     * {@link Observable} otherwise any work scheduled on the returned
     * {@link Scheduler} will not be executed.
     * <p>
     * When {@link Scheduler#createWorker()} is invoked a {@link Observable} of
     * {@link Completable}s is onNext'd to the combinator to be flattened. If
     * the inner {@link Observable} is not immediately subscribed to an calls to
     * {@link Worker#schedule} are buffered. Once the {@link Observable} is
     * subscribed to actions are then onNext'd as {@link Completable}s.
     * <p>
     * Finally the actions scheduled on the parent {@link Scheduler} when the
     * inner most {@link Completable}s are subscribed to.
     * <p>
     * When the {@link Worker} is unsubscribed the {@link Completable} emits an
     * onComplete and triggers any behavior in the flattening operator. The
     * {@link Observable} and all {@link Completable}s give to the flattening
     * function never onError.
     * <p>
     * Limit the amount concurrency two at a time without creating a new fix
     * size thread pool:
     *
     * <pre>
     * Scheduler limitScheduler = Schedulers.computation().when(workers -> {
     *     // use merge max concurrent to limit the number of concurrent
     *     // callbacks two at a time
     *     return Completable.merge(Observable.merge(workers), 2);
     * });
     * </pre>
     * <p>
     * This is a slightly different way to limit the concurrency but it has some
     * interesting benefits and drawbacks to the method above. It works by
     * limited the number of concurrent {@link Worker}s rather than individual
     * actions. Generally each {@link Observable} uses its own {@link Worker}.
     * This means that this will essentially limit the number of concurrent
     * subscribes. The danger comes from using operators like
     * {@link Observable#zip(Observable, Observable, rx.functions.Func2)} where
     * subscribing to the first {@link Observable} could deadlock the
     * subscription to the second.
     *
     * <pre>
     * Scheduler limitScheduler = Schedulers.computation().when(workers -> {
     *     // use merge max concurrent to limit the number of concurrent
     *     // Observables two at a time
     *     return Completable.merge(Observable.merge(workers, 2));
     * });
     * </pre>
     *
     * Slowing down the rate to no more than than 1 a second. This suffers from
     * the same problem as the one above I could find an {@link Observable}
     * operator that limits the rate without dropping the values (aka leaky
     * bucket algorithm).
     *
     * <pre>
     * Scheduler slowScheduler = Schedulers.computation().when(workers -> {
     *     // use concatenate to make each worker happen one at a time.
     *     return Completable.concat(workers.map(actions -> {
     *         // delay the starting of the next worker by 1 second.
     *         return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
     *    }));
     * });
     * </pre>
     *
     * @param <S> a Scheduler and a Subscription
     * @param combine the function that takes a two-level nested Observable sequence of a Completable and returns
     * the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.
     * @return the Scheduler with the customized execution behavior
     * @since 1.3
     */
    @SuppressWarnings("unchecked")
    public <S extends Scheduler & Subscription> S when(Func1<Observable<Observable<Completable>>, Completable> combine) {
        return (S) new SchedulerWhen(combine, this);
    }
}