ReactiveX/RxJava

View on GitHub
src/main/java/rx/internal/util/InternalObservableUtils.java

Summary

Maintainability
A
2 hrs
Test Coverage
/**
 * Copyright 2016 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package rx.internal.util;

import java.util.List;
import java.util.concurrent.TimeUnit;

import rx.*;
import rx.Observable.Operator;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.*;
import rx.internal.operators.OperatorAny;
import rx.observables.ConnectableObservable;

/**
 * Holder of named utility classes factored out from Observable to save
 * source space and help with debugging with properly named objects.
 */
public enum InternalObservableUtils {
    ;
    /**
     * A BiFunction that expects a long as its first parameter and returns +1.
     */
    public static final PlusOneLongFunc2 LONG_COUNTER = new PlusOneLongFunc2();

    /**
     * A two-argument function comparing two objects via null-safe equals.
     */
    public static final ObjectEqualsFunc2 OBJECT_EQUALS = new ObjectEqualsFunc2();
    /**
     * A function that converts a List of Observables into an array of Observables.
     */
    public static final ToArrayFunc1 TO_ARRAY = new ToArrayFunc1();

    static final ReturnsVoidFunc1 RETURNS_VOID = new ReturnsVoidFunc1();

    /**
     * A BiFunction that expects an integer as its first parameter and returns +1.
     */
    public static final PlusOneFunc2 COUNTER = new PlusOneFunc2();

    static final NotificationErrorExtractor ERROR_EXTRACTOR = new NotificationErrorExtractor();

    /**
     * Throws an OnErrorNotImplementedException when called.
     */
    public static final Action1<Throwable> ERROR_NOT_IMPLEMENTED = new ErrorNotImplementedAction();

    public static final Operator<Boolean, Object> IS_EMPTY = new OperatorAny<Object>(UtilityFunctions.alwaysTrue(), true);

    static final class PlusOneFunc2 implements Func2<Integer, Object, Integer> {
        @Override
        public Integer call(Integer count, Object o) {
            return count + 1;
        }
    }

    static final class PlusOneLongFunc2 implements Func2<Long, Object, Long> {
        @Override
        public Long call(Long count, Object o) {
            return count + 1;
        }
    }

    static final class ObjectEqualsFunc2 implements Func2<Object, Object, Boolean> {
        @Override
        public Boolean call(Object first, Object second) {
            return first == second || (first != null && first.equals(second));
        }
    }

    static final class ToArrayFunc1 implements Func1<List<? extends Observable<?>>, Observable<?>[]> {
        @Override
        public Observable<?>[] call(List<? extends Observable<?>> o) {
            return o.toArray(new Observable<?>[o.size()]);
        }
    }

    /**
     * Returns a Func1 that checks if its argument is null-safe equals with the given
     * constant reference.
     * @param other the other object to check against (nulls allowed)
     * @return the comparison function
     */
    public static Func1<Object, Boolean> equalsWith(Object other) {
        return new EqualsWithFunc1(other);
    }

    static final class EqualsWithFunc1 implements Func1<Object, Boolean> {
        final Object other;

        public EqualsWithFunc1(Object other) {
            this.other = other;
        }

        @Override
        public Boolean call(Object t) {
            return t == other || (t != null && t.equals(other));
        }
    }

    /**
     * Returns a Func1 that checks if its argument is an instance of
     * the supplied class.
     * @param clazz the class to check against
     * @return the comparison function
     */
    public static Func1<Object, Boolean> isInstanceOf(Class<?> clazz) {
        return new IsInstanceOfFunc1(clazz);
    }

    static final class IsInstanceOfFunc1 implements Func1<Object, Boolean> {
        final Class<?> clazz;

        public IsInstanceOfFunc1(Class<?> other) {
            this.clazz = other;
        }

        @Override
        public Boolean call(Object t) {
            return clazz.isInstance(t);
        }
    }

    /**
     * Returns a function that dematerializes the notification signal from an Observable and calls
     * a notification handler with a null for non-terminal events.
     * @param notificationHandler the handler to notify with nulls
     * @return the Func1 instance
     */
    public static Func1<Observable<? extends Notification<?>>, Observable<?>> createRepeatDematerializer(Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler) {
        return new RepeatNotificationDematerializer(notificationHandler);
    }

