ReactiveX/RxJava

View on GitHub
src/main/java/rx/internal/util/IndexedRingBuffer.java

Summary

Maintainability
C
1 day
Test Coverage
/**
 * Copyright 2014 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package rx.internal.util;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;

import rx.Subscription;
import rx.functions.Func1;

/**
 * Add/Remove without object allocation (after initial construction).
 * <p>
 * This is meant for hundreds or single-digit thousands of elements that need
 * to be rapidly added and randomly or sequentially removed while avoiding object allocation.
 * <p>
 * On Intel Core i7, 2.3Mhz, Mac Java 8:
 * <p>
 * - adds per second single-threaded => ~32,598,500 for 100
 * - adds per second single-threaded => ~23,200,000 for 10,000
 * - adds + removes per second single-threaded => 15,562,100 for 100
 * - adds + removes per second single-threaded => 8,760,000 for 10,000
 *
 * <pre> {@code
 * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   263571.721     9856.994    ops/s
 * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5     1763.417      211.998    ops/s
 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   139850.115    17143.705    ops/s
 * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5      809.982       72.931    ops/s
 * } </pre>
 *
 * @param <E>
 */
public final class IndexedRingBuffer<E> implements Subscription {
    private final ElementSection<E> elements = new ElementSection<E>();
    private final IndexSection removed = new IndexSection();
    /* package for unit testing */final AtomicInteger index = new AtomicInteger();
    /* package for unit testing */final AtomicInteger removedIndex = new AtomicInteger();

    /* package for unit testing */static final int SIZE;

