Skip to content

2.x: Use XConsumable in Completable and Single #4042

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 68 additions & 53 deletions src/main/java/io/reactivex/Completable.java

Large diffs are not rendered by default.

273 changes: 128 additions & 145 deletions src/main/java/io/reactivex/Single.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import io.reactivex.plugins.RxJavaPlugins;

public final class CompletableOnSubscribeConcat implements CompletableConsumable {
final Flowable<? extends Completable> sources;
final Publisher<? extends CompletableConsumable> sources;
final int prefetch;

public CompletableOnSubscribeConcat(Flowable<? extends Completable> sources, int prefetch) {
public CompletableOnSubscribeConcat(Publisher<? extends CompletableConsumable> sources, int prefetch) {
this.sources = sources;
this.prefetch = prefetch;
}
Expand All @@ -42,14 +42,14 @@ public void subscribe(CompletableSubscriber s) {

static final class CompletableConcatSubscriber
extends AtomicInteger
implements Subscriber<Completable>, Disposable {
implements Subscriber<CompletableConsumable>, Disposable {
/** */
private static final long serialVersionUID = 7412667182931235013L;
final CompletableSubscriber actual;
final int prefetch;
final SerialResource<Disposable> sr;

final SpscArrayQueue<Completable> queue;
final SpscArrayQueue<CompletableConsumable> queue;

Subscription s;

Expand All @@ -62,7 +62,7 @@ static final class CompletableConcatSubscriber
public CompletableConcatSubscriber(CompletableSubscriber actual, int prefetch) {
this.actual = actual;
this.prefetch = prefetch;
this.queue = new SpscArrayQueue<Completable>(prefetch);
this.queue = new SpscArrayQueue<CompletableConsumable>(prefetch);
this.sr = new SerialResource<Disposable>(Disposables.consumeAndDispose());
this.inner = new ConcatInnerSubscriber();
}
Expand All @@ -78,7 +78,7 @@ public void onSubscribe(Subscription s) {
}

@Override
public void onNext(Completable t) {
public void onNext(CompletableConsumable t) {
if (!queue.offer(t)) {
onError(new MissingBackpressureException());
return;
Expand Down Expand Up @@ -130,7 +130,7 @@ public void dispose() {

void next() {
boolean d = done;
Completable c = queue.poll();
CompletableConsumable c = queue.poll();
if (c == null) {
if (d) {
if (once.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import io.reactivex.disposables.*;

public final class CompletableOnSubscribeConcatArray implements CompletableConsumable {
final Completable[] sources;
final CompletableConsumable[] sources;

public CompletableOnSubscribeConcatArray(Completable[] sources) {
public CompletableOnSubscribeConcatArray(CompletableConsumable[] sources) {
this.sources = sources;
}

Expand All @@ -37,13 +37,13 @@ static final class ConcatInnerSubscriber extends AtomicInteger implements Comple
private static final long serialVersionUID = -7965400327305809232L;

final CompletableSubscriber actual;
final Completable[] sources;
final CompletableConsumable[] sources;

int index;

final SerialDisposable sd;

public ConcatInnerSubscriber(CompletableSubscriber actual, Completable[] sources) {
public ConcatInnerSubscriber(CompletableSubscriber actual, CompletableConsumable[] sources) {
this.actual = actual;
this.sources = sources;
this.sd = new SerialDisposable();
Expand Down Expand Up @@ -73,7 +73,7 @@ void next() {
return;
}

Completable[] a = sources;
CompletableConsumable[] a = sources;
do {
if (sd.isDisposed()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@
import io.reactivex.internal.disposables.EmptyDisposable;

public final class CompletableOnSubscribeConcatIterable implements CompletableConsumable {
final Iterable<? extends Completable> sources;
final Iterable<? extends CompletableConsumable> sources;

public CompletableOnSubscribeConcatIterable(Iterable<? extends Completable> sources) {
public CompletableOnSubscribeConcatIterable(Iterable<? extends CompletableConsumable> sources) {
this.sources = sources;
}

@Override
public void subscribe(CompletableSubscriber s) {

Iterator<? extends Completable> it;
Iterator<? extends CompletableConsumable> it;

try {
it = sources.iterator();
Expand All @@ -56,13 +56,13 @@ static final class ConcatInnerSubscriber extends AtomicInteger implements Comple
private static final long serialVersionUID = -7965400327305809232L;

final CompletableSubscriber actual;
final Iterator<? extends Completable> sources;
final Iterator<? extends CompletableConsumable> sources;

int index;

final SerialDisposable sd;

public ConcatInnerSubscriber(CompletableSubscriber actual, Iterator<? extends Completable> sources) {
public ConcatInnerSubscriber(CompletableSubscriber actual, Iterator<? extends CompletableConsumable> sources) {
this.actual = actual;
this.sources = sources;
this.sd = new SerialDisposable();
Expand Down Expand Up @@ -92,7 +92,7 @@ void next() {
return;
}

Iterator<? extends Completable> a = sources;
Iterator<? extends CompletableConsumable> a = sources;
do {
if (sd.isDisposed()) {
return;
Expand All @@ -111,7 +111,7 @@ void next() {
return;
}

Completable c;
CompletableConsumable c;

try {
c = a.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@
import io.reactivex.plugins.RxJavaPlugins;

public final class CompletableOnSubscribeMerge implements CompletableConsumable {
final Flowable<? extends Completable> source;
final Publisher<? extends CompletableConsumable> source;
final int maxConcurrency;
final boolean delayErrors;

public CompletableOnSubscribeMerge(Flowable<? extends Completable> source, int maxConcurrency, boolean delayErrors) {
public CompletableOnSubscribeMerge(Publisher<? extends CompletableConsumable> source, int maxConcurrency, boolean delayErrors) {
this.source = source;
this.maxConcurrency = maxConcurrency;
this.delayErrors = delayErrors;
Expand All @@ -45,7 +45,7 @@ public void subscribe(CompletableSubscriber s) {

static final class CompletableMergeSubscriber
extends AtomicInteger
implements Subscriber<Completable>, Disposable {
implements Subscriber<CompletableConsumable>, Disposable {
/** */
private static final long serialVersionUID = -2108443387387077490L;

Expand Down Expand Up @@ -106,7 +106,7 @@ Queue<Throwable> getOrCreateErrors() {
}

@Override
public void onNext(Completable t) {
public void onNext(CompletableConsumable t) {
if (done) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import io.reactivex.plugins.RxJavaPlugins;

public final class CompletableOnSubscribeMergeArray implements CompletableConsumable {
final Completable[] sources;
final CompletableConsumable[] sources;

public CompletableOnSubscribeMergeArray(Completable[] sources) {
public CompletableOnSubscribeMergeArray(CompletableConsumable[] sources) {
this.sources = sources;
}

Expand All @@ -34,7 +34,7 @@ public void subscribe(final CompletableSubscriber s) {

s.onSubscribe(set);

for (Completable c : sources) {
for (CompletableConsumable c : sources) {
if (set.isDisposed()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import io.reactivex.disposables.*;

public final class CompletableOnSubscribeMergeDelayErrorArray implements CompletableConsumable {
final Completable[] sources;
final CompletableConsumable[] sources;

public CompletableOnSubscribeMergeDelayErrorArray(Completable[] sources) {
public CompletableOnSubscribeMergeDelayErrorArray(CompletableConsumable[] sources) {
this.sources = sources;
}

Expand All @@ -36,7 +36,7 @@ public void subscribe(final CompletableSubscriber s) {

s.onSubscribe(set);

for (Completable c : sources) {
for (CompletableConsumable c : sources) {
if (set.isDisposed()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import io.reactivex.internal.queue.MpscLinkedQueue;

public final class CompletableOnSubscribeMergeDelayErrorIterable implements CompletableConsumable {
final Iterable<? extends Completable> sources;
final Iterable<? extends CompletableConsumable> sources;

public CompletableOnSubscribeMergeDelayErrorIterable(Iterable<? extends Completable> sources) {
public CompletableOnSubscribeMergeDelayErrorIterable(Iterable<? extends CompletableConsumable> sources) {
this.sources = sources;
}

Expand All @@ -36,7 +36,7 @@ public void subscribe(final CompletableSubscriber s) {

s.onSubscribe(set);

Iterator<? extends Completable> iterator;
Iterator<? extends CompletableConsumable> iterator;

try {
iterator = sources.iterator();
Expand Down Expand Up @@ -78,7 +78,7 @@ public void subscribe(final CompletableSubscriber s) {
return;
}

Completable c;
CompletableConsumable c;

try {
c = iterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import io.reactivex.plugins.RxJavaPlugins;

public final class CompletableOnSubscribeMergeIterable implements CompletableConsumable {
final Iterable<? extends Completable> sources;
final Iterable<? extends CompletableConsumable> sources;

public CompletableOnSubscribeMergeIterable(Iterable<? extends Completable> sources) {
public CompletableOnSubscribeMergeIterable(Iterable<? extends CompletableConsumable> sources) {
this.sources = sources;
}

Expand All @@ -35,7 +35,7 @@ public void subscribe(final CompletableSubscriber s) {

s.onSubscribe(set);

Iterator<? extends Completable> iterator;
Iterator<? extends CompletableConsumable> iterator;

try {
iterator = sources.iterator();
Expand Down Expand Up @@ -75,7 +75,7 @@ public void subscribe(final CompletableSubscriber s) {
return;
}

Completable c;
CompletableConsumable c;

try {
c = iterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@

public final class CompletableOnSubscribeTimeout implements CompletableConsumable {

final Completable source;
final CompletableConsumable source;
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
final Completable other;
final CompletableConsumable other;

public CompletableOnSubscribeTimeout(Completable source, long timeout,
TimeUnit unit, Scheduler scheduler, Completable other) {
public CompletableOnSubscribeTimeout(CompletableConsumable source, long timeout,
TimeUnit unit, Scheduler scheduler, CompletableConsumable other) {
this.source = source;
this.timeout = timeout;
this.unit = unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,32 @@
package io.reactivex.internal.operators.single;

import io.reactivex.*;
import io.reactivex.Single.*;
import io.reactivex.disposables.*;
import io.reactivex.functions.Function;

public final class SingleOperatorFlatMap<T, R> implements SingleOperator<R, T> {
final Function<? super T, ? extends Single<? extends R>> mapper;
public final class SingleOperatorFlatMap<T, R> extends Single<R> {
final SingleConsumable<? extends T> source;

final Function<? super T, ? extends SingleConsumable<? extends R>> mapper;

public SingleOperatorFlatMap(Function<? super T, ? extends Single<? extends R>> mapper) {
public SingleOperatorFlatMap(SingleConsumable<? extends T> source, Function<? super T, ? extends SingleConsumable<? extends R>> mapper) {
this.mapper = mapper;
this.source = source;
}

@Override
public SingleSubscriber<? super T> apply(SingleSubscriber<? super R> t) {
return new SingleFlatMapCallback<T, R>(t, mapper);
protected void subscribeActual(SingleSubscriber<? super R> subscriber) {
source.subscribe(new SingleFlatMapCallback<T, R>(subscriber, mapper));
}

static final class SingleFlatMapCallback<T, R> implements SingleSubscriber<T> {
final SingleSubscriber<? super R> actual;
final Function<? super T, ? extends Single<? extends R>> mapper;
final Function<? super T, ? extends SingleConsumable<? extends R>> mapper;

final MultipleAssignmentDisposable mad;

public SingleFlatMapCallback(SingleSubscriber<? super R> actual,
Function<? super T, ? extends Single<? extends R>> mapper) {
Function<? super T, ? extends SingleConsumable<? extends R>> mapper) {
this.actual = actual;
this.mapper = mapper;
this.mad = new MultipleAssignmentDisposable();
Expand All @@ -50,7 +52,7 @@ public void onSubscribe(Disposable d) {

@Override
public void onSuccess(T value) {
Single<? extends R> o;
SingleConsumable<? extends R> o;

try {
o = mapper.apply(value);
Expand Down
Loading