Skip to content

Commit 23461ee

Browse files
authored
2.x: move blocking operators into the base classes (#4371)
1 parent a6bbf46 commit 23461ee

File tree

121 files changed

+2384
-1661
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

121 files changed

+2384
-1661
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 428 additions & 34 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Observable.java

Lines changed: 512 additions & 114 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/flowables/BlockingFlowable.java

Lines changed: 0 additions & 413 deletions
This file was deleted.
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import java.util.Iterator;
17+
18+
import org.reactivestreams.Publisher;
19+
20+
public final class BlockingFlowableIterable<T> implements Iterable<T> {
21+
final Publisher<? extends T> source;
22+
23+
final int bufferSize;
24+
25+
public BlockingFlowableIterable(Publisher<? extends T> source, int bufferSize) {
26+
this.source = source;
27+
this.bufferSize = bufferSize;
28+
}
29+
30+
@Override
31+
public Iterator<T> iterator() {
32+
BlockingFlowableIterator<T> it = new BlockingFlowableIterator<T>(bufferSize);
33+
source.subscribe(it);
34+
return it;
35+
}
36+
}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import java.util.concurrent.*;
17+
18+
import org.reactivestreams.*;
19+
20+
import io.reactivex.exceptions.Exceptions;
21+
import io.reactivex.functions.*;
22+
import io.reactivex.internal.functions.Functions;
23+
import io.reactivex.internal.subscribers.flowable.*;
24+
import io.reactivex.internal.util.*;
25+
import io.reactivex.plugins.RxJavaPlugins;
26+
import io.reactivex.subscribers.DefaultSubscriber;
27+
28+
/**
29+
* Utility methods to consume a Publisher in a blocking manner with callbacks or Subscriber.
30+
*/
31+
public enum FlowableBlockingSubscribe {
32+
;
33+
34+
/**
35+
* Subscribes to the source and calls the Subscriber methods on the current thread.
36+
* <p>
37+
* @param o the source publisher
38+
* The unsubscription and backpressure is composed through.
39+
* @param subscriber the subscriber to forward events and calls to in the current thread
40+
* @param <T> the value type
41+
*/
42+
public static <T> void subscribe(Publisher<? extends T> o, Subscriber<? super T> subscriber) {
43+
final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
44+
45+
BlockingSubscriber<T> bs = new BlockingSubscriber<T>(queue);
46+
47+
o.subscribe(bs);
48+
49+
try {
50+
for (;;) {
51+
if (bs.isCancelled()) {
52+
break;
53+
}
54+
Object v = queue.poll();
55+
if (v == null) {
56+
if (bs.isCancelled()) {
57+
break;
58+
}
59+
v = queue.take();
60+
}
61+
if (bs.isCancelled()) {
62+
break;
63+
}
64+
if (o == BlockingSubscriber.TERMINATED) {
65+
break;
66+
}
67+
if (NotificationLite.acceptFull(o, subscriber)) {
68+
break;
69+
}
70+
}
71+
} catch (InterruptedException e) {
72+
Thread.currentThread().interrupt();
73+
subscriber.onError(e);
74+
} finally {
75+
bs.cancel();
76+
}
77+
}
78+
79+
/**
80+
* Runs the source observable to a terminal event, ignoring any values and rethrowing any exception.
81+
* @param o the source publisher
82+
* @param <T> the value type
83+
*/
84+
public static <T> void subscribe(Publisher<? extends T> o) {
85+
final CountDownLatch cdl = new CountDownLatch(1);
86+
final Throwable[] error = { null };
87+
LambdaSubscriber<T> ls = new LambdaSubscriber<T>(Functions.emptyConsumer(),
88+
new Consumer<Throwable>() {
89+
@Override
90+
public void accept(Throwable e) {
91+
error[0] = e;
92+
cdl.countDown();
93+
}
94+
}, new Action() {
95+
@Override
96+
public void run() {
97+
cdl.countDown();
98+
}
99+
}, new Consumer<Subscription>() {
100+
@Override
101+
public void accept(Subscription s) {
102+
s.request(Long.MAX_VALUE);
103+
}
104+
});
105+
106+
o.subscribe(ls);
107+
108+
BlockingHelper.awaitForComplete(cdl, ls);
109+
Throwable e = error[0];
110+
if (e != null) {
111+
Exceptions.propagate(e);
112+
}
113+
}
114+
115+
/**
116+
* Subscribes to the source and calls the given actions on the current thread.
117+
* @param o the source publisher
118+
* @param onNext the callback action for each source value
119+
* @param onError the callback action for an error event
120+
* @param onComplete the callback action for the completion event.
121+
* @param <T> the value type
122+
*/
123+
public static <T> void subscribe(Publisher<? extends T> o, final Consumer<? super T> onNext,
124+
final Consumer<? super Throwable> onError, final Action onComplete) {
125+
subscribe(o, new DefaultSubscriber<T>() {
126+
boolean done;
127+
@Override
128+
public void onNext(T t) {
129+
if (done) {
130+
return;
131+
}
132+
try {
133+
onNext.accept(t);
134+
} catch (Throwable ex) {
135+
Exceptions.throwIfFatal(ex);
136+
cancel();
137+
onError(ex);
138+
}
139+
}
140+
141+
@Override
142+
public void onError(Throwable e) {
143+
if (done) {
144+
RxJavaPlugins.onError(e);
145+
return;
146+
}
147+
done = true;
148+
try {
149+
onError.accept(e);
150+
} catch (Throwable ex) {
151+
Exceptions.throwIfFatal(ex);
152+
RxJavaPlugins.onError(ex);
153+
}
154+
}
155+
156+
@Override
157+
public void onComplete() {
158+
if (done) {
159+
return;
160+
}
161+
done = true;
162+
try {
163+
onComplete.run();
164+
} catch (Throwable ex) {
165+
Exceptions.throwIfFatal(ex);
166+
RxJavaPlugins.onError(ex);
167+
}
168+
}
169+
});
170+
}
171+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import java.util.NoSuchElementException;
17+
import java.util.concurrent.*;
18+
import java.util.concurrent.atomic.AtomicReference;
19+
20+
import org.reactivestreams.*;
21+
22+
import io.reactivex.disposables.Disposables;
23+
import io.reactivex.internal.disposables.SequentialDisposable;
24+
25+
/**
26+
* Utility method to turn a Publisher into a Future.
27+
*/
28+
public enum FlowableToFuture {
29+
;
30+
31+
public static <T> Future<T> toFuture(Publisher<? extends T> o) {
32+
final CountDownLatch cdl = new CountDownLatch(1);
33+
final AtomicReference<T> value = new AtomicReference<T>();
34+
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
35+
final SequentialDisposable sd = new SequentialDisposable();
36+
37+
o.subscribe(new Subscriber<T>() {
38+
39+
@Override
40+
public void onSubscribe(Subscription d) {
41+
sd.replace(Disposables.from(d));
42+
d.request(Long.MAX_VALUE);
43+
}
44+
45+
@Override
46+
public void onNext(T v) {
47+
if (value.get() != null) {
48+
sd.dispose();
49+
onError(new IndexOutOfBoundsException("More than one element received"));
50+
return;
51+
}
52+
value.lazySet(v);
53+
}
54+
55+
@Override
56+
public void onError(Throwable e) {
57+
error.lazySet(e);
58+
cdl.countDown();
59+
}
60+
61+
@Override
62+
public void onComplete() {
63+
if (value.get() == null) {
64+
onError(new NoSuchElementException("The source is empty"));
65+
return;
66+
}
67+
cdl.countDown();
68+
}
69+
70+
});
71+
72+
return new Future<T>() {
73+
74+
@Override
75+
public boolean cancel(boolean mayInterruptIfRunning) {
76+
if (cdl.getCount() != 0) {
77+
sd.dispose();
78+
error.set(new CancellationException());
79+
cdl.countDown();
80+
return true;
81+
}
82+
return false;
83+
}
84+
85+
@Override
86+
public boolean isCancelled() {
87+
return sd.isDisposed();
88+
}
89+
90+
@Override
91+
public boolean isDone() {
92+
return cdl.getCount() == 0 && !sd.isDisposed();
93+
}
94+
95+
@Override
96+
public T get() throws InterruptedException, ExecutionException {
97+
if (cdl.getCount() != 0) {
98+
cdl.await();
99+
}
100+
Throwable e = error.get();
101+
if (e != null) {
102+
if (e instanceof CancellationException) {
103+
throw (CancellationException)e;
104+
}
105+
throw new ExecutionException(e);
106+
}
107+
return value.get();
108+
}
109+
110+
@Override
111+
public T get(long timeout, TimeUnit unit)
112+
throws InterruptedException, ExecutionException, TimeoutException {
113+
if (cdl.getCount() != 0) {
114+
if (!cdl.await(timeout, unit)) {
115+
throw new TimeoutException();
116+
}
117+
}
118+
Throwable e = error.get();
119+
if (e != null) {
120+
if (e instanceof CancellationException) {
121+
throw (CancellationException)e;
122+
}
123+
throw new ExecutionException(e);
124+
}
125+
return value.get();
126+
}
127+
128+
};
129+
}
130+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.observable;
15+
16+
import java.util.Iterator;
17+
18+
import io.reactivex.ObservableSource;
19+
20+
public final class BlockingObservableIterable<T> implements Iterable<T> {
21+
final ObservableSource<? extends T> source;
22+
23+
final int bufferSize;
24+
25+
public BlockingObservableIterable(ObservableSource<? extends T> source, int bufferSize) {
26+
this.source = source;
27+
this.bufferSize = bufferSize;
28+
}
29+
30+
@Override
31+
public Iterator<T> iterator() {
32+
BlockingObservableIterator<T> it = new BlockingObservableIterator<T>(bufferSize);
33+
source.subscribe(it);
34+
return it;
35+
}
36+
}

0 commit comments

Comments
 (0)