    static final class RepeatNotificationDematerializer implements Func1<Observable<? extends Notification<?>>, Observable<?>> {

        final Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler;

        public RepeatNotificationDematerializer(Func1<? super Observable<? extends Void>, ? extends Observable<?>> notificationHandler) {
            this.notificationHandler = notificationHandler;
        }

        @Override
        public Observable<?> call(Observable<? extends Notification<?>> notifications) {
            return notificationHandler.call(notifications.map(RETURNS_VOID));
        }
    }

    static final class ReturnsVoidFunc1 implements Func1<Object, Void> {
        @Override
        public Void call(Object t) {
            return null;
        }
    }

    /**
     * Creates a Func1 which calls the selector function with the received argument, applies an
     * observeOn on the result and returns the resulting Observable.
     * @param <T> the input value type
     * @param <R> the output value type
     * @param selector the selector function
     * @param scheduler the scheduler to apply on the output of the selector
     * @return the new Func1 instance
     */
    public static <T, R> Func1<Observable<T>, Observable<R>> createReplaySelectorAndObserveOn(
            Func1<? super Observable<T>, ? extends Observable<R>> selector,
                    Scheduler scheduler) {
        return new SelectorAndObserveOn<T, R>(selector, scheduler);
    }

    static final class SelectorAndObserveOn<T, R> implements Func1<Observable<T>, Observable<R>> {
        final Func1<? super Observable<T>, ? extends Observable<R>> selector;
        final Scheduler scheduler;

        public SelectorAndObserveOn(Func1<? super Observable<T>, ? extends Observable<R>> selector,
                Scheduler scheduler) {
            super();
            this.selector = selector;
            this.scheduler = scheduler;
        }



        @Override
        public Observable<R> call(Observable<T> t) {
            return selector.call(t).observeOn(scheduler);
        }
    }

    /**
     * Returns a function that dematerializes the notification signal from an Observable and calls
     * a notification handler with the Throwable.
     * @param notificationHandler the handler to notify with Throwables
     * @return the Func1 instance
     */
    public static Func1<Observable<? extends Notification<?>>, Observable<?>> createRetryDematerializer(Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler) {
        return new RetryNotificationDematerializer(notificationHandler);
    }

    static final class RetryNotificationDematerializer implements Func1<Observable<? extends Notification<?>>, Observable<?>> {
        final Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler;

        public RetryNotificationDematerializer(Func1<? super Observable<? extends Throwable>, ? extends Observable<?>> notificationHandler) {
            this.notificationHandler = notificationHandler;
        }

        @Override
        public Observable<?> call(Observable<? extends Notification<?>> notifications) {
            return notificationHandler.call(notifications.map(ERROR_EXTRACTOR));
        }
    }

    static final class NotificationErrorExtractor implements Func1<Notification<?>, Throwable> {
        @Override
        public Throwable call(Notification<?> t) {
            return t.getThrowable();
        }
    }

    /**
     * Returns a Func0 that supplies the ConnectableObservable returned by calling replay() on the source.
     * @param <T> the input value type
     * @param source the source to call replay on by the supplier function
     * @return the new Func0 instance
     */
    public static <T> Func0<ConnectableObservable<T>> createReplaySupplier(final Observable<T> source) {
        return new ReplaySupplierNoParams<T>(source);
    }

    static final class ReplaySupplierNoParams<T> implements Func0<ConnectableObservable<T>> {
        private final Observable<T> source;

        ReplaySupplierNoParams(Observable<T> source) {
            this.source = source;
        }

        @Override
        public ConnectableObservable<T> call() {
            return source.replay();
        }
    }
    /**
     * Returns a Func0 that supplies the ConnectableObservable returned by calling a parameterized replay() on the source.
     * @param <T> the input value type
     * @param source the source to call replay on by the supplier function
     * @param bufferSize
     *            the buffer size that limits the number of items the connectable observable can replay
     * @return the new Func0 instance
     */
    public static <T> Func0<ConnectableObservable<T>> createReplaySupplier(final Observable<T> source, final int bufferSize) {
        return new ReplaySupplierBuffer<T>(source, bufferSize);
    }

