ReactiveX/RxJava

View on GitHub
src/main/java/rx/internal/operators/OperatorWithLatestFromMany.java

Summary

Maintainability
A
3 hrs
Test Coverage
/**
 * Copyright 2014 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package rx.internal.operators;

import java.util.Arrays;
import java.util.concurrent.atomic.*;

import rx.*;
import rx.Observable.OnSubscribe;
import rx.exceptions.Exceptions;
import rx.functions.FuncN;
import rx.observers.SerializedSubscriber;
import rx.plugins.RxJavaHooks;

public final class OperatorWithLatestFromMany<T, R> implements OnSubscribe<R> {
    final Observable<T> main;

    final Observable<?>[] others;

    final Iterable<Observable<?>> othersIterable;

    final FuncN<R> combiner;

    public OperatorWithLatestFromMany(Observable<T> main, Observable<?>[] others, Iterable<Observable<?>> othersIterable, FuncN<R> combiner) {
        this.main = main;
        this.others = others;
        this.othersIterable = othersIterable;
        this.combiner = combiner;
    }

    @Override
    public void call(Subscriber<? super R> t) {
        SerializedSubscriber<R> serial = new SerializedSubscriber<R>(t);


        Observable<?>[] sources;
        int n = 0;

        if (others != null) {
            sources = others;
            n = sources.length;
        } else {
            sources = new Observable[8];
            for (Observable<?> o : othersIterable) {
                if (n == sources.length) {
                    sources = Arrays.copyOf(sources, n + (n >> 2));
                }
                sources[n++] = o;
            }
        }

        WithLatestMainSubscriber<T, R> parent = new WithLatestMainSubscriber<T, R>(t, combiner, n);

        serial.add(parent);


        for (int i = 0; i < n; i++) {
            if (serial.isUnsubscribed()) {
                return;
            }

            WithLatestOtherSubscriber inner = new WithLatestOtherSubscriber(parent, i + 1);
            parent.add(inner);

            Observable<?> o = sources[i];
            o.unsafeSubscribe(inner);
        }

        main.unsafeSubscribe(parent);
    }

    static final class WithLatestMainSubscriber<T, R> extends Subscriber<T> {
        final Subscriber<? super R> actual;

        final FuncN<R> combiner;

        final AtomicReferenceArray<Object> current;

        static final Object EMPTY = new Object();

        final AtomicInteger ready;

        boolean done;

        public WithLatestMainSubscriber(Subscriber<? super R> actual, FuncN<R> combiner, int n) {
            this.actual = actual;
            this.combiner = combiner;

            AtomicReferenceArray<Object> array = new AtomicReferenceArray<Object>(n + 1);
            for (int i = 0; i <= n; i++) {
                array.lazySet(i, EMPTY);
            }
            this.current = array;

            this.ready = new AtomicInteger(n);
            this.request(0);
        }

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (ready.get() == 0) {

                AtomicReferenceArray<Object> array = current;
                int n = array.length();
                array.lazySet(0, t);

                Object[] copy = new Object[array.length()];
                for (int i = 0; i < n; i++) {
                    copy[i] = array.get(i);
                }

                R result;

                try {
                    result = combiner.call(copy);
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    onError(ex);
                    return;
                }

                actual.onNext(result);
            } else {
                request(1);
            }
        }

        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;
            unsubscribe();
            actual.onError(e);
        }

        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            done = true;
            unsubscribe();
            actual.onCompleted();
        }

        @Override
        public void setProducer(Producer p) {
            super.setProducer(p);
            actual.setProducer(p);
        }

        void innerNext(int index, Object o) {
            Object last = current.getAndSet(index, o);
            if (last == EMPTY) {
                ready.decrementAndGet();
            }
        }

        void innerError(int index, Throwable e) {
            onError(e);
        }

        void innerComplete(int index) {
            if (current.get(index) == EMPTY) {
                onCompleted();
            }
        }
    }

    static final class WithLatestOtherSubscriber extends Subscriber<Object> {
        final WithLatestMainSubscriber<?, ?> parent;

        final int index;

        public WithLatestOtherSubscriber(WithLatestMainSubscriber<?, ?> parent, int index) {
            this.parent = parent;
            this.index = index;
        }

        @Override
        public void onNext(Object t) {
            parent.innerNext(index, t);
        }

        @Override
        public void onError(Throwable e) {
            parent.innerError(index, e);
        }

        @Override
        public void onCompleted() {
            parent.innerComplete(index);
        }
    }
}