src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;
import java.util.*;
import rx.*;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.exceptions.Exceptions;
import rx.functions.*;
import rx.observers.*;
import rx.subjects.*;
import rx.subscriptions.*;
/**
* Correlates two sequences when they overlap and groups the results.
*
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244235.aspx">MSDN: Observable.GroupJoin</a>
* @param <T1> the left value type
* @param <T2> the right value type
* @param <D1> the value type of the left duration
* @param <D2> the value type of the right duration
* @param <R> the result value type
*/
public final class OnSubscribeGroupJoin<T1, T2, D1, D2, R> implements OnSubscribe<R> {
final Observable<T1> left;
final Observable<T2> right;
final Func1<? super T1, ? extends Observable<D1>> leftDuration;
final Func1<? super T2, ? extends Observable<D2>> rightDuration;
final Func2<? super T1, ? super Observable<T2>, ? extends R> resultSelector;
public OnSubscribeGroupJoin(
Observable<T1> left,
Observable<T2> right,
Func1<? super T1, ? extends Observable<D1>> leftDuration,
Func1<? super T2, ? extends Observable<D2>> rightDuration,
Func2<? super T1, ? super Observable<T2>, ? extends R> resultSelector) {
this.left = left;
this.right = right;
this.leftDuration = leftDuration;
this.rightDuration = rightDuration;
this.resultSelector = resultSelector;
}
@Override
public void call(Subscriber<? super R> child) {
ResultManager ro = new ResultManager(new SerializedSubscriber<R>(child));
child.add(ro);
ro.init();
}
/** Manages sub-observers and subscriptions. */
final class ResultManager extends HashMap<Integer, Observer<T2>>implements Subscription {
// HashMap aspect of `this` refers to `leftMap`
private static final long serialVersionUID = -3035156013812425335L;
final RefCountSubscription cancel;
final Subscriber<? super R> subscriber;
final CompositeSubscription group;
/** Guarded by this. */
int leftIds;
/** Guarded by this. */
int rightIds;
/** Guarded by this. */
final Map<Integer, T2> rightMap = new HashMap<Integer, T2>(); // NOPMD
/** Guarded by this. */
boolean leftDone;
/** Guarded by this. */
boolean rightDone;
public ResultManager(Subscriber<? super R> subscriber) {
super();
this.subscriber = subscriber;
this.group = new CompositeSubscription();
this.cancel = new RefCountSubscription(group);
}
public void init() {
Subscriber<T1> s1 = new LeftObserver();
Subscriber<T2> s2 = new RightObserver();
group.add(s1);
group.add(s2);
left.unsafeSubscribe(s1);
right.unsafeSubscribe(s2);
}
@Override
public void unsubscribe() {
cancel.unsubscribe();
}
@Override
public boolean isUnsubscribed() {
return cancel.isUnsubscribed();
}
Map<Integer, Observer<T2>> leftMap() {
return this;
}
/**
* Notify everyone and cleanup.
* @param e the exception
*/
void errorAll(Throwable e) {
List<Observer<T2>> list;
synchronized (ResultManager.this) {
list = new ArrayList<Observer<T2>>(leftMap().values());
leftMap().clear();
rightMap.clear();
}
for (Observer<T2> o : list) {
o.onError(e);
}
subscriber.onError(e);
cancel.unsubscribe();
}
/**
* Notify only the main subscriber and cleanup.
* @param e the exception
*/
void errorMain(Throwable e) {
synchronized (ResultManager.this) {
leftMap().clear();
rightMap.clear();
}
subscriber.onError(e);
cancel.unsubscribe();
}
void complete(List<Observer<T2>> list) {
if (list != null) {
for (Observer<T2> o : list) {
o.onCompleted();
}
subscriber.onCompleted();
cancel.unsubscribe();
}
}
/** Observe the left source. */
final class LeftObserver extends Subscriber<T1> {
@Override
public void onNext(T1 args) {
try {
int id;
Subject<T2, T2> subj = PublishSubject.create();
Observer<T2> subjSerial = new SerializedObserver<T2>(subj);
synchronized (ResultManager.this) {
id = leftIds++;
leftMap().put(id, subjSerial);
}
Observable<T2> window = Observable.unsafeCreate(new WindowObservableFunc<T2>(subj, cancel));
Observable<D1> duration = leftDuration.call(args);
Subscriber<D1> d1 = new LeftDurationObserver(id);
group.add(d1);
duration.unsafeSubscribe(d1);
R result = resultSelector.call(args, window);
List<T2> rightMapValues;
synchronized (ResultManager.this) {
rightMapValues = new ArrayList<T2>(rightMap.values());
}
subscriber.onNext(result);
for (T2 t2 : rightMapValues) {
subjSerial.onNext(t2);
}
} catch (Throwable t) {
Exceptions.throwOrReport(t, this);
}
}
@Override
public void onCompleted() {
List<Observer<T2>> list = null;
synchronized (ResultManager.this) {
leftDone = true;
if (rightDone) {
list = new ArrayList<Observer<T2>>(leftMap().values());
leftMap().clear();
rightMap.clear();
}
}
complete(list);
}
@Override
public void onError(Throwable e) {
errorAll(e);
}
}
/** Observe the right source. */
final class RightObserver extends Subscriber<T2> {
@Override
public void onNext(T2 args) {
try {
int id;
synchronized (ResultManager.this) {
id = rightIds++;
rightMap.put(id, args);
}
Observable<D2> duration = rightDuration.call(args);
Subscriber<D2> d2 = new RightDurationObserver(id);
group.add(d2);
duration.unsafeSubscribe(d2);
List<Observer<T2>> list;
synchronized (ResultManager.this) {
list = new ArrayList<Observer<T2>>(leftMap().values());
}
for (Observer<T2> o : list) {
o.onNext(args);
}
} catch (Throwable t) {
Exceptions.throwOrReport(t, this);
}
}
@Override
public void onCompleted() {
List<Observer<T2>> list = null;
synchronized (ResultManager.this) {
rightDone = true;
if (leftDone) {
list = new ArrayList<Observer<T2>>(leftMap().values());
leftMap().clear();
rightMap.clear();
}
}
complete(list);
}
@Override
public void onError(Throwable e) {
errorAll(e);
}
}
/** Observe left duration and apply termination. */
final class LeftDurationObserver extends Subscriber<D1> {
final int id;
boolean once = true;
public LeftDurationObserver(int id) {
this.id = id;
}
@Override
public void onCompleted() {
if (once) {
once = false;
Observer<T2> gr;
synchronized (ResultManager.this) {
gr = leftMap().remove(id);
}
if (gr != null) {
gr.onCompleted();
}
group.remove(this);
}
}
@Override
public void onError(Throwable e) {
errorMain(e);
}
@Override
public void onNext(D1 args) {
onCompleted();
}
}
/** Observe right duration and apply termination. */
final class RightDurationObserver extends Subscriber<D2> {
final int id;
boolean once = true;
public RightDurationObserver(int id) {
this.id = id;
}
@Override
public void onCompleted() {
if (once) {
once = false;
synchronized (ResultManager.this) {
rightMap.remove(id);
}
group.remove(this);
}
}
@Override
public void onError(Throwable e) {
errorMain(e);
}
@Override
public void onNext(D2 args) {
onCompleted();
}
}
}
/**
* The reference-counted window observable.
* Subscribes to the underlying Observable by using a reference-counted
* subscription.
*/
final static class WindowObservableFunc<T> implements OnSubscribe<T> {
final RefCountSubscription refCount;
final Observable<T> underlying;
public WindowObservableFunc(Observable<T> underlying, RefCountSubscription refCount) {
this.refCount = refCount;
this.underlying = underlying;
}
@Override
public void call(Subscriber<? super T> t1) {
Subscription ref = refCount.get();
WindowSubscriber wo = new WindowSubscriber(t1, ref);
wo.add(ref);
underlying.unsafeSubscribe(wo);
}
/** Observe activities on the window. */
final class WindowSubscriber extends Subscriber<T> {
final Subscriber<? super T> subscriber;
private final Subscription ref;
public WindowSubscriber(Subscriber<? super T> subscriber, Subscription ref) {
super(subscriber);
this.subscriber = subscriber;
this.ref = ref;
}
@Override
public void onNext(T args) {
subscriber.onNext(args);
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
ref.unsubscribe();
}
@Override
public void onCompleted() {
subscriber.onCompleted();
ref.unsubscribe();
}
}
}
}