Skip to content

Commit f8dcc93

Browse files
committed
Merge pull request #3682 from akarnokd/ObserveOnCleanup1xV2
1.x: fix observeOn resource handling, add delayError capability
2 parents 31c57e1 + 2367f90 commit f8dcc93

File tree

6 files changed

+240
-141
lines changed

6 files changed

+240
-141
lines changed

src/main/java/rx/Observable.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5999,7 +5999,9 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {
59995999

60006000
/**
60016001
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
6002-
* asynchronously with an unbounded buffer.
6002+
* asynchronously with a bounded buffer.
6003+
* <p>Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly
6004+
* asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload.
60036005
* <p>
60046006
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
60056007
* <dl>
@@ -6014,12 +6016,43 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {
60146016
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
60156017
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
60166018
* @see #subscribeOn
6019+
* @see #observeOn(Scheduler, boolean)
60176020
*/
60186021
public final Observable<T> observeOn(Scheduler scheduler) {
60196022
if (this instanceof ScalarSynchronousObservable) {
60206023
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
60216024
}
6022-
return lift(new OperatorObserveOn<T>(scheduler));
6025+
return lift(new OperatorObserveOn<T>(scheduler, false));
6026+
}
6027+
6028+
/**
6029+
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
6030+
* asynchronously with a bounded buffer and optionally delays onError notifications.
6031+
* <p>
6032+
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
6033+
* <dl>
6034+
* <dt><b>Scheduler:</b></dt>
6035+
* <dd>you specify which {@link Scheduler} this operator will use</dd>
6036+
* </dl>
6037+
*
6038+
* @param scheduler
6039+
* the {@link Scheduler} to notify {@link Observer}s on
6040+
* @param delayError
6041+
* indicates if the onError notification may not cut ahead of onNext notification on the other side of the
6042+
* scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received
6043+
* from upstream
6044+
* @return the source Observable modified so that its {@link Observer}s are notified on the specified
6045+
* {@link Scheduler}
6046+
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
6047+
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
6048+
* @see #subscribeOn
6049+
* @see #observeOn(Scheduler)
6050+
*/
6051+
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
6052+
if (this instanceof ScalarSynchronousObservable) {
6053+
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
6054+
}
6055+
return lift(new OperatorObserveOn<T>(scheduler, delayError));
60236056
}
60246057

60256058
/**

src/main/java/rx/Single.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1368,7 +1368,9 @@ public final Single<T> observeOn(Scheduler scheduler) {
13681368
if (this instanceof ScalarSynchronousSingle) {
13691369
return ((ScalarSynchronousSingle<T>)this).scalarScheduleOn(scheduler);
13701370
}
1371-
return lift(new OperatorObserveOn<T>(scheduler));
1371+
// Note that since Single emits onSuccess xor onError,
1372+
// there is no cut-ahead possible like with regular Observable sequences.
1373+
return lift(new OperatorObserveOn<T>(scheduler, false));
13721374
}
13731375

13741376
/**

0 commit comments

Comments
 (0)