Skip to content

Commit db50d3d

Browse files
committed
1.x: make just() support backpressure
1 parent c63fa2e commit db50d3d

File tree

10 files changed

+675
-234
lines changed

10 files changed

+675
-234
lines changed

src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8304,7 +8304,7 @@ public final Observable<T> subscribeOn(Scheduler scheduler) {
83048304
if (this instanceof ScalarSynchronousObservable) {
83058305
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
83068306
}
8307-
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
8307+
return create(new OperatorSubscribeOn<T>(this, scheduler));
83088308
}
83098309

83108310
/**

src/main/java/rx/Single.java

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -12,38 +12,16 @@
1212
*/
1313
package rx;
1414

15-
import java.util.concurrent.Callable;
16-
import java.util.concurrent.Future;
17-
import java.util.concurrent.TimeUnit;
18-
import java.util.concurrent.TimeoutException;
15+
import java.util.concurrent.*;
1916

2017
import rx.Observable.Operator;
2118
import rx.annotations.Experimental;
22-
import rx.exceptions.Exceptions;
23-
import rx.exceptions.OnErrorNotImplementedException;
24-
import rx.functions.Action1;
25-
import rx.functions.Func1;
26-
import rx.functions.Func2;
27-
import rx.functions.Func3;
28-
import rx.functions.Func4;
29-
import rx.functions.Func5;
30-
import rx.functions.Func6;
31-
import rx.functions.Func7;
32-
import rx.functions.Func8;
33-
import rx.functions.Func9;
34-
import rx.internal.operators.OnSubscribeToObservableFuture;
35-
import rx.internal.operators.OperatorDelay;
36-
import rx.internal.operators.OperatorDoOnEach;
37-
import rx.internal.operators.OperatorMap;
38-
import rx.internal.operators.OperatorObserveOn;
39-
import rx.internal.operators.OperatorOnErrorReturn;
40-
import rx.internal.operators.OperatorSubscribeOn;
41-
import rx.internal.operators.OperatorTimeout;
42-
import rx.internal.operators.OperatorZip;
19+
import rx.exceptions.*;
20+
import rx.functions.*;
21+
import rx.internal.operators.*;
4322
import rx.internal.producers.SingleDelayedProducer;
4423
import rx.observers.SafeSubscriber;
45-
import rx.plugins.RxJavaObservableExecutionHook;
46-
import rx.plugins.RxJavaPlugins;
24+
import rx.plugins.*;
4725
import rx.schedulers.Schedulers;
4826
import rx.subscriptions.Subscriptions;
4927

