Skip to content

Commit 77a4f63

Browse files
Fix Thread Safety for Unsubscribe
I'm using a separate variable so the volatile is only touching during window start/end and unsubscribe rather than every onNext.
1 parent 399cbf9 commit 77a4f63

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

src/main/java/rx/internal/operators/OperatorWindowWithSize.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ final class ExactSubscriber extends Subscriber<T> {
6161
final Subscriber<? super Observable<T>> child;
6262
int count;
6363
BufferUntilSubscriber<T> window;
64-
Subscription parentSubscription = this;
64+
volatile boolean noWindow = true;
65+
final Subscription parentSubscription = this;
6566
public ExactSubscriber(Subscriber<? super Observable<T>> child) {
6667
/**
6768
* See https://github.com/ReactiveX/RxJava/issues/1546
@@ -77,7 +78,7 @@ public ExactSubscriber(Subscriber<? super Observable<T>> child) {
7778
@Override
7879
public void call() {
7980
// if no window we unsubscribe up otherwise wait until window ends
80-
if(window == null) {
81+
if(noWindow) {
8182
parentSubscription.unsubscribe();
8283
}
8384
}
@@ -94,13 +95,15 @@ public void onStart() {
9495
@Override
9596
public void onNext(T t) {
9697
if (window == null) {
98+
noWindow = false;
9799
window = BufferUntilSubscriber.create();
98100
child.onNext(window);
99101
}
100102
window.onNext(t);
101103
if (++count % size == 0) {
102104
window.onCompleted();
103105
window = null;
106+
noWindow = true;
104107
if (child.isUnsubscribed()) {
105108
parentSubscription.unsubscribe();
106109
return;
@@ -130,7 +133,7 @@ final class InexactSubscriber extends Subscriber<T> {
130133
final Subscriber<? super Observable<T>> child;
131134
int count;
132135
final List<CountedSubject<T>> chunks = new LinkedList<CountedSubject<T>>();
133-
Subscription parentSubscription = this;
136+
final Subscription parentSubscription = this;
134137

135138
public InexactSubscriber(Subscriber<? super Observable<T>> child) {
136139
/**

0 commit comments

Comments
 (0)