src/main/java/rx/internal/operators/OnSubscribeJoin.java
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.internal.operators;
import java.util.*;
import rx.*;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.exceptions.Exceptions;
import rx.functions.*;
import rx.observers.SerializedSubscriber;
import rx.subscriptions.*;
/**
* Correlates the elements of two sequences based on overlapping durations.
*
* @param <TLeft> the left value type
* @param <TRight> the right value type
* @param <TLeftDuration> the left duration value type
* @param <TRightDuration> the right duration type
* @param <R> the result type
*/
public final class OnSubscribeJoin<TLeft, TRight, TLeftDuration, TRightDuration, R> implements OnSubscribe<R> {
final Observable<TLeft> left;
final Observable<TRight> right;
final Func1<TLeft, Observable<TLeftDuration>> leftDurationSelector;
final Func1<TRight, Observable<TRightDuration>> rightDurationSelector;
final Func2<TLeft, TRight, R> resultSelector;
public OnSubscribeJoin(
Observable<TLeft> left,
Observable<TRight> right,
Func1<TLeft, Observable<TLeftDuration>> leftDurationSelector,
Func1<TRight, Observable<TRightDuration>> rightDurationSelector,
Func2<TLeft, TRight, R> resultSelector) {
this.left = left;
this.right = right;
this.leftDurationSelector = leftDurationSelector;
this.rightDurationSelector = rightDurationSelector;
this.resultSelector = resultSelector;
}
@Override
public void call(Subscriber<? super R> t1) {
ResultSink result = new ResultSink(new SerializedSubscriber<R>(t1));
result.run();
}
/** Manage the left and right sources. */
final class ResultSink extends HashMap<Integer,TLeft> {
//HashMap aspect of `this` refers to the `leftMap`
private static final long serialVersionUID = 3491669543549085380L;
final CompositeSubscription group;
final Subscriber<? super R> subscriber;
/** Guarded by this. */
boolean leftDone;
/** Guarded by this. */
int leftId;
/** Guarded by this. */
boolean rightDone;
/** Guarded by this. */
int rightId;
/** Guarded by this. */
final Map<Integer, TRight> rightMap;
public ResultSink(Subscriber<? super R> subscriber) {
super();
this.subscriber = subscriber;
this.group = new CompositeSubscription();
//`leftMap` is `this`
this.rightMap = new HashMap<Integer, TRight>();
}
HashMap<Integer, TLeft> leftMap() {
return this;
}
public void run() {
subscriber.add(group);
Subscriber<TLeft> s1 = new LeftSubscriber();
Subscriber<TRight> s2 = new RightSubscriber();
group.add(s1);
group.add(s2);
left.unsafeSubscribe(s1);
right.unsafeSubscribe(s2);
}
/** Observes the left values. */
final class LeftSubscriber extends Subscriber<TLeft> {
protected void expire(int id, Subscription resource) {
boolean complete = false;
synchronized (ResultSink.this) {
if (leftMap().remove(id) != null && leftMap().isEmpty() && leftDone) {
complete = true;
}
}
if (complete) {
subscriber.onCompleted();
subscriber.unsubscribe();
} else {
group.remove(resource);
}
}
@Override
public void onNext(TLeft args) {
int id;
int highRightId;
synchronized (ResultSink.this) {
id = leftId++;
leftMap().put(id, args);
highRightId = rightId;
}
Observable<TLeftDuration> duration;
try {
duration = leftDurationSelector.call(args);
Subscriber<TLeftDuration> d1 = new LeftDurationSubscriber(id);
group.add(d1);
duration.unsafeSubscribe(d1);
List<TRight> rightValues = new ArrayList<TRight>();
synchronized (ResultSink.this) {
for (Map.Entry<Integer, TRight> entry : rightMap.entrySet()) {
if (entry.getKey() < highRightId) {
rightValues.add(entry.getValue());
}
}
}
for (TRight r : rightValues) {
R result = resultSelector.call(args, r);
subscriber.onNext(result);
}
} catch (Throwable t) {
Exceptions.throwOrReport(t, this);
}
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
subscriber.unsubscribe();
}
@Override
public void onCompleted() {
boolean complete = false;
synchronized (ResultSink.this) {
leftDone = true;
if (rightDone || leftMap().isEmpty()) {
complete = true;
}
}
if (complete) {
subscriber.onCompleted();
subscriber.unsubscribe();
} else {
group.remove(this);
}
}
/** Observes the left duration. */
final class LeftDurationSubscriber extends Subscriber<TLeftDuration> {
final int id;
boolean once = true;
public LeftDurationSubscriber(int id) {
this.id = id;
}
@Override
public void onNext(TLeftDuration args) {
onCompleted();
}
@Override
public void onError(Throwable e) {
LeftSubscriber.this.onError(e);
}
@Override
public void onCompleted() {
if (once) {
once = false;
expire(id, this);
}
}
}
}
/** Observes the right values. */
final class RightSubscriber extends Subscriber<TRight> {
void expire(int id, Subscription resource) {
boolean complete = false;
synchronized (ResultSink.this) {
if (rightMap.remove(id) != null && rightMap.isEmpty() && rightDone) {
complete = true;
}
}
if (complete) {
subscriber.onCompleted();
subscriber.unsubscribe();
} else {
group.remove(resource);
}
}
@Override
public void onNext(TRight args) {
int id;
int highLeftId;
synchronized (ResultSink.this) {
id = rightId++;
rightMap.put(id, args);
highLeftId = leftId;
}
SerialSubscription md = new SerialSubscription();
group.add(md);
Observable<TRightDuration> duration;
try {
duration = rightDurationSelector.call(args);
Subscriber<TRightDuration> d2 = new RightDurationSubscriber(id);
group.add(d2);
duration.unsafeSubscribe(d2);
List<TLeft> leftValues = new ArrayList<TLeft>();
synchronized (ResultSink.this) {
for (Map.Entry<Integer, TLeft> entry : leftMap().entrySet()) {
if (entry.getKey() < highLeftId) {
leftValues.add(entry.getValue());
}
}
}
for (TLeft lv : leftValues) {
R result = resultSelector.call(lv, args);
subscriber.onNext(result);
}
} catch (Throwable t) {
Exceptions.throwOrReport(t, this);
}
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
subscriber.unsubscribe();
}
@Override
public void onCompleted() {
boolean complete = false;
synchronized (ResultSink.this) {
rightDone = true;
if (leftDone || rightMap.isEmpty()) {
complete = true;
}
}
if (complete) {
subscriber.onCompleted();
subscriber.unsubscribe();
} else {
group.remove(this);
}
}
/** Observe the right duration. */
final class RightDurationSubscriber extends Subscriber<TRightDuration> {
final int id;
boolean once = true;
public RightDurationSubscriber(int id) {
this.id = id;
}
@Override
public void onNext(TRightDuration args) {
onCompleted();
}
@Override
public void onError(Throwable e) {
RightSubscriber.this.onError(e);
}
@Override
public void onCompleted() {
if (once) {
once = false;
expire(id, this);
}
}
}
}
}
}