Skip to content

Commit 3382fe7

Browse files
committed
2.x: factor out consumer types, add XConsumable, update ops
1 parent 5e2c21d commit 3382fe7

File tree

164 files changed

+819
-813
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

164 files changed

+819
-813
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 61 additions & 92 deletions
Large diffs are not rendered by default.
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex;
14+
15+
/**
16+
* Represents a basic {@link Completable} source base interface,
17+
* consumable via an {@link CompletableSubscriber}.
18+
* <p>
19+
* This class also serves the base type for custom operators wrapped into
20+
* Completable via {@link Completable#create(CompletableConsumable)}.
21+
*
22+
* @since 2.0
23+
*/
24+
public interface CompletableConsumable {
25+
26+
void subscribe(CompletableSubscriber cs);
27+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package io.reactivex;
2+
3+
import io.reactivex.disposables.Disposable;
4+
5+
/**
6+
* Represents the subscription API callbacks when subscribing to a Completable instance.
7+
*/
8+
public interface CompletableSubscriber {
9+
/**
10+
* Called once the deferred computation completes normally.
11+
*/
12+
void onComplete();
13+
14+
/**
15+
* Called once if the deferred computation 'throws' an exception.
16+
* @param e the exception, not null.
17+
*/
18+
void onError(Throwable e);
19+
20+
/**
21+
* Called once by the Completable to set a Disposable on this instance which
22+
* then can be used to cancel the subscription at any time.
23+
* @param d the Disposable instance to call dispose on for cancellation, not null
24+
*/
25+
void onSubscribe(Disposable d);
26+
}

src/main/java/io/reactivex/Observable.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import org.reactivestreams.*;
2121

22-
import io.reactivex.Single.*;
2322
import io.reactivex.annotations.*;
2423
import io.reactivex.disposables.*;
2524
import io.reactivex.functions.*;
@@ -38,12 +37,8 @@
3837
* Observable for delivering a sequence of values without backpressure.
3938
* @param <T>
4039
*/
41-
public abstract class Observable<T> {
40+
public abstract class Observable<T> implements ObservableConsumable<T> {
4241

43-
public interface NbpOnSubscribe<T> extends Consumer<Observer<? super T>> {
44-
45-
}
46-
4742
public interface NbpOperator<Downstream, Upstream> extends Function<Observer<? super Downstream>, Observer<? super Upstream>> {
4843

4944
}
@@ -53,18 +48,18 @@ public interface NbpTransformer<Upstream, Downstream> extends Function<Observabl
5348
}
5449

5550
/** An empty observable instance as there is no need to instantiate this more than once. */
56-
static final Observable<Object> EMPTY = create(new NbpOnSubscribe<Object>() {
51+
static final Observable<Object> EMPTY = create(new ObservableConsumable<Object>() {
5752
@Override
58-
public void accept(Observer<? super Object> s) {
53+
public void subscribe(Observer<? super Object> s) {
5954
s.onSubscribe(EmptyDisposable.INSTANCE);
6055
s.onComplete();
6156
}
6257
});
6358

6459
/** A never NbpObservable instance as there is no need to instantiate this more than once. */
65-
static final Observable<Object> NEVER = create(new NbpOnSubscribe<Object>() {
60+
static final Observable<Object> NEVER = create(new ObservableConsumable<Object>() {
6661
@Override
67-
public void accept(Observer<? super Object> s) {
62+
public void subscribe(Observer<? super Object> s) {
6863
s.onSubscribe(EmptyDisposable.INSTANCE);
6964
}
7065
});
@@ -355,7 +350,7 @@ public static <T> Observable<T> concatArray(Observable<? extends T>... sources)
355350
return fromArray(sources).concatMap((Function)Functions.identity());
356351
}
357352

358-
public static <T> Observable<T> create(NbpOnSubscribe<T> onSubscribe) {
353+
public static <T> Observable<T> create(ObservableConsumable<T> onSubscribe) {
359354
Objects.requireNonNull(onSubscribe, "onSubscribe is null");
360355
// TODO plugin wrapper
361356
return new ObservableWrapper<T>(onSubscribe);
@@ -449,9 +444,9 @@ public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
449444

450445
public static <T> Observable<T> fromPublisher(final Publisher<? extends T> publisher) {
451446
Objects.requireNonNull(publisher, "publisher is null");
452-
return create(new NbpOnSubscribe<T>() {
447+
return create(new ObservableConsumable<T>() {
453448
@Override
454-
public void accept(final Observer<? super T> s) {
449+
public void subscribe(final Observer<? super T> s) {
455450
publisher.subscribe(new Subscriber<T>() {
456451

457452
@Override
@@ -871,9 +866,9 @@ public static Observable<Integer> range(final int start, final int count) {
871866
if ((long)start + (count - 1) > Integer.MAX_VALUE) {
872867
throw new IllegalArgumentException("Integer overflow");
873868
}
874-
return create(new NbpOnSubscribe<Integer>() {
869+
return create(new ObservableConsumable<Integer>() {
875870
@Override
876-
public void accept(Observer<? super Integer> s) {
871+
public void subscribe(Observer<? super Integer> s) {
877872
BooleanDisposable d = new BooleanDisposable();
878873
s.onSubscribe(d);
879874

@@ -1110,9 +1105,9 @@ public final Observable<Boolean> any(Predicate<? super T> predicate) {
11101105

11111106
@SchedulerSupport(SchedulerKind.NONE)
11121107
public final Observable<T> asObservable() {
1113-
return create(new NbpOnSubscribe<T>() {
1108+
return create(new ObservableConsumable<T>() {
11141109
@Override
1115-
public void accept(Observer<? super T> s) {
1110+
public void subscribe(Observer<? super T> s) {
11161111
Observable.this.subscribe(s);
11171112
}
11181113
});
@@ -2628,6 +2623,7 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T
26282623
return ls;
26292624
}
26302625

2626+
@Override
26312627
public final void subscribe(Observer<? super T> observer) {
26322628
Objects.requireNonNull(observer, "observer is null");
26332629

@@ -3154,9 +3150,9 @@ public void request(long n) {
31543150

31553151
@SchedulerSupport(SchedulerKind.NONE)
31563152
public final Single<T> toSingle() {
3157-
return Single.create(new SingleOnSubscribe<T>() {
3153+
return Single.create(new SingleConsumable<T>() {
31583154
@Override
3159-
public void accept(final SingleSubscriber<? super T> s) {
3155+
public void subscribe(final SingleSubscriber<? super T> s) {
31603156
Observable.this.subscribe(new Observer<T>() {
31613157
T last;
31623158
@Override
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex;
14+
15+
/**
16+
* Represents a basic, non-backpressured {@link Observable} source base interface,
17+
* consumable via an {@link Observer}.
18+
* <p>
19+
* This class also serves the base type for custom operators wrapped into
20+
* Observable via {@link Observable#create(ObservableConsumable)}.
21+
*
22+
* @param <T> the element type
23+
* @since 2.0
24+
*/
25+
public interface ObservableConsumable<T> {
26+
27+
void subscribe(Observer<? super T> observer);
28+
}

0 commit comments

Comments
 (0)