@@ -1678,8 +1656,43 @@ public void onNext(T t) {
16781656
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
16791657
* @see #observeOn
16801658
*/
1681-
public final Single<T> subscribeOn(Scheduler scheduler) {
1682-
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
1659+
public final Single<T> subscribeOn(final Scheduler scheduler) {
1660+
return create(new OnSubscribe<T>() {
1661+
@Override
1662+
public void call(final SingleSubscriber<? super T> t) {
1663+
final Scheduler.Worker w = scheduler.createWorker();
1664+
t.add(w);
1665+
1666+
w.schedule(new Action0() {
1667+
@Override
1668+
public void call() {
1669+
SingleSubscriber<T> ssub = new SingleSubscriber<T>() {
1670+
@Override
1671+
public void onSuccess(T value) {
1672+
try {
1673+
t.onSuccess(value);
1674+
} finally {
1675+
w.unsubscribe();
1676+
}
1677+
}
1678+
1679+
@Override
1680+
public void onError(Throwable error) {
1681+
try {
1682+
t.onError(error);
1683+
} finally {
1684+
w.unsubscribe();
1685+
}
1686+
}
1687+
};
1688+
1689+
t.add(ssub);
1690+
1691+
Single.this.subscribe(ssub);
1692+
}
1693+
});
1694+
}
1695+
});
16831696
}
16841697

16851698
/**

src/main/java/rx/internal/operators/OperatorSubscribeOn.java

Lines changed: 56 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -15,96 +15,84 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import rx.Observable;
19-
import rx.Observable.Operator;
20-
import rx.Producer;
21-
import rx.Scheduler;
18+
import rx.*;
19+
import rx.Observable.OnSubscribe;
2220
import rx.Scheduler.Worker;
23-
import rx.Subscriber;
2421
import rx.functions.Action0;
2522

2623
/**
2724
* Subscribes Observers on the specified {@code Scheduler}.
2825
* <p>
2926
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/subscribeOn.png" alt="">
27+
*
28+
* @param <T> the value type of the actual source
3029
*/
31-
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {
30+
public class OperatorSubscribeOn<T> implements OnSubscribe<T> {
3231

33-
private final Scheduler scheduler;
32+
final Scheduler scheduler;
33+
final Observable<T> source;
3434

35-
public OperatorSubscribeOn(Scheduler scheduler) {
35+
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
3636
this.scheduler = scheduler;
37+
this.source = source;
3738
}
3839

3940
@Override
40-
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
41+
public void call(final Subscriber<? super T> subscriber) {
4142
final Worker inner = scheduler.createWorker();
4243
subscriber.add(inner);
43-
return new Subscriber<Observable<T>>(subscriber) {
44-
45-
@Override
46-
public void onCompleted() {
47-
// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
48-
}
49-
50-
@Override
51-
public void onError(Throwable e) {
52-
subscriber.onError(e);
53-
}
54-
44+
45+
inner.schedule(new Action0() {
5546
@Override
56-
public void onNext(final Observable<T> o) {
57-
inner.schedule(new Action0() {
58-
47+
public void call() {
48+
final Thread t = Thread.currentThread();
49+
50+
Subscriber<T> s = new Subscriber<T>(subscriber) {
5951
@Override
60-
public void call() {
61-
final Thread t = Thread.currentThread();
62-
o.unsafeSubscribe(new Subscriber<T>(subscriber) {
63-
64-
@Override
65-
public void onCompleted() {
66-
subscriber.onCompleted();
67-
}
68-
69-
@Override
70-
public void onError(Throwable e) {
71-
subscriber.onError(e);
72-
}
73-
74-
@Override
75-
public void onNext(T t) {
76-
subscriber.onNext(t);
77-
}
78-
52+
public void onNext(T t) {
53+
subscriber.onNext(t);
54+
}
55+
56+
@Override
57+
public void onError(Throwable e) {
58+
try {
59+
subscriber.onError(e);
60+
} finally {
61+
inner.unsubscribe();
62+
}
63+
}
64+
65+
@Override
66+
public void onCompleted() {
67+
try {
68+
subscriber.onCompleted();
69+
} finally {
70+
inner.unsubscribe();
71+
}
72+
}
73+
74+
@Override
75+
public void setProducer(final Producer p) {
76+
subscriber.setProducer(new Producer() {
7977
@Override
80-
public void setProducer(final Producer producer) {
81-
subscriber.setProducer(new Producer() {
82-
83-
@Override
84-
public void request(final long n) {
85-
if (Thread.currentThread() == t) {
86-
// don't schedule if we're already on the thread (primarily for first setProducer call)
87-
// see unit test 'testSetProducerSynchronousRequest' for more context on this
88-
producer.request(n);
89-
} else {
90-
inner.schedule(new Action0() {
91-
92-
@Override
93-
public void call() {
94-
producer.request(n);
95-
}
96-
});
78+
public void request(final long n) {
79+
if (t == Thread.currentThread()) {
80+
p.request(n);
81+
} else {
82+
inner.schedule(new Action0() {
83+
@Override
84+
public void call() {
85+
p.request(n);
9786
}
98-
}
99-
100-
});
87+
});
88+
}
10189
}
102-
10390
});
10491
}
105-
});
92+
};
93+
94+
source.unsafeSubscribe(s);
10695
}
107-
108-
};
96+
});
10997
}
110-
}
98+
}

0 commit comments

Comments
 (0)