Skip to content

Commit f0ff512

Browse files
Fixes Early Unsubscribe for Non-Overlapping Window
#1546
1 parent 2b727b0 commit f0ff512

File tree

1 file changed

+40
-18
lines changed

1 file changed

+40
-18
lines changed

src/main/java/rx/internal/operators/OperatorWindowWithSize.java

Lines changed: 40 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@
1919
import java.util.Iterator;
2020
import java.util.LinkedList;
2121
import java.util.List;
22+
2223
import rx.Observable;
2324
import rx.Observable.Operator;
25+
import rx.Subscription;
26+
import rx.functions.Action0;
27+
import rx.subscriptions.Subscriptions;
2428
import rx.Observer;
2529
import rx.Subscriber;
2630

@@ -56,11 +60,29 @@ public Subscriber<? super T> call(Subscriber<? super Observable<T>> child) {
5660
final class ExactSubscriber extends Subscriber<T> {
5761
final Subscriber<? super Observable<T>> child;
5862
int count;
59-
Observer<T> consumer;
60-
Observable<T> producer;
63+
BufferUntilSubscriber<T> window;
64+
Subscription parentSubscription = this;
6165
public ExactSubscriber(Subscriber<? super Observable<T>> child) {
62-
super(child);
66+
/**
67+
* See https://github.com/ReactiveX/RxJava/issues/1546
68+
* We cannot compose through a Subscription because unsubscribing
69+
* applies to the outer, not the inner.
70+
*/
6371
this.child = child;
72+
/*
73+
* Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
74+
*/
75+
child.add(Subscriptions.create(new Action0() {
76+
77+
@Override
78+
public void call() {
79+
// if no window we unsubscribe up otherwise wait until window ends
80+
if(window == null) {
81+
parentSubscription.unsubscribe();
82+
}
83+
}
84+
85+
}));
6486
}
6587

6688
@Override
@@ -71,36 +93,36 @@ public void onStart() {
7193

7294
@Override
7395
public void onNext(T t) {
74-
if (count++ % size == 0) {
75-
if (consumer != null) {
76-
consumer.onCompleted();
96+
if (window == null) {
97+
window = BufferUntilSubscriber.create();
98+
child.onNext(window);
99+
}
100+
window.onNext(t);
101+
if (++count % size == 0) {
102+
window.onCompleted();
103+
window = null;
104+
if (child.isUnsubscribed()) {
105+
parentSubscription.unsubscribe();
106+
return;
77107
}
78-
createNewWindow();
79-
child.onNext(producer);
80108
}
81-
consumer.onNext(t);
82109
}
83110

84111
@Override
85112
public void onError(Throwable e) {
86-
if (consumer != null) {
87-
consumer.onError(e);
113+
if (window != null) {
114+
window.onError(e);
88115
}
89116
child.onError(e);
90117
}
91118

92119
@Override
93120
public void onCompleted() {
94-
if (consumer != null) {
95-
consumer.onCompleted();
121+
if (window != null) {
122+
window.onCompleted();
96123
}
97124
child.onCompleted();
98125
}
99-
void createNewWindow() {
100-
final BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
101-
consumer = bus;
102-
producer = bus;
103-
}
104126
}
105127
/** Subscriber with inexact, possibly overlapping or skipping windows. */
106128
final class InexactSubscriber extends Subscriber<T> {

0 commit comments

Comments
 (0)