Skip to content

Commit 4017e26

Browse files
authored
2.x: make internal naming consistent, refactor some classes 9/14-2 (#4554)
1 parent 6ff866a commit 4017e26

File tree

170 files changed

+1438
-1082
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

170 files changed

+1438
-1082
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -828,7 +828,7 @@ public final Completable andThen(CompletableSource next) {
828828
*/
829829
@SchedulerSupport(SchedulerSupport.NONE)
830830
public final void blockingAwait() {
831-
BlockingObserver<Void> observer = new BlockingObserver<Void>();
831+
BlockingMultiObserver<Void> observer = new BlockingMultiObserver<Void>();
832832
subscribe(observer);
833833
observer.blockingGet();
834834
}
@@ -848,7 +848,7 @@ public final void blockingAwait() {
848848
*/
849849
@SchedulerSupport(SchedulerSupport.NONE)
850850
public final boolean blockingAwait(long timeout, TimeUnit unit) {
851-
BlockingObserver<Void> observer = new BlockingObserver<Void>();
851+
BlockingMultiObserver<Void> observer = new BlockingMultiObserver<Void>();
852852
subscribe(observer);
853853
return observer.blockingAwait(timeout, unit);
854854
}
@@ -865,7 +865,7 @@ public final boolean blockingAwait(long timeout, TimeUnit unit) {
865865
*/
866866
@SchedulerSupport(SchedulerSupport.NONE)
867867
public final Throwable blockingGet() {
868-
BlockingObserver<Void> observer = new BlockingObserver<Void>();
868+
BlockingMultiObserver<Void> observer = new BlockingMultiObserver<Void>();
869869
subscribe(observer);
870870
return observer.blockingGetError();
871871
}
@@ -886,7 +886,7 @@ public final Throwable blockingGet() {
886886
@SchedulerSupport(SchedulerSupport.NONE)
887887
public final Throwable blockingGet(long timeout, TimeUnit unit) {
888888
ObjectHelper.requireNonNull(unit, "unit is null");
889-
BlockingObserver<Void> observer = new BlockingObserver<Void>();
889+
BlockingMultiObserver<Void> observer = new BlockingMultiObserver<Void>();
890890
subscribe(observer);
891891
return observer.blockingGetError(timeout, unit);
892892
}

src/main/java/io/reactivex/Flowable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
3131
import io.reactivex.internal.operators.single.*;
3232
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
33-
import io.reactivex.internal.subscribers.flowable.*;
33+
import io.reactivex.internal.subscribers.*;
3434
import io.reactivex.internal.util.*;
3535
import io.reactivex.plugins.RxJavaPlugins;
3636
import io.reactivex.schedulers.*;

src/main/java/io/reactivex/Maybe.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import io.reactivex.exceptions.Exceptions;
2323
import io.reactivex.functions.*;
2424
import io.reactivex.internal.functions.*;
25-
import io.reactivex.internal.observers.BlockingObserver;
25+
import io.reactivex.internal.observers.BlockingMultiObserver;
2626
import io.reactivex.internal.operators.flowable.*;
2727
import io.reactivex.internal.operators.maybe.*;
2828
import io.reactivex.internal.util.*;
@@ -1888,7 +1888,7 @@ public final Maybe<T> ambWith(MaybeSource<? extends T> other) {
18881888
*/
18891889
@SchedulerSupport(SchedulerSupport.NONE)
18901890
public final T blockingGet() {
1891-
BlockingObserver<T> observer = new BlockingObserver<T>();
1891+
BlockingMultiObserver<T> observer = new BlockingMultiObserver<T>();
18921892
subscribe(observer);
18931893
return observer.blockingGet();
18941894
}
@@ -1906,7 +1906,7 @@ public final T blockingGet() {
19061906
@SchedulerSupport(SchedulerSupport.NONE)
19071907
public final T blockingGet(T defaultValue) {
19081908
ObjectHelper.requireNonNull(defaultValue, "defaultValue is null");
1909-
BlockingObserver<T> observer = new BlockingObserver<T>();
1909+
BlockingMultiObserver<T> observer = new BlockingMultiObserver<T>();
19101910
subscribe(observer);
19111911
return observer.blockingGet(defaultValue);
19121912
}
@@ -2770,7 +2770,7 @@ public final <R> R to(Function<? super Maybe<T>, R> convert) {
27702770
}
27712771

27722772
/**
2773-
* Converts this Maybe into an Completable instance composing cancellation
2773+
* Converts this Maybe into a Completable instance composing cancellation
27742774
* through and dropping a success value if emitted.
27752775
* <dl>
27762776
* <dt><b>Scheduler:</b></dt>

src/main/java/io/reactivex/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,20 @@
1616
import java.util.*;
1717
import java.util.concurrent.*;
1818

19-
import org.reactivestreams.*;
19+
import org.reactivestreams.Publisher;
2020

2121
import io.reactivex.annotations.*;
2222
import io.reactivex.disposables.Disposable;
2323
import io.reactivex.exceptions.Exceptions;
2424
import io.reactivex.functions.*;
2525
import io.reactivex.internal.functions.*;
2626
import io.reactivex.internal.fuseable.ScalarCallable;
27+
import io.reactivex.internal.observers.*;
2728
import io.reactivex.internal.operators.completable.CompletableFromObservable;
2829
import io.reactivex.internal.operators.flowable.FlowableFromObservable;
2930
import io.reactivex.internal.operators.maybe.MaybeFromObservable;
3031
import io.reactivex.internal.operators.observable.*;
3132
import io.reactivex.internal.operators.single.SingleFromObservable;
32-
import io.reactivex.internal.subscribers.observable.*;
3333
import io.reactivex.internal.util.*;
3434
import io.reactivex.observables.*;
3535
import io.reactivex.observers.*;
@@ -9939,7 +9939,7 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T
99399939
@SchedulerSupport(SchedulerSupport.NONE)
99409940
@Override
99419941
public final void subscribe(Observer<? super T> observer) {
9942-
ObjectHelper.requireNonNull(observer, "s is null");
9942+
ObjectHelper.requireNonNull(observer, "observer is null");
99439943
try {
99449944
observer = RxJavaPlugins.onSubscribe(this, observer);
99459945

src/main/java/io/reactivex/Single.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1907,7 +1907,7 @@ public final Completable flatMapCompletable(final Function<? super T, ? extends
19071907
*/
19081908
@SchedulerSupport(SchedulerSupport.NONE)
19091909
public final T blockingGet() {
1910-
BlockingObserver<T> observer = new BlockingObserver<T>();
1910+
BlockingMultiObserver<T> observer = new BlockingMultiObserver<T>();
19111911
subscribe(observer);
19121912
return observer.blockingGet();
19131913
}

src/main/java/io/reactivex/internal/disposables/ObserverFullArbiter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ void disposeResource() {
6565
}
6666
}
6767

68-
public boolean setSubscription(Disposable s) {
68+
public boolean setDisposable(Disposable s) {
6969
if (cancelled) {
7070
return false;
7171
}

src/main/java/io/reactivex/internal/fuseable/ConditionalSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import org.reactivestreams.Subscriber;
1717

1818
/**
19-
* An Subscriber with an additional onNextIf(T) method that
19+
* A Subscriber with an additional onNextIf(T) method that
2020
* tells the caller the specified value has been accepted or
2121
* not.
2222
*

src/main/java/io/reactivex/internal/subscribers/observable/BasicFuseableObserver.java renamed to src/main/java/io/reactivex/internal/observers/BasicFuseableObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
1313

14-
package io.reactivex.internal.subscribers.observable;
14+
package io.reactivex.internal.observers;
1515

1616
import io.reactivex.Observer;
1717
import io.reactivex.disposables.Disposable;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
1313

14-
package io.reactivex.internal.subscribers.observable;
14+
package io.reactivex.internal.observers;
1515

1616
import java.util.concurrent.atomic.AtomicInteger;
1717

src/main/java/io/reactivex/internal/subscribers/observable/BasicQueueDisposable.java renamed to src/main/java/io/reactivex/internal/observers/BasicQueueDisposable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
1313

14-
package io.reactivex.internal.subscribers.observable;
14+
package io.reactivex.internal.observers;
1515

1616
import io.reactivex.internal.fuseable.QueueDisposable;
1717

src/main/java/io/reactivex/internal/subscribers/observable/BlockingBaseObserver.java renamed to src/main/java/io/reactivex/internal/observers/BlockingBaseObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
13-
package io.reactivex.internal.subscribers.observable;
13+
package io.reactivex.internal.observers;
1414

1515
import java.util.concurrent.CountDownLatch;
1616

src/main/java/io/reactivex/internal/subscribers/observable/BlockingFirstObserver.java renamed to src/main/java/io/reactivex/internal/observers/BlockingFirstObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
1313

14-
package io.reactivex.internal.subscribers.observable;
14+
package io.reactivex.internal.observers;
1515

1616
/**
1717
* Blocks until the upstream signals its first value or completes.

src/main/java/io/reactivex/internal/subscribers/observable/BlockingLastObserver.java renamed to src/main/java/io/reactivex/internal/observers/BlockingLastObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* the License for the specific language governing permissions and limitations under the License.
1212
*/
1313

14-
package io.reactivex.internal.subscribers.observable;
14+
package io.reactivex.internal.observers;
1515

1616
/**
1717
* Blocks until the upstream signals its last value or completes.
Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.observers;
15+
16+
import java.util.concurrent.*;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.internal.util.ExceptionHelper;
21+
22+
/**
23+
* A combined Observer that awaits the success or error signal via a CountDownLatch.
24+
* @param <T> the value type
25+
*/
26+
public final class BlockingMultiObserver<T>
27+
extends CountDownLatch
28+
implements SingleObserver<T>, CompletableObserver, MaybeObserver<T> {
29+
30+
T value;
31+
Throwable error;
32+
33+
Disposable d;
34+
35+
volatile boolean cancelled;
36+
37+
public BlockingMultiObserver() {
38+
super(1);
39+
}
40+
41+
void dispose() {
42+
cancelled = true;
43+
Disposable d = this.d;
44+
if (d != null) {
45+
d.dispose();
46+
}
47+
}
48+
49+
@Override
50+
public void onSubscribe(Disposable d) {
51+
this.d = d;
52+
if (cancelled) {
53+
d.dispose();
54+
}
55+
}
56+
57+
@Override
58+
public void onSuccess(T value) {
59+
this.value = value;
60+
countDown();
61+
}
62+
63+
@Override
64+
public void onError(Throwable e) {
65+
error = e;
66+
countDown();
67+
}
68+
69+
@Override
70+
public void onComplete() {
71+
countDown();
72+
}
73+
74+
/**
75+
* Block until the latch is counted down then rethrow any exception received (wrapped if checked)
76+
* or return the received value (null if none).
77+
* @return the value received or null if no value received
78+
*/
79+
public T blockingGet() {
80+
if (getCount() != 0) {
81+
try {
82+
await();
83+
} catch (InterruptedException ex) {
84+
dispose();
85+
throw ExceptionHelper.wrapOrThrow(ex);
86+
}
87+
}
88+
Throwable ex = error;
89+
if (ex != null) {
90+
throw ExceptionHelper.wrapOrThrow(ex);
91+
}
92+
return value;
93+
}
94+
95+
/**
96+
* Block until the latch is counted down then rethrow any exception received (wrapped if checked)
97+
* or return the received value (the defaultValue if none).
98+
* @param defaultValue the default value to return if no value was received
99+
* @return the value received or defaultValue if no value received
100+
*/
101+
public T blockingGet(T defaultValue) {
102+
if (getCount() != 0) {
103+
try {
104+
await();
105+
} catch (InterruptedException ex) {
106+
dispose();
107+
throw ExceptionHelper.wrapOrThrow(ex);
108+
}
109+
}
110+
Throwable ex = error;
111+
if (ex != null) {
112+
throw ExceptionHelper.wrapOrThrow(ex);
113+
}
114+
T v = value;
115+
return v != null ? v : defaultValue;
116+
}
117+
118+
/**
119+
* Block until the latch is counted down and return the error received or null if no
120+
* error happened.
121+
* @return the error received or null
122+
*/
123+
public Throwable blockingGetError() {
124+
if (getCount() != 0) {
125+
try {
126+
await();
127+
} catch (InterruptedException ex) {
128+
dispose();
129+
return ex;
130+
}
131+
}
132+
return error;
133+
}
134+
135+
/**
136+
* Block until the latch is counted down and return the error received or
137+
* when the wait is interrupted or times out, null otherwise.
138+
* @param timeout the timeout value
139+
* @param unit the time unit
140+
* @return the error received or null
141+
*/
142+
public Throwable blockingGetError(long timeout, TimeUnit unit) {
143+
if (getCount() != 0) {
144+
try {
145+
if (!await(timeout, unit)) {
146+
dispose();
147+
throw ExceptionHelper.wrapOrThrow(new TimeoutException());
148+
}
149+
} catch (InterruptedException ex) {
150+
dispose();
151+
throw ExceptionHelper.wrapOrThrow(ex);
152+
}
153+
}
154+
return error;
155+
}
156+
157+
/**
158+
* Block until the observer terminates and return true; return false if
159+
* the wait times out.
160+
* @param timeout the timeout value
161+
* @param unit the time unit
162+
* @return true if the observer terminated in time, false otherwise
163+
*/
164+
public boolean blockingAwait(long timeout, TimeUnit unit) {
165+
if (getCount() != 0) {
166+
try {
167+
if (!await(timeout, unit)) {
168+
dispose();
169+
return false;
170+
}
171+
} catch (InterruptedException ex) {
172+
dispose();
173+
throw ExceptionHelper.wrapOrThrow(ex);
174+
}
175+
}
176+
Throwable ex = error;
177+
if (ex != null) {
178+
throw ExceptionHelper.wrapOrThrow(ex);
179+
}
180+
return true;
181+
}
182+
}

0 commit comments

Comments
 (0)