Skip to content

Commit 6cd6efa

Browse files
committed
Window with Observable: fixed unsubscription and behavior
1 parent f44b2be commit 6cd6efa

File tree

4 files changed

+325
-82
lines changed

4 files changed

+325
-82
lines changed

src/main/java/rx/internal/operators/OperatorWindowWithObservable.java

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,15 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.ArrayList;
19-
import java.util.Collections;
20-
import java.util.List;
21-
import rx.Observable;
18+
import java.util.*;
19+
20+
import rx.*;
2221
import rx.Observable.Operator;
22+
import rx.Observable;
2323
import rx.Observer;
24-
import rx.Subscriber;
2524
import rx.functions.Func0;
2625
import rx.observers.SerializedSubscriber;
27-
import rx.observers.Subscribers;
26+
import rx.subscriptions.SerialSubscription;
2827

2928
/**
3029
* Creates non-overlapping windows of items where each window is terminated by
@@ -53,29 +52,20 @@ public Observable<U> call() {
5352
@Override
5453
public Subscriber<? super T> call(Subscriber<? super Observable<T>> child) {
5554

56-
Observable<? extends U> other;
57-
try {
58-
other = otherFactory.call();
59-
} catch (Throwable e) {
60-
child.onError(e);
61-
return Subscribers.empty();
62-
}
55+
SourceSubscriber<T, U> sub = new SourceSubscriber<T, U>(child, otherFactory);
6356

64-
SourceSubscriber<T> sub = new SourceSubscriber<T>(child);
65-
BoundarySubscriber<T, U> bs = new BoundarySubscriber<T, U>(child, sub);
57+
child.add(sub);
6658

6759
sub.replaceWindow();
6860

69-
other.unsafeSubscribe(bs);
70-
7161
return sub;
7262
}
7363
/** Indicate the current subject should complete and a new subject be emitted. */
7464
static final Object NEXT_SUBJECT = new Object();
7565
/** For error and completion indication. */
7666
static final NotificationLite<Object> nl = NotificationLite.instance();
7767
/** Observes the source. */
78-
static final class SourceSubscriber<T> extends Subscriber<T> {
68+
static final class SourceSubscriber<T, U> extends Subscriber<T> {
7969
final Subscriber<? super Observable<T>> child;
8070
final Object guard;
8171
/** Accessed from the serialized part. */
@@ -87,10 +77,17 @@ static final class SourceSubscriber<T> extends Subscriber<T> {
8777
/** Guarded by guard. */
8878
List<Object> queue;
8979

90-
public SourceSubscriber(Subscriber<? super Observable<T>> child) {
91-
super(child);
80+
final SerialSubscription ssub;
81+
82+
final Func0<? extends Observable<? extends U>> otherFactory;
83+
84+
public SourceSubscriber(Subscriber<? super Observable<T>> child,
85+
Func0<? extends Observable<? extends U>> otherFactory) {
9286
this.child = new SerializedSubscriber<Observable<T>>(child);
9387
this.guard = new Object();
88+
this.ssub = new SerialSubscription();
89+
this.otherFactory = otherFactory;
90+
this.add(ssub);
9491
}
9592

9693
@Override
@@ -176,6 +173,18 @@ void createNewWindow() {
176173
BufferUntilSubscriber<T> bus = BufferUntilSubscriber.create();
177174
consumer = bus;
178175
producer = bus;
176+
Observable<? extends U> other;
177+
try {
178+
other = otherFactory.call();
179+
} catch (Throwable e) {
180+
child.onError(e);
181+
unsubscribe();
182+
return;
183+
}
184+
185+
BoundarySubscriber<T, U> bs = new BoundarySubscriber<T, U>(child, this);
186+
ssub.set(bs);
187+
other.unsafeSubscribe(bs);
179188
}
180189
void emitValue(T t) {
181190
Observer<T> s = consumer;
@@ -286,9 +295,8 @@ void error(Throwable e) {
286295
}
287296
/** Observes the boundary. */
288297
static final class BoundarySubscriber<T, U> extends Subscriber<U> {
289-
final SourceSubscriber<T> sub;
290-
public BoundarySubscriber(Subscriber<?> child, SourceSubscriber<T> sub) {
291-
super(child);
298+
final SourceSubscriber<T, U> sub;
299+
public BoundarySubscriber(Subscriber<?> child, SourceSubscriber<T, U> sub) {
292300
this.sub = sub;
293301
}
294302

src/main/java/rx/internal/operators/OperatorWindowWithStartEndObservable.java

Lines changed: 46 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,14 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import java.util.ArrayList;
19-
import java.util.Iterator;
20-
import java.util.LinkedList;
21-
import java.util.List;
22-
import rx.Observable;
18+
import java.util.*;
19+
20+
import rx.*;
2321
import rx.Observable.Operator;
22+
import rx.Observable;
2423
import rx.Observer;
25-
import rx.Subscriber;
2624
import rx.functions.Func1;
27-
import rx.observers.SerializedObserver;
28-
import rx.observers.SerializedSubscriber;
25+
import rx.observers.*;
2926
import rx.subscriptions.CompositeSubscription;
3027

3128
/**
@@ -49,9 +46,12 @@ public OperatorWindowWithStartEndObservable(Observable<? extends U> windowOpenin
4946

5047
@Override
5148
public Subscriber<? super T> call(Subscriber<? super Observable<T>> child) {
52-
final SourceSubscriber sub = new SourceSubscriber(child);
49+
CompositeSubscription csub = new CompositeSubscription();
50+
child.add(csub);
51+
52+
final SourceSubscriber sub = new SourceSubscriber(child, csub);
5353

54-
Subscriber<U> open = new Subscriber<U>(child) {
54+
Subscriber<U> open = new Subscriber<U>() {
5555

5656
@Override
5757
public void onStart() {
@@ -73,7 +73,10 @@ public void onCompleted() {
7373
sub.onCompleted();
7474
}
7575
};
76-
76+
77+
csub.add(sub);
78+
csub.add(open);
79+
7780
windowOpenings.unsafeSubscribe(open);
7881

7982
return sub;
@@ -97,13 +100,11 @@ final class SourceSubscriber extends Subscriber<T> {
97100
final List<SerializedSubject<T>> chunks;
98101
/** Guarded by guard. */
99102
boolean done;
100-
public SourceSubscriber(Subscriber<? super Observable<T>> child) {
101-
super(child);
103+
public SourceSubscriber(Subscriber<? super Observable<T>> child, CompositeSubscription csub) {
102104
this.child = new SerializedSubscriber<Observable<T>>(child);
103105
this.guard = new Object();
104106
this.chunks = new LinkedList<SerializedSubject<T>>();
105-
this.csub = new CompositeSubscription();
106-
child.add(csub);
107+
this.csub = csub;
107108
}
108109

109110
@Override
@@ -127,36 +128,44 @@ public void onNext(T t) {
127128

128129
@Override
129130
public void onError(Throwable e) {
130-
List<SerializedSubject<T>> list;
131-
synchronized (guard) {
132-
if (done) {
133-
return;
131+
try {
132+
List<SerializedSubject<T>> list;
133+
synchronized (guard) {
134+
if (done) {
135+
return;
136+
}
137+
done = true;
138+
list = new ArrayList<SerializedSubject<T>>(chunks);
139+
chunks.clear();
134140
}
135-
done = true;
136-
list = new ArrayList<SerializedSubject<T>>(chunks);
137-
chunks.clear();
138-
}
139-
for (SerializedSubject<T> cs : list) {
140-
cs.consumer.onError(e);
141+
for (SerializedSubject<T> cs : list) {
142+
cs.consumer.onError(e);
143+
}
144+
child.onError(e);
145+
} finally {
146+
csub.unsubscribe();
141147
}
142-
child.onError(e);
143148
}
144149

145150
@Override
146151
public void onCompleted() {
147-
List<SerializedSubject<T>> list;
148-
synchronized (guard) {
149-
if (done) {
150-
return;
152+
try {
153+
List<SerializedSubject<T>> list;
154+
synchronized (guard) {
155+
if (done) {
156+
return;
157+
}
158+
done = true;
159+
list = new ArrayList<SerializedSubject<T>>(chunks);
160+
chunks.clear();
151161
}
152-
done = true;
153-
list = new ArrayList<SerializedSubject<T>>(chunks);
154-
chunks.clear();
155-
}
156-
for (SerializedSubject<T> cs : list) {
157-
cs.consumer.onCompleted();
162+
for (SerializedSubject<T> cs : list) {
163+
cs.consumer.onCompleted();
164+
}
165+
child.onCompleted();
166+
} finally {
167+
csub.unsubscribe();
158168
}
159-
child.onCompleted();
160169
}
161170

162171
void beginWindow(U token) {

0 commit comments

Comments
 (0)