ReactiveX/RxJava

View on GitHub
src/main/java/rx/observers/SerializedObserver.java

Summary

Maintainability
C
1 day
Test Coverage
/**
 * Copyright 2014 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package rx.observers;

import rx.Observer;
import rx.exceptions.*;
import rx.internal.operators.NotificationLite;

/**
 * Enforces single-threaded, serialized, ordered execution of {@link #onNext}, {@link #onCompleted}, and
 * {@link #onError}.
 * <p>
 * When multiple threads are emitting and/or notifying they will be serialized by:
 * </p><ul>
 * <li>Allowing only one thread at a time to emit</li>
 * <li>Adding notifications to a queue if another thread is already emitting</li>
 * <li>Not holding any locks or blocking any threads while emitting</li>
 * </ul>
 *
 * @param <T>
 *          the type of items expected to be observed by the {@code Observer}
 */
public class SerializedObserver<T> implements Observer<T> {
    private final Observer<? super T> actual;

    private boolean emitting;
    /** Set to true if a terminal event was received. */
    private volatile boolean terminated;
    /** If not null, it indicates more work. */
    private FastList queue;

    static final class FastList {
        Object[] array;
        int size;

        public void add(Object o) {
            int s = size;
            Object[] a = array;
            if (a == null) {
                a = new Object[16];
                array = a;
            } else if (s == a.length) {
                Object[] array2 = new Object[s + (s >> 2)];
                System.arraycopy(a, 0, array2, 0, s);
                a = array2;
                array = a;
            }
            a[s] = o;
            size = s + 1;
        }
    }

    public SerializedObserver(Observer<? super T> s) {
        this.actual = s;
    }

    @Override
    public void onNext(T t) {
        if (terminated) {
            return;
        }
        synchronized (this) {
            if (terminated) {
                return;
            }
            if (emitting) {
                FastList list = queue;
                if (list == null) {
                    list = new FastList();
                    queue = list;
                }
                list.add(NotificationLite.next(t));
                return;
            }
            emitting = true;
        }
        try {
            actual.onNext(t);
        } catch (Throwable e) {
            terminated = true;
            Exceptions.throwOrReport(e, actual, t);
            return;
        }
        for (;;) {
            FastList list;
            synchronized (this) {
                list = queue;
                if (list == null) {
                    emitting = false;
                    return;
                }
                queue = null;
            }
            for (Object o : list.array) {
                if (o == null) {
                    break;
                }
                try {
                    if (NotificationLite.accept(actual, o)) {
                        terminated = true;
                        return;
                    }
                } catch (Throwable e) {
                    terminated = true;
                    Exceptions.throwIfFatal(e);
                    actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
                    return;
                }
            }
        }
    }

    @Override
    public void onError(final Throwable e) {
        Exceptions.throwIfFatal(e);
        if (terminated) {
            return;
        }
        synchronized (this) {
            if (terminated) {
                return;
            }
            terminated = true;
            if (emitting) {
                /*
                 * FIXME: generally, errors jump the queue but this wasn't true
                 * for SerializedObserver and may break existing expectations.
                 */
                FastList list = queue;
                if (list == null) {
                    list = new FastList();
                    queue = list;
                }
                list.add(NotificationLite.error(e));
                return;
            }
            emitting = true;
        }
        actual.onError(e);
    }

    @Override
    public void onCompleted() {
        if (terminated) {
            return;
        }
        synchronized (this) {
            if (terminated) {
                return;
            }
            terminated = true;
            if (emitting) {
                FastList list = queue;
                if (list == null) {
                    list = new FastList();
                    queue = list;
                }
                list.add(NotificationLite.completed());
                return;
            }
            emitting = true;
        }
        actual.onCompleted();
    }
}