    static final class ReplaySupplierBuffer<T> implements Func0<ConnectableObservable<T>> {
        private final Observable<T> source;
        private final int bufferSize;

        ReplaySupplierBuffer(Observable<T> source, int bufferSize) {
            this.source = source;
            this.bufferSize = bufferSize;
        }

        @Override
        public ConnectableObservable<T> call() {
            return source.replay(bufferSize);
        }
    }

    /**
     * Returns a Func0 that supplies the ConnectableObservable returned by calling a parameterized replay() on the source.
     * @param <T> the input value type
     * @param source the source to call replay on by the supplier function
     * @param time
     *            the duration of the window in which the replayed items must have been emitted
     * @param unit
     *            the time unit of {@code time}
     * @param scheduler the scheduler to use for timing information
     * @return the new Func0 instance
     */
    public static <T> Func0<ConnectableObservable<T>> createReplaySupplier(final Observable<T> source,
            final long time, final TimeUnit unit, final Scheduler scheduler) {
        return new ReplaySupplierBufferTime<T>(source, time, unit, scheduler);
    }

    static final class ReplaySupplierBufferTime<T> implements Func0<ConnectableObservable<T>> {
        private final TimeUnit unit;
        private final Observable<T> source;
        private final long time;
        private final Scheduler scheduler;

        ReplaySupplierBufferTime(Observable<T> source, long time, TimeUnit unit, Scheduler scheduler) {
            this.unit = unit;
            this.source = source;
            this.time = time;
            this.scheduler = scheduler;
        }

        @Override
        public ConnectableObservable<T> call() {
            return source.replay(time, unit, scheduler);
        }
    }

    /**
     * Returns a Func0 that supplies the ConnectableObservable returned by calling a parameterized replay() on the source.
     * @param <T> the input value type
     * @param source the source to call replay on by the supplier function
     * @param bufferSize
     *            the buffer size that limits the number of items the connectable observable can replay
     * @param time
     *            the duration of the window in which the replayed items must have been emitted
     * @param unit
     *            the time unit of {@code time}
     * @param scheduler the scheduler to use for timing information
     * @return the new Func0 instance
     */
    public static <T> Func0<ConnectableObservable<T>> createReplaySupplier(final Observable<T> source,
            final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler) {
        return new ReplaySupplierTime<T>(source, bufferSize, time, unit, scheduler);
    }

    static final class ReplaySupplierTime<T> implements Func0<ConnectableObservable<T>> {
        private final long time;
        private final TimeUnit unit;
        private final Scheduler scheduler;
        private final int bufferSize;
        private final Observable<T> source;

        ReplaySupplierTime(Observable<T> source, int bufferSize, long time, TimeUnit unit,
                           Scheduler scheduler) {
            this.time = time;
            this.unit = unit;
            this.scheduler = scheduler;
            this.bufferSize = bufferSize;
            this.source = source;
        }

        @Override
        public ConnectableObservable<T> call() {
            return source.replay(bufferSize, time, unit, scheduler);
        }
    }

    /**
     * Returns a Func2 which calls a collector with its parameters and returns the first (R) parameter.
     * @param <T> the input value type
     * @param <R> the result value type
     * @param collector the collector action to call
     * @return the new Func2 instance
     */
    public static <T, R> Func2<R, T, R> createCollectorCaller(Action2<R, ? super T> collector) {
        return new CollectorCaller<T, R>(collector);
    }

    static final class CollectorCaller<T, R> implements Func2<R, T, R> {
        final Action2<R, ? super T> collector;

        public CollectorCaller(Action2<R, ? super T> collector) {
            this.collector = collector;
        }

        @Override
        public R call(R state, T value) {
            collector.call(state, value);
            return state;
        }
    }

    static final class ErrorNotImplementedAction implements Action1<Throwable> {
        @Override
        public void call(Throwable t) {
            throw new OnErrorNotImplementedException(t);
        }
    }

}