Skip to content

Commit 0aa8e92

Browse files
committed
Merge pull request #2972 from akarnokd/WindowUnsubscribeToComplete
Fixed window(time) to work properly with unsubscription, added
2 parents b86bbf6 + c7e6cf9 commit 0aa8e92

File tree

5 files changed

+209
-64
lines changed

5 files changed

+209
-64
lines changed

src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9017,7 +9017,7 @@ public final Observable<Observable<T>> window(int count) {
90179017
* <img width="640" height="365" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/window4.png" alt="">
90189018
* <dl>
90199019
* <dt><b>Backpressure Support:</b></dt>
9020-
* <dd>The operator has limited backpressure support. If {@code count} == {@code skip}, the operator honors backpressure on its outer subscriber, ignores backpressure in its inner Observables
9020+
* <dd>The operator honors backpressure on its outer subscriber, ignores backpressure in its inner Observables
90219021
* but each of them will emit at most {@code count} elements.</dd>
90229022
* <dt><b>Scheduler:</b></dt>
90239023
* <dd>This version of {@code window} does not operate by default on a particular {@link Scheduler}.</dd>

src/main/java/rx/internal/operators/OperatorWindowWithSize.java

Lines changed: 43 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -48,17 +48,20 @@ public OperatorWindowWithSize(int size, int skip) {
4848
@Override
4949
public Subscriber<? super T> call(Subscriber<? super Observable<T>> child) {
5050
if (skip == size) {
51-
return new ExactSubscriber(child);
51+
ExactSubscriber e = new ExactSubscriber(child);
52+
e.init();
53+
return e;
5254
}
53-
return new InexactSubscriber(child);
55+
InexactSubscriber ie = new InexactSubscriber(child);
56+
ie.init();
57+
return ie;
5458
}
5559
/** Subscriber with exact, non-overlapping window bounds. */
5660
final class ExactSubscriber extends Subscriber<T> {
5761
final Subscriber<? super Observable<T>> child;
5862
int count;
5963
BufferUntilSubscriber<T> window;
6064
volatile boolean noWindow = true;
61-
final Subscription parentSubscription = this;
6265
public ExactSubscriber(Subscriber<? super Observable<T>> child) {
6366
/**
6467
* See https://github.com/ReactiveX/RxJava/issues/1546
@@ -69,13 +72,15 @@ public ExactSubscriber(Subscriber<? super Observable<T>> child) {
6972
/*
7073
* Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
7174
*/
75+
}
76+
void init() {
7277
child.add(Subscriptions.create(new Action0() {
7378

7479
@Override
7580
public void call() {
7681
// if no window we unsubscribe up otherwise wait until window ends
7782
if (noWindow) {
78-
parentSubscription.unsubscribe();
83+
unsubscribe();
7984
}
8085
}
8186

@@ -111,7 +116,7 @@ public void onNext(T t) {
111116
window = null;
112117
noWindow = true;
113118
if (child.isUnsubscribed()) {
114-
parentSubscription.unsubscribe();
119+
unsubscribe();
115120
return;
116121
}
117122
}
@@ -139,7 +144,7 @@ final class InexactSubscriber extends Subscriber<T> {
139144
final Subscriber<? super Observable<T>> child;
140145
int count;
141146
final List<CountedSubject<T>> chunks = new LinkedList<CountedSubject<T>>();
142-
final Subscription parentSubscription = this;
147+
volatile boolean noWindow = true;
143148

144149
public InexactSubscriber(Subscriber<? super Observable<T>> child) {
145150
/**
@@ -148,6 +153,9 @@ public InexactSubscriber(Subscriber<? super Observable<T>> child) {
148153
* applies to the outer, not the inner.
149154
*/
150155
this.child = child;
156+
}
157+
158+
void init() {
151159
/*
152160
* Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
153161
*/
@@ -156,24 +164,38 @@ public InexactSubscriber(Subscriber<? super Observable<T>> child) {
156164
@Override
157165
public void call() {
158166
// if no window we unsubscribe up otherwise wait until window ends
159-
if (chunks == null || chunks.size() == 0) {
160-
parentSubscription.unsubscribe();
167+
if (noWindow) {
168+
unsubscribe();
161169
}
162170
}
163171

164172
}));
165-
}
166-
167-
@Override
168-
public void onStart() {
169-
// no backpressure as we are controlling data flow by window size
170-
request(Long.MAX_VALUE);
173+
174+
child.setProducer(new Producer() {
175+
@Override
176+
public void request(long n) {
177+
if (n > 0) {
178+
long u = n * size;
179+
if (((u >>> 31) != 0) && (u / n != size)) {
180+
u = Long.MAX_VALUE;
181+
}
182+
requestMore(u);
183+
}
184+
}
185+
});
171186
}
172187

188+
void requestMore(long n) {
189+
request(n);
190+
}
191+
173192
@Override
174193
public void onNext(T t) {
175194
if (count++ % skip == 0) {
176195
if (!child.isUnsubscribed()) {
196+
if (chunks.isEmpty()) {
197+
noWindow = false;
198+
}
177199
CountedSubject<T> cs = createCountedSubject();
178200
chunks.add(cs);
179201
child.onNext(cs.producer);
@@ -189,16 +211,19 @@ public void onNext(T t) {
189211
cs.consumer.onCompleted();
190212
}
191213
}
192-
if (chunks.size() == 0 && child.isUnsubscribed()) {
193-
parentSubscription.unsubscribe();
194-
return;
214+
if (chunks.isEmpty()) {
215+
noWindow = true;
216+
if (child.isUnsubscribed()) {
217+
unsubscribe();
218+
}
195219
}
196220
}
197221

198222
@Override
199223
public void onError(Throwable e) {
200224
List<CountedSubject<T>> list = new ArrayList<CountedSubject<T>>(chunks);
201225
chunks.clear();
226+
noWindow = true;
202227
for (CountedSubject<T> cs : list) {
203228
cs.consumer.onError(e);
204229
}
@@ -209,6 +234,7 @@ public void onError(Throwable e) {
209234
public void onCompleted() {
210235
List<CountedSubject<T>> list = new ArrayList<CountedSubject<T>>(chunks);
211236
chunks.clear();
237+
noWindow = true;
212238
for (CountedSubject<T> cs : list) {
213239
cs.consumer.onCompleted();
214240
}

src/main/java/rx/internal/operators/OperatorWindowWithTime.java

Lines changed: 62 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,17 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.ArrayList;
19-
import java.util.Collections;
20-
import java.util.Iterator;
21-
import java.util.LinkedList;
22-
import java.util.List;
18+
import java.util.*;
2319
import java.util.concurrent.TimeUnit;
24-
import rx.Observable;
20+
21+
import rx.*;
2522
import rx.Observable.Operator;
26-
import rx.Observer;
27-
import rx.Scheduler;
2823
import rx.Scheduler.Worker;
29-
import rx.Subscriber;
24+
import rx.Observable;
25+
import rx.Observer;
3026
import rx.functions.Action0;
31-
import rx.observers.SerializedObserver;
32-
import rx.observers.SerializedSubscriber;
27+
import rx.observers.*;
28+
import rx.subscriptions.Subscriptions;
3329

3430
/**
3531
* Creates windows of values into the source sequence with timed window creation, length and size bounds.
@@ -62,15 +58,16 @@ public OperatorWindowWithTime(long timespan, long timeshift, TimeUnit unit, int
6258
@Override
6359
public Subscriber<? super T> call(Subscriber<? super Observable<T>> child) {
6460
Worker worker = scheduler.createWorker();
65-
child.add(worker);
6661

6762
if (timespan == timeshift) {
6863
ExactSubscriber s = new ExactSubscriber(child, worker);
64+
s.add(worker);
6965
s.scheduleExact();
7066
return s;
7167
}
7268

7369
InexactSubscriber s = new InexactSubscriber(child, worker);
70+
s.add(worker);
7471
s.startNewChunk();
7572
s.scheduleChunk();
7673
return s;
@@ -118,11 +115,19 @@ final class ExactSubscriber extends Subscriber<T> {
118115
volatile State<T> state;
119116

120117
public ExactSubscriber(Subscriber<? super Observable<T>> child, Worker worker) {
121-
super(child);
122118
this.child = new SerializedSubscriber<Observable<T>>(child);
123119
this.worker = worker;
124120
this.guard = new Object();
125121
this.state = State.empty();
122+
child.add(Subscriptions.create(new Action0() {
123+
@Override
124+
public void call() {
125+
// if there is no active window, unsubscribe the upstream
126+
if (state.consumer == null) {
127+
unsubscribe();
128+
}
129+
}
130+
}));
126131
}
127132

128133
@Override
@@ -132,7 +137,6 @@ public void onStart() {
132137

133138
@Override
134139
public void onNext(T t) {
135-
List<Object> localQueue;
136140
synchronized (guard) {
137141
if (emitting) {
138142
if (queue == null) {
@@ -141,29 +145,29 @@ public void onNext(T t) {
141145
queue.add(t);
142146
return;
143147
}
144-
localQueue = queue;
145-
queue = null;
146148
emitting = true;
147149
}
148-
boolean once = true;
149150
boolean skipFinal = false;
150151
try {
151-
do {
152-
drain(localQueue);
153-
if (once) {
154-
once = false;
155-
emitValue(t);
156-
}
152+
if (!emitValue(t)) {
153+
return;
154+
}
155+
156+
for (;;) {
157+
List<Object> localQueue;
157158
synchronized (guard) {
158159
localQueue = queue;
159-
queue = null;
160160
if (localQueue == null) {
161161
emitting = false;
162162
skipFinal = true;
163163
return;
164164
}
165+
queue = null;
166+
}
167+
if (!drain(localQueue)) {
168+
return;
165169
}
166-
} while (!child.isUnsubscribed());
170+
}
167171
} finally {
168172
if (!skipFinal) {
169173
synchronized (guard) {
@@ -172,13 +176,15 @@ public void onNext(T t) {
172176
}
173177
}
174178
}
175-
void drain(List<Object> queue) {
179+
boolean drain(List<Object> queue) {
176180
if (queue == null) {
177-
return;
181+
return true;
178182
}
179183
for (Object o : queue) {
180184
if (o == NEXT_SUBJECT) {
181-
replaceSubject();
185+
if (!replaceSubject()) {
186+
return false;
187+
}
182188
} else
183189
if (nl.isError(o)) {
184190
error(nl.getError(o));
@@ -190,23 +196,35 @@ void drain(List<Object> queue) {
190196
} else {
191197
@SuppressWarnings("unchecked")
192198
T t = (T)o;
193-
emitValue(t);
199+
if (!emitValue(t)) {
200+
return false;
201+
}
194202
}
195203
}
204+
return true;
196205
}
197-
void replaceSubject() {
206+
boolean replaceSubject() {
198207
Observer<T> s = state.consumer;
199208
if (s != null) {
200209
s.onCompleted();
201210
}
211+
// if child has unsubscribed, unsubscribe upstream instead of opening a new window
212+
if (child.isUnsubscribed()) {
213+
state = state.clear();
214+
unsubscribe();
215+
return false;
216+
}
202217
BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
203218
state = state.create(bus, bus);
204219
child.onNext(bus);
220+
return true;
205221
}
206-
void emitValue(T t) {
222+
boolean emitValue(T t) {
207223
State<T> s = state;
208224
if (s.consumer == null) {
209-
replaceSubject();
225+
if (!replaceSubject()) {
226+
return false;
227+
}
210228
s = state;
211229
}
212230
s.consumer.onNext(t);
@@ -217,6 +235,7 @@ void emitValue(T t) {
217235
s = s.next();
218236
}
219237
state = s;
238+
return true;
220239
}
221240

222241
@Override
@@ -285,7 +304,6 @@ public void call() {
285304
}, 0, timespan, unit);
286305
}
287306
void nextWindow() {
288-
List<Object> localQueue;
289307
synchronized (guard) {
290308
if (emitting) {
291309
if (queue == null) {
@@ -294,29 +312,29 @@ void nextWindow() {
294312
queue.add(NEXT_SUBJECT);
295313
return;
296314
}
297-
localQueue = queue;
298-
queue = null;
299315
emitting = true;
300316
}
301-
boolean once = true;
302317
boolean skipFinal = false;
303318
try {
304-
do {
305-
drain(localQueue);
306-
if (once) {
307-
once = false;
308-
replaceSubject();
309-
}
319+
if (!replaceSubject()) {
320+
return;
321+
}
322+
for (;;) {
323+
List<Object> localQueue;
310324
synchronized (guard) {
311325
localQueue = queue;
312-
queue = null;
313326
if (localQueue == null) {
314327
emitting = false;
315328
skipFinal = true;
316329
return;
317330
}
331+
queue = null;
318332
}
319-
} while (!child.isUnsubscribed());
333+
334+
if (!drain(localQueue)) {
335+
return;
336+
}
337+
}
320338
} finally {
321339
if (!skipFinal) {
322340
synchronized (guard) {

0 commit comments

Comments
 (0)