ReactiveX/RxJava

View on GitHub
src/main/java/rx/internal/operators/OperatorWindowWithStartEndObservable.java

Summary

Maintainability
B
4 hrs
Test Coverage
/**
 * Copyright 2014 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package rx.internal.operators;

import java.util.*;

import rx.Observable;
import rx.Observable.Operator;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Func1;
import rx.observers.*;
import rx.subjects.UnicastSubject;
import rx.subscriptions.CompositeSubscription;

/**
 * Creates potentially overlapping windows of the source items where each window is
 * started by a value emitted by an observable and closed when an associated Observable emits
 * a value or completes.
 *
 * @param <T> the value type
 * @param <U> the type of the window opening event
 * @param <V> the type of the window closing event
 */
public final class OperatorWindowWithStartEndObservable<T, U, V> implements Operator<Observable<T>, T> {
    final Observable<? extends U> windowOpenings;
    final Func1<? super U, ? extends Observable<? extends V>> windowClosingSelector;

    public OperatorWindowWithStartEndObservable(Observable<? extends U> windowOpenings,
            Func1<? super U, ? extends Observable<? extends V>> windowClosingSelector) {
        this.windowOpenings = windowOpenings;
        this.windowClosingSelector = windowClosingSelector;
    }

    @Override
    public Subscriber<? super T> call(Subscriber<? super Observable<T>> child) {
        CompositeSubscription composite = new CompositeSubscription();
        child.add(composite);

        final SourceSubscriber sub = new SourceSubscriber(child, composite);

        Subscriber<U> open = new Subscriber<U>() {

            @Override
            public void onStart() {
                request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(U t) {
                sub.beginWindow(t);
            }

            @Override
            public void onError(Throwable e) {
                sub.onError(e);
            }

            @Override
            public void onCompleted() {
                sub.onCompleted();
            }
        };

        composite.add(sub);
        composite.add(open);

        windowOpenings.unsafeSubscribe(open);

        return sub;
    }
    /** Serialized access to the subject. */
    static final class SerializedSubject<T> {
        final Observer<T> consumer;
        final Observable<T> producer;

        public SerializedSubject(Observer<T> consumer, Observable<T> producer) {
            this.consumer = new SerializedObserver<T>(consumer);
            this.producer = producer;
        }

    }
    final class SourceSubscriber extends Subscriber<T> {
        final Subscriber<? super Observable<T>> child;
        final CompositeSubscription composite;
        final Object guard;
        /** Guarded by guard. */
        final List<SerializedSubject<T>> chunks;
        /** Guarded by guard. */
        boolean done;
        public SourceSubscriber(Subscriber<? super Observable<T>> child, CompositeSubscription composite) {
            this.child = new SerializedSubscriber<Observable<T>>(child);
            this.guard = new Object();
            this.chunks = new LinkedList<SerializedSubject<T>>();
            this.composite = composite;
        }

        @Override
        public void onStart() {
            request(Long.MAX_VALUE);
        }

        @Override
        public void onNext(T t) {
            List<SerializedSubject<T>> list;
            synchronized (guard) {
                if (done) {
                    return;
                }
                list = new ArrayList<SerializedSubject<T>>(chunks);
            }
            for (SerializedSubject<T> cs : list) {
                cs.consumer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable e) {
            try {
                List<SerializedSubject<T>> list;
                synchronized (guard) {
                    if (done) {
                        return;
                    }
                    done = true;
                    list = new ArrayList<SerializedSubject<T>>(chunks);
                    chunks.clear();
                }
                for (SerializedSubject<T> cs : list) {
                    cs.consumer.onError(e);
                }
                child.onError(e);
            } finally {
                composite.unsubscribe();
            }
        }

        @Override
        public void onCompleted() {
            try {
                List<SerializedSubject<T>> list;
                synchronized (guard) {
                    if (done) {
                        return;
                    }
                    done = true;
                    list = new ArrayList<SerializedSubject<T>>(chunks);
                    chunks.clear();
                }
                for (SerializedSubject<T> cs : list) {
                    cs.consumer.onCompleted();
                }
                child.onCompleted();
            } finally {
                composite.unsubscribe();
            }
        }

        void beginWindow(U token) {
            final SerializedSubject<T> s = createSerializedSubject();
            synchronized (guard) {
                if (done) {
                    return;
                }
                chunks.add(s);
            }
            child.onNext(s.producer);

            Observable<? extends V> end;
            try {
                end = windowClosingSelector.call(token);
            } catch (Throwable e) {
                onError(e);
                return;
            }

            Subscriber<V> v = new Subscriber<V>() {
                boolean once = true;
                @Override
                public void onNext(V t) {
                    onCompleted();
                }

                @Override
                public void onError(Throwable e) {
                    SourceSubscriber.this.onError(e);
                }

                @Override
                public void onCompleted() {
                    if (once) {
                        once = false;
                        endWindow(s);
                        composite.remove(this);
                    }
                }

            };
            composite.add(v);

            end.unsafeSubscribe(v);
        }
        void endWindow(SerializedSubject<T> window) {
            boolean terminate = false;
            synchronized (guard) {
                if (done) {
                    return;
                }
                Iterator<SerializedSubject<T>> it = chunks.iterator();
                while (it.hasNext()) {
                    SerializedSubject<T> s = it.next();
                    if (s == window) {
                        terminate = true;
                        it.remove();
                        break;
                    }
                }
            }
            if (terminate) {
                window.consumer.onCompleted();
            }
        }
        SerializedSubject<T> createSerializedSubject() {
            UnicastSubject<T> bus = UnicastSubject.create();
            return new SerializedSubject<T>(bus, bus);
        }
    }
}