ReactiveX/RxJava

View on GitHub
src/main/java/rx/schedulers/TestScheduler.java

Summary

Maintainability
A
3 hrs
Test Coverage
/**
 * Copyright 2014 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package rx.schedulers;

import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.internal.schedulers.SchedulePeriodicHelper;
import rx.internal.schedulers.SchedulePeriodicHelper.NowNanoSupplier;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;

/**
 * The {@code TestScheduler} is useful for debugging. It allows you to test schedules of events by manually
 * advancing the clock at whatever pace you choose.
 */
public class TestScheduler extends Scheduler {
    final Queue<TimedAction> queue = new PriorityQueue<TimedAction>(11, new CompareActionsByTime());

    static long counter;

    // Storing time in nanoseconds internally.
    long time;

    static final class TimedAction {

        final long time;
        final Action0 action;
        final Worker scheduler;
        private final long count = counter++; // for differentiating tasks at same time

        TimedAction(Worker scheduler, long time, Action0 action) {
            this.time = time;
            this.action = action;
            this.scheduler = scheduler;
        }

        @Override
        public String toString() {
            return String.format("TimedAction(time = %d, action = %s)", time, action.toString());
        }
    }

    static final class CompareActionsByTime implements Comparator<TimedAction> {

        @Override
        public int compare(TimedAction action1, TimedAction action2) {
            if (action1.time == action2.time) {
                return action1.count < action2.count ? -1 : ((action1.count > action2.count) ? 1 : 0);
            } else {
                return action1.time < action2.time ? -1 : ((action1.time > action2.time) ? 1 : 0);
            }
        }
    }

    @Override
    public long now() {
        return TimeUnit.NANOSECONDS.toMillis(time);
    }

    /**
     * Moves the Scheduler's clock forward by a specified amount of time.
     *
     * @param delayTime
     *          the amount of time to move the Scheduler's clock forward
     * @param unit
     *          the units of time that {@code delayTime} is expressed in
     */
    public void advanceTimeBy(long delayTime, TimeUnit unit) {
        advanceTimeTo(time + unit.toNanos(delayTime), TimeUnit.NANOSECONDS);
    }

    /**
     * Moves the Scheduler's clock to a particular moment in time.
     *
     * @param delayTime
     *          the point in time to move the Scheduler's clock to
     * @param unit
     *          the units of time that {@code delayTime} is expressed in
     */
    public void advanceTimeTo(long delayTime, TimeUnit unit) {
        long targetTime = unit.toNanos(delayTime);
        triggerActions(targetTime);
    }

    /**
     * Triggers any actions that have not yet been triggered and that are scheduled to be triggered at or
     * before this Scheduler's present time.
     */
    public void triggerActions() {
        triggerActions(time);
    }

    private void triggerActions(long targetTimeInNanos) {
        while (!queue.isEmpty()) {
            TimedAction current = queue.peek();
            if (current.time > targetTimeInNanos) {
                break;
            }
            // if scheduled time is 0 (immediate) use current virtual time
            time = current.time == 0 ? time : current.time;
            queue.remove();

            // Only execute if not unsubscribed
            if (!current.scheduler.isUnsubscribed()) {
                current.action.call();
            }
        }
        time = targetTimeInNanos;
    }

    @Override
    public Worker createWorker() {
        return new InnerTestScheduler();
    }

    final class InnerTestScheduler extends Worker implements NowNanoSupplier {

        private final BooleanSubscription s = new BooleanSubscription();

        @Override
        public void unsubscribe() {
            s.unsubscribe();
        }

        @Override
        public boolean isUnsubscribed() {
            return s.isUnsubscribed();
        }

        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            final TimedAction timedAction = new TimedAction(this, time + unit.toNanos(delayTime), action);
            queue.add(timedAction);
            return Subscriptions.create(new Action0() {

                @Override
                public void call() {
                    queue.remove(timedAction);
                }

            });
        }

        @Override
        public Subscription schedule(Action0 action) {
            final TimedAction timedAction = new TimedAction(this, 0, action);
            queue.add(timedAction);
            return Subscriptions.create(new Action0() {

                @Override
                public void call() {
                    queue.remove(timedAction);
                }

            });
        }

        @Override
        public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) {
            return SchedulePeriodicHelper.schedulePeriodically(this,
                    action, initialDelay, period, unit, this);
        }

        @Override
        public long now() {
            return TestScheduler.this.now();
        }

        @Override
        public long nowNanos() {
            return TestScheduler.this.time;
        }

    }

}