Skip to content

Commit 828db38

Browse files
authored
1.x: create+subscribeOn avoid same-pool deadlock (#5091)
1 parent 47e6c67 commit 828db38

File tree

4 files changed

+199
-58
lines changed

4 files changed

+199
-58
lines changed

src/main/java/rx/Observable.java

Lines changed: 43 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10333,11 +10333,15 @@ static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T
1033310333
/**
1033410334
* Asynchronously subscribes Observers to this Observable on the specified {@link Scheduler}.
1033510335
* <p>
10336+
* If there is a {@link #create(Action1, rx.Emitter.BackpressureMode)} type source up in the
10337+
* chain, it is recommended to use {@code subscribeOn(scheduler, false)} instead
10338+
* to avoid same-pool deadlock because requests pile up behind a eager/blocking emitter.
10339+
* <p>
1033610340
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
1033710341
* <dl>
1033810342
* <dt><b>Backpressure:</b></dt>
10339-
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Observable}'s backpressure
10340-
* behavior.</dd>
10343+
* <dd>The operator doesn't interfere with backpressure amount which is determined by the source {@code Observable}'s backpressure
10344+
* behavior. However, the upstream is requested from the given scheduler thread.</dd>
1034110345
* <dt><b>Scheduler:</b></dt>
1034210346
* <dd>you specify which {@link Scheduler} this operator will use</dd>
1034310347
* </dl>
@@ -10349,12 +10353,48 @@ static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T
1034910353
* @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
1035010354
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
1035110355
* @see #observeOn
10356+
* @see #subscribeOn(Scheduler, boolean)
1035210357
*/
1035310358
public final Observable<T> subscribeOn(Scheduler scheduler) {
10359+
return subscribeOn(scheduler, !(this.onSubscribe instanceof OnSubscribeCreate));
10360+
}
10361+
10362+
/**
10363+
* Asynchronously subscribes Observers to this Observable on the specified {@link Scheduler} and
10364+
* optionally reroutes requests from other threads to the same {@link Scheduler} thread.
10365+
* <p>
10366+
* If there is a {@link #create(Action1, rx.Emitter.BackpressureMode)} type source up in the
10367+
* chain, it is recommended to have {@code requestOn} false to avoid same-pool deadlock
10368+
* because requests pile up behind a eager/blocking emitter.
10369+
* <p>
10370+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
10371+
* <dl>
10372+
* <dt><b>Backpressure:</b></dt>
10373+
* <dd>The operator doesn't interfere with backpressure amount which is determined by the source {@code Observable}'s backpressure
10374+
* behavior. However, the upstream is requested from the given scheduler if requestOn is true.</dd>
10375+
* <dt><b>Scheduler:</b></dt>
10376+
* <dd>you specify which {@link Scheduler} this operator will use</dd>
10377+
* </dl>
10378+
*
10379+
* @param scheduler
10380+
* the {@link Scheduler} to perform subscription actions on
10381+
* @param requestOn if true, requests are rerouted to the given Scheduler as well (strong pipelining)
10382+
* if false, requests coming from any thread are simply forwarded to
10383+
* the upstream on the same thread (weak pipelining)
10384+
* @return the source Observable modified so that its subscriptions happen on the
10385+
* specified {@link Scheduler}
10386+
* @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
10387+
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
10388+
* @see #observeOn
10389+
* @see #subscribeOn(Scheduler)
10390+
* @since 1.2.7 - experimental
10391+
*/
10392+
@Experimental
10393+
public final Observable<T> subscribeOn(Scheduler scheduler, boolean requestOn) {
1035410394
if (this instanceof ScalarSynchronousObservable) {
1035510395
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
1035610396
}
10357-
return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler));
10397+
return unsafeCreate(new OperatorSubscribeOn<T>(this, scheduler, requestOn));
1035810398
}
1035910399

1036010400
/**

src/main/java/rx/internal/operators/OperatorSubscribeOn.java

Lines changed: 69 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -31,68 +31,92 @@ public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
3131

3232
final Scheduler scheduler;
3333
final Observable<T> source;
34+
final boolean requestOn;
3435

35-
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
36+
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler, boolean requestOn) {
3637
this.scheduler = scheduler;
3738
this.source = source;
39+
this.requestOn = requestOn;
3840
}
3941

4042
@Override
4143
public void call(final Subscriber<? super T> subscriber) {
4244
final Worker inner = scheduler.createWorker();
45+
46+
SubscribeOnSubscriber<T> parent = new SubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
47+
subscriber.add(parent);
4348
subscriber.add(inner);
4449

45-
inner.schedule(new Action0() {
46-
@Override
47-
public void call() {
48-
final Thread t = Thread.currentThread();
50+
inner.schedule(parent);
51+
}
4952

50-
Subscriber<T> s = new Subscriber<T>(subscriber) {
51-
@Override
52-
public void onNext(T t) {
53-
subscriber.onNext(t);
54-
}
53+
static final class SubscribeOnSubscriber<T> extends Subscriber<T> implements Action0 {
5554

56-
@Override
57-
public void onError(Throwable e) {
58-
try {
59-
subscriber.onError(e);
60-
} finally {
61-
inner.unsubscribe();
62-
}
63-
}
55+
final Subscriber<? super T> actual;
6456

65-
@Override
66-
public void onCompleted() {
67-
try {
68-
subscriber.onCompleted();
69-
} finally {
70-
inner.unsubscribe();
71-
}
72-
}
57+
final boolean requestOn;
58+
59+
final Worker worker;
60+
61+
Observable<T> source;
62+
63+
Thread t;
64+
65+
SubscribeOnSubscriber(Subscriber<? super T> actual, boolean requestOn, Worker worker, Observable<T> source) {
66+
this.actual = actual;
67+
this.requestOn = requestOn;
68+
this.worker = worker;
69+
this.source = source;
70+
}
7371

74-
@Override
75-
public void setProducer(final Producer p) {
76-
subscriber.setProducer(new Producer() {
72+
@Override
73+
public void onNext(T t) {
74+
actual.onNext(t);
75+
}
76+
77+
@Override
78+
public void onError(Throwable e) {
79+
try {
80+
actual.onError(e);
81+
} finally {
82+
worker.unsubscribe();
83+
}
84+
}
85+
86+
@Override
87+
public void onCompleted() {
88+
try {
89+
actual.onCompleted();
90+
} finally {
91+
worker.unsubscribe();
92+
}
93+
}
94+
95+
@Override
96+
public void call() {
97+
Observable<T> src = source;
98+
source = null;
99+
t = Thread.currentThread();
100+
src.unsafeSubscribe(this);
101+
}
102+
103+
@Override
104+
public void setProducer(final Producer p) {
105+
actual.setProducer(new Producer() {
106+
@Override
107+
public void request(final long n) {
108+
if (t == Thread.currentThread() || !requestOn) {
109+
p.request(n);
110+
} else {
111+
worker.schedule(new Action0() {
77112
@Override
78-
public void request(final long n) {
79-
if (t == Thread.currentThread()) {
80-
p.request(n);
81-
} else {
82-
inner.schedule(new Action0() {
83-
@Override
84-
public void call() {
85-
p.request(n);
86-
}
87-
});
88-
}
113+
public void call() {
114+
p.request(n);
89115
}
90116
});
91117
}
92-
};
93-
94-
source.unsafeSubscribe(s);
95-
}
96-
});
118+
}
119+
});
120+
}
97121
}
98122
}

src/test/java/rx/internal/operators/OperatorGroupByTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2017,11 +2017,11 @@ public Map<Integer, Object> call(Action1<Integer> t) {
20172017
throw exception;
20182018
}};
20192019
}
2020-
2020+
20212021
@Test
20222022
public void outerConsumedInABoundedManner() {
20232023
final int[] counter = { 0 };
2024-
2024+
20252025
Observable.range(1, 10000)
20262026
.doOnRequest(new Action1<Long>() {
20272027
@Override

src/test/java/rx/internal/operators/OperatorSubscribeOnTest.java

Lines changed: 85 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,12 @@
2626

2727
import org.junit.Test;
2828

29-
import rx.Observable;
29+
import rx.*;
3030
import rx.Observable.OnSubscribe;
3131
import rx.Observable.Operator;
32-
import rx.Observer;
33-
import rx.Producer;
34-
import rx.Scheduler;
35-
import rx.Subscriber;
36-
import rx.Subscription;
37-
import rx.functions.Action0;
38-
import rx.observers.TestSubscriber;
32+
import rx.functions.*;
33+
import rx.internal.util.*;
34+
import rx.observers.*;
3935
import rx.schedulers.Schedulers;
4036

4137
public class OperatorSubscribeOnTest {
@@ -267,4 +263,85 @@ public void onNext(Integer t) {
267263
ts.assertNoErrors();
268264
}
269265

266+
@Test
267+
public void noSamepoolDeadlock() {
268+
final int n = 4 * RxRingBuffer.SIZE;
269+
270+
Observable.create(new Action1<Emitter<Object>>() {
271+
@Override
272+
public void call(Emitter<Object> e) {
273+
for (int i = 0; i < n; i++) {
274+
e.onNext(i);
275+
try {
276+
Thread.sleep(1);
277+
} catch (InterruptedException e1) {
278+
e1.printStackTrace();
279+
}
280+
}
281+
e.onCompleted();
282+
}
283+
}, Emitter.BackpressureMode.DROP)
284+
.map(UtilityFunctions.identity())
285+
.subscribeOn(Schedulers.io(), false)
286+
.observeOn(Schedulers.computation())
287+
.test()
288+
.awaitTerminalEvent(5, TimeUnit.SECONDS)
289+
.assertValueCount(n)
290+
.assertNoErrors()
291+
.assertCompleted();
292+
}
293+
294+
@Test
295+
public void noSamepoolDeadlockRequestOn() {
296+
final int n = 4 * RxRingBuffer.SIZE;
297+
298+
Observable.create(new Action1<Emitter<Object>>() {
299+
@Override
300+
public void call(Emitter<Object> e) {
301+
for (int i = 0; i < n; i++) {
302+
e.onNext(i);
303+
try {
304+
Thread.sleep(1);
305+
} catch (InterruptedException e1) {
306+
e1.printStackTrace();
307+
}
308+
}
309+
e.onCompleted();
310+
}
311+
}, Emitter.BackpressureMode.DROP)
312+
.subscribeOn(Schedulers.io())
313+
.observeOn(Schedulers.computation())
314+
.test()
315+
.awaitTerminalEvent(5, TimeUnit.SECONDS)
316+
.assertValueCount(n)
317+
.assertNoErrors()
318+
.assertCompleted();
319+
}
320+
321+
@Test
322+
public void noSamepoolDeadlockRequestOn2() {
323+
final int n = 4 * RxRingBuffer.SIZE;
324+
325+
Observable.create(new Action1<Emitter<Object>>() {
326+
@Override
327+
public void call(Emitter<Object> e) {
328+
for (int i = 0; i < n; i++) {
329+
e.onNext(i);
330+
try {
331+
Thread.sleep(1);
332+
} catch (InterruptedException e1) {
333+
e1.printStackTrace();
334+
}
335+
}
336+
e.onCompleted();
337+
}
338+
}, Emitter.BackpressureMode.DROP)
339+
.subscribeOn(Schedulers.io(), true)
340+
.observeOn(Schedulers.computation())
341+
.test()
342+
.awaitTerminalEvent(5, TimeUnit.SECONDS)
343+
.assertValueCount(RxRingBuffer.SIZE)
344+
.assertNoErrors()
345+
.assertCompleted();
346+
}
270347
}

0 commit comments

Comments
 (0)