Skip to content

2.x: cleanup, enhancements 8/23-1 #4410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 23, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions src/main/java/io/reactivex/Emitter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright 2016 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex;

/**
* Base interface for emitting signals in a push-fashion in various generator-like source
* operators (create, generate).
*
* @param <T> the value type emitted
*/
public interface Emitter<T> {

/**
* Signal a normal value.
* @param value the value to signal, not null
*/
void onNext(T value);

/**
* Signal a Throwable exception.
* @param error the Throwable to signal, not null
*/
void onError(Throwable error);

/**
* Signal a completion.
*/
void onComplete();
}
25 changes: 12 additions & 13 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2049,7 +2049,7 @@ public static <T> Flowable<T> fromPublisher(final Publisher<? extends T> publish
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> generate(final Consumer<Subscriber<T>> generator) {
public static <T> Flowable<T> generate(final Consumer<Emitter<T>> generator) {
ObjectHelper.requireNonNull(generator, "generator is null");
return generate(Functions.nullSupplier(),
FlowableInternalHelper.<T, Object>simpleGenerator(generator),
Expand Down Expand Up @@ -2077,7 +2077,7 @@ public static <T> Flowable<T> generate(final Consumer<Subscriber<T>> generator)
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, S> Flowable<T> generate(Callable<S> initialState, final BiConsumer<S, Subscriber<T>> generator) {
public static <T, S> Flowable<T> generate(Callable<S> initialState, final BiConsumer<S, Emitter<T>> generator) {
ObjectHelper.requireNonNull(generator, "generator is null");
return generate(initialState, FlowableInternalHelper.<T, S>simpleBiGenerator(generator),
Functions.emptyConsumer());
Expand Down Expand Up @@ -2106,7 +2106,7 @@ public static <T, S> Flowable<T> generate(Callable<S> initialState, final BiCons
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, S> Flowable<T> generate(Callable<S> initialState, final BiConsumer<S, Subscriber<T>> generator,
public static <T, S> Flowable<T> generate(Callable<S> initialState, final BiConsumer<S, Emitter<T>> generator,
Consumer<? super S> disposeState) {
ObjectHelper.requireNonNull(generator, "generator is null");
return generate(initialState, FlowableInternalHelper.<T, S>simpleBiGenerator(generator),
Expand Down Expand Up @@ -2135,7 +2135,7 @@ public static <T, S> Flowable<T> generate(Callable<S> initialState, final BiCons
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, S> Flowable<T> generate(Callable<S> initialState, BiFunction<S, Subscriber<T>, S> generator) {
public static <T, S> Flowable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator) {
return generate(initialState, generator, Functions.emptyConsumer());
}

Expand Down Expand Up @@ -2163,7 +2163,7 @@ public static <T, S> Flowable<T> generate(Callable<S> initialState, BiFunction<S
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, S> Flowable<T> generate(Callable<S> initialState, BiFunction<S, Subscriber<T>, S> generator, Consumer<? super S> disposeState) {
public static <T, S> Flowable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator, Consumer<? super S> disposeState) {
ObjectHelper.requireNonNull(initialState, "initialState is null");
ObjectHelper.requireNonNull(generator, "generator is null");
ObjectHelper.requireNonNull(disposeState, "disposeState is null");
Expand Down Expand Up @@ -15446,22 +15446,21 @@ public final TestSubscriber<T> test(long initialRequest) { // NoPMD
}

/**
* Creates a TestSubscriber with the given initial request amount, fusion mode
* and optionally in cancelled state, then subscribes it to this Flowable.
* Creates a TestSubscriber with the given initial request amount,
* optionally cancels it before the subscription and subscribes
* it to this Flowable.
* @param initialRequest the initial request amount, positive
* @param fusionMode the requested fusion mode, see {@link QueueSubscription} constants.
* @param cancelled if true, the TestSubscriber will be cancelled before subscribing to this
* Flowable.
* @param cancel should the TestSubscriber be cancelled before the subscription?
* @return the new TestSubscriber instance
* @since 2.0
*/
public final TestSubscriber<T> test(long initialRequest, int fusionMode, boolean cancelled) { // NoPMD
public final TestSubscriber<T> test(long initialRequest, boolean cancel) { // NoPMD
TestSubscriber<T> ts = new TestSubscriber<T>(initialRequest);
ts.setInitialFusionMode(fusionMode);
if (cancelled) {
if (cancel) {
ts.cancel();
}
subscribe(ts);
return ts;
}

}
19 changes: 1 addition & 18 deletions src/main/java/io/reactivex/FlowableEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,8 @@
*
* @param <T> the value type to emit
*/
public interface FlowableEmitter<T> {
public interface FlowableEmitter<T> extends Emitter<T> {

/**
* Signal a value.
* @param t the value, not null
*/
void onNext(T t);

/**
* Signal an exception.
* @param t the exception, not null
*/
void onError(Throwable t);

/**
* Signal the completion.
*/
void onComplete();

/**
* Sets a Disposable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
Expand Down
29 changes: 5 additions & 24 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1782,7 +1782,7 @@ public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
* @return the new Observable instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> generate(final Consumer<Observer<T>> generator) {
public static <T> Observable<T> generate(final Consumer<Emitter<T>> generator) {
ObjectHelper.requireNonNull(generator, "generator is null");
return generate(Functions.<Object>nullSupplier(),
ObservableInternalHelper.simpleGenerator(generator), Functions.<Object>emptyConsumer());
Expand All @@ -1806,7 +1806,7 @@ public static <T> Observable<T> generate(final Consumer<Observer<T>> generator)
* @return the new Observable instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, S> Observable<T> generate(Callable<S> initialState, final BiConsumer<S, Observer<T>> generator) {
public static <T, S> Observable<T> generate(Callable<S> initialState, final BiConsumer<S, Emitter<T>> generator) {
ObjectHelper.requireNonNull(generator, "generator is null");
return generate(initialState, ObservableInternalHelper.simpleBiGenerator(generator), Functions.emptyConsumer());
}
Expand All @@ -1833,7 +1833,7 @@ public static <T, S> Observable<T> generate(Callable<S> initialState, final BiCo
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, S> Observable<T> generate(
final Callable<S> initialState,
final BiConsumer<S, Observer<T>> generator,
final BiConsumer<S, Emitter<T>> generator,
Consumer<? super S> disposeState) {
ObjectHelper.requireNonNull(generator, "generator is null");
return generate(initialState, ObservableInternalHelper.simpleBiGenerator(generator), disposeState);
Expand All @@ -1858,7 +1858,7 @@ public static <T, S> Observable<T> generate(
* @return the new Observable instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Observer<T>, S> generator) {
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator) {
return generate(initialState, generator, Functions.emptyConsumer());
}

Expand All @@ -1883,7 +1883,7 @@ public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction
* @return the new Observable instance
*/
@SchedulerSupport(SchedulerSupport.NONE)
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Observer<T>, S> generator,
public static <T, S> Observable<T> generate(Callable<S> initialState, BiFunction<S, Emitter<T>, S> generator,
Consumer<? super S> disposeState) {
ObjectHelper.requireNonNull(initialState, "initialState is null");
ObjectHelper.requireNonNull(generator, "generator is null");
Expand Down Expand Up @@ -13220,23 +13220,4 @@ public final TestObserver<T> test() { // NoPMD
subscribe(ts);
return ts;
}

/**
* Creates a TestObserver with the given fusion mode
* and optionally in cancelled state, then subscribes it to this Observable.
* @param fusionMode the requested fusion mode, see {@link QueueDisposable} constants.
* @param cancelled if true, the TestSubscriber will be cancelled before subscribing to this
* Observable.
* @return the new TestObserver instance
* @since 2.0
*/
public final TestObserver<T> test(int fusionMode, boolean cancelled) { // NoPMD
TestObserver<T> ts = new TestObserver<T>();
ts.setInitialFusionMode(fusionMode);
if (cancelled) {
ts.dispose();
}
subscribe(ts);
return ts;
}
}
19 changes: 1 addition & 18 deletions src/main/java/io/reactivex/ObservableEmitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,8 @@
*
* @param <T> the value type to emit
*/
public interface ObservableEmitter<T> {
public interface ObservableEmitter<T> extends Emitter<T> {

/**
* Signal a value.
* @param t the value, not null
*/
void onNext(T t);

/**
* Signal an exception.
* @param t the exception, not null
*/
void onError(Throwable t);

/**
* Signal the completion.
*/
void onComplete();

/**
* Sets a Disposable on this emitter; any previous Disposable
* or Cancellation will be unsubscribed/cancelled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import org.reactivestreams.*;

import io.reactivex.Flowable;
import io.reactivex.*;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.*;
import io.reactivex.internal.subscriptions.*;
Expand All @@ -27,10 +27,10 @@

public final class FlowableGenerate<T, S> extends Flowable<T> {
final Callable<S> stateSupplier;
final BiFunction<S, Subscriber<T>, S> generator;
final BiFunction<S, Emitter<T>, S> generator;
final Consumer<? super S> disposeState;

public FlowableGenerate(Callable<S> stateSupplier, BiFunction<S, Subscriber<T>, S> generator,
public FlowableGenerate(Callable<S> stateSupplier, BiFunction<S, Emitter<T>, S> generator,
Consumer<? super S> disposeState) {
this.stateSupplier = stateSupplier;
this.generator = generator;
Expand All @@ -54,12 +54,12 @@ public void subscribeActual(Subscriber<? super T> s) {

static final class GeneratorSubscription<T, S>
extends AtomicLong
implements Subscriber<T>, Subscription {
implements Emitter<T>, Subscription {
/** */
private static final long serialVersionUID = 7565982551505011832L;

final Subscriber<? super T> actual;
final BiFunction<S, ? super Subscriber<T>, S> generator;
final BiFunction<S, ? super Emitter<T>, S> generator;
final Consumer<? super S> disposeState;

S state;
Expand All @@ -69,7 +69,7 @@ static final class GeneratorSubscription<T, S>
boolean terminate;

public GeneratorSubscription(Subscriber<? super T> actual,
BiFunction<S, ? super Subscriber<T>, S> generator,
BiFunction<S, ? super Emitter<T>, S> generator,
Consumer<? super S> disposeState, S initialState) {
this.actual = actual;
this.generator = generator;
Expand All @@ -90,7 +90,7 @@ public void request(long n) {

S s = state;

final BiFunction<S, ? super Subscriber<T>, S> f = generator;
final BiFunction<S, ? super Emitter<T>, S> f = generator;

for (;;) {
if (cancelled) {
Expand Down Expand Up @@ -171,11 +171,6 @@ public void cancel() {
}
}

@Override
public void onSubscribe(Subscription s) {
throw new IllegalStateException("Should not call onSubscribe in the generator!");
}

@Override
public void onNext(T t) {
if (t == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,39 +28,39 @@
public enum FlowableInternalHelper {
;

static final class SimpleGenerator<T, S> implements BiFunction<S, Subscriber<T>, S> {
final Consumer<Subscriber<T>> consumer;
static final class SimpleGenerator<T, S> implements BiFunction<S, Emitter<T>, S> {
final Consumer<Emitter<T>> consumer;

public SimpleGenerator(Consumer<Subscriber<T>> consumer) {
public SimpleGenerator(Consumer<Emitter<T>> consumer) {
this.consumer = consumer;
}

@Override
public S apply(S t1, Subscriber<T> t2) throws Exception {
public S apply(S t1, Emitter<T> t2) throws Exception {
consumer.accept(t2);
return t1;
}
}

public static <T, S> BiFunction<S, Subscriber<T>, S> simpleGenerator(Consumer<Subscriber<T>> consumer) {
public static <T, S> BiFunction<S, Emitter<T>, S> simpleGenerator(Consumer<Emitter<T>> consumer) {
return new SimpleGenerator<T, S>(consumer);
}

static final class SimpleBiGenerator<T, S> implements BiFunction<S, Subscriber<T>, S> {
final BiConsumer<S, Subscriber<T>> consumer;
static final class SimpleBiGenerator<T, S> implements BiFunction<S, Emitter<T>, S> {
final BiConsumer<S, Emitter<T>> consumer;

public SimpleBiGenerator(BiConsumer<S, Subscriber<T>> consumer) {
public SimpleBiGenerator(BiConsumer<S, Emitter<T>> consumer) {
this.consumer = consumer;
}

@Override
public S apply(S t1, Subscriber<T> t2) throws Exception {
public S apply(S t1, Emitter<T> t2) throws Exception {
consumer.accept(t1, t2);
return t1;
}
}

public static <T, S> BiFunction<S, Subscriber<T>, S> simpleBiGenerator(BiConsumer<S, Subscriber<T>> consumer) {
public static <T, S> BiFunction<S, Emitter<T>, S> simpleBiGenerator(BiConsumer<S, Emitter<T>> consumer) {
return new SimpleBiGenerator<T, S>(consumer);
}

Expand Down
Loading