Skip to content

Commit 1f0a116

Browse files
authored
2.x: reduce blockingX overhead, move internal observers to common pkg (#4491)
1 parent 10f727f commit 1f0a116

22 files changed

+333
-350
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
import io.reactivex.exceptions.Exceptions;
2222
import io.reactivex.functions.*;
2323
import io.reactivex.internal.functions.*;
24+
import io.reactivex.internal.observers.*;
2425
import io.reactivex.internal.operators.completable.*;
2526
import io.reactivex.internal.operators.flowable.FlowableDelaySubscriptionOther;
2627
import io.reactivex.internal.operators.maybe.MaybeFromCompletable;
2728
import io.reactivex.internal.operators.observable.ObservableDelaySubscriptionOther;
2829
import io.reactivex.internal.operators.single.SingleDelayWithCompletable;
29-
import io.reactivex.internal.subscribers.completable.*;
3030
import io.reactivex.internal.util.ExceptionHelper;
3131
import io.reactivex.plugins.RxJavaPlugins;
3232
import io.reactivex.schedulers.Schedulers;
@@ -795,7 +795,9 @@ public final Completable andThen(CompletableSource next) {
795795
*/
796796
@SchedulerSupport(SchedulerSupport.NONE)
797797
public final void blockingAwait() {
798-
CompletableAwait.await(this);
798+
BlockingObserver<Void> observer = new BlockingObserver<Void>();
799+
subscribe(observer);
800+
observer.blockingGet();
799801
}
800802

801803
/**
@@ -813,7 +815,9 @@ public final void blockingAwait() {
813815
*/
814816
@SchedulerSupport(SchedulerSupport.NONE)
815817
public final boolean blockingAwait(long timeout, TimeUnit unit) {
816-
return CompletableAwait.await(this, timeout, unit);
818+
BlockingObserver<Void> observer = new BlockingObserver<Void>();
819+
subscribe(observer);
820+
return observer.blockingAwait(timeout, unit);
817821
}
818822

819823
/**
@@ -828,7 +832,9 @@ public final boolean blockingAwait(long timeout, TimeUnit unit) {
828832
*/
829833
@SchedulerSupport(SchedulerSupport.NONE)
830834
public final Throwable blockingGet() {
831-
return CompletableAwait.get(this);
835+
BlockingObserver<Void> observer = new BlockingObserver<Void>();
836+
subscribe(observer);
837+
return observer.blockingGetError();
832838
}
833839

834840
/**
@@ -842,7 +848,10 @@ public final Throwable blockingGet() {
842848
*/
843849
@SchedulerSupport(SchedulerSupport.NONE)
844850
public final Throwable blockingGet(long timeout, TimeUnit unit) {
845-
return CompletableAwait.get(this, timeout, unit);
851+
ObjectHelper.requireNonNull(unit, "unit is null");
852+
BlockingObserver<Void> observer = new BlockingObserver<Void>();
853+
subscribe(observer);
854+
return observer.blockingGetError(timeout, unit);
846855
}
847856

848857
/**

src/main/java/io/reactivex/Maybe.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.reactivex.exceptions.Exceptions;
2323
import io.reactivex.functions.*;
2424
import io.reactivex.internal.functions.*;
25+
import io.reactivex.internal.observers.BlockingObserver;
2526
import io.reactivex.internal.operators.flowable.*;
2627
import io.reactivex.internal.operators.maybe.*;
2728
import io.reactivex.internal.util.*;
@@ -710,7 +711,9 @@ public static <T> Maybe<T> wrap(MaybeSource<T> source) {
710711
* @return the success value
711712
*/
712713
public T blockingGet() {
713-
return MaybeAwait.get(this, null);
714+
BlockingObserver<T> observer = new BlockingObserver<T>();
715+
subscribe(observer);
716+
return observer.blockingGet();
714717
}
715718

716719
/**
@@ -725,7 +728,9 @@ public T blockingGet() {
725728
*/
726729
public T blockingGet(T defaultValue) {
727730
ObjectHelper.requireNonNull(defaultValue, "defaultValue is null");
728-
return MaybeAwait.get(this, defaultValue);
731+
BlockingObserver<T> observer = new BlockingObserver<T>();
732+
subscribe(observer);
733+
return observer.blockingGet(defaultValue);
729734
}
730735

731736
/**

src/main/java/io/reactivex/Single.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@
2323
import io.reactivex.functions.*;
2424
import io.reactivex.internal.functions.*;
2525
import io.reactivex.internal.fuseable.FuseToFlowable;
26+
import io.reactivex.internal.observers.*;
2627
import io.reactivex.internal.operators.completable.*;
2728
import io.reactivex.internal.operators.flowable.*;
2829
import io.reactivex.internal.operators.maybe.*;
2930
import io.reactivex.internal.operators.observable.ObservableConcatMap;
3031
import io.reactivex.internal.operators.single.*;
31-
import io.reactivex.internal.subscribers.single.*;
3232
import io.reactivex.internal.util.*;
3333
import io.reactivex.plugins.RxJavaPlugins;
3434
import io.reactivex.schedulers.Schedulers;
@@ -1775,7 +1775,9 @@ public final Completable flatMapCompletable(final Function<? super T, ? extends
17751775
* @return the success value
17761776
*/
17771777
public final T blockingGet() {
1778-
return SingleAwait.get(this);
1778+
BlockingObserver<T> observer = new BlockingObserver<T>();
1779+
subscribe(observer);
1780+
return observer.blockingGet();
17791781
}
17801782

17811783
/**

src/main/java/io/reactivex/internal/subscribers/single/BiConsumerSingleObserver.java renamed to src/main/java/io/reactivex/internal/observers/BiConsumerSingleObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
1313

14-
package io.reactivex.internal.subscribers.single;
14+
package io.reactivex.internal.observers;
1515

1616
import java.util.concurrent.atomic.AtomicReference;
1717

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.observers;
15+
16+
import java.util.concurrent.*;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.internal.util.ExceptionHelper;
21+
22+
/**
23+
* A combined Observer that awaits the success or error signal via a CountDownLatch.
24+
* @param <T> the value type
25+
*/
26+
public final class BlockingObserver<T>
27+
extends CountDownLatch
28+
implements SingleObserver<T>, CompletableObserver, MaybeObserver<T> {
29+
30+
T value;
31+
Throwable error;
32+
33+
Disposable d;
34+
35+
volatile boolean cancelled;
36+
37+
public BlockingObserver() {
38+
super(1);
39+
}
40+
41+
void dispose() {
42+
cancelled = true;
43+
Disposable d = this.d;
44+
if (d != null) {
45+
d.dispose();
46+
}
47+
}
48+
49+
@Override
50+
public void onSubscribe(Disposable d) {
51+
this.d = d;
52+
if (cancelled) {
53+
d.dispose();
54+
}
55+
}
56+
57+
@Override
58+
public void onSuccess(T value) {
59+
this.value = value;
60+
countDown();
61+
}
62+
63+
@Override
64+
public void onError(Throwable e) {
65+
error = e;
66+
countDown();
67+
}
68+
69+
@Override
70+
public void onComplete() {
71+
countDown();
72+
}
73+
74+
/**
75+
* Block until the latch is counted down then rethrow any exception received (wrapped if checked)
76+
* or return the received value (null if none).
77+
* @return the value received or null if no value received
78+
*/
79+
public T blockingGet() {
80+
if (getCount() != 0) {
81+
try {
82+
await();
83+
} catch (InterruptedException ex) {
84+
dispose();
85+
throw ExceptionHelper.wrapOrThrow(ex);
86+
}
87+
}
88+
Throwable ex = error;
89+
if (ex != null) {
90+
throw ExceptionHelper.wrapOrThrow(ex);
91+
}
92+
return value;
93+
}
94+
95+
/**
96+
* Block until a the latch is counted down and return the value received, otherwise
97+
* rethrow the received exception or rethrow the InterruptedException or TimeoutException
98+
* (wrapped).
99+
* @param timeout the timeout value
100+
* @param unit the time unit
101+
* @return the value received or null if no value received
102+
*/
103+
public T blockingGet(long timeout, TimeUnit unit) {
104+
if (getCount() != 0) {
105+
try {
106+
if (!await(timeout, unit)) {
107+
dispose();
108+
throw ExceptionHelper.wrapOrThrow(new TimeoutException());
109+
}
110+
} catch (InterruptedException ex) {
111+
dispose();
112+
throw ExceptionHelper.wrapOrThrow(ex);
113+
}
114+
}
115+
Throwable ex = error;
116+
if (ex != null) {
117+
throw ExceptionHelper.wrapOrThrow(ex);
118+
}
119+
return value;
120+
}
121+
122+
/**
123+
* Block until the latch is counted down then rethrow any exception received (wrapped if checked)
124+
* or return the received value (the defaultValue if none).
125+
* @param defaultValue the default value to return if no value was received
126+
* @return the value received or defaultValue if no value received
127+
*/
128+
public T blockingGet(T defaultValue) {
129+
if (getCount() != 0) {
130+
try {
131+
await();
132+
} catch (InterruptedException ex) {
133+
dispose();
134+
throw ExceptionHelper.wrapOrThrow(ex);
135+
}
136+
}
137+
Throwable ex = error;
138+
if (ex != null) {
139+
throw ExceptionHelper.wrapOrThrow(ex);
140+
}
141+
T v = value;
142+
return v != null ? v : defaultValue;
143+
}
144+
145+
/**
146+
* Block until the latch is counted down and return the error received or null if no
147+
* error happened.
148+
* @return the error received or null
149+
*/
150+
public Throwable blockingGetError() {
151+
if (getCount() != 0) {
152+
try {
153+
await();
154+
} catch (InterruptedException ex) {
155+
dispose();
156+
return ex;
157+
}
158+
}
159+
return error;
160+
}
161+
162+
/**
163+
* Block until the latch is counted down and return the error received or
164+
* when the wait is interrupted or times out, null otherwise.
165+
* @param timeout the timeout value
166+
* @param unit the time unit
167+
* @return the error received or null
168+
*/
169+
public Throwable blockingGetError(long timeout, TimeUnit unit) {
170+
if (getCount() != 0) {
171+
try {
172+
if (!await(timeout, unit)) {
173+
dispose();
174+
throw ExceptionHelper.wrapOrThrow(new TimeoutException());
175+
}
176+
} catch (InterruptedException ex) {
177+
dispose();
178+
throw ExceptionHelper.wrapOrThrow(ex);
179+
}
180+
}
181+
return error;
182+
}
183+
184+
/**
185+
* Block until the observer terminates and return true; return false if
186+
* the wait times out.
187+
* @param timeout the timeout value
188+
* @param unit the time unit
189+
* @return true if the observer terminated in time, false otherwise
190+
*/
191+
public boolean blockingAwait(long timeout, TimeUnit unit) {
192+
if (getCount() != 0) {
193+
try {
194+
if (!await(timeout, unit)) {
195+
dispose();
196+
return false;
197+
}
198+
} catch (InterruptedException ex) {
199+
dispose();
200+
throw ExceptionHelper.wrapOrThrow(ex);
201+
}
202+
}
203+
Throwable ex = error;
204+
if (ex != null) {
205+
throw ExceptionHelper.wrapOrThrow(ex);
206+
}
207+
return true;
208+
}
209+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
1313

14-
package io.reactivex.internal.subscribers.completable;
14+
package io.reactivex.internal.observers;
1515

1616
import java.util.concurrent.atomic.AtomicReference;
1717

src/main/java/io/reactivex/internal/subscribers/single/ConsumerSingleObserver.java renamed to src/main/java/io/reactivex/internal/observers/ConsumerSingleObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
1313

14-
package io.reactivex.internal.subscribers.single;
14+
package io.reactivex.internal.observers;
1515

1616
import java.util.concurrent.atomic.AtomicReference;
1717

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
1313

14-
package io.reactivex.internal.subscribers.completable;
14+
package io.reactivex.internal.observers;
1515

1616
import java.util.concurrent.atomic.AtomicReference;
1717

src/main/java/io/reactivex/internal/subscribers/single/FutureSingleObserver.java renamed to src/main/java/io/reactivex/internal/observers/FutureSingleObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
1313

14-
package io.reactivex.internal.subscribers.single;
14+
package io.reactivex.internal.observers;
1515

1616
import java.util.concurrent.*;
1717
import java.util.concurrent.atomic.AtomicReference;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
1313

14-
package io.reactivex.internal.subscribers.completable;
14+
package io.reactivex.internal.observers;
1515

1616
import org.reactivestreams.*;
1717

0 commit comments

Comments
 (0)