ReactiveX/RxJava

View on GitHub
src/main/java/rx/internal/operators/BackpressureUtils.java

Summary

Maintainability
C
1 day
Test Coverage
/**
 * Copyright 2015 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package rx.internal.operators;

import java.util.Queue;
import java.util.concurrent.atomic.*;

import rx.Subscriber;
import rx.functions.Func1;
import rx.internal.util.UtilityFunctions;

/**
 * Utility functions for use with backpressure.
 *
 */
public final class BackpressureUtils {
    /**
     * Masks the most significant bit, i.e., 0x8000_0000_0000_0000L.
     */
    static final long COMPLETED_MASK = Long.MIN_VALUE;
    /**
     * Masks the request amount bits, i.e., 0x7FFF_FFFF_FFFF_FFFF.
     */
    static final long REQUESTED_MASK = Long.MAX_VALUE;

    /** Utility class, no instances. */
    private BackpressureUtils() {
        throw new IllegalStateException("No instances!");
    }

    /**
     * Adds {@code n} (not validated) to {@code requested} and returns the value prior to addition once the
     * addition is successful (uses CAS semantics). If overflows then sets
     * {@code requested} field to {@code Long.MAX_VALUE}.
     *
     * @param requested
     *            atomic long that should be updated
     * @param n
     *            the number of requests to add to the requested count, positive (not validated)
     * @return requested value just prior to successful addition
     */
    public static long getAndAddRequest(AtomicLong requested, long n) {
        // add n to field but check for overflow
        while (true) {
            long current = requested.get();
            long next = addCap(current, n);
            if (requested.compareAndSet(current, next)) {
                return current;
            }
        }
    }

    /**
     * Multiplies two positive longs and caps the result at Long.MAX_VALUE.
     * @param a the first value
     * @param b the second value
     * @return the capped product of a and b
     */
    public static long multiplyCap(long a, long b) {
        long u = a * b;
        if (((a | b) >>> 31) != 0) {
            if (b != 0L && (u / b != a)) {
                u = Long.MAX_VALUE;
            }
        }
        return u;
    }

    /**
     * Adds two positive longs and caps the result at Long.MAX_VALUE.
     * @param a the first value
     * @param b the second value
     * @return the capped sum of a and b
     */
    public static long addCap(long a, long b) {
        long u = a + b;
        if (u < 0L) {
            u = Long.MAX_VALUE;
        }
        return u;
    }

