src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java
/**
* Copyright 2015 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package rx.internal.operators;
import java.util.*;
import java.util.concurrent.atomic.*;
import rx.*;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.exceptions.CompositeException;
import rx.functions.FuncN;
import rx.internal.util.RxRingBuffer;
import rx.internal.util.atomic.SpscLinkedArrayQueue;
import rx.plugins.RxJavaHooks;
public final class OnSubscribeCombineLatest<T, R> implements OnSubscribe<R> {
final Observable<? extends T>[] sources;
final Iterable<? extends Observable<? extends T>> sourcesIterable;
final FuncN<? extends R> combiner;
final int bufferSize;
final boolean delayError;
public OnSubscribeCombineLatest(Iterable<? extends Observable<? extends T>> sourcesIterable,
FuncN<? extends R> combiner) {
this(null, sourcesIterable, combiner, RxRingBuffer.SIZE, false);
}
public OnSubscribeCombineLatest(Observable<? extends T>[] sources,
Iterable<? extends Observable<? extends T>> sourcesIterable,
FuncN<? extends R> combiner, int bufferSize,
boolean delayError) {
this.sources = sources;
this.sourcesIterable = sourcesIterable;
this.combiner = combiner;
this.bufferSize = bufferSize;
this.delayError = delayError;
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public void call(Subscriber<? super R> s) {
Observable<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
if (sourcesIterable instanceof List) {
// unchecked & raw: javac type inference problem otherwise
List list = (List)sourcesIterable;
sources = (Observable[])list.toArray(new Observable[list.size()]);
count = sources.length;
} else {
sources = new Observable[8];
for (Observable<? extends T> p : sourcesIterable) {
if (count == sources.length) {
Observable<? extends T>[] b = new Observable[count + (count >> 2)];
System.arraycopy(sources, 0, b, 0, count);
sources = b;
}
sources[count++] = p;
}
}
} else {
count = sources.length;
}
if (count == 0) {
s.onCompleted();
return;
}
LatestCoordinator<T, R> lc = new LatestCoordinator<T, R>(s, combiner, count, bufferSize, delayError);
lc.subscribe(sources);
}
static final class LatestCoordinator<T, R> extends AtomicInteger implements Producer, Subscription {
/** */
private static final long serialVersionUID = 8567835998786448817L;
final Subscriber<? super R> actual;
final FuncN<? extends R> combiner;
final CombinerSubscriber<T, R>[] subscribers;
final int bufferSize;
final Object[] latest;
final SpscLinkedArrayQueue<Object> queue;
final boolean delayError;
volatile boolean cancelled;
volatile boolean done;
final AtomicLong requested;
final AtomicReference<Throwable> error;
int active;
int complete;
/** Indicates the particular source hasn't emitted any value yet. */
static final Object MISSING = new Object();
@SuppressWarnings("unchecked")
public LatestCoordinator(Subscriber<? super R> actual,
FuncN<? extends R> combiner,
int count, int bufferSize, boolean delayError) {
this.actual = actual;
this.combiner = combiner;
this.bufferSize = bufferSize;
this.delayError = delayError;
this.latest = new Object[count];
Arrays.fill(latest, MISSING);
this.subscribers = new CombinerSubscriber[count];
this.queue = new SpscLinkedArrayQueue<Object>(bufferSize);
this.requested = new AtomicLong();
this.error = new AtomicReference<Throwable>();
}
@SuppressWarnings("unchecked")
public void subscribe(Observable<? extends T>[] sources) {
Subscriber<T>[] as = subscribers;
int len = as.length;
for (int i = 0; i < len; i++) {
as[i] = new CombinerSubscriber<T, R>(this, i);
}
lazySet(0); // release array contents
actual.add(this);
actual.setProducer(this);
for (int i = 0; i < len; i++) {
if (cancelled) {
return;
}
((Observable<T>)sources[i]).subscribe(as[i]);
}
}
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("n >= required but it was " + n);
}
if (n != 0) {
BackpressureUtils.getAndAddRequest(requested, n);
drain();
}
}
@Override
public void unsubscribe() {
if (!cancelled) {
cancelled = true;
if (getAndIncrement() == 0) {
cancel(queue);
}
}
}
@Override
public boolean isUnsubscribed() {
return cancelled;
}
void cancel(Queue<?> q) {
q.clear();
for (CombinerSubscriber<T, R> s : subscribers) {
s.unsubscribe();
}
}
/**
* Combine the given notification value from the indexth source with the existing known
* latest values.
* @param value the notification to combine, null indicates the source terminated normally
* @param index the index of the source subscriber
*/
void combine(Object value, int index) {
CombinerSubscriber<T, R> combinerSubscriber = subscribers[index];
int activeCount;
int completedCount;
int sourceCount;
boolean empty;
boolean allSourcesFinished;
synchronized (this) {
sourceCount = latest.length;
Object o = latest[index];
activeCount = active;
if (o == MISSING) {
active = ++activeCount;
}
completedCount = complete;
if (value == null) {
complete = ++completedCount;
} else {
latest[index] = NotificationLite.getValue(value);
}
allSourcesFinished = activeCount == sourceCount;
// see if either all sources completed
empty = completedCount == sourceCount
|| (value == null && o == MISSING); // or this source completed without any value
if (!empty) {
if (value != null && allSourcesFinished) {
queue.offer(combinerSubscriber, latest.clone());
} else
if (value == null && error.get() != null && (o == MISSING || !delayError)) {
done = true; // if this source completed without a value
}
} else {
done = true;
}
}
if (!allSourcesFinished && value != null) {
combinerSubscriber.requestMore(1);
return;
}
drain();
}
void drain() {
if (getAndIncrement() != 0) {
return;
}
final Queue<Object> q = queue;
final Subscriber<? super R> a = actual;
final boolean delayError = this.delayError;
final AtomicLong localRequested = this.requested;
int missed = 1;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a, q, delayError)) {
return;
}
long requestAmount = localRequested.get();
long emitted = 0L;
while (emitted != requestAmount) {
boolean d = done;
@SuppressWarnings("unchecked")
CombinerSubscriber<T, R> cs = (CombinerSubscriber<T, R>)q.peek();
boolean empty = cs == null;
if (checkTerminated(d, empty, a, q, delayError)) {
return;
}
if (empty) {
break;
}
q.poll();
Object[] array = (Object[])q.poll();
if (array == null) {
cancelled = true;
cancel(q);
a.onError(new IllegalStateException("Broken queue?! Sender received but not the array."));
return;
}
R v;
try {
v = combiner.call(array);
} catch (Throwable ex) {
cancelled = true;
cancel(q);
a.onError(ex);
return;
}
a.onNext(v);
cs.requestMore(1);
emitted++;
}
if (emitted != 0L && requestAmount != Long.MAX_VALUE) {
BackpressureUtils.produced(localRequested, emitted);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
boolean checkTerminated(boolean mainDone, boolean queueEmpty, Subscriber<?> childSubscriber, Queue<?> q, boolean delayError) {
if (cancelled) {
cancel(q);
return true;
}
if (mainDone) {
if (delayError) {
if (queueEmpty) {
Throwable e = error.get();
if (e != null) {
childSubscriber.onError(e);
} else {
childSubscriber.onCompleted();
}
return true;
}
} else {
Throwable e = error.get();
if (e != null) {
cancel(q);
childSubscriber.onError(e);
return true;
} else
if (queueEmpty) {
childSubscriber.onCompleted();
return true;
}
}
}
return false;
}
void onError(Throwable e) {
AtomicReference<Throwable> localError = this.error;
for (;;) {
Throwable curr = localError.get();
Throwable next;
if (curr != null) {
if (curr instanceof CompositeException) {
CompositeException ce = (CompositeException) curr;
List<Throwable> es = new ArrayList<Throwable>(ce.getExceptions());
es.add(e);
next = new CompositeException(es);
} else {
next = new CompositeException(Arrays.asList(curr, e));
}
} else {
next = e;
}
if (localError.compareAndSet(curr, next)) {
return;
}
}
}
}
static final class CombinerSubscriber<T, R> extends Subscriber<T> {
final LatestCoordinator<T, R> parent;
final int index;
boolean done;
public CombinerSubscriber(LatestCoordinator<T, R> parent, int index) {
this.parent = parent;
this.index = index;
request(parent.bufferSize);
}
@Override
public void onNext(T t) {
if (done) {
return;
}
parent.combine(NotificationLite.next(t), index);
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaHooks.onError(t);
return;
}
parent.onError(t);
done = true;
parent.combine(null, index);
}
@Override
public void onCompleted() {
if (done) {
return;
}
done = true;
parent.combine(null, index);
}
public void requestMore(long n) {
request(n);
}
}
}