Skip to content

Commit 18d6f3e

Browse files
Merge pull request #1829 from benjchristensen/1546-window
Fix Window by Count Unsubscribe Behavior
2 parents fddc923 + 71b4e7a commit 18d6f3e

File tree

6 files changed

+878
-570
lines changed

6 files changed

+878
-570
lines changed

src/main/java/rx/internal/operators/OperatorWindowWithSize.java

Lines changed: 74 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,12 @@
1919
import java.util.Iterator;
2020
import java.util.LinkedList;
2121
import java.util.List;
22+
2223
import rx.Observable;
2324
import rx.Observable.Operator;
25+
import rx.Subscription;
26+
import rx.functions.Action0;
27+
import rx.subscriptions.Subscriptions;
2428
import rx.Observer;
2529
import rx.Subscriber;
2630

@@ -56,11 +60,29 @@ public Subscriber<? super T> call(Subscriber<? super Observable<T>> child) {
5660
final class ExactSubscriber extends Subscriber<T> {
5761
final Subscriber<? super Observable<T>> child;
5862
int count;
59-
Observer<T> consumer;
60-
Observable<T> producer;
63+
BufferUntilSubscriber<T> window;
64+
Subscription parentSubscription = this;
6165
public ExactSubscriber(Subscriber<? super Observable<T>> child) {
62-
super(child);
66+
/**
67+
* See https://github.com/ReactiveX/RxJava/issues/1546
68+
* We cannot compose through a Subscription because unsubscribing
69+
* applies to the outer, not the inner.
70+
*/
6371
this.child = child;
72+
/*
73+
* Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
74+
*/
75+
child.add(Subscriptions.create(new Action0() {
76+
77+
@Override
78+
public void call() {
79+
// if no window we unsubscribe up otherwise wait until window ends
80+
if(window == null) {
81+
parentSubscription.unsubscribe();
82+
}
83+
}
84+
85+
}));
6486
}
6587

6688
@Override
@@ -71,45 +93,66 @@ public void onStart() {
7193

7294
@Override
7395
public void onNext(T t) {
74-
if (count++ % size == 0) {
75-
if (consumer != null) {
76-
consumer.onCompleted();
96+
if (window == null) {
97+
window = BufferUntilSubscriber.create();
98+
child.onNext(window);
99+
}
100+
window.onNext(t);
101+
if (++count % size == 0) {
102+
window.onCompleted();
103+
window = null;
104+
if (child.isUnsubscribed()) {
105+
parentSubscription.unsubscribe();
106+
return;
77107
}
78-
createNewWindow();
79-
child.onNext(producer);
80108
}
81-
consumer.onNext(t);
82109
}
83110

84111
@Override
85112
public void onError(Throwable e) {
86-
if (consumer != null) {
87-
consumer.onError(e);
113+
if (window != null) {
114+
window.onError(e);
88115
}
89116
child.onError(e);
90117
}
91118

92119
@Override
93120
public void onCompleted() {
94-
if (consumer != null) {
95-
consumer.onCompleted();
121+
if (window != null) {
122+
window.onCompleted();
96123
}
97124
child.onCompleted();
98125
}
99-
void createNewWindow() {
100-
final BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
101-
consumer = bus;
102-
producer = bus;
103-
}
104126
}
127+
105128
/** Subscriber with inexact, possibly overlapping or skipping windows. */
106129
final class InexactSubscriber extends Subscriber<T> {
107130
final Subscriber<? super Observable<T>> child;
108131
int count;
109-
final List<CountedSubject<T>> chunks;
132+
final List<CountedSubject<T>> chunks = new LinkedList<CountedSubject<T>>();
133+
Subscription parentSubscription = this;
134+
110135
public InexactSubscriber(Subscriber<? super Observable<T>> child) {
136+
/**
137+
* See https://github.com/ReactiveX/RxJava/issues/1546
138+
* We cannot compose through a Subscription because unsubscribing
139+
* applies to the outer, not the inner.
140+
*/
111141
this.child = child;
112-
this.chunks = new LinkedList<CountedSubject<T>>();
142+
/*
143+
* Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself)
144+
*/
145+
child.add(Subscriptions.create(new Action0() {
146+
147+
@Override
148+
public void call() {
149+
// if no window we unsubscribe up otherwise wait until window ends
150+
if (chunks == null || chunks.size() == 0) {
151+
parentSubscription.unsubscribe();
152+
}
153+
}
154+
155+
}));
113156
}
114157

115158
@Override
@@ -121,10 +164,13 @@ public void onStart() {
121164
@Override
122165
public void onNext(T t) {
123166
if (count++ % skip == 0) {
124-
CountedSubject<T> cs = createCountedSubject();
125-
chunks.add(cs);
126-
child.onNext(cs.producer);
167+
if (!child.isUnsubscribed()) {
168+
CountedSubject<T> cs = createCountedSubject();
169+
chunks.add(cs);
170+
child.onNext(cs.producer);
171+
}
127172
}
173+
128174
Iterator<CountedSubject<T>> it = chunks.iterator();
129175
while (it.hasNext()) {
130176
CountedSubject<T> cs = it.next();
@@ -134,6 +180,10 @@ public void onNext(T t) {
134180
cs.consumer.onCompleted();
135181
}
136182
}
183+
if (chunks.size() == 0 && child.isUnsubscribed()) {
184+
parentSubscription.unsubscribe();
185+
return;
186+
}
137187
}
138188

139189
@Override
@@ -155,6 +205,7 @@ public void onCompleted() {
155205
}
156206
child.onCompleted();
157207
}
208+
158209
CountedSubject<T> createCountedSubject() {
159210
final BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
160211
return new CountedSubject<T>(bus, bus);

0 commit comments

Comments
 (0)