Skip to content

Commit 19ceb76

Browse files
Merge pull request #3039 from akarnokd/WindowFuncFix
Window with Observable: fixed unsubscription and behavior
2 parents 89782ee + 39d3bd9 commit 19ceb76

File tree

6 files changed

+624
-89
lines changed

6 files changed

+624
-89
lines changed

src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9059,7 +9059,7 @@ public final <U, R> Observable<R> withLatestFrom(Observable<? extends U> other,
90599059
* @see <a href="http://reactivex.io/documentation/operators/window.html">ReactiveX operators documentation: Window</a>
90609060
*/
90619061
public final <TClosing> Observable<Observable<T>> window(Func0<? extends Observable<? extends TClosing>> closingSelector) {
9062-
return lift(new OperatorWindowWithObservable<T, TClosing>(closingSelector));
9062+
return lift(new OperatorWindowWithObservableFactory<T, TClosing>(closingSelector));
90639063
}
90649064

90659065
/**

src/main/java/rx/internal/operators/OperatorWindowWithObservable.java

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,13 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.ArrayList;
19-
import java.util.Collections;
20-
import java.util.List;
21-
import rx.Observable;
18+
import java.util.*;
19+
20+
import rx.*;
2221
import rx.Observable.Operator;
22+
import rx.Observable;
2323
import rx.Observer;
24-
import rx.Subscriber;
25-
import rx.functions.Func0;
2624
import rx.observers.SerializedSubscriber;
27-
import rx.observers.Subscribers;
2825

2926
/**
3027
* Creates non-overlapping windows of items where each window is terminated by
@@ -34,36 +31,21 @@
3431
* @param <U> the boundary value type
3532
*/
3633
public final class OperatorWindowWithObservable<T, U> implements Operator<Observable<T>, T> {
37-
final Func0<? extends Observable<? extends U>> otherFactory;
34+
final Observable<U> other;
3835

39-
public OperatorWindowWithObservable(Func0<? extends Observable<? extends U>> otherFactory) {
40-
this.otherFactory = otherFactory;
41-
}
4236
public OperatorWindowWithObservable(final Observable<U> other) {
43-
this.otherFactory = new Func0<Observable<U>>() {
44-
45-
@Override
46-
public Observable<U> call() {
47-
return other;
48-
}
49-
50-
};
37+
this.other = other;
5138
}
5239

5340
@Override
5441
public Subscriber<? super T> call(Subscriber<? super Observable<T>> child) {
5542

56-
Observable<? extends U> other;
57-
try {
58-
other = otherFactory.call();
59-
} catch (Throwable e) {
60-
child.onError(e);
61-
return Subscribers.empty();
62-
}
63-
6443
SourceSubscriber<T> sub = new SourceSubscriber<T>(child);
6544
BoundarySubscriber<T, U> bs = new BoundarySubscriber<T, U>(child, sub);
6645

46+
child.add(sub);
47+
child.add(bs);
48+
6749
sub.replaceWindow();
6850

6951
other.unsafeSubscribe(bs);
@@ -88,7 +70,6 @@ static final class SourceSubscriber<T> extends Subscriber<T> {
8870
List<Object> queue;
8971

9072
public SourceSubscriber(Subscriber<? super Observable<T>> child) {
91-
super(child);
9273
this.child = new SerializedSubscriber<Observable<T>>(child);
9374
this.guard = new Object();
9475
}
@@ -288,7 +269,6 @@ void error(Throwable e) {
288269
static final class BoundarySubscriber<T, U> extends Subscriber<U> {
289270
final SourceSubscriber<T> sub;
290271
public BoundarySubscriber(Subscriber<?> child, SourceSubscriber<T> sub) {
291-
super(child);
292272
this.sub = sub;
293273
}
294274

0 commit comments

Comments
 (0)