    // default size of ring buffer
    /**
     * Set at 256 ... Android defaults far smaller which likely will never hit the use cases that require the higher buffers.
     * <p>
     * The 10000 size test represents something that should be a rare use case (merging 10000 concurrent Observables for example)
     *
     * <pre> {@code
     * ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*IndexedRingBufferPerf.*'
     *
     * 1024
     *
     * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   269292.006     6013.347    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5     2217.103      163.396    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   139349.608     9397.232    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5     1045.323       30.991    ops/s
     *
     * 512
     *
     * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   270919.870     5381.793    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5     1724.436       42.287    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   141478.813     3696.030    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5      719.447       75.629    ops/s
     *
     *
     * 256
     *
     * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   272042.605     7954.982    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5     1101.329       23.566    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   140479.804     6389.060    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5      397.306       24.222    ops/s
     *
     * 128
     *
     * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   263065.312    11168.941    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5      581.708       17.397    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   138051.488     4618.935    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5      176.873       35.669    ops/s
     *
     * 32
     *
     * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   250737.473    17260.148    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5      144.725       26.284    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   118832.832     9082.658    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5       32.133        8.048    ops/s
     *
     * 8
     *
     * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5   209192.847    25558.124    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5       26.520        3.100    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5   100200.463     1854.259    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5        8.456        2.114    ops/s
     *
     * 2
     *
     * Benchmark                                              (size)   Mode   Samples        Score  Score error    Units
     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd            100  thrpt         5    96549.208     4427.239    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAdd          10000  thrpt         5        6.637        2.025    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove      100  thrpt         5    34553.169     4904.197    ops/s
     * r.i.IndexedRingBufferPerf.indexedRingBufferAddRemove    10000  thrpt         5        2.159        0.700    ops/s
     * } </pre>
     *
     * Impact of IndexedRingBuffer size on merge
     *
     * <pre> {@code
     * ./gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*OperatorMergePerf.*'
     *
     * 512
     *
     * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
     * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5282500.038   530541.761    ops/s
     * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    49327.272     6382.189    ops/s
     * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       53.025        4.724    ops/s
     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    97395.148     2489.303    ops/s
     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.723        1.479    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4534067.250   116321.725    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   458561.098    27652.081    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    43267.381     2648.107    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5581051.672   144191.849    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       50.643        4.354    ops/s
     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    76437.644      959.748    ops/s
     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     2965.306      272.928    ops/s
     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5026522.098   364196.255    ops/s
     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    34926.819      938.612    ops/s
     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       33.342        1.701    ops/s
     *
     *
     * 128
     *
     * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
     * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5144891.776   271990.561    ops/s
     * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    53580.161     2370.204    ops/s
     * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       53.265        2.236    ops/s
     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    96634.426     1417.430    ops/s
     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.648        0.255    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4601280.220    53157.938    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   463394.568    58612.882    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    50503.565     2394.168    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5490315.842   228654.817    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       50.661        3.385    ops/s
     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    74716.169     7413.642    ops/s
     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     3009.476      277.075    ops/s
     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  4953313.642   307512.126    ops/s
     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    35335.579     2368.377    ops/s
     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       37.450        0.655    ops/s
     *
     * 32
     *
     * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
     * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  4975957.497   365423.694    ops/s
     * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    52141.226     5056.658    ops/s
     * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       53.663        2.671    ops/s
     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    96507.893     1833.371    ops/s
     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.850        0.782    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4557128.302   118516.934    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   339005.037    10594.737    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    50781.535     6071.787    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5604920.068   209285.840    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       50.413        7.496    ops/s
     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    76098.942      558.187    ops/s
     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     2988.137      193.255    ops/s
     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5177255.256   150253.086    ops/s
     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    34772.490      909.967    ops/s
     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       34.847        0.606    ops/s
     *
     * 8
     *
     * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
     * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5027331.903   337986.410    ops/s
     * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    51746.540     3585.450    ops/s
     * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       52.682        4.026    ops/s
     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    96805.587     2868.112    ops/s
     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.598        0.290    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4390912.630   300687.310    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   458615.731    56125.958    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    49033.105     6132.936    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5090614.100   649439.778    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       48.548        3.586    ops/s
     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    72285.482    16820.952    ops/s
     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     2981.576      316.140    ops/s
     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  4993609.293   267975.397    ops/s
     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    33228.972     1554.924    ops/s
     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       32.994        3.615    ops/s
     *
     *
     * 2
     *
     * Benchmark                                          (size)   Mode   Samples        Score  Score error    Units
     * r.o.OperatorMergePerf.merge1SyncStreamOfN               1  thrpt         5  5103812.234   939461.192    ops/s
     * r.o.OperatorMergePerf.merge1SyncStreamOfN            1000  thrpt         5    51491.116     3790.056    ops/s
     * r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000  thrpt         5       54.043        2.340    ops/s
     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1  thrpt         5    96575.834    13416.541    ops/s
     * r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000  thrpt         5        4.740        0.047    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1  thrpt         5  4435909.832   899133.671    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100  thrpt         5   392382.445    59814.783    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000  thrpt         5    50429.258     7489.849    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1  thrpt         5  5637321.803   161838.195    ops/s
     * r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000  thrpt         5       51.065        2.138    ops/s
     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1  thrpt         5    76366.764     2631.710    ops/s
     * r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000  thrpt         5     2978.302      296.418    ops/s
     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1  thrpt         5  5280829.290  1602542.493    ops/s
     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000  thrpt         5    35070.518     3565.672    ops/s
     * r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000  thrpt         5       34.501        0.991    ops/s
     *
     * } </pre>
     */
    static {
        int defaultSize = 128;

        // lower default for Android (https://github.com/ReactiveX/RxJava/issues/1820)
        if (PlatformDependent.isAndroid()) {
            defaultSize = 8;
        }

        // possible system property for overriding
        String sizeFromProperty = System.getProperty("rx.indexed-ring-buffer.size"); // also see RxRingBuffer
        if (sizeFromProperty != null) {
            try {
                defaultSize = Integer.parseInt(sizeFromProperty);
            } catch (NumberFormatException e) {
                System.err.println("Failed to set 'rx.indexed-ring-buffer.size' with value " + sizeFromProperty + " => " + e.getMessage()); // NOPMD
            }
        }

        SIZE = defaultSize;
    }

    public static <T> IndexedRingBuffer<T> getInstance() {
        return new IndexedRingBuffer<T>();
    }

    /**
     * This resets the arrays, nulls out references and returns it to the pool.
     * This extra CPU cost is far smaller than the object allocation cost of not pooling.
     */
    public void releaseToPool() {
        // need to clear all elements so we don't leak memory
        int maxIndex = index.get();
        int realIndex = 0;
        ElementSection<E> section = elements;
        outer: while (section != null) {
            for (int i = 0; i < SIZE; i++, realIndex++) {
                if (realIndex >= maxIndex) {
                    break outer;
                }
                // we can use lazySet here because we are nulling things out and not accessing them again
                // (relative on Mac Intel i7) lazySet gets us ~30m vs ~26m ops/second in the JMH test (100 adds per release)
                section.array.set(i, null);
            }
            section = section.next.get();
        }

        index.set(0);
        removedIndex.set(0);
    }

    @Override
    public void unsubscribe() {
        releaseToPool();
    }

    IndexedRingBuffer() {
        // nothing to do
    }

    /**
     * Add an element and return the index where it was added to allow removal.
     *
     * @param e the element to add
     * @return the index where the element was added
     */
    public int add(E e) {
        int i = getIndexForAdd();
        if (i < SIZE) {
            // fast-path when we are in the first section
            elements.array.set(i, e);
            return i;
        } else {
            int sectionIndex = i % SIZE;
            getElementSection(i).array.set(sectionIndex, e);
            return i;
        }
    }

    public E remove(int index) {
        E e;
        if (index < SIZE) {
            // fast-path when we are in the first section
            e = elements.array.getAndSet(index, null);
        } else {
            int sectionIndex = index % SIZE;
            e = getElementSection(index).array.getAndSet(sectionIndex, null);
        }
        pushRemovedIndex(index);
        return e;
    }

    private IndexSection getIndexSection(int index) {
        // short-cut the normal case
        if (index < SIZE) {
            return removed;
        }

        // if we have passed the first array we get more complicated and do recursive chaining
        int numSections = index / SIZE;
        IndexSection a = removed;
        for (int i = 0; i < numSections; i++) {
            a = a.getNext();
        }
        return a;
    }

    private ElementSection<E> getElementSection(int index) {
        // short-cut the normal case
        if (index < SIZE) {
            return elements;
        }

        // if we have passed the first array we get more complicated and do recursive chaining
        int numSections = index / SIZE;
        ElementSection<E> a = elements;
        for (int i = 0; i < numSections; i++) {
            a = a.getNext();
        }
        return a;
    }

    private synchronized int getIndexForAdd() { // NOPMD
        /*
         * Synchronized as I haven't yet figured out a way to do this in an atomic way that doesn't involve object allocation
         */
        int i;
        int ri = getIndexFromPreviouslyRemoved();
        if (ri >= 0) {
            if (ri < SIZE) {
                // fast-path when we are in the first section
                i = removed.getAndSet(ri, -1);
            } else {
                int sectionIndex = ri % SIZE;
                i = getIndexSection(ri).getAndSet(sectionIndex, -1);
            }
            if (i == index.get()) {
                // if it was the last index removed, when we pick it up again we want to increment
                index.getAndIncrement();
            }
        } else {
            i = index.getAndIncrement();
        }
        return i;
    }

    /**
     * Returns -1 if nothing, 0 or greater if the index should be used
     *
     * @return the index or -1 if none
     */
    private synchronized int getIndexFromPreviouslyRemoved() { // NOPMD
        /*
         * Synchronized as I haven't yet figured out a way to do this in an atomic way that doesn't involve object allocation
         */

        // loop because of CAS
        while (true) {
            int currentRi = removedIndex.get();
            if (currentRi > 0) {
                // claim it
                if (removedIndex.compareAndSet(currentRi, currentRi - 1)) {
                    return currentRi - 1;
                }
            } else {
                // do nothing
                return -1;
            }
        }
    }

    private synchronized void pushRemovedIndex(int elementIndex) { // NOPMD
        /*
         * Synchronized as I haven't yet figured out a way to do this in an atomic way that doesn't involve object allocation
         */

        int i = removedIndex.getAndIncrement();
        if (i < SIZE) {
            // fast-path when we are in the first section
            removed.set(i, elementIndex);
        } else {
            int sectionIndex = i % SIZE;
            getIndexSection(i).set(sectionIndex, elementIndex);
        }
    }

    @Override
    public boolean isUnsubscribed() {
        return false;
    }

    public int forEach(Func1<? super E, Boolean> action) {
        return forEach(action, 0);
    }

    /**
     * Loop through each element in the buffer and call a specific function.
     * @param action
     *            that processes each item and returns true if it wants to continue to the next
     * @param startIndex at which index the loop should start
     * @return int of next index to process, or last index seen if it exited early
     */
    public int forEach(Func1<? super E, Boolean> action, int startIndex) {
        int endedAt = forEach(action, startIndex, index.get());
        if (startIndex > 0 && endedAt == index.get()) {
            // start at the beginning again and go up to startIndex
            endedAt = forEach(action, 0, startIndex);
        } else if (endedAt == index.get()) {
            // start back at the beginning
            endedAt = 0;
        }
        return endedAt;
    }

    private int forEach(Func1<? super E, Boolean> action, int startIndex, int endIndex) {
        int lastIndex;
        int maxIndex = index.get();
        int realIndex = startIndex;
        ElementSection<E> section = elements;

        if (startIndex >= SIZE) {
            // move into the correct section
            section = getElementSection(startIndex);
            startIndex = startIndex % SIZE;
        }

        outer: while (section != null) {
            for (int i = startIndex; i < SIZE; i++, realIndex++) {
                if (realIndex >= maxIndex || realIndex >= endIndex) {
                    break outer;
                }
                E element = section.array.get(i);
                if (element == null) {
                    continue;
                }
                lastIndex = realIndex;
                boolean continueLoop = action.call(element);
                if (!continueLoop) {
                    return lastIndex;
                }
            }
            section = section.next.get();
            startIndex = 0; // reset to start for next section
        }

        // return the OutOfBounds index position if we processed all of them ... the one we should be less-than
        return realIndex;
    }

    static final class ElementSection<E> {
        final AtomicReferenceArray<E> array = new AtomicReferenceArray<E>(SIZE);
        final AtomicReference<ElementSection<E>> next = new AtomicReference<ElementSection<E>>();

        ElementSection<E> getNext() {
            if (next.get() != null) {
                return next.get();
            } else {
                ElementSection<E> newSection = new ElementSection<E>();
                if (next.compareAndSet(null, newSection)) {
                    // we won
                    return newSection;
                } else {
                    // we lost so get the value that won
                    return next.get();
                }
            }
        }
    }

    static class IndexSection {

        private final AtomicIntegerArray unsafeArray = new AtomicIntegerArray(SIZE);

        private final AtomicReference<IndexSection> _next = new AtomicReference<IndexSection>();

        public int getAndSet(int expected, int newValue) {
            return unsafeArray.getAndSet(expected, newValue);
        }

        public void set(int i, int elementIndex) {
            unsafeArray.set(i, elementIndex);
        }

        IndexSection getNext() {
            if (_next.get() != null) {
                return _next.get();
            } else {
                IndexSection newSection = new IndexSection();
                if (_next.compareAndSet(null, newSection)) {
                    // we won
                    return newSection;
                } else {
                    // we lost so get the value that won
                    return _next.get();
                }
            }
        }

    }

}