src/main/java/rx/internal/operators/OperatorOnBackpressureLatest.java
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.internal.operators;
import java.util.concurrent.atomic.*;
import rx.*;
import rx.Observable.Operator;
/**
* An operator which drops all but the last received value in case the downstream
* doesn't request more.
* @param <T> the value type
*/
public final class OperatorOnBackpressureLatest<T> implements Operator<T, T> {
/** Holds a singleton instance initialized on class-loading. */
static final class Holder {
static final OperatorOnBackpressureLatest<Object> INSTANCE = new OperatorOnBackpressureLatest<Object>();
}
/**
* Returns a singleton instance of the OnBackpressureLatest operator since it is stateless.
* @param <T> the value type
* @return the single instanceof OperatorOnBackpressureLatest
*/
@SuppressWarnings("unchecked")
public static <T> OperatorOnBackpressureLatest<T> instance() {
return (OperatorOnBackpressureLatest<T>)Holder.INSTANCE;
}
@Override
public Subscriber<? super T> call(Subscriber<? super T> child) {
final LatestEmitter<T> producer = new LatestEmitter<T>(child);
LatestSubscriber<T> parent = new LatestSubscriber<T>(producer);
producer.parent = parent;
child.add(parent);
child.add(producer);
child.setProducer(producer);
return parent;
}
/**
* A terminable producer which emits the latest items on request.
* @param <T>
*/
static final class LatestEmitter<T> extends AtomicLong implements Producer, Subscription, Observer<T> {
/** */
private static final long serialVersionUID = -1364393685005146274L;
final Subscriber<? super T> child;
LatestSubscriber<? super T> parent;
final AtomicReference<Object> value;
/** Written before done, read after done. */
Throwable terminal;
volatile boolean done;
/** Guarded by this. */
boolean emitting;
/** Guarded by this. */
boolean missed;
static final Object EMPTY = new Object();
static final long NOT_REQUESTED = Long.MIN_VALUE / 2;
public LatestEmitter(Subscriber<? super T> child) {
this.child = child;
this.value = new AtomicReference<Object>(EMPTY);
this.lazySet(NOT_REQUESTED); // not
}
@Override
public void request(long n) {
if (n >= 0) {
for (;;) {
long r = get();
if (r == Long.MIN_VALUE) {
return;
}
long u;
if (r == NOT_REQUESTED) {
u = n;
} else {
u = r + n;
if (u < 0) {
u = Long.MAX_VALUE;
}
}
if (compareAndSet(r, u)) {
if (r == NOT_REQUESTED) {
parent.requestMore(Long.MAX_VALUE);
}
emit();
return;
}
}
}
}
long produced(long n) {
for (;;) {
long r = get();
if (r < 0) {
return r;
}
long u = r - n;
if (compareAndSet(r, u)) {
return u;
}
}
}
@Override
public boolean isUnsubscribed() {
return get() == Long.MIN_VALUE;
}
@Override
public void unsubscribe() {
if (get() >= 0) {
getAndSet(Long.MIN_VALUE);
}
}
@Override
public void onNext(T t) {
value.lazySet(t); // emit's synchronized block does a full release
emit();
}
@Override
public void onError(Throwable e) {
terminal = e;
done = true;
emit();
}
@Override
public void onCompleted() {
done = true;
emit();
}
void emit() {
synchronized (this) {
if (emitting) {
missed = true;
return;
}
emitting = true;
missed = false;
}
boolean skipFinal = false;
try {
for (;;) {
long r = get();
if (r == Long.MIN_VALUE) {
skipFinal = true;
break;
}
Object v = value.get();
if (r > 0 && v != EMPTY) {
@SuppressWarnings("unchecked")
T v2 = (T)v;
child.onNext(v2);
value.compareAndSet(v, EMPTY);
produced(1);
v = EMPTY;
}
if (v == EMPTY && done) {
Throwable e = terminal;
if (e != null) {
child.onError(e);
} else {
child.onCompleted();
}
}
synchronized (this) {
if (!missed) {
emitting = false;
skipFinal = true;
break;
}
missed = false;
}
}
} finally {
if (!skipFinal) {
synchronized (this) {
emitting = false;
}
}
}
}
}
static final class LatestSubscriber<T> extends Subscriber<T> {
private final LatestEmitter<T> producer;
LatestSubscriber(LatestEmitter<T> producer) {
this.producer = producer;
}
@Override
public void onStart() {
// don't run until the child actually requested to avoid synchronous problems
request(0);
}
@Override
public void onNext(T t) {
producer.onNext(t);
}
@Override
public void onError(Throwable e) {
producer.onError(e);
}
@Override
public void onCompleted() {
producer.onCompleted();
}
void requestMore(long n) {
request(n);
}
}
}