Skip to content

Commit 7a4d633

Browse files
authored
2.x: Use XCompletable in Completable and Single (#4042)
1 parent 596f77f commit 7a4d633

14 files changed

+311
-252
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 68 additions & 53 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Single.java

Lines changed: 128 additions & 145 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/internal/operators/completable/CompletableOnSubscribeConcat.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,10 @@
2626
import io.reactivex.plugins.RxJavaPlugins;
2727

2828
public final class CompletableOnSubscribeConcat implements CompletableConsumable {
29-
final Flowable<? extends Completable> sources;
29+
final Publisher<? extends CompletableConsumable> sources;
3030
final int prefetch;
3131

32-
public CompletableOnSubscribeConcat(Flowable<? extends Completable> sources, int prefetch) {
32+
public CompletableOnSubscribeConcat(Publisher<? extends CompletableConsumable> sources, int prefetch) {
3333
this.sources = sources;
3434
this.prefetch = prefetch;
3535
}
@@ -42,14 +42,14 @@ public void subscribe(CompletableSubscriber s) {
4242

4343
static final class CompletableConcatSubscriber
4444
extends AtomicInteger
45-
implements Subscriber<Completable>, Disposable {
45+
implements Subscriber<CompletableConsumable>, Disposable {
4646
/** */
4747
private static final long serialVersionUID = 7412667182931235013L;
4848
final CompletableSubscriber actual;
4949
final int prefetch;
5050
final SerialResource<Disposable> sr;
5151

52-
final SpscArrayQueue<Completable> queue;
52+
final SpscArrayQueue<CompletableConsumable> queue;
5353

5454
Subscription s;
5555

@@ -62,7 +62,7 @@ static final class CompletableConcatSubscriber
6262
public CompletableConcatSubscriber(CompletableSubscriber actual, int prefetch) {
6363
this.actual = actual;
6464
this.prefetch = prefetch;
65-
this.queue = new SpscArrayQueue<Completable>(prefetch);
65+
this.queue = new SpscArrayQueue<CompletableConsumable>(prefetch);
6666
this.sr = new SerialResource<Disposable>(Disposables.consumeAndDispose());
6767
this.inner = new ConcatInnerSubscriber();
6868
}
@@ -78,7 +78,7 @@ public void onSubscribe(Subscription s) {
7878
}
7979

8080
@Override
81-
public void onNext(Completable t) {
81+
public void onNext(CompletableConsumable t) {
8282
if (!queue.offer(t)) {
8383
onError(new MissingBackpressureException());
8484
return;
@@ -130,7 +130,7 @@ public void dispose() {
130130

131131
void next() {
132132
boolean d = done;
133-
Completable c = queue.poll();
133+
CompletableConsumable c = queue.poll();
134134
if (c == null) {
135135
if (d) {
136136
if (once.compareAndSet(false, true)) {

src/main/java/io/reactivex/internal/operators/completable/CompletableOnSubscribeConcatArray.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@
1919
import io.reactivex.disposables.*;
2020

2121
public final class CompletableOnSubscribeConcatArray implements CompletableConsumable {
22-
final Completable[] sources;
22+
final CompletableConsumable[] sources;
2323

24-
public CompletableOnSubscribeConcatArray(Completable[] sources) {
24+
public CompletableOnSubscribeConcatArray(CompletableConsumable[] sources) {
2525
this.sources = sources;
2626
}
2727

@@ -37,13 +37,13 @@ static final class ConcatInnerSubscriber extends AtomicInteger implements Comple
3737
private static final long serialVersionUID = -7965400327305809232L;
3838

3939
final CompletableSubscriber actual;
40-
final Completable[] sources;
40+
final CompletableConsumable[] sources;
4141

4242
int index;
4343

4444
final SerialDisposable sd;
4545

46-
public ConcatInnerSubscriber(CompletableSubscriber actual, Completable[] sources) {
46+
public ConcatInnerSubscriber(CompletableSubscriber actual, CompletableConsumable[] sources) {
4747
this.actual = actual;
4848
this.sources = sources;
4949
this.sd = new SerialDisposable();
@@ -73,7 +73,7 @@ void next() {
7373
return;
7474
}
7575

76-
Completable[] a = sources;
76+
CompletableConsumable[] a = sources;
7777
do {
7878
if (sd.isDisposed()) {
7979
return;

src/main/java/io/reactivex/internal/operators/completable/CompletableOnSubscribeConcatIterable.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,16 @@
2121
import io.reactivex.internal.disposables.EmptyDisposable;
2222

2323
public final class CompletableOnSubscribeConcatIterable implements CompletableConsumable {
24-
final Iterable<? extends Completable> sources;
24+
final Iterable<? extends CompletableConsumable> sources;
2525

26-
public CompletableOnSubscribeConcatIterable(Iterable<? extends Completable> sources) {
26+
public CompletableOnSubscribeConcatIterable(Iterable<? extends CompletableConsumable> sources) {
2727
this.sources = sources;
2828
}
2929

3030
@Override
3131
public void subscribe(CompletableSubscriber s) {
3232

33-
Iterator<? extends Completable> it;
33+
Iterator<? extends CompletableConsumable> it;
3434

3535
try {
3636
it = sources.iterator();
@@ -56,13 +56,13 @@ static final class ConcatInnerSubscriber extends AtomicInteger implements Comple
5656
private static final long serialVersionUID = -7965400327305809232L;
5757

5858
final CompletableSubscriber actual;
59-
final Iterator<? extends Completable> sources;
59+
final Iterator<? extends CompletableConsumable> sources;
6060

6161
int index;
6262

6363
final SerialDisposable sd;
6464

65-
public ConcatInnerSubscriber(CompletableSubscriber actual, Iterator<? extends Completable> sources) {
65+
public ConcatInnerSubscriber(CompletableSubscriber actual, Iterator<? extends CompletableConsumable> sources) {
6666
this.actual = actual;
6767
this.sources = sources;
6868
this.sd = new SerialDisposable();
@@ -92,7 +92,7 @@ void next() {
9292
return;
9393
}
9494

95-
Iterator<? extends Completable> a = sources;
95+
Iterator<? extends CompletableConsumable> a = sources;
9696
do {
9797
if (sd.isDisposed()) {
9898
return;
@@ -111,7 +111,7 @@ void next() {
111111
return;
112112
}
113113

114-
Completable c;
114+
CompletableConsumable c;
115115

116116
try {
117117
c = a.next();

src/main/java/io/reactivex/internal/operators/completable/CompletableOnSubscribeMerge.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@
2727
import io.reactivex.plugins.RxJavaPlugins;
2828

2929
public final class CompletableOnSubscribeMerge implements CompletableConsumable {
30-
final Flowable<? extends Completable> source;
30+
final Publisher<? extends CompletableConsumable> source;
3131
final int maxConcurrency;
3232
final boolean delayErrors;
3333

34-
public CompletableOnSubscribeMerge(Flowable<? extends Completable> source, int maxConcurrency, boolean delayErrors) {
34+
public CompletableOnSubscribeMerge(Publisher<? extends CompletableConsumable> source, int maxConcurrency, boolean delayErrors) {
3535
this.source = source;
3636
this.maxConcurrency = maxConcurrency;
3737
this.delayErrors = delayErrors;
@@ -45,7 +45,7 @@ public void subscribe(CompletableSubscriber s) {
4545

4646
static final class CompletableMergeSubscriber
4747
extends AtomicInteger
48-
implements Subscriber<Completable>, Disposable {
48+
implements Subscriber<CompletableConsumable>, Disposable {
4949
/** */
5050
private static final long serialVersionUID = -2108443387387077490L;
5151

@@ -106,7 +106,7 @@ Queue<Throwable> getOrCreateErrors() {
106106
}
107107

108108
@Override
109-
public void onNext(Completable t) {
109+
public void onNext(CompletableConsumable t) {
110110
if (done) {
111111
return;
112112
}

src/main/java/io/reactivex/internal/operators/completable/CompletableOnSubscribeMergeArray.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@
2020
import io.reactivex.plugins.RxJavaPlugins;
2121

2222
public final class CompletableOnSubscribeMergeArray implements CompletableConsumable {
23-
final Completable[] sources;
23+
final CompletableConsumable[] sources;
2424

25-
public CompletableOnSubscribeMergeArray(Completable[] sources) {
25+
public CompletableOnSubscribeMergeArray(CompletableConsumable[] sources) {
2626
this.sources = sources;
2727
}
2828

@@ -34,7 +34,7 @@ public void subscribe(final CompletableSubscriber s) {
3434

3535
s.onSubscribe(set);
3636

37-
for (Completable c : sources) {
37+
for (CompletableConsumable c : sources) {
3838
if (set.isDisposed()) {
3939
return;
4040
}

src/main/java/io/reactivex/internal/operators/completable/CompletableOnSubscribeMergeDelayErrorArray.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import io.reactivex.disposables.*;
2222

2323
public final class CompletableOnSubscribeMergeDelayErrorArray implements CompletableConsumable {
24-
final Completable[] sources;
24+
final CompletableConsumable[] sources;
2525

26-
public CompletableOnSubscribeMergeDelayErrorArray(Completable[] sources) {
26+
public CompletableOnSubscribeMergeDelayErrorArray(CompletableConsumable[] sources) {
2727
this.sources = sources;
2828
}
2929

@@ -36,7 +36,7 @@ public void subscribe(final CompletableSubscriber s) {
3636

3737
s.onSubscribe(set);
3838

39-
for (Completable c : sources) {
39+
for (CompletableConsumable c : sources) {
4040
if (set.isDisposed()) {
4141
return;
4242
}

src/main/java/io/reactivex/internal/operators/completable/CompletableOnSubscribeMergeDelayErrorIterable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import io.reactivex.internal.queue.MpscLinkedQueue;
2222

2323
public final class CompletableOnSubscribeMergeDelayErrorIterable implements CompletableConsumable {
24-
final Iterable<? extends Completable> sources;
24+
final Iterable<? extends CompletableConsumable> sources;
2525

26-
public CompletableOnSubscribeMergeDelayErrorIterable(Iterable<? extends Completable> sources) {
26+
public CompletableOnSubscribeMergeDelayErrorIterable(Iterable<? extends CompletableConsumable> sources) {
2727
this.sources = sources;
2828
}
2929

@@ -36,7 +36,7 @@ public void subscribe(final CompletableSubscriber s) {
3636

3737
s.onSubscribe(set);
3838

39-
Iterator<? extends Completable> iterator;
39+
Iterator<? extends CompletableConsumable> iterator;
4040

4141
try {
4242
iterator = sources.iterator();
@@ -78,7 +78,7 @@ public void subscribe(final CompletableSubscriber s) {
7878
return;
7979
}
8080

81-
Completable c;
81+
CompletableConsumable c;
8282

8383
try {
8484
c = iterator.next();

src/main/java/io/reactivex/internal/operators/completable/CompletableOnSubscribeMergeIterable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
import io.reactivex.plugins.RxJavaPlugins;
2222

2323
public final class CompletableOnSubscribeMergeIterable implements CompletableConsumable {
24-
final Iterable<? extends Completable> sources;
24+
final Iterable<? extends CompletableConsumable> sources;
2525

26-
public CompletableOnSubscribeMergeIterable(Iterable<? extends Completable> sources) {
26+
public CompletableOnSubscribeMergeIterable(Iterable<? extends CompletableConsumable> sources) {
2727
this.sources = sources;
2828
}
2929

@@ -35,7 +35,7 @@ public void subscribe(final CompletableSubscriber s) {
3535

3636
s.onSubscribe(set);
3737

38-
Iterator<? extends Completable> iterator;
38+
Iterator<? extends CompletableConsumable> iterator;
3939

4040
try {
4141
iterator = sources.iterator();
@@ -75,7 +75,7 @@ public void subscribe(final CompletableSubscriber s) {
7575
return;
7676
}
7777

78-
Completable c;
78+
CompletableConsumable c;
7979

8080
try {
8181
c = iterator.next();

src/main/java/io/reactivex/internal/operators/completable/CompletableOnSubscribeTimeout.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222

2323
public final class CompletableOnSubscribeTimeout implements CompletableConsumable {
2424

25-
final Completable source;
25+
final CompletableConsumable source;
2626
final long timeout;
2727
final TimeUnit unit;
2828
final Scheduler scheduler;
29-
final Completable other;
29+
final CompletableConsumable other;
3030

31-
public CompletableOnSubscribeTimeout(Completable source, long timeout,
32-
TimeUnit unit, Scheduler scheduler, Completable other) {
31+
public CompletableOnSubscribeTimeout(CompletableConsumable source, long timeout,
32+
TimeUnit unit, Scheduler scheduler, CompletableConsumable other) {
3333
this.source = source;
3434
this.timeout = timeout;
3535
this.unit = unit;

src/main/java/io/reactivex/internal/operators/single/SingleOperatorFlatMap.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,30 +14,32 @@
1414
package io.reactivex.internal.operators.single;
1515

1616
import io.reactivex.*;
17-
import io.reactivex.Single.*;
1817
import io.reactivex.disposables.*;
1918
import io.reactivex.functions.Function;
2019

21-
public final class SingleOperatorFlatMap<T, R> implements SingleOperator<R, T> {
22-
final Function<? super T, ? extends Single<? extends R>> mapper;
20+
public final class SingleOperatorFlatMap<T, R> extends Single<R> {
21+
final SingleConsumable<? extends T> source;
22+
23+
final Function<? super T, ? extends SingleConsumable<? extends R>> mapper;
2324

24-
public SingleOperatorFlatMap(Function<? super T, ? extends Single<? extends R>> mapper) {
25+
public SingleOperatorFlatMap(SingleConsumable<? extends T> source, Function<? super T, ? extends SingleConsumable<? extends R>> mapper) {
2526
this.mapper = mapper;
27+
this.source = source;
2628
}
2729

2830
@Override
29-
public SingleSubscriber<? super T> apply(SingleSubscriber<? super R> t) {
30-
return new SingleFlatMapCallback<T, R>(t, mapper);
31+
protected void subscribeActual(SingleSubscriber<? super R> subscriber) {
32+
source.subscribe(new SingleFlatMapCallback<T, R>(subscriber, mapper));
3133
}
3234

3335
static final class SingleFlatMapCallback<T, R> implements SingleSubscriber<T> {
3436
final SingleSubscriber<? super R> actual;
35-
final Function<? super T, ? extends Single<? extends R>> mapper;
37+
final Function<? super T, ? extends SingleConsumable<? extends R>> mapper;
3638

3739
final MultipleAssignmentDisposable mad;
3840

3941
public SingleFlatMapCallback(SingleSubscriber<? super R> actual,
40-
Function<? super T, ? extends Single<? extends R>> mapper) {
42+
Function<? super T, ? extends SingleConsumable<? extends R>> mapper) {
4143
this.actual = actual;
4244
this.mapper = mapper;
4345
this.mad = new MultipleAssignmentDisposable();
@@ -50,7 +52,7 @@ public void onSubscribe(Disposable d) {
5052

5153
@Override
5254
public void onSuccess(T value) {
53-
Single<? extends R> o;
55+
SingleConsumable<? extends R> o;
5456

5557
try {
5658
o = mapper.apply(value);

0 commit comments

Comments
 (0)