Skip to content

Commit 85e0ea5

Browse files
authored
2.x: BehaviorProcessor & Subject terminate-subscribe race (#5281)
* 2.x: BehaviorProcessor & Subject terminate-subscribe race * Use assertFailure.
1 parent 8a78c74 commit 85e0ea5

File tree

4 files changed

+131
-20
lines changed

4 files changed

+131
-20
lines changed

src/main/java/io/reactivex/processors/BehaviorProcessor.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public final class BehaviorProcessor<T> extends FlowableProcessor<T> {
8787

8888
final AtomicReference<Object> value;
8989

90-
boolean done;
90+
final AtomicReference<Throwable> terminalEvent;
9191

9292
long index;
9393

@@ -131,6 +131,7 @@ public static <T> BehaviorProcessor<T> createDefault(T defaultValue) {
131131
this.readLock = lock.readLock();
132132
this.writeLock = lock.writeLock();
133133
this.subscribers = new AtomicReference<BehaviorSubscription<T>[]>(EMPTY);
134+
this.terminalEvent = new AtomicReference<Throwable>();
134135
}
135136

136137
/**
@@ -155,18 +156,18 @@ protected void subscribeActual(Subscriber<? super T> s) {
155156
bs.emitFirst();
156157
}
157158
} else {
158-
Object o = value.get();
159-
if (NotificationLite.isComplete(o)) {
159+
Throwable ex = terminalEvent.get();
160+
if (ex == ExceptionHelper.TERMINATED) {
160161
s.onComplete();
161162
} else {
162-
s.onError(NotificationLite.getError(o));
163+
s.onError(ex);
163164
}
164165
}
165166
}
166167

167168
@Override
168169
public void onSubscribe(Subscription s) {
169-
if (done) {
170+
if (terminalEvent.get() != null) {
170171
s.cancel();
171172
return;
172173
}
@@ -179,7 +180,7 @@ public void onNext(T t) {
179180
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
180181
return;
181182
}
182-
if (done) {
183+
if (terminalEvent.get() != null) {
183184
return;
184185
}
185186
Object o = NotificationLite.next(t);
@@ -194,11 +195,10 @@ public void onError(Throwable t) {
194195
if (t == null) {
195196
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
196197
}
197-
if (done) {
198+
if (!terminalEvent.compareAndSet(null, t)) {
198199
RxJavaPlugins.onError(t);
199200
return;
200201
}
201-
done = true;
202202
Object o = NotificationLite.error(t);
203203
for (BehaviorSubscription<T> bs : terminate(o)) {
204204
bs.emitNext(o, index);
@@ -207,10 +207,9 @@ public void onError(Throwable t) {
207207

208208
@Override
209209
public void onComplete() {
210-
if (done) {
210+
if (!terminalEvent.compareAndSet(null, ExceptionHelper.TERMINATED)) {
211211
return;
212212
}
213-
done = true;
214213
Object o = NotificationLite.complete();
215214
for (BehaviorSubscription<T> bs : terminate(o)) {
216215
bs.emitNext(o, index); // relaxed read okay since this is the only mutator thread

src/main/java/io/reactivex/subjects/BehaviorSubject.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public final class BehaviorSubject<T> extends Subject<T> {
8686
final Lock readLock;
8787
final Lock writeLock;
8888

89-
boolean done;
89+
final AtomicReference<Throwable> terminalEvent;
9090

9191
long index;
9292

@@ -129,6 +129,7 @@ public static <T> BehaviorSubject<T> createDefault(T defaultValue) {
129129
this.writeLock = lock.writeLock();
130130
this.subscribers = new AtomicReference<BehaviorDisposable<T>[]>(EMPTY);
131131
this.value = new AtomicReference<Object>();
132+
this.terminalEvent = new AtomicReference<Throwable>();
132133
}
133134

134135
/**
@@ -153,18 +154,18 @@ protected void subscribeActual(Observer<? super T> observer) {
153154
bs.emitFirst();
154155
}
155156
} else {
156-
Object o = value.get();
157-
if (NotificationLite.isComplete(o)) {
157+
Throwable ex = terminalEvent.get();
158+
if (ex == ExceptionHelper.TERMINATED) {
158159
observer.onComplete();
159160
} else {
160-
observer.onError(NotificationLite.getError(o));
161+
observer.onError(ex);
161162
}
162163
}
163164
}
164165

165166
@Override
166167
public void onSubscribe(Disposable s) {
167-
if (done) {
168+
if (terminalEvent.get() != null) {
168169
s.dispose();
169170
}
170171
}
@@ -175,7 +176,7 @@ public void onNext(T t) {
175176
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
176177
return;
177178
}
178-
if (done) {
179+
if (terminalEvent.get() != null) {
179180
return;
180181
}
181182
Object o = NotificationLite.next(t);
@@ -190,11 +191,10 @@ public void onError(Throwable t) {
190191
if (t == null) {
191192
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
192193
}
193-
if (done) {
194+
if (!terminalEvent.compareAndSet(null, t)) {
194195
RxJavaPlugins.onError(t);
195196
return;
196197
}
197-
done = true;
198198
Object o = NotificationLite.error(t);
199199
for (BehaviorDisposable<T> bs : terminate(o)) {
200200
bs.emitNext(o, index);
@@ -203,10 +203,9 @@ public void onError(Throwable t) {
203203

204204
@Override
205205
public void onComplete() {
206-
if (done) {
206+
if (!terminalEvent.compareAndSet(null, ExceptionHelper.TERMINATED)) {
207207
return;
208208
}
209-
done = true;
210209
Object o = NotificationLite.complete();
211210
for (BehaviorDisposable<T> bs : terminate(o)) {
212211
bs.emitNext(o, index); // relaxed read okay since this is the only mutator thread

src/test/java/io/reactivex/processors/BehaviorProcessorTest.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -756,4 +756,60 @@ public void run() {
756756
.awaitDone(5, TimeUnit.SECONDS)
757757
.assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
758758
}
759+
760+
@Test
761+
public void completeSubscribeRace() throws Exception {
762+
for (int i = 0; i < 1000; i++) {
763+
final BehaviorProcessor<Object> p = BehaviorProcessor.create();
764+
765+
final TestSubscriber<Object> ts = new TestSubscriber<Object>();
766+
767+
Runnable r1 = new Runnable() {
768+
@Override
769+
public void run() {
770+
p.subscribe(ts);
771+
}
772+
};
773+
774+
Runnable r2 = new Runnable() {
775+
@Override
776+
public void run() {
777+
p.onComplete();
778+
}
779+
};
780+
781+
TestHelper.race(r1, r2);
782+
783+
ts.assertResult();
784+
}
785+
}
786+
787+
@Test
788+
public void errorSubscribeRace() throws Exception {
789+
for (int i = 0; i < 1000; i++) {
790+
final BehaviorProcessor<Object> p = BehaviorProcessor.create();
791+
792+
final TestSubscriber<Object> ts = new TestSubscriber<Object>();
793+
794+
final TestException ex = new TestException();
795+
796+
Runnable r1 = new Runnable() {
797+
@Override
798+
public void run() {
799+
p.subscribe(ts);
800+
}
801+
};
802+
803+
Runnable r2 = new Runnable() {
804+
@Override
805+
public void run() {
806+
p.onError(ex);
807+
}
808+
};
809+
810+
TestHelper.race(r1, r2);
811+
812+
ts.assertFailure(TestException.class);
813+
}
814+
}
759815
}

src/test/java/io/reactivex/subjects/BehaviorSubjectTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,4 +769,61 @@ public void onComplete() {
769769
}
770770
});
771771
}
772+
773+
774+
@Test
775+
public void completeSubscribeRace() throws Exception {
776+
for (int i = 0; i < 1000; i++) {
777+
final BehaviorSubject<Object> p = BehaviorSubject.create();
778+
779+
final TestObserver<Object> ts = new TestObserver<Object>();
780+
781+
Runnable r1 = new Runnable() {
782+
@Override
783+
public void run() {
784+
p.subscribe(ts);
785+
}
786+
};
787+
788+
Runnable r2 = new Runnable() {
789+
@Override
790+
public void run() {
791+
p.onComplete();
792+
}
793+
};
794+
795+
TestHelper.race(r1, r2);
796+
797+
ts.assertResult();
798+
}
799+
}
800+
801+
@Test
802+
public void errorSubscribeRace() throws Exception {
803+
for (int i = 0; i < 1000; i++) {
804+
final BehaviorSubject<Object> p = BehaviorSubject.create();
805+
806+
final TestObserver<Object> ts = new TestObserver<Object>();
807+
808+
final TestException ex = new TestException();
809+
810+
Runnable r1 = new Runnable() {
811+
@Override
812+
public void run() {
813+
p.subscribe(ts);
814+
}
815+
};
816+
817+
Runnable r2 = new Runnable() {
818+
@Override
819+
public void run() {
820+
p.onError(ex);
821+
}
822+
};
823+
824+
TestHelper.race(r1, r2);
825+
826+
ts.assertFailure(TestException.class);
827+
}
828+
}
772829
}

0 commit comments

Comments
 (0)