Skip to content

Commit bffcee7

Browse files
authored
2.x: cleanup, enhancements 8/23-1 (#4410)
1 parent da88837 commit bffcee7

26 files changed

+767
-321
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
package io.reactivex;
14+
15+
/**
16+
* Base interface for emitting signals in a push-fashion in various generator-like source
17+
* operators (create, generate).
18+
*
19+
* @param <T> the value type emitted
20+
*/
21+
public interface Emitter<T> {
22+
23+
/**
24+
* Signal a normal value.
25+
* @param value the value to signal, not null
26+
*/
27+
void onNext(T value);
28+
29+
/**
30+
* Signal a Throwable exception.
31+
* @param error the Throwable to signal, not null
32+
*/
33+
void onError(Throwable error);
34+
35+
/**
36+
* Signal a completion.
37+
*/
38+
void onComplete();
39+
}

src/main/java/io/reactivex/Flowable.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2049,7 +2049,7 @@ public static <T> Flowable<T> fromPublisher(final Publisher<? extends T> publish
20492049
*/
20502050
@BackpressureSupport(BackpressureKind.FULL)
20512051
@SchedulerSupport(SchedulerSupport.NONE)
2052-
public static <T> Flowable<T> generate(final Consumer<Subscriber<T>> generator) {
2052+
public static <T> Flowable<T> generate(final Consumer<Emitter<T>> generator) {
20532053
ObjectHelper.requireNonNull(generator, "generator is null");
20542054
return generate(Functions.nullSupplier(),
20552055
FlowableInternalHelper.<T, Object>simpleGenerator(generator),
@@ -2077,7 +2077,7 @@ public static <T> Flowable<T> generate(final Consumer<Subscriber<T>> generator)
20772077
*/
20782078
@BackpressureSupport(BackpressureKind.FULL)
20792079
@SchedulerSupport(SchedulerSupport.NONE)
2080-
public static <T, S> Flowable<T> generate(Callable<S> initialState, final BiConsumer<S, Subscriber<T>> generator) {
2080+
public static <T, S> Flowable<T> generate(Callable<S> initialState, final BiConsumer<S, Emitter<T>> generator) {
20812081
ObjectHelper.requireNonNull(generator, "generator is null");
20822082
return generate(initialState, FlowableInternalHelper.<T, S>simpleBiGenerator(generator),
20832083
Functions.emptyConsumer());
@@ -2106,7 +2106,7 @@ public static <T, S> Flowable<T> generate(Callable<S> initialState, final BiCons
21062106
*/
21072107
@BackpressureSupport(BackpressureKind.FULL)
21082108
@SchedulerSupport(SchedulerSupport.NONE)
2109-
public static <T, S> Flowable<T> generate(Callable<S> initialState, final BiConsumer<S, Subscriber<T>> generator,
2109+
public static <T, S> Flowable<T> generate(Callable<S> initialState, final BiConsumer<S, Emitter<T>> generator,
21102110
Consumer<? super S> disposeState) {
21112111
ObjectHelper.requireNonNull(generator, "generator is null");
21122112
return generate(initialState, FlowableInternalHelper.<T, S>simpleBiGenerator(generator),
@@ -2135,7 +2135,7 @@ public static <T, S> Flowable<T> generate(Callable<S> initialState, final BiCons
21352135
*/
21362136
@BackpressureSupport(BackpressureKind.FULL)
21372137
@SchedulerSupport(SchedulerSupport.NONE)
2138-
public static <T, S> Flowable<T> generate(Callable<S> initialState, BiFunction<S, Subscriber<T>, S> generator) {
2138+
public static <T, S> Flowable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator) {
21392139
return generate(initialState, generator, Functions.emptyConsumer());
21402140
}
21412141

@@ -2163,7 +2163,7 @@ public static <T, S> Flowable<T> generate(Callable<S> initialState, BiFunction<S
21632163
*/
21642164
@BackpressureSupport(BackpressureKind.FULL)
21652165
@SchedulerSupport(SchedulerSupport.NONE)
2166-
public static <T, S> Flowable<T> generate(Callable<S> initialState, BiFunction<S, Subscriber<T>, S> generator, Consumer<? super S> disposeState) {
2166+
public static <T, S> Flowable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator, Consumer<? super S> disposeState) {
21672167
ObjectHelper.requireNonNull(initialState, "initialState is null");
21682168
ObjectHelper.requireNonNull(generator, "generator is null");
21692169
ObjectHelper.requireNonNull(disposeState, "disposeState is null");
@@ -15446,22 +15446,21 @@ public final TestSubscriber<T> test(long initialRequest) { // NoPMD
1544615446
}
1544715447

1544815448
/**
15449-
* Creates a TestSubscriber with the given initial request amount, fusion mode
15450-
* and optionally in cancelled state, then subscribes it to this Flowable.
15449+
* Creates a TestSubscriber with the given initial request amount,
15450+
* optionally cancels it before the subscription and subscribes
15451+
* it to this Flowable.
1545115452
* @param initialRequest the initial request amount, positive
15452-
* @param fusionMode the requested fusion mode, see {@link QueueSubscription} constants.
15453-
* @param cancelled if true, the TestSubscriber will be cancelled before subscribing to this
15454-
* Flowable.
15453+
* @param cancel should the TestSubscriber be cancelled before the subscription?
1545515454
* @return the new TestSubscriber instance
1545615455
* @since 2.0
1545715456
*/
15458-
public final TestSubscriber<T> test(long initialRequest, int fusionMode, boolean cancelled) { // NoPMD
15457+
public final TestSubscriber<T> test(long initialRequest, boolean cancel) { // NoPMD
1545915458
TestSubscriber<T> ts = new TestSubscriber<T>(initialRequest);
15460-
ts.setInitialFusionMode(fusionMode);
15461-
if (cancelled) {
15459+
if (cancel) {
1546215460
ts.cancel();
1546315461
}
1546415462
subscribe(ts);
1546515463
return ts;
1546615464
}
15465+
1546715466
}

src/main/java/io/reactivex/FlowableEmitter.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,8 @@
2828
*
2929
* @param <T> the value type to emit
3030
*/
31-
public interface FlowableEmitter<T> {
31+
public interface FlowableEmitter<T> extends Emitter<T> {
3232

33-
/**
34-
* Signal a value.
35-
* @param t the value, not null
36-
*/
37-
void onNext(T t);
38-
39-
/**
40-
* Signal an exception.
41-
* @param t the exception, not null
42-
*/
43-
void onError(Throwable t);
44-
45-
/**
46-
* Signal the completion.
47-
*/
48-
void onComplete();
49-
5033
/**
5134
* Sets a Disposable on this emitter; any previous Disposable
5235
* or Cancellation will be unsubscribed/cancelled.

src/main/java/io/reactivex/Observable.java

Lines changed: 5 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1782,7 +1782,7 @@ public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
17821782
* @return the new Observable instance
17831783
*/
17841784
@SchedulerSupport(SchedulerSupport.NONE)
1785-
public static <T> Observable<T> generate(final Consumer<Observer<T>> generator) {
1785+
public static <T> Observable<T> generate(final Consumer<Emitter<T>> generator) {
17861786
ObjectHelper.requireNonNull(generator, "generator is null");
17871787
return generate(Functions.<Object>nullSupplier(),
17881788
ObservableInternalHelper.simpleGenerator(generator), Functions.<Object>emptyConsumer());
@@ -1806,7 +1806,7 @@ public static <T> Observable<T> generate(final Consumer<Observer<T>> generator)
18061806
* @return the new Observable instance
18071807
*/
18081808
@SchedulerSupport(SchedulerSupport.NONE)
1809-
public static <T, S> Observable<T> generate(Callable<S> initialState, final BiConsumer<S, Observer<T>> generator) {
1809+
public static <T, S> Observable<T> generate(Callable<S> initialState, final BiConsumer<S, Emitter<T>> generator) {
18101810
ObjectHelper.requireNonNull(generator, "generator is null");
18111811
return generate(initialState, ObservableInternalHelper.simpleBiGenerator(generator), Functions.emptyConsumer());
18121812
}
@@ -1833,7 +1833,7 @@ public static <T, S> Observable<T> generate(Callable<S> initialState, final BiCo
18331833
@SchedulerSupport(SchedulerSupport.NONE)
18341834
public static <T, S> Observable<T> generate(
18351835
final Callable<S> initialState,
1836-
final BiConsumer<S, Observer<T>> generator,
1836+
final BiConsumer<S, Emitter<T>> generator,
18371837
Consumer<? super S> disposeState) {
18381838
ObjectHelper.requireNonNull(generator, "generator is null");
18391839
return generate(initialState, ObservableInternalHelper.simpleBiGenerator(generator), disposeState);
@@ -1858,7 +1858,7 @@ public static <T, S> Observable<T> generate(
18581858
* @return the new Observable instance
18591859
*/
18601860
@SchedulerSupport(SchedulerSupport.NONE)
1861-
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Observer<T>, S> generator) {
1861+
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator) {
18621862
return generate(initialState, generator, Functions.emptyConsumer());
18631863
}
18641864

@@ -1883,7 +1883,7 @@ public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction
18831883
* @return the new Observable instance
18841884
*/
18851885
@SchedulerSupport(SchedulerSupport.NONE)
1886-
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Observer<T>, S> generator,
1886+
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator,
18871887
Consumer<? super S> disposeState) {
18881888
ObjectHelper.requireNonNull(initialState, "initialState is null");
18891889
ObjectHelper.requireNonNull(generator, "generator is null");
@@ -13220,23 +13220,4 @@ public final TestObserver<T> test() { // NoPMD
1322013220
subscribe(ts);
1322113221
return ts;
1322213222
}
13223-
13224-
/**
13225-
* Creates a TestObserver with the given fusion mode
13226-
* and optionally in cancelled state, then subscribes it to this Observable.
13227-
* @param fusionMode the requested fusion mode, see {@link QueueDisposable} constants.
13228-
* @param cancelled if true, the TestSubscriber will be cancelled before subscribing to this
13229-
* Observable.
13230-
* @return the new TestObserver instance
13231-
* @since 2.0
13232-
*/
13233-
public final TestObserver<T> test(int fusionMode, boolean cancelled) { // NoPMD
13234-
TestObserver<T> ts = new TestObserver<T>();
13235-
ts.setInitialFusionMode(fusionMode);
13236-
if (cancelled) {
13237-
ts.dispose();
13238-
}
13239-
subscribe(ts);
13240-
return ts;
13241-
}
1324213223
}

src/main/java/io/reactivex/ObservableEmitter.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,8 @@
2727
*
2828
* @param <T> the value type to emit
2929
*/
30-
public interface ObservableEmitter<T> {
30+
public interface ObservableEmitter<T> extends Emitter<T> {
3131

32-
/**
33-
* Signal a value.
34-
* @param t the value, not null
35-
*/
36-
void onNext(T t);
37-
38-
/**
39-
* Signal an exception.
40-
* @param t the exception, not null
41-
*/
42-
void onError(Throwable t);
43-
44-
/**
45-
* Signal the completion.
46-
*/
47-
void onComplete();
48-
4932
/**
5033
* Sets a Disposable on this emitter; any previous Disposable
5134
* or Cancellation will be unsubscribed/cancelled.

src/main/java/io/reactivex/internal/operators/flowable/FlowableGenerate.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import org.reactivestreams.*;
2020

21-
import io.reactivex.Flowable;
21+
import io.reactivex.*;
2222
import io.reactivex.exceptions.Exceptions;
2323
import io.reactivex.functions.*;
2424
import io.reactivex.internal.subscriptions.*;
@@ -27,10 +27,10 @@
2727

2828
public final class FlowableGenerate<T, S> extends Flowable<T> {
2929
final Callable<S> stateSupplier;
30-
final BiFunction<S, Subscriber<T>, S> generator;
30+
final BiFunction<S, Emitter<T>, S> generator;
3131
final Consumer<? super S> disposeState;
3232

33-
public FlowableGenerate(Callable<S> stateSupplier, BiFunction<S, Subscriber<T>, S> generator,
33+
public FlowableGenerate(Callable<S> stateSupplier, BiFunction<S, Emitter<T>, S> generator,
3434
Consumer<? super S> disposeState) {
3535
this.stateSupplier = stateSupplier;
3636
this.generator = generator;
@@ -54,12 +54,12 @@ public void subscribeActual(Subscriber<? super T> s) {
5454

5555
static final class GeneratorSubscription<T, S>
5656
extends AtomicLong
57-
implements Subscriber<T>, Subscription {
57+
implements Emitter<T>, Subscription {
5858
/** */
5959
private static final long serialVersionUID = 7565982551505011832L;
6060

6161
final Subscriber<? super T> actual;
62-
final BiFunction<S, ? super Subscriber<T>, S> generator;
62+
final BiFunction<S, ? super Emitter<T>, S> generator;
6363
final Consumer<? super S> disposeState;
6464

6565
S state;
@@ -69,7 +69,7 @@ static final class GeneratorSubscription<T, S>
6969
boolean terminate;
7070

7171
public GeneratorSubscription(Subscriber<? super T> actual,
72-
BiFunction<S, ? super Subscriber<T>, S> generator,
72+
BiFunction<S, ? super Emitter<T>, S> generator,
7373
Consumer<? super S> disposeState, S initialState) {
7474
this.actual = actual;
7575
this.generator = generator;
@@ -90,7 +90,7 @@ public void request(long n) {
9090

9191
S s = state;
9292

93-
final BiFunction<S, ? super Subscriber<T>, S> f = generator;
93+
final BiFunction<S, ? super Emitter<T>, S> f = generator;
9494

9595
for (;;) {
9696
if (cancelled) {
@@ -171,11 +171,6 @@ public void cancel() {
171171
}
172172
}
173173

174-
@Override
175-
public void onSubscribe(Subscription s) {
176-
throw new IllegalStateException("Should not call onSubscribe in the generator!");
177-
}
178-
179174
@Override
180175
public void onNext(T t) {
181176
if (t == null) {

src/main/java/io/reactivex/internal/operators/flowable/FlowableInternalHelper.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,39 +28,39 @@
2828
public enum FlowableInternalHelper {
2929
;
3030

31-
static final class SimpleGenerator<T, S> implements BiFunction<S, Subscriber<T>, S> {
32-
final Consumer<Subscriber<T>> consumer;
31+
static final class SimpleGenerator<T, S> implements BiFunction<S, Emitter<T>, S> {
32+
final Consumer<Emitter<T>> consumer;
3333

34-
public SimpleGenerator(Consumer<Subscriber<T>> consumer) {
34+
public SimpleGenerator(Consumer<Emitter<T>> consumer) {
3535
this.consumer = consumer;
3636
}
3737

3838
@Override
39-
public S apply(S t1, Subscriber<T> t2) throws Exception {
39+
public S apply(S t1, Emitter<T> t2) throws Exception {
4040
consumer.accept(t2);
4141
return t1;
4242
}
4343
}
4444

45-
public static <T, S> BiFunction<S, Subscriber<T>, S> simpleGenerator(Consumer<Subscriber<T>> consumer) {
45+
public static <T, S> BiFunction<S, Emitter<T>, S> simpleGenerator(Consumer<Emitter<T>> consumer) {
4646
return new SimpleGenerator<T, S>(consumer);
4747
}
4848

49-
static final class SimpleBiGenerator<T, S> implements BiFunction<S, Subscriber<T>, S> {
50-
final BiConsumer<S, Subscriber<T>> consumer;
49+
static final class SimpleBiGenerator<T, S> implements BiFunction<S, Emitter<T>, S> {
50+
final BiConsumer<S, Emitter<T>> consumer;
5151

52-
public SimpleBiGenerator(BiConsumer<S, Subscriber<T>> consumer) {
52+
public SimpleBiGenerator(BiConsumer<S, Emitter<T>> consumer) {
5353
this.consumer = consumer;
5454
}
5555

5656
@Override
57-
public S apply(S t1, Subscriber<T> t2) throws Exception {
57+
public S apply(S t1, Emitter<T> t2) throws Exception {
5858
consumer.accept(t1, t2);
5959
return t1;
6060
}
6161
}
6262

63-
public static <T, S> BiFunction<S, Subscriber<T>, S> simpleBiGenerator(BiConsumer<S, Subscriber<T>> consumer) {
63+
public static <T, S> BiFunction<S, Emitter<T>, S> simpleBiGenerator(BiConsumer<S, Emitter<T>> consumer) {
6464
return new SimpleBiGenerator<T, S>(consumer);
6565
}
6666

0 commit comments

Comments
 (0)