ReactiveX/RxJava

View on GitHub
src/main/java/rx/internal/util/BackpressureDrainManager.java

Summary

Maintainability
C
1 day
Test Coverage
/**
 * Copyright 2014 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package rx.internal.util;

import java.util.concurrent.atomic.AtomicLong;

import rx.Producer;

/**
 * Manages the producer-backpressure-consumer interplay by
 * matching up available elements with requested elements and/or
 * terminal events.
 * <p>History: 1.1.0 - experimental
 * @since 1.3
 */
public final class BackpressureDrainManager extends AtomicLong implements Producer {
    /** */
    private static final long serialVersionUID = 2826241102729529449L;
    /** Indicates if one is in emitting phase, guarded by this. */
    boolean emitting;
    /** Indicates a terminal state. */
    volatile boolean terminated;
    /** Indicates an error state, barrier is provided via terminated. */
    Throwable exception;
    /** The callbacks to manage the drain. */
    final BackpressureQueueCallback actual;

    /**
     * Interface representing the minimal callbacks required
     * to operate the drain part of a backpressure system.
     */
    public interface BackpressureQueueCallback {
        /**
         * Override this method to peek for the next element,
         * null meaning no next element available now.
         * <p>It will be called plain and while holding this object's monitor.
         * @return the next element or null if no next element available
         */
        Object peek();
        /**
         * Override this method to poll (consume) the next element,
         * null meaning no next element available now.
         * @return the next element or null if no next element available
         */
        Object poll();
        /**
         * Override this method to deliver an element to downstream.
         * The logic ensures that this happens only in the right conditions.
         * @param value the value to deliver, not null
         * @return true indicates that one should terminate the emission loop unconditionally
         * and not deliver any further elements or terminal events.
         */
        boolean accept(Object value);
        /**
         * Override this method to deliver a normal or exceptional
         * terminal event.
         * @param exception if not null, contains the terminal exception
         */
        void complete(Throwable exception);
    }

    /**
     * Constructs a backpressure drain manager with 0 requestedCount,
     * no terminal event and not emitting.
     * @param actual he queue callback to check for new element availability
     */
    public BackpressureDrainManager(BackpressureQueueCallback actual) {
        this.actual = actual;
    }
    /**
     * Checks if a terminal state has been reached.
     * @return true if a terminal state has been reached
     */
    public boolean isTerminated() {
        return terminated;
    }
    /**
     * Move into a terminal state.
     * Call drain() anytime after.
     */
    public void terminate() {
        terminated = true;
    }
    /**
     * Move into a terminal state with an exception.
     * Call drain() anytime after.
     * <p>Serialized access is expected with respect to
     * element emission.
     * @param error the exception to deliver
     */
    public void terminate(Throwable error) {
        if (!terminated) {
            exception = error;
            terminated = true;
        }
    }
    /**
     * Move into a terminal state and drain.
     */
    public void terminateAndDrain() {
        terminated = true;
        drain();
    }
    /**
     * Move into a terminal state with an exception and drain.
     * <p>Serialized access is expected with respect to
     * element emission.
     * @param error the exception to deliver
     */
    public void terminateAndDrain(Throwable error) {
        if (!terminated) {
            exception = error;
            terminated = true;
            drain();
        }
    }
    @Override
    public void request(long n) {
        if (n == 0) {
            return;
        }
        boolean mayDrain;
        long r;
        long u;
        do {
            r = get();
            mayDrain = r == 0;
            if (r == Long.MAX_VALUE) {
                break;
            }
            if (n == Long.MAX_VALUE) {
                u = n;
                mayDrain = true;
            } else {
                if (r > Long.MAX_VALUE - n) {
                    u = Long.MAX_VALUE;
                } else {
                    u = r + n;
                }
            }
        } while (!compareAndSet(r, u));
        // since we implement producer, we have to call drain
        // on a 0-n request transition
        if (mayDrain) {
            drain();
        }
    }
    /**
     * Try to drain the "queued" elements and terminal events
     * by considering the available and requested event counts.
     */
    public void drain() {
        boolean term;
        synchronized (this) {
            if (emitting) {
                return;
            }
            emitting = true;
            term = terminated;
        }
        long n = get();
        boolean skipFinal = false;
        try {
            BackpressureQueueCallback a = actual;
            while (true) {
                int emitted = 0;
                while (n > 0 || term) {
                    Object o;
                    if (term) {
                        o = a.peek();
                        if (o == null) {
                            skipFinal = true;
                            Throwable e = exception;
                            a.complete(e);
                            return;
                        }
                        if (n == 0) {
                            break;
                        }
                    }
                    o = a.poll();
                    if (o == null) {
                        break;
                    } else {
                        if (a.accept(o)) {
                            skipFinal = true;
                            return;
                        }
                        n--;
                        emitted++;
                    }
                }
                synchronized (this) {
                    term = terminated;
                    boolean more = a.peek() != null;
                    // if no backpressure below
                    if (get() == Long.MAX_VALUE) {
                        // no new data arrived since the last poll
                        if (!more && !term) {
                            skipFinal = true;
                            emitting = false;
                            return;
                        }
                        n = Long.MAX_VALUE;
                    } else {
                        n = addAndGet(-emitted);
                        if ((n == 0 || !more) && (!term || more)) {
                            skipFinal = true;
                            emitting = false;
                            return;
                        }
                    }
                }
            }
        } finally {
            if (!skipFinal) {
                synchronized (this) {
                    emitting = false;
                }
            }
        }

    }
}