 * Copyright 2014 Netflix, Inc.
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.

package rx.internal.util.atomic;

import java.util.*;
import java.util.concurrent.atomic.*;

import rx.internal.util.unsafe.Pow2;

 * The code was inspired by the similarly named JCTools class:

 * A single-producer single-consumer array-backed queue which can allocate new arrays in case the consumer is slower
 * than the producer.
 * @param <T> the element type, not null
public final class SpscLinkedArrayQueue<T> implements Queue<T> {
    static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096);
    final AtomicLong producerIndex;
    int producerLookAheadStep;
    long producerLookAhead;
    int producerMask;
    AtomicReferenceArray<Object> producerBuffer;
    int consumerMask;
    AtomicReferenceArray<Object> consumerBuffer;
    final AtomicLong consumerIndex;

    private static final Object HAS_NEXT = new Object();

    public SpscLinkedArrayQueue(final int bufferSize) {
        int p2capacity = Pow2.roundToPowerOfTwo(bufferSize);
        int mask = p2capacity - 1;
        AtomicReferenceArray<Object> buffer = new AtomicReferenceArray<Object>(p2capacity + 1);
        producerBuffer = buffer;
        producerMask = mask;
        consumerBuffer = buffer;
        consumerMask = mask;
        producerLookAhead = mask - 1; // we know it's all empty to start with
        producerIndex = new AtomicLong();
        consumerIndex = new AtomicLong();

     * {@inheritDoc}
     * <p>
     * This implementation is correct for single producer thread use only.
    public boolean offer(final T e) {
        // local load of field to avoid repeated loads after volatile reads
        final AtomicReferenceArray<Object> buffer = producerBuffer;
        final long index = lpProducerIndex();
        final int mask = producerMask;
        final int offset = calcWrappedOffset(index, mask);
        if (index < producerLookAhead) {
            return writeToQueue(buffer, e, index, offset);
        } else {
            final int lookAheadStep = producerLookAheadStep;
            // go around the buffer or resize if full (unless we hit max capacity)
            int lookAheadElementOffset = calcWrappedOffset(index + lookAheadStep, mask);
            if (null == lvElement(buffer, lookAheadElementOffset)) { // LoadLoad
                producerLookAhead = index + lookAheadStep - 1; // joy, there's plenty of room
                return writeToQueue(buffer, e, index, offset);
            } else if (null == lvElement(buffer, calcWrappedOffset(index + 1, mask))) { // buffer is not full
                return writeToQueue(buffer, e, index, offset);
            } else {
                resize(buffer, index, offset, e, mask); // add a buffer and link old to new
                return true;

    private boolean writeToQueue(final AtomicReferenceArray<Object> buffer, final T e, final long index, final int offset) {
        soElement(buffer, offset, e);// StoreStore
        soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms
        return true;

    private void resize(final AtomicReferenceArray<Object> oldBuffer, final long currIndex, final int offset, final T e,
            final long mask) {
        final int capacity = oldBuffer.length();
        final AtomicReferenceArray<Object> newBuffer = new AtomicReferenceArray<Object>(capacity);
        producerBuffer = newBuffer;
        producerLookAhead = currIndex + mask - 1;
        soElement(newBuffer, offset, e);// StoreStore
        soNext(oldBuffer, newBuffer);
        soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is
                                                                 // inserted
        soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms

    private void soNext(AtomicReferenceArray<Object> curr, AtomicReferenceArray<Object> next) {
        soElement(curr, calcDirectOffset(curr.length() - 1), next);
    private AtomicReferenceArray<Object> lvNext(AtomicReferenceArray<Object> curr) {
        return (AtomicReferenceArray<Object>)lvElement(curr, calcDirectOffset(curr.length() - 1));
     * {@inheritDoc}
     * <p>
     * This implementation is correct for single consumer thread use only.
    public T poll() {
        // local load of field to avoid repeated loads after volatile reads
        final AtomicReferenceArray<Object> buffer = consumerBuffer;
        final long index = lpConsumerIndex();
        final int mask = consumerMask;
        final int offset = calcWrappedOffset(index, mask);
        final Object e = lvElement(buffer, offset);// LoadLoad
        boolean isNextBuffer = e == HAS_NEXT;
        if (null != e && !isNextBuffer) {
            soElement(buffer, offset, null);// StoreStore
            soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
            return (T) e;
        } else if (isNextBuffer) {
            return newBufferPoll(lvNext(buffer), index, mask);

        return null;

    private T newBufferPoll(AtomicReferenceArray<Object> nextBuffer, final long index, final int mask) {
        consumerBuffer = nextBuffer;
        final int offsetInNew = calcWrappedOffset(index, mask);
        final T n = (T) lvElement(nextBuffer, offsetInNew);// LoadLoad
        if (null == n) {
            return null;
        } else {
            soElement(nextBuffer, offsetInNew, null);// StoreStore
            soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms
            return n;

     * {@inheritDoc}
     * <p>
     * This implementation is correct for single consumer thread use only.
    public T peek() {
        final AtomicReferenceArray<Object> buffer = consumerBuffer;
        final long index = lpConsumerIndex();
        final int mask = consumerMask;
        final int offset = calcWrappedOffset(index, mask);
        final Object e = lvElement(buffer, offset);// LoadLoad
        if (e == HAS_NEXT) {
            return newBufferPeek(lvNext(buffer), index, mask);

        return (T) e;

    public void clear() {
        while (poll() != null || !isEmpty()) { } // NOPMD

    private T newBufferPeek(AtomicReferenceArray<Object> nextBuffer, final long index, final int mask) {
        consumerBuffer = nextBuffer;
        final int offsetInNew = calcWrappedOffset(index, mask);
        return (T) lvElement(nextBuffer, offsetInNew);// LoadLoad

    public int size() {
         * It is possible for a thread to be interrupted or reschedule between the read of the producer and
         * consumer indices, therefore protection is required to ensure size is within valid range. In the
         * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
         * index BEFORE the producer index.
        long after = lvConsumerIndex();
        while (true) {
            final long before = after;
            final long currentProducerIndex = lvProducerIndex();
            after = lvConsumerIndex();
            if (before == after) {
                return (int) (currentProducerIndex - after);

    public boolean isEmpty() {
        return lvProducerIndex() == lvConsumerIndex();

    private void adjustLookAheadStep(int capacity) {
        producerLookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP);

    private long lvProducerIndex() {
        return producerIndex.get();

    private long lvConsumerIndex() {
        return consumerIndex.get();

    private long lpProducerIndex() {
        return producerIndex.get();

    private long lpConsumerIndex() {
        return consumerIndex.get();

    private void soProducerIndex(long v) {

    private void soConsumerIndex(long v) {

    private static int calcWrappedOffset(long index, int mask) {
        return calcDirectOffset((int)index & mask);
    private static int calcDirectOffset(int index) {
        return index;
    private static void soElement(AtomicReferenceArray<Object> buffer, int offset, Object e) {
        buffer.lazySet(offset, e);

    private static <E> Object lvElement(AtomicReferenceArray<Object> buffer, int offset) {
        return buffer.get(offset);

    public Iterator<T> iterator() {
        throw new UnsupportedOperationException();

    public boolean contains(Object o) {
        throw new UnsupportedOperationException();

    public Object[] toArray() {
        throw new UnsupportedOperationException();

    public <E> E[] toArray(E[] a) {
        throw new UnsupportedOperationException();

    public boolean remove(Object o) {
        throw new UnsupportedOperationException();

    public boolean containsAll(Collection<?> c) {
        throw new UnsupportedOperationException();

    public boolean addAll(Collection<? extends T> c) {
        throw new UnsupportedOperationException();

    public boolean removeAll(Collection<?> c) {
        throw new UnsupportedOperationException();

    public boolean retainAll(Collection<?> c) {
        throw new UnsupportedOperationException();

    public boolean add(T e) {
        throw new UnsupportedOperationException();

    public T remove() {
        throw new UnsupportedOperationException();

    public T element() {
        throw new UnsupportedOperationException();

     * Atomically offer two elements.
     * <p>Don't use the regular offer() with this at all!
     * @param first the first value
     * @param second the second value
     * @return always true
    public boolean offer(T first, T second) {
        final AtomicReferenceArray<Object> buffer = producerBuffer;
        final long p = lvProducerIndex();
        final int m = producerMask;

        int pi = calcWrappedOffset(p + 2, m);

        if (null == lvElement(buffer, pi)) {
            pi = calcWrappedOffset(p, m);
            soElement(buffer, pi + 1, second);
            soElement(buffer, pi, first);
            soProducerIndex(p + 2);
        } else {
            final int capacity = buffer.length();
            final AtomicReferenceArray<Object> newBuffer = new AtomicReferenceArray<Object>(capacity);
            producerBuffer = newBuffer;

            pi = calcWrappedOffset(p, m);
            soElement(newBuffer, pi + 1, second);// StoreStore
            soElement(newBuffer, pi, first);
            soNext(buffer, newBuffer);

            soElement(buffer, pi, HAS_NEXT); // new buffer is visible after element is

            soProducerIndex(p + 2);// this ensures correctness on 32bit platforms

        return true;