src/main/java/rx/internal/observers/AssertableSubscriberObservable.java
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.observers;
import java.util.List;
import java.util.concurrent.TimeUnit;
import rx.Producer;
import rx.Subscriber;
import rx.functions.Action0;
import rx.observers.TestSubscriber;
import rx.observers.AssertableSubscriber;
/**
* A {@code AssertableSubscriber} is a variety of {@link Subscriber} that you can use
* for unit testing, to perform assertions or inspect received events.
* AssertableSubscriber is a duplicate of TestSubscriber but supports method chaining
* where possible.
*
* @param <T>
* the value type
* @since 1.3
*/
public class AssertableSubscriberObservable<T> extends Subscriber<T> implements AssertableSubscriber<T> {
private final TestSubscriber<T> ts;
public AssertableSubscriberObservable(TestSubscriber<T> ts) {
this.ts = ts;
}
public static <T> AssertableSubscriberObservable<T> create(long initialRequest) {
TestSubscriber<T> t1 = new TestSubscriber<T>(initialRequest);
AssertableSubscriberObservable<T> t2 = new AssertableSubscriberObservable<T>(t1);
t2.add(t1);
return t2;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#onStart()
*/
@Override
public void onStart() {
ts.onStart();
}
@Override
public void onCompleted() {
ts.onCompleted();
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#setProducer(rx.Producer)
*/
@Override
public void setProducer(Producer p) {
ts.setProducer(p);
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#getCompletions()
*/
@Override
public final int getCompletions() {
return ts.getCompletions();
}
@Override
public void onError(Throwable e) {
ts.onError(e);
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#getOnErrorEvents()
*/
@Override
public List<Throwable> getOnErrorEvents() {
return ts.getOnErrorEvents();
}
@Override
public void onNext(T t) {
ts.onNext(t);
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#getValueCount()
*/
@Override
public final int getValueCount() {
return ts.getValueCount();
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#requestMore(long)
*/
@Override
public AssertableSubscriber<T> requestMore(long n) {
ts.requestMore(n);
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#getOnNextEvents()
*/
@Override
public List<T> getOnNextEvents() {
return ts.getOnNextEvents();
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#assertReceivedOnNext(java.util.List)
*/
@Override
public AssertableSubscriber<T> assertReceivedOnNext(List<T> items) {
ts.assertReceivedOnNext(items);
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#awaitValueCount(int, long, java.util.concurrent.TimeUnit)
*/
@Override
public final AssertableSubscriber<T> awaitValueCount(int expected, long timeout, TimeUnit unit) {
if (!ts.awaitValueCount(expected, timeout, unit)) {
throw new AssertionError("Did not receive enough values in time. Expected: " + expected + ", Actual: " + ts.getValueCount());
}
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#assertTerminalEvent()
*/
@Override
public AssertableSubscriber<T> assertTerminalEvent() {
ts.assertTerminalEvent();
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#assertUnsubscribed()
*/
@Override
public AssertableSubscriber<T> assertUnsubscribed() {
ts.assertUnsubscribed();
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#assertNoErrors()
*/
@Override
public AssertableSubscriber<T> assertNoErrors() {
ts.assertNoErrors();
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#awaitTerminalEvent()
*/
@Override
public AssertableSubscriber<T> awaitTerminalEvent() {
ts.awaitTerminalEvent();
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#awaitTerminalEvent(long, java.util.concurrent.TimeUnit)
*/
@Override
public AssertableSubscriber<T> awaitTerminalEvent(long timeout, TimeUnit unit) {
ts.awaitTerminalEvent(timeout, unit);
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#awaitTerminalEventAndUnsubscribeOnTimeout(long, java.util.concurrent.TimeUnit)
*/
@Override
public AssertableSubscriber<T> awaitTerminalEventAndUnsubscribeOnTimeout(long timeout,
TimeUnit unit) {
ts.awaitTerminalEventAndUnsubscribeOnTimeout(timeout, unit);
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#getLastSeenThread()
*/
@Override
public Thread getLastSeenThread() {
return ts.getLastSeenThread();
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#assertCompleted()
*/
@Override
public AssertableSubscriber<T> assertCompleted() {
ts.assertCompleted();
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#assertNotCompleted()
*/
@Override
public AssertableSubscriber<T> assertNotCompleted() {
ts.assertNotCompleted();
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#assertError(java.lang.Class)
*/
@Override
public AssertableSubscriber<T> assertError(Class<? extends Throwable> clazz) {
ts.assertError(clazz);
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#assertError(java.lang.Throwable)
*/
@Override
public AssertableSubscriber<T> assertError(Throwable throwable) {
ts.assertError(throwable);
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#assertNoTerminalEvent()
*/
@Override
public AssertableSubscriber<T> assertNoTerminalEvent() {
ts.assertNoTerminalEvent();
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#assertNoValues()
*/
@Override
public AssertableSubscriber<T> assertNoValues() {
ts.assertNoValues();
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#assertValueCount(int)
*/
@Override
public AssertableSubscriber<T> assertValueCount(int count) {
ts.assertValueCount(count);
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#assertValues(T)
*/
@Override
public AssertableSubscriber<T> assertValues(T... values) {
ts.assertValues(values);
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#assertValue(T)
*/
@Override
public AssertableSubscriber<T> assertValue(T value) {
ts.assertValue(value);
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#assertValuesAndClear(T, T)
*/
@Override
public final AssertableSubscriber<T> assertValuesAndClear(T expectedFirstValue,
T... expectedRestValues) {
ts.assertValuesAndClear(expectedFirstValue, expectedRestValues);
return this;
}
/* (non-Javadoc)
* @see rx.observers.AssertableSubscriber#perform(rx.functions.Action0)
*/
@Override
public final AssertableSubscriber<T> perform(Action0 action) {
action.call();
return this;
}
@Override
public String toString() {
return ts.toString();
}
@Override
public final AssertableSubscriber<T> assertResult(T... values) {
ts.assertValues(values);
ts.assertNoErrors();
ts.assertCompleted();
return this;
}
@Override
public final AssertableSubscriber<T> assertFailure(Class<? extends Throwable> errorClass, T... values) {
ts.assertValues(values);
ts.assertError(errorClass);
ts.assertNotCompleted();
return this;
}
@Override
public final AssertableSubscriber<T> assertFailureAndMessage(Class<? extends Throwable> errorClass, String message,
T... values) {
ts.assertValues(values);
ts.assertError(errorClass);
ts.assertNotCompleted();
String actualMessage = ts.getOnErrorEvents().get(0).getMessage();
if (!(actualMessage == message || (message != null && message.equals(actualMessage)))) {
throw new AssertionError("Error message differs. Expected: \'" + message + "\', Received: \'" + actualMessage + "\'");
}
return this;
}
}