Skip to content

Commit 8a62afb

Browse files
authored
2.x: add subscribeOn overload to avoid same-pool deadlock with create (#5386)
1 parent a43265f commit 8a62afb

File tree

3 files changed

+94
-4
lines changed

3 files changed

+94
-4
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13045,6 +13045,44 @@ public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
1304513045
/**
1304613046
* Asynchronously subscribes Subscribers to this Publisher on the specified {@link Scheduler}.
1304713047
* <p>
13048+
* If there is a {@link #create(FlowableOnSubscribe, BackpressureStrategy)} type source up in the
13049+
* chain, it is recommended to use {@code subscribeOn(scheduler, false)} instead
13050+
* to avoid same-pool deadlock because requests may pile up behind a eager/blocking emitter.
13051+
* <p>
13052+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
13053+
* <dl>
13054+
* <dt><b>Backpressure:</b></dt>
13055+
* <dd>The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s backpressure
13056+
* behavior.</dd>
13057+
* <dt><b>Scheduler:</b></dt>
13058+
* <dd>You specify which {@link Scheduler} this operator will use</dd>
13059+
* </dl>
13060+
*
13061+
* @param scheduler
13062+
* the {@link Scheduler} to perform subscription actions on
13063+
* @return the source Publisher modified so that its subscriptions happen on the
13064+
* specified {@link Scheduler}
13065+
* @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
13066+
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
13067+
* @see #observeOn
13068+
* @see #subscribeOn(Scheduler, boolean)
13069+
*/
13070+
@CheckReturnValue
13071+
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
13072+
@SchedulerSupport(SchedulerSupport.CUSTOM)
13073+
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
13074+
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
13075+
return subscribeOn(scheduler, !(this instanceof FlowableCreate));
13076+
}
13077+
13078+
/**
13079+
* Asynchronously subscribes Subscribers to this Publisher on the specified {@link Scheduler}
13080+
* optionally reroutes requests from other threads to the same {@link Scheduler} thread.
13081+
* <p>
13082+
* If there is a {@link #create(FlowableOnSubscribe, BackpressureStrategy)} type source up in the
13083+
* chain, it is recommended to have {@code requestOn} false to avoid same-pool deadlock
13084+
* because requests may pile up behind a eager/blocking emitter.
13085+
* <p>
1304813086
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
1304913087
* <dl>
1305013088
* <dt><b>Backpressure:</b></dt>
@@ -13056,18 +13094,23 @@ public final <E extends Subscriber<? super T>> E subscribeWith(E subscriber) {
1305613094
*
1305713095
* @param scheduler
1305813096
* the {@link Scheduler} to perform subscription actions on
13097+
* @param requestOn if true, requests are rerouted to the given Scheduler as well (strong pipelining)
13098+
* if false, requests coming from any thread are simply forwarded to
13099+
* the upstream on the same thread (weak pipelining)
1305913100
* @return the source Publisher modified so that its subscriptions happen on the
1306013101
* specified {@link Scheduler}
1306113102
* @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
1306213103
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
1306313104
* @see #observeOn
13105+
* @since 2.1.1 - experimental
1306413106
*/
1306513107
@CheckReturnValue
1306613108
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
1306713109
@SchedulerSupport(SchedulerSupport.CUSTOM)
13068-
public final Flowable<T> subscribeOn(Scheduler scheduler) {
13110+
@Experimental
13111+
public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
1306913112
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
13070-
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, this instanceof FlowableCreate));
13113+
return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
1307113114
}
1307213115

1307313116
/**

src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,13 @@ static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
6565

6666
Publisher<T> source;
6767

68-
SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source, boolean nonScheduledRequests) {
68+
SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source, boolean requestOn) {
6969
this.actual = actual;
7070
this.worker = worker;
7171
this.source = source;
7272
this.s = new AtomicReference<Subscription>();
7373
this.requested = new AtomicLong();
74-
this.nonScheduledRequests = nonScheduledRequests;
74+
this.nonScheduledRequests = !requestOn;
7575
}
7676

7777
@Override

src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,4 +372,51 @@ public void subscribe(FlowableEmitter<Object> s) throws Exception {
372372
.assertNoErrors()
373373
.assertComplete();
374374
}
375+
376+
@Test
377+
public void nonScheduledRequestsNotSubsequentSubscribeOn() {
378+
TestSubscriber<Object> ts = Flowable.create(new FlowableOnSubscribe<Object>() {
379+
@Override
380+
public void subscribe(FlowableEmitter<Object> s) throws Exception {
381+
for (int i = 1; i < 1001; i++) {
382+
s.onNext(i);
383+
Thread.sleep(1);
384+
}
385+
s.onComplete();
386+
}
387+
}, BackpressureStrategy.DROP)
388+
.map(Functions.identity())
389+
.subscribeOn(Schedulers.single(), false)
390+
.observeOn(Schedulers.computation())
391+
.test()
392+
.awaitDone(5, TimeUnit.SECONDS)
393+
.assertNoErrors()
394+
.assertComplete();
395+
396+
int c = ts.valueCount();
397+
398+
assertTrue("" + c, c > Flowable.bufferSize());
399+
}
400+
401+
@Test
402+
public void scheduledRequestsNotSubsequentSubscribeOn() {
403+
Flowable.create(new FlowableOnSubscribe<Object>() {
404+
@Override
405+
public void subscribe(FlowableEmitter<Object> s) throws Exception {
406+
for (int i = 1; i < 1001; i++) {
407+
s.onNext(i);
408+
Thread.sleep(1);
409+
}
410+
s.onComplete();
411+
}
412+
}, BackpressureStrategy.DROP)
413+
.map(Functions.identity())
414+
.subscribeOn(Schedulers.single(), true)
415+
.observeOn(Schedulers.computation())
416+
.test()
417+
.awaitDone(5, TimeUnit.SECONDS)
418+
.assertValueCount(Flowable.bufferSize())
419+
.assertNoErrors()
420+
.assertComplete();
421+
}
375422
}

0 commit comments

Comments
 (0)