    /**
     * Signals the completion of the main sequence and switches to post-completion replay mode.
     *
     * <p>
     * Don't modify the queue after calling this method!
     *
     * <p>
     * Post-completion backpressure handles the case when a source produces values based on
     * requests when it is active but more values are available even after its completion.
     * In this case, the onCompleted() can't just emit the contents of the queue but has to
     * coordinate with the requested amounts. This requires two distinct modes: active and
     * completed. In active mode, requests flow through and the queue is not accessed but
     * in completed mode, requests no-longer reach the upstream but help in draining the queue.
     * <p>
     * The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since
     * request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't
     * allowed.
     *
     * @param <T> the value type to emit
     * @param requested the holder of current requested amount
     * @param queue the queue holding values to be emitted after completion
     * @param actual the subscriber to receive the values
     */
    public static <T> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<? super T> actual) {
        postCompleteDone(requested, queue, actual, UtilityFunctions.<T>identity());
    }

    /**
     * Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests.
     *
     * <p>
     * Post-completion backpressure handles the case when a source produces values based on
     * requests when it is active but more values are available even after its completion.
     * In this case, the onCompleted() can't just emit the contents of the queue but has to
     * coordinate with the requested amounts. This requires two distinct modes: active and
     * completed. In active mode, requests flow through and the queue is not accessed but
     * in completed mode, requests no-longer reach the upstream but help in draining the queue.
     *
     * @param <T> the value type to emit
     * @param requested the holder of current requested amount
     * @param n the value requested;
     * @param queue the queue holding values to be emitted after completion
     * @param actual the subscriber to receive the values
     * @return true if in the active mode and the request amount of n can be relayed to upstream, false if
     * in the post-completed mode and the queue is draining.
     */
    public static <T> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super T> actual) {
        return postCompleteRequest(requested, n, queue, actual, UtilityFunctions.<T>identity());
    }

    /**
     * Signals the completion of the main sequence and switches to post-completion replay mode
     * and allows exit transformation on the queued values.
     *
     * <p>
     * Don't modify the queue after calling this method!
     *
     * <p>
     * Post-completion backpressure handles the case when a source produces values based on
     * requests when it is active but more values are available even after its completion.
     * In this case, the onCompleted() can't just emit the contents of the queue but has to
     * coordinate with the requested amounts. This requires two distinct modes: active and
     * completed. In active mode, requests flow through and the queue is not accessed but
     * in completed mode, requests no-longer reach the upstream but help in draining the queue.
     * <p>
     * The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since
     * request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't
     * allowed.
     *
     * @param <T> the value type in the queue
     * @param <R> the value type to emit
     * @param requested the holder of current requested amount
     * @param queue the queue holding values to be emitted after completion
     * @param actual the subscriber to receive the values
     * @param exitTransform the transformation to apply on the dequeued value to get the value to be emitted
     */
    public static <T, R> void postCompleteDone(AtomicLong requested, Queue<T> queue, Subscriber<? super R> actual, Func1<? super T, ? extends R> exitTransform) {
        for (;;) {
            long r = requested.get();

            // switch to completed mode only once
            if ((r & COMPLETED_MASK) != 0L) {
                return;
            }

            //
            long u = r | COMPLETED_MASK;

            if (requested.compareAndSet(r, u)) {
                // if we successfully switched to post-complete mode and there
                // are requests available start draining the queue
                if (r != 0L) {
                    // if the switch happened when there was outstanding requests, start draining
                    postCompleteDrain(requested, queue, actual, exitTransform);
                }
                return;
            }
        }
    }

    /**
     * Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests
     * and allows exit transformation on the queued values.
     *
     * <p>
     * Post-completion backpressure handles the case when a source produces values based on
     * requests when it is active but more values are available even after its completion.
     * In this case, the onCompleted() can't just emit the contents of the queue but has to
     * coordinate with the requested amounts. This requires two distinct modes: active and
     * completed. In active mode, requests flow through and the queue is not accessed but
     * in completed mode, requests no-longer reach the upstream but help in draining the queue.
     *
     * @param <T> the value type in the queue
     * @param <R> the value type to emit
     * @param requested the holder of current requested amount
     * @param n the value requested;
     * @param queue the queue holding values to be emitted after completion
     * @param actual the subscriber to receive the values
     * @param exitTransform the transformation to apply on the dequeued value to get the value to be emitted
     * @return true if in the active mode and the request amount of n can be relayed to upstream, false if
     * in the post-completed mode and the queue is draining.
     */
    public static <T, R> boolean postCompleteRequest(AtomicLong requested, long n, Queue<T> queue, Subscriber<? super R> actual, Func1<? super T, ? extends R> exitTransform) {
        if (n < 0L) {
            throw new IllegalArgumentException("n >= 0 required but it was " + n);
        }
        if (n == 0) {
            return (requested.get() & COMPLETED_MASK) == 0;
        }

        for (;;) {
            long r = requested.get();

            // mask of the completed flag
            long c = r & COMPLETED_MASK;
            // mask of the requested amount
            long u = r & REQUESTED_MASK;

            // add the current requested amount and the new requested amount
            // cap at Long.MAX_VALUE;
            long v = addCap(u, n);

            // restore the completed flag
            v |= c;

            if (requested.compareAndSet(r, v)) {
                // if there was no outstanding request before and in
                // the post-completed state, start draining
                if (r == COMPLETED_MASK) {
                    postCompleteDrain(requested, queue, actual, exitTransform);
                    return false;
                }
                // returns true for active mode and false if the completed flag was set
                return c == 0L;
            }
        }
    }

    /**
     * Drains the queue based on the outstanding requests in post-completed mode (only!)
     * and allows exit transformation on the queued values.
     *
     * @param <T> the value type in the queue
     * @param <R> the value type to emit
     * @param requested the holder of current requested amount
     * @param queue the queue holding values to be emitted after completion
     * @param subscriber the subscriber to receive the values
     * @param exitTransform the transformation to apply on the dequeued value to get the value to be emitted
     */
    static <T, R> void postCompleteDrain(AtomicLong requested, Queue<T> queue, Subscriber<? super R> subscriber, Func1<? super T, ? extends R> exitTransform) {

        long r = requested.get();

        // Run on a fast-path if the downstream is unbounded
        if (r == Long.MAX_VALUE) {
            for (;;) {
                if (subscriber.isUnsubscribed()) {
                    return;
                }

                T v = queue.poll();

                if (v == null) {
                    subscriber.onCompleted();
                    return;
                }

                subscriber.onNext(exitTransform.call(v));
            }
        }
        /*
         * Since we are supposed to be in the post-complete state,
         * requested will have its top bit set.
         * To allow direct comparison, we start with an emission value which has also
         * this flag set, then increment it as usual.
         * Since COMPLETED_MASK is essentially Long.MIN_VALUE,
         * there won't be any overflow or sign flip.
         */
        long e = COMPLETED_MASK;

        for (;;) {

            /*
             * This is an improved queue-drain algorithm with a specialization
             * in which we know the queue won't change anymore (i.e., done is always true
             * when looking at the classical algorithm and there is no error).
             *
             * Note that we don't check for cancellation or emptiness upfront for two reasons:
             * 1) if e != r, the loop will do this and we quit appropriately
             * 2) if e == r, then either there was no outstanding requests or we emitted the requested amount
             *    and the execution simply falls to the e == r check below which checks for emptiness anyway.
             */

            while (e != r) {
                if (subscriber.isUnsubscribed()) {
                    return;
                }

                T v = queue.poll();

                if (v == null) {
                    subscriber.onCompleted();
                    return;
                }

                subscriber.onNext(exitTransform.call(v));

                e++;
            }

            /*
             * If the emission count reaches the requested amount the same time the queue becomes empty
             * this will make sure the subscriber is completed immediately instead of on the next request.
             * This is also true if there are no outstanding requests (this the while loop doesn't run)
             * and the queue is empty from the start.
             */
            if (e == r) {
                if (subscriber.isUnsubscribed()) {
                    return;
                }
                if (queue.isEmpty()) {
                    subscriber.onCompleted();
                    return;
                }
            }

            /*
             * Fast flow: see if more requests have arrived in the meantime.
             * This avoids an atomic add (~40 cycles) and resumes the emission immediately.
             */
            r = requested.get();

            if (r == e) {
                /*
                 * Atomically decrement the requested amount by the emission amount.
                 * We can't use the full emission value because of the completed flag,
                 * however, due to two's complement representation, the flag on requested
                 * is preserved.
                 */
                r = requested.addAndGet(-(e & REQUESTED_MASK));
                // The requested amount actually reached zero, quit
                if (r == COMPLETED_MASK) {
                    return;
                }
                // reset the emission count
                e = COMPLETED_MASK;
            }
        }
    }

    /**
     * Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE.
     * @param requested the requested amount holder
     * @param n the value to subtract from the requested amount, has to be positive (not verified)
     * @return the new requested amount
     * @throws IllegalStateException if n is greater than the current requested amount, which
     * indicates a bug in the request accounting logic
     */
    public static long produced(AtomicLong requested, long n) {
        for (;;) {
            long current = requested.get();
            if (current == Long.MAX_VALUE) {
                return Long.MAX_VALUE;
            }
            long next = current - n;
            if (next < 0L) {
                throw new IllegalStateException("More produced than requested: " + next);
            }
            if (requested.compareAndSet(current, next)) {
                return next;
            }
        }
    }

    /**
     * Validates the requested amount and returns true if it is positive.
     * @param n the requested amount
     * @return true if n is positive
     * @throws IllegalArgumentException if n is negative
     */
    public static boolean validate(long n) {
        if (n < 0) {
            throw new IllegalArgumentException("n >= 0 required but it was " + n);
        }
        return n != 0L;
    }
}