ReactiveX/RxJava

View on GitHub
src/main/java/rx/internal/operators/OnSubscribeConcatMap.java

Summary

Maintainability
D
2 days
Test Coverage
/**
 * Copyright 2016 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */

package rx.internal.operators;

import java.util.Queue;
import java.util.concurrent.atomic.*;

import rx.*;
import rx.Observable.OnSubscribe;
import rx.exceptions.*;
import rx.functions.Func1;
import rx.internal.producers.ProducerArbiter;
import rx.internal.util.*;
import rx.internal.util.atomic.SpscAtomicArrayQueue;
import rx.internal.util.unsafe.*;
import rx.observers.SerializedSubscriber;
import rx.plugins.RxJavaHooks;
import rx.subscriptions.SerialSubscription;

/**
 * Maps a source sequence into Observables and concatenates them in order, subscribing
 * to one at a time.
 * @param <T> the source value type
 * @param <R> the output value type
 *
 * @since 1.1.2
 */
public final class OnSubscribeConcatMap<T, R> implements OnSubscribe<R> {
    final Observable<? extends T> source;

    final Func1<? super T, ? extends Observable<? extends R>> mapper;

    final int prefetch;

    /**
     * How to handle errors from the main and inner Observables.
     * See the constants below.
     */
    final int delayErrorMode;

    /** Whenever any Observable fires an error, terminate with that error immediately. */
    public static final int IMMEDIATE = 0;

    /** Whenever the main fires an error, wait until the inner terminates. */
    public static final int BOUNDARY = 1;

    /** Delay all errors to the very end. */
    public static final int END = 2;

    public OnSubscribeConcatMap(Observable<? extends T> source, Func1<? super T, ? extends Observable<? extends R>> mapper, int prefetch,
            int delayErrorMode) {
        this.source = source;
        this.mapper = mapper;
        this.prefetch = prefetch;
        this.delayErrorMode = delayErrorMode;
    }

    @Override
    public void call(Subscriber<? super R> child) {
        Subscriber<? super R> s;

        if (delayErrorMode == IMMEDIATE) {
            s = new SerializedSubscriber<R>(child);
        } else {
            s = child;
        }

        final ConcatMapSubscriber<T, R> parent = new ConcatMapSubscriber<T, R>(s, mapper, prefetch, delayErrorMode);

        child.add(parent);
        child.add(parent.inner);
        child.setProducer(new Producer() {
            @Override
            public void request(long n) {
                parent.requestMore(n);
            }
        });

        if (!child.isUnsubscribed()) {
            source.unsafeSubscribe(parent);
        }
    }

    static final class ConcatMapSubscriber<T, R> extends Subscriber<T> {
        final Subscriber<? super R> actual;

        final Func1<? super T, ? extends Observable<? extends R>> mapper;

        final int delayErrorMode;

        final ProducerArbiter arbiter;

        final Queue<Object> queue;

        final AtomicInteger wip;

        final AtomicReference<Throwable> error;

        final SerialSubscription inner;

        volatile boolean done;

        volatile boolean active;

        public ConcatMapSubscriber(Subscriber<? super R> actual,
                Func1<? super T, ? extends Observable<? extends R>> mapper, int prefetch, int delayErrorMode) {
            this.actual = actual;
            this.mapper = mapper;
            this.delayErrorMode = delayErrorMode;
            this.arbiter = new ProducerArbiter();
            this.wip = new AtomicInteger();
            this.error = new AtomicReference<Throwable>();
            Queue<Object> q;
            if (UnsafeAccess.isUnsafeAvailable()) {
                q = new SpscArrayQueue<Object>(prefetch);
            } else {
                q = new SpscAtomicArrayQueue<Object>(prefetch);
            }
            this.queue = q;
            this.inner = new SerialSubscription();
            this.request(prefetch);
        }

        @Override
        public void onNext(T t) {
            if (!queue.offer(NotificationLite.next(t))) {
                unsubscribe();
                onError(new MissingBackpressureException());
            } else {
                drain();
            }
        }

        @Override
        public void onError(Throwable mainError) {
            if (ExceptionsUtils.addThrowable(error, mainError)) {
                done = true;
                if (delayErrorMode == IMMEDIATE) {
                    Throwable ex = ExceptionsUtils.terminate(error);
                    if (!ExceptionsUtils.isTerminated(ex)) {
                        actual.onError(ex);
                    }
                    inner.unsubscribe();
                } else {
                    drain();
                }
            } else {
                pluginError(mainError);
            }
        }

        @Override
        public void onCompleted() {
            done = true;
            drain();
        }

        void requestMore(long n) {
            if (n > 0) {
                arbiter.request(n);
            } else
            if (n < 0) {
                throw new IllegalArgumentException("n >= 0 required but it was " + n);
            }
        }

        void innerNext(R value) {
            actual.onNext(value);
        }

        void innerError(Throwable innerError, long produced) {
            if (!ExceptionsUtils.addThrowable(error, innerError)) {
                pluginError(innerError);
            } else
            if (delayErrorMode == IMMEDIATE) {
                Throwable ex = ExceptionsUtils.terminate(error);
                if (!ExceptionsUtils.isTerminated(ex)) {
                    actual.onError(ex);
                }
                unsubscribe();
            } else {
                if (produced != 0L) {
                    arbiter.produced(produced);
                }
                active = false;
                drain();
            }
        }

        void innerCompleted(long produced) {
            if (produced != 0L) {
                arbiter.produced(produced);
            }
            active = false;
            drain();
        }

        void pluginError(Throwable e) {
            RxJavaHooks.onError(e);
        }

        void drain() {
            if (wip.getAndIncrement() != 0) {
                return;
            }

            final int delayErrorMode = this.delayErrorMode;

            for (;;) {
                if (actual.isUnsubscribed()) {
                    return;
                }

                if (!active) {
                    if (delayErrorMode == BOUNDARY) {
                        if (error.get() != null) {
                            Throwable ex = ExceptionsUtils.terminate(error);
                            if (!ExceptionsUtils.isTerminated(ex)) {
                                actual.onError(ex);
                            }
                            return;
                        }
                    }

                    boolean mainDone = done;
                    Object v = queue.poll();
                    boolean empty = v == null;

                    if (mainDone && empty) {
                        Throwable ex = ExceptionsUtils.terminate(error);
                        if (ex == null) {
                            actual.onCompleted();
                        } else
                        if (!ExceptionsUtils.isTerminated(ex)) {
                            actual.onError(ex);
                        }
                        return;
                    }

                    if (!empty) {

                        Observable<? extends R> source;

                        try {
                            source = mapper.call(NotificationLite.<T>getValue(v));
                        } catch (Throwable mapperError) {
                            Exceptions.throwIfFatal(mapperError);
                            drainError(mapperError);
                            return;
                        }

                        if (source == null) {
                            drainError(new NullPointerException("The source returned by the mapper was null"));
                            return;
                        }

                        if (source != Observable.empty()) {

                            if (source instanceof ScalarSynchronousObservable) {
                                ScalarSynchronousObservable<? extends R> scalarSource = (ScalarSynchronousObservable<? extends R>) source;

                                active = true;

                                arbiter.setProducer(new ConcatMapInnerScalarProducer<T, R>(scalarSource.get(), this));
                            } else {
                                ConcatMapInnerSubscriber<T, R> innerSubscriber = new ConcatMapInnerSubscriber<T, R>(this);
                                inner.set(innerSubscriber);

                                if (!innerSubscriber.isUnsubscribed()) {
                                    active = true;

                                    source.unsafeSubscribe(innerSubscriber);
                                } else {
                                    return;
                                }
                            }
                            request(1);
                        } else {
                            request(1);
                            continue;
                        }
                    }
                }
                if (wip.decrementAndGet() == 0) {
                    break;
                }
            }
        }

        void drainError(Throwable mapperError) {
            unsubscribe();

            if (ExceptionsUtils.addThrowable(error, mapperError)) {
                Throwable ex = ExceptionsUtils.terminate(error);
                if (!ExceptionsUtils.isTerminated(ex)) {
                    actual.onError(ex);
                }
            } else {
                pluginError(mapperError);
            }
        }
    }

    static final class ConcatMapInnerSubscriber<T, R> extends Subscriber<R> {
        final ConcatMapSubscriber<T, R> parent;

        long produced;

        public ConcatMapInnerSubscriber(ConcatMapSubscriber<T, R> parent) {
            this.parent = parent;
        }

        @Override
        public void setProducer(Producer p) {
            parent.arbiter.setProducer(p);
        }

        @Override
        public void onNext(R t) {
            produced++;
            parent.innerNext(t);
        }

        @Override
        public void onError(Throwable e) {
            parent.innerError(e, produced);
        }

        @Override
        public void onCompleted() {
            parent.innerCompleted(produced);
        }
    }

    static final class ConcatMapInnerScalarProducer<T, R> implements Producer {
        final R value;

        final ConcatMapSubscriber<T, R> parent;

        boolean once;

        public ConcatMapInnerScalarProducer(R value, ConcatMapSubscriber<T, R> parent) {
            this.value = value;
            this.parent = parent;
        }

        @Override
        public void request(long n) {
            if (!once && n > 0L) {
                once = true;
                ConcatMapSubscriber<T, R> p = parent;
                p.innerNext(value);
                p.innerCompleted(1);
            }
        }
    }
}