Skip to content

Commit 71b4e7a

Browse files
Fixes Early Unsubscribe for Overlapping Window
- #1546 - This also fixes the fact that the overlapping window overload was not propagating unsubscribe before. A new unit test caught that.
1 parent f0ff512 commit 71b4e7a

File tree

2 files changed

+77
-5
lines changed

2 files changed

+77
-5
lines changed

src/main/java/rx/internal/operators/OperatorWindowWithSize.java

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,35 @@ public void onCompleted() {
124124
child.onCompleted();
125125
}
126126
}
127+
127128
/** Subscriber with inexact, possibly overlapping or skipping windows. */
128129
final class InexactSubscriber extends Subscriber<T> {
129130
final Subscriber<? super Observable<T>> child;
130131
int count;
131-
final List<CountedSubject<T>> chunks;
132+
final List<CountedSubject<T>> chunks = new LinkedList<CountedSubject<T>>();
133+
Subscription parentSubscription = this;
134+
132135
public InexactSubscriber(Subscriber<? super Observable<T>> child) {
136+
/**
137+
* See https://github.com/ReactiveX/RxJava/issues/1546
138+
* We cannot compose through a Subscription because unsubscribing
139+
* applies to the outer, not the inner.
140+
*/
133141
this.child = child;
134-
this.chunks = new LinkedList<CountedSubject<T>>();
142+
/*
143+
* Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
144+
*/
145+
child.add(Subscriptions.create(new Action0() {
146+
147+
@Override
148+
public void call() {
149+
// if no window we unsubscribe up otherwise wait until window ends
150+
if (chunks == null || chunks.size() == 0) {
151+
parentSubscription.unsubscribe();
152+
}
153+
}
154+
155+
}));
135156
}
136157

137158
@Override
@@ -143,10 +164,13 @@ public void onStart() {
143164
@Override
144165
public void onNext(T t) {
145166
if (count++ % skip == 0) {
146-
CountedSubject<T> cs = createCountedSubject();
147-
chunks.add(cs);
148-
child.onNext(cs.producer);
167+
if (!child.isUnsubscribed()) {
168+
CountedSubject<T> cs = createCountedSubject();
169+
chunks.add(cs);
170+
child.onNext(cs.producer);
171+
}
149172
}
173+
150174
Iterator<CountedSubject<T>> it = chunks.iterator();
151175
while (it.hasNext()) {
152176
CountedSubject<T> cs = it.next();
@@ -156,6 +180,10 @@ public void onNext(T t) {
156180
cs.consumer.onCompleted();
157181
}
158182
}
183+
if (chunks.size() == 0 && child.isUnsubscribed()) {
184+
parentSubscription.unsubscribe();
185+
return;
186+
}
159187
}
160188

161189
@Override
@@ -177,6 +205,7 @@ public void onCompleted() {
177205
}
178206
child.onCompleted();
179207
}
208+
180209
CountedSubject<T> createCountedSubject() {
181210
final BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
182211
return new CountedSubject<T>(bus, bus);

src/test/java/rx/internal/operators/OperatorWindowWithSizeTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,49 @@ public void call(Integer t1) {
148148
assertTrue(count.get() < 100000);
149149
}
150150

151+
@Test
152+
public void testWindowUnsubscribeOverlapping() {
153+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
154+
final AtomicInteger count = new AtomicInteger();
155+
Observable.merge(Observable.range(1, 10000).doOnNext(new Action1<Integer>() {
156+
157+
@Override
158+
public void call(Integer t1) {
159+
count.incrementAndGet();
160+
}
161+
162+
}).window(5, 4).take(2)).subscribe(ts);
163+
ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
164+
ts.assertTerminalEvent();
165+
// System.out.println(ts.getOnNextEvents());
166+
ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 5, 6, 7, 8, 9));
167+
assertEquals(9, count.get());
168+
}
169+
170+
@Test
171+
public void testWindowUnsubscribeOverlappingAsyncSource() {
172+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
173+
final AtomicInteger count = new AtomicInteger();
174+
Observable.merge(Observable.range(1, 100000)
175+
.doOnNext(new Action1<Integer>() {
176+
177+
@Override
178+
public void call(Integer t1) {
179+
count.incrementAndGet();
180+
}
181+
182+
})
183+
.observeOn(Schedulers.computation())
184+
.window(5, 4)
185+
.take(2))
186+
.subscribe(ts);
187+
ts.awaitTerminalEvent(500, TimeUnit.MILLISECONDS);
188+
ts.assertTerminalEvent();
189+
ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 5, 6, 7, 8, 9));
190+
// make sure we don't emit all values ... the unsubscribe should propagate
191+
assertTrue(count.get() < 100000);
192+
}
193+
151194
private List<String> list(String... args) {
152195
List<String> list = new ArrayList<String>();
153196
for (String arg : args) {

0 commit comments

Comments
 (0)