Skip to content

Commit bc06175

Browse files
authored
1.x: reduce stack depth with switchIfEmpty (#5125)
* 1.x: reduce stack depth with switchIfEmpty * Use source as the indication of first/second phase
1 parent 5468972 commit bc06175

File tree

3 files changed

+69
-14
lines changed

3 files changed

+69
-14
lines changed

src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5481,7 +5481,7 @@ public final Observable<T> switchIfEmpty(Observable<? extends T> alternate) {
54815481
if (alternate == null) {
54825482
throw new NullPointerException("alternate is null");
54835483
}
5484-
return lift(new OperatorSwitchIfEmpty<T>(alternate));
5484+
return unsafeCreate(new OnSubscribeSwitchIfEmpty<T>(this, alternate));
54855485
}
54865486

54875487
/**

src/main/java/rx/internal/operators/OperatorSwitchIfEmpty.java renamed to src/main/java/rx/internal/operators/OnSubscribeSwitchIfEmpty.java

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package rx.internal.operators;
1717

1818

19+
import java.util.concurrent.atomic.AtomicInteger;
20+
1921
import rx.*;
2022
import rx.internal.producers.ProducerArbiter;
2123
import rx.subscriptions.SerialSubscription;
@@ -26,22 +28,28 @@
2628
* empty, the results of the given Observable will be emitted.
2729
* @param <T> the value type
2830
*/
29-
public final class OperatorSwitchIfEmpty<T> implements Observable.Operator<T, T> {
30-
private final Observable<? extends T> alternate;
31+
public final class OnSubscribeSwitchIfEmpty<T> implements Observable.OnSubscribe<T> {
32+
33+
final Observable<? extends T> source;
3134

32-
public OperatorSwitchIfEmpty(Observable<? extends T> alternate) {
35+
final Observable<? extends T> alternate;
36+
37+
public OnSubscribeSwitchIfEmpty(Observable<? extends T> source, Observable<? extends T> alternate) {
38+
this.source = source;
3339
this.alternate = alternate;
3440
}
3541

3642
@Override
37-
public Subscriber<? super T> call(Subscriber<? super T> child) {
43+
public void call(Subscriber<? super T> child) {
3844
final SerialSubscription serial = new SerialSubscription();
3945
ProducerArbiter arbiter = new ProducerArbiter();
4046
final ParentSubscriber<T> parent = new ParentSubscriber<T>(child, serial, arbiter, alternate);
47+
4148
serial.set(parent);
4249
child.add(serial);
4350
child.setProducer(arbiter);
44-
return parent;
51+
52+
parent.subscribe(source);
4553
}
4654

4755
static final class ParentSubscriber<T> extends Subscriber<T> {
@@ -52,11 +60,15 @@ static final class ParentSubscriber<T> extends Subscriber<T> {
5260
private final ProducerArbiter arbiter;
5361
private final Observable<? extends T> alternate;
5462

63+
final AtomicInteger wip;
64+
volatile boolean active;
65+
5566
ParentSubscriber(Subscriber<? super T> child, final SerialSubscription serial, ProducerArbiter arbiter, Observable<? extends T> alternate) {
5667
this.child = child;
5768
this.serial = serial;
5869
this.arbiter = arbiter;
5970
this.alternate = alternate;
71+
this.wip = new AtomicInteger();
6072
}
6173

6274
@Override
@@ -69,14 +81,33 @@ public void onCompleted() {
6981
if (!empty) {
7082
child.onCompleted();
7183
} else if (!child.isUnsubscribed()) {
72-
subscribeToAlternate();
84+
active = false;
85+
subscribe(null);
7386
}
7487
}
7588

76-
private void subscribeToAlternate() {
77-
AlternateSubscriber<T> as = new AlternateSubscriber<T>(child, arbiter);
78-
serial.set(as);
79-
alternate.unsafeSubscribe(as);
89+
void subscribe(Observable<? extends T> source) {
90+
if (wip.getAndIncrement() == 0) {
91+
do {
92+
if (child.isUnsubscribed()) {
93+
break;
94+
}
95+
96+
if (!active) {
97+
if (source == null) {
98+
AlternateSubscriber<T> as = new AlternateSubscriber<T>(child, arbiter);
99+
serial.set(as);
100+
active = true;
101+
alternate.unsafeSubscribe(as);
102+
} else {
103+
active = true;
104+
source.unsafeSubscribe(this);
105+
source = null;
106+
}
107+
}
108+
109+
} while (wip.decrementAndGet() != 0);
110+
}
80111
}
81112

82113
@Override

src/test/java/rx/internal/operators/OperatorSwitchIfEmptyTest.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,15 @@
1717

1818
import static org.junit.Assert.*;
1919

20-
import java.util.*;
20+
import java.util.Arrays;
2121
import java.util.concurrent.TimeUnit;
2222
import java.util.concurrent.atomic.AtomicBoolean;
2323

2424
import org.junit.Test;
2525

2626
import rx.*;
27-
import rx.Observable;
2827
import rx.Observable.OnSubscribe;
29-
import rx.functions.Action0;
28+
import rx.functions.*;
3029
import rx.observers.TestSubscriber;
3130
import rx.schedulers.Schedulers;
3231
import rx.subscriptions.Subscriptions;
@@ -212,4 +211,29 @@ public void call() {
212211
public void testAlternateNull() {
213212
Observable.just(1).switchIfEmpty(null);
214213
}
214+
215+
Observable<StackTraceElement[]> recursiveSwitch(final int level) {
216+
if (level == 100) {
217+
return Observable.just(Thread.currentThread().getStackTrace());
218+
}
219+
return Observable.<StackTraceElement[]>empty().switchIfEmpty(Observable.defer(new Func0<Observable<StackTraceElement[]>>() {
220+
@Override
221+
public Observable<StackTraceElement[]> call() {
222+
return recursiveSwitch(level + 1);
223+
}
224+
}));
225+
}
226+
227+
@Test
228+
public void stackDepth() {
229+
StackTraceElement[] trace = recursiveSwitch(0)
230+
.toBlocking().last();
231+
232+
if (trace.length > 1000 || trace.length < 100) {
233+
for (StackTraceElement ste : trace) {
234+
System.out.println(ste);
235+
}
236+
fail("Stack too deep: " + trace.length);
237+
}
238+
}
215239
}

0 commit comments

Comments
 (0)