ReactiveX/RxJava

View on GitHub
src/main/java/rx/internal/operators/OperatorBufferWithTime.java

Summary

Maintainability
B
5 hrs
Test Coverage
/**
 * Copyright 2014 Netflix, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package rx.internal.operators;

import java.util.*;
import java.util.concurrent.TimeUnit;

import rx.*;
import rx.Observable;
import rx.Observable.Operator;
import rx.Scheduler.Worker;
import rx.exceptions.Exceptions;
import rx.functions.Action0;
import rx.observers.SerializedSubscriber;

/**
 * This operation takes
 * values from the specified {@link Observable} source and stores them in a buffer. Periodically the buffer
 * is emitted and replaced with a new buffer. How often this is done depends on the specified timespan.
 * The creation of chunks is also periodical. How often this is done depends on the specified timeshift.
 * When the source {@link Observable} completes or produces an error, the current buffer is emitted, and
 * the event is propagated to all subscribed {@link Subscriber}s.
 * <p>
 * Note that this operation can produce <strong>non-connected, or overlapping chunks</strong> depending
 * on the input parameters.
 * </p>
 *
 * @param <T> the buffered value type
 */
public final class OperatorBufferWithTime<T> implements Operator<List<T>, T> {
    final long timespan;
    final long timeshift;
    final TimeUnit unit;
    final int count;
    final Scheduler scheduler;

    /**
     * @param timespan
     *            the amount of time all chunks must be actively collect values before being emitted
     * @param timeshift
     *            the amount of time between creating chunks
     * @param unit
     *            the {@link TimeUnit} defining the unit of time for the timespan
     * @param count
     *            the maximum size of the buffer. Once a buffer reaches this size, it is emitted
     * @param scheduler
     *            the {@link Scheduler} to use for timing chunks
     */
    public OperatorBufferWithTime(long timespan, long timeshift, TimeUnit unit, int count, Scheduler scheduler) {
        this.timespan = timespan;
        this.timeshift = timeshift;
        this.unit = unit;
        this.count = count;
        this.scheduler = scheduler;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super List<T>> child) {
        final Worker inner = scheduler.createWorker();
        SerializedSubscriber<List<T>> serialized = new SerializedSubscriber<List<T>>(child);

        if (timespan == timeshift) {
            ExactSubscriber parent = new ExactSubscriber(serialized, inner);
            parent.add(inner);
            child.add(parent);
            parent.scheduleExact();
            return parent;
        }

        InexactSubscriber parent = new InexactSubscriber(serialized, inner);
        parent.add(inner);
        child.add(parent);
        parent.startNewChunk();
        parent.scheduleChunk();
        return parent;
    }
    /** Subscriber when the buffer chunking time and length differ. */
    final class InexactSubscriber extends Subscriber<T> {
        final Subscriber<? super List<T>> child;
        final Worker inner;
        /** Guarded by this. */
        final List<List<T>> chunks;
        /** Guarded by this. */
        boolean done;
        public InexactSubscriber(Subscriber<? super List<T>> child, Worker inner) {
            this.child = child;
            this.inner = inner;
            this.chunks = new LinkedList<List<T>>();
        }

        @Override
        public void onNext(T t) {
            List<List<T>> sizeReached = null;
            synchronized (this) {
                if (done) {
                    return;
                }
                Iterator<List<T>> it = chunks.iterator();
                while (it.hasNext()) {
                    List<T> chunk = it.next();
                    chunk.add(t);
                    if (chunk.size() == count) {
                        it.remove();
                        if (sizeReached == null) {
                            sizeReached = new LinkedList<List<T>>();
                        }
                        sizeReached.add(chunk);
                    }
                }
            }
            if (sizeReached != null) {
                for (List<T> chunk : sizeReached) {
                    child.onNext(chunk);
                }
            }
        }

        @Override
        public void onError(Throwable e) {
            synchronized (this) {
                if (done) {
                    return;
                }
                done = true;
                chunks.clear();
            }
            child.onError(e);
            unsubscribe();
        }

        @Override
        public void onCompleted() {
            try {
                List<List<T>> sizeReached;
                synchronized (this) {
                    if (done) {
                        return;
                    }
                    done = true;
                    sizeReached = new LinkedList<List<T>>(chunks);
                    chunks.clear();
                }
                for (List<T> chunk : sizeReached) {
                    child.onNext(chunk);
                }
            } catch (Throwable t) {
                Exceptions.throwOrReport(t, child);
                return;
            }
            child.onCompleted();
            unsubscribe();
        }
        void scheduleChunk() {
            inner.schedulePeriodically(new Action0() {
                @Override
                public void call() {
                    startNewChunk();
                }
            }, timeshift, timeshift, unit);
        }
        void startNewChunk() {
            final List<T> chunk = new ArrayList<T>();
            synchronized (this) {
                if (done) {
                    return;
                }
                chunks.add(chunk);
            }
            inner.schedule(new Action0() {
                @Override
                public void call() {
                    emitChunk(chunk);
                }
            }, timespan, unit);
        }
        void emitChunk(List<T> chunkToEmit) {
            boolean emit = false;
            synchronized (this) {
                if (done) {
                    return;
                }
                Iterator<List<T>> it = chunks.iterator();
                while (it.hasNext()) {
                    List<T> chunk = it.next();
                    if (chunk == chunkToEmit) {
                        it.remove();
                        emit = true;
                        break;
                    }
                }
            }
            if (emit) {
                try {
                    child.onNext(chunkToEmit);
                } catch (Throwable t) {
                    Exceptions.throwOrReport(t, this);
                }
            }
        }
    }
    /** Subscriber when exact timed chunking is required. */
    final class ExactSubscriber extends Subscriber<T> {
        final Subscriber<? super List<T>> child;
        final Worker inner;
        /** Guarded by this. */
        List<T> chunk;
        /** Guarded by this. */
        boolean done;
        public ExactSubscriber(Subscriber<? super List<T>> child, Worker inner) {
            this.child = child;
            this.inner = inner;
            this.chunk = new ArrayList<T>();
        }

        @Override
        public void onNext(T t) {
            List<T> toEmit = null;
            synchronized (this) {
                if (done) {
                    return;
                }
                chunk.add(t);
                if (chunk.size() == count) {
                    toEmit = chunk;
                    chunk = new ArrayList<T>();
                }
            }
            if (toEmit != null) {
                child.onNext(toEmit);
            }
        }

        @Override
        public void onError(Throwable e) {
            synchronized (this) {
                if (done) {
                    return;
                }
                done = true;
                chunk = null;
            }
            child.onError(e);
            unsubscribe();
        }

        @Override
        public void onCompleted() {
            try {
                inner.unsubscribe();
                List<T> toEmit;
                synchronized (this) {
                    if (done) {
                        return;
                    }
                    done = true;
                    toEmit = chunk;
                    chunk = null;
                }
                child.onNext(toEmit);
            } catch (Throwable t) {
                Exceptions.throwOrReport(t, child);
                return;
            }
            child.onCompleted();
            unsubscribe();
        }
        void scheduleExact() {
            inner.schedulePeriodically(new Action0() {
                @Override
                public void call() {
                    emit();
                }
            }, timespan, timespan, unit);
        }
        void emit() {
            List<T> toEmit;
            synchronized (this) {
                if (done) {
                    return;
                }
                toEmit = chunk;
                chunk = new ArrayList<T>();
            }
            try {
                child.onNext(toEmit);
            } catch (Throwable t) {
                Exceptions.throwOrReport(t, this);
            }
        }
    }
}