Skip to content

Commit eb0c28e

Browse files
committed
2.x: first step switching to the reduced-allocation architecture
1 parent 12490fd commit eb0c28e

36 files changed

+292
-217
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
*
3838
* The class follows a similar event pattern as Reactive-Streams: onSubscribe (onError|onComplete)?
3939
*/
40-
public class Completable {
40+
public abstract class Completable {
4141
/**
4242
* Callback used for building deferred computations that takes a CompletableSubscriber.
4343
*/
@@ -386,7 +386,7 @@ public static Completable create(CompletableOnSubscribe onSubscribe) {
386386
try {
387387
// TODO plugin wrapping onSubscribe
388388

389-
return new Completable(onSubscribe);
389+
return new CompletableWrapper(onSubscribe);
390390
} catch (NullPointerException ex) {
391391
throw ex;
392392
} catch (Throwable ex) {
@@ -986,18 +986,6 @@ public void dispose() {
986986
});
987987
}
988988

989-
/** The actual subscription action. */
990-
private final CompletableOnSubscribe onSubscribe;
991-
992-
/**
993-
* Constructs a Completable instance with the given onSubscribe callback.
994-
* @param onSubscribe the callback that will receive CompletableSubscribers when they subscribe,
995-
* not null (not verified)
996-
*/
997-
protected Completable(CompletableOnSubscribe onSubscribe) {
998-
this.onSubscribe = onSubscribe;
999-
}
1000-
1001989
/**
1002990
* Returns a Completable that emits the a terminated event of either this Completable
1003991
* or the other Completable whichever fires first.
@@ -1929,14 +1917,16 @@ public final void subscribe(CompletableSubscriber s) {
19291917
try {
19301918
// TODO plugin wrapping the subscriber
19311919

1932-
onSubscribe.accept(s);
1920+
subscribeActual(s);
19331921
} catch (NullPointerException ex) {
19341922
throw ex;
19351923
} catch (Throwable ex) {
19361924
RxJavaPlugins.onError(ex);
19371925
throw toNpe(ex);
19381926
}
19391927
}
1928+
1929+
protected abstract void subscribeActual(CompletableSubscriber s);
19401930

19411931
/**
19421932
* Subscribes to this Completable and calls back either the onError or onComplete functions.

src/main/java/io/reactivex/Flowable.java

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import io.reactivex.schedulers.*;
3333
import io.reactivex.subscribers.*;
3434

35-
public class Flowable<T> implements Publisher<T> {
35+
public abstract class Flowable<T> implements Publisher<T> {
3636
/**
3737
* Interface to map/wrap a downstream subscriber to an upstream subscriber.
3838
*
@@ -385,7 +385,7 @@ public static <T> Flowable<T> concatArray(Publisher<? extends T>... sources) {
385385
public static <T> Flowable<T> create(Publisher<T> onSubscribe) {
386386
Objects.requireNonNull(onSubscribe, "onSubscribe is null");
387387
onSubscribe = RxJavaPlugins.onCreate(onSubscribe);
388-
return new Flowable<T>(onSubscribe);
388+
return fromPublisher(onSubscribe);
389389
}
390390

391391
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@@ -497,12 +497,7 @@ public static <T> Flowable<T> fromPublisher(final Publisher<? extends T> publish
497497
}
498498
Objects.requireNonNull(publisher, "publisher is null");
499499

500-
return create(new Publisher<T>() {
501-
@Override
502-
public void subscribe(Subscriber<? super T> s) {
503-
publisher.subscribe(s);
504-
}
505-
});
500+
return new FlowableWrapper<T>(publisher);
506501
}
507502

508503
@BackpressureSupport(BackpressureKind.FULL)
@@ -1160,12 +1155,6 @@ public static <T, R> Flowable<R> zipIterable(Function<? super Object[], ? extend
11601155
return create(new PublisherZip<T, R>(null, sources, zipper, bufferSize, delayError));
11611156
}
11621157

1163-
final Publisher<T> onSubscribe;
1164-
1165-
protected Flowable(Publisher<T> onSubscribe) {
1166-
this.onSubscribe = onSubscribe;
1167-
}
1168-
11691158
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
11701159
@SchedulerSupport(SchedulerKind.NONE)
11711160
public final Flowable<Boolean> all(Predicate<? super T> predicate) {
@@ -2162,7 +2151,7 @@ public final Flowable<T> last(T defaultValue) {
21622151
public final <R> Flowable<R> lift(Operator<? extends R, ? super T> lifter) {
21632152
Objects.requireNonNull(lifter, "lifter is null");
21642153
// using onSubscribe so the fusing has access to the underlying raw Publisher
2165-
return create(new PublisherLift<R, T>(onSubscribe, lifter));
2154+
return create(new PublisherLift<R, T>(this, lifter));
21662155
}
21672156

21682157
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@@ -2939,18 +2928,14 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T
29392928
@Override
29402929
public final void subscribe(Subscriber<? super T> s) {
29412930
Objects.requireNonNull(s, "s is null");
2942-
subscribeActual(s);
2943-
}
2944-
2945-
private void subscribeActual(Subscriber<? super T> s) {
29462931
try {
29472932
s = RxJavaPlugins.onSubscribe(s);
29482933

29492934
if (s == null) {
29502935
throw new NullPointerException("Plugin returned null Subscriber");
29512936
}
29522937

2953-
onSubscribe.subscribe(s);
2938+
subscribeActual(s);
29542939
} catch (NullPointerException e) {
29552940
throw e;
29562941
} catch (Throwable e) {
@@ -2965,6 +2950,8 @@ private void subscribeActual(Subscriber<? super T> s) {
29652950
}
29662951
}
29672952

2953+
protected abstract void subscribeActual(Subscriber<? super T> s);
2954+
29682955
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
29692956
@SchedulerSupport(SchedulerKind.CUSTOM)
29702957
public final Flowable<T> subscribeOn(Scheduler scheduler) {

src/main/java/io/reactivex/Observable.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
* Observable for delivering a sequence of values without backpressure.
3939
* @param <T>
4040
*/
41-
public class Observable<T> {
41+
public abstract class Observable<T> {
4242

4343
public interface NbpOnSubscribe<T> extends Consumer<Observer<? super T>> {
4444

@@ -358,7 +358,7 @@ public static <T> Observable<T> concatArray(Observable<? extends T>... sources)
358358
public static <T> Observable<T> create(NbpOnSubscribe<T> onSubscribe) {
359359
Objects.requireNonNull(onSubscribe, "onSubscribe is null");
360360
// TODO plugin wrapper
361-
return new Observable<T>(onSubscribe);
361+
return new ObservableWrapper<T>(onSubscribe);
362362
}
363363

364364
@SchedulerSupport(SchedulerKind.NONE)
@@ -1089,12 +1089,6 @@ public static <T, R> Observable<R> zipIterable(Function<? super Object[], ? exte
10891089
}
10901090

10911091

1092-
protected final NbpOnSubscribe<T> onSubscribe;
1093-
1094-
protected Observable(NbpOnSubscribe<T> onSubscribe) {
1095-
this.onSubscribe = onSubscribe;
1096-
}
1097-
10981092
@SchedulerSupport(SchedulerKind.NONE)
10991093
public final Observable<Boolean> all(Predicate<? super T> predicate) {
11001094
Objects.requireNonNull(predicate, "predicate is null");
@@ -2634,10 +2628,15 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T
26342628
return ls;
26352629
}
26362630

2637-
public final void subscribe(Observer<? super T> subscriber) {
2638-
Objects.requireNonNull(subscriber, "subscriber is null");
2639-
onSubscribe.accept(subscriber);
2631+
public final void subscribe(Observer<? super T> observer) {
2632+
Objects.requireNonNull(observer, "observer is null");
2633+
2634+
// TODO plugin wrappings
2635+
2636+
subscribeActual(observer);
26402637
}
2638+
2639+
protected abstract void subscribeActual(Observer<? super T> observer);
26412640

26422641
@SchedulerSupport(SchedulerKind.CUSTOM)
26432642
public final Observable<T> subscribeOn(Scheduler scheduler) {

src/main/java/io/reactivex/Single.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
*
3737
* @param <T> the value type
3838
*/
39-
public class Single<T> {
39+
public abstract class Single<T> {
4040

4141
public interface SingleOnSubscribe<T> extends Consumer<SingleSubscriber<? super T>> {
4242

@@ -356,7 +356,7 @@ public static <T> Flowable<T> concat(
356356
public static <T> Single<T> create(SingleOnSubscribe<T> onSubscribe) {
357357
Objects.requireNonNull(onSubscribe, "onSubscribe is null");
358358
// TODO plugin wrapper
359-
return new Single<T>(onSubscribe);
359+
return new SingleWrapper<T>(onSubscribe);
360360
}
361361

362362
public static <T> Single<T> defer(final Supplier<? extends Single<? extends T>> singleSupplier) {
@@ -1003,12 +1003,6 @@ public static <T, R> Single<R> zipArray(Function<? super Object[], ? extends R>
10031003
return Flowable.zipArray(zipper, false, 1, sourcePublishers).toSingle();
10041004
}
10051005

1006-
protected final SingleOnSubscribe<T> onSubscribe;
1007-
1008-
protected Single(SingleOnSubscribe<T> onSubscribe) {
1009-
this.onSubscribe = onSubscribe;
1010-
}
1011-
10121006
@SuppressWarnings("unchecked")
10131007
public final Single<T> ambWith(Single<? extends T> other) {
10141008
Objects.requireNonNull(other, "other is null");
@@ -1374,7 +1368,7 @@ public void accept(SingleSubscriber<? super R> s) {
13741368
throw new NullPointerException("The onLift returned a null subscriber");
13751369
}
13761370
// TODO plugin wrapper
1377-
onSubscribe.accept(sr);
1371+
subscribe(sr);
13781372
} catch (NullPointerException ex) {
13791373
throw ex;
13801374
} catch (Throwable ex) {
@@ -1690,9 +1684,11 @@ public void onSuccess(T value) {
16901684
public final void subscribe(SingleSubscriber<? super T> subscriber) {
16911685
Objects.requireNonNull(subscriber, "subscriber is null");
16921686
// TODO plugin wrapper
1693-
onSubscribe.accept(subscriber);
1687+
subscribeActual(subscriber);
16941688
}
16951689

1690+
protected abstract void subscribeActual(SingleSubscriber<? super T> subscriber);
1691+
16961692
public final void subscribe(Subscriber<? super T> s) {
16971693
toFlowable().subscribe(s);
16981694
}

src/main/java/io/reactivex/flowables/ConnectableFlowable.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,6 @@
3636
*/
3737
public abstract class ConnectableFlowable<T> extends Flowable<T> {
3838

39-
protected ConnectableFlowable(Publisher<T> onSubscribe) {
40-
super(onSubscribe);
41-
}
42-
4339
/**
4440
* Instructs the {@code ConnectableObservable} to begin emitting the items from its underlying
4541
* {@link Flowable} to its {@link Subscriber}s.

src/main/java/io/reactivex/flowables/GroupedFlowable.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,11 @@
1212
*/
1313
package io.reactivex.flowables;
1414

15-
import org.reactivestreams.Publisher;
16-
1715
import io.reactivex.Flowable;
1816

19-
public class GroupedFlowable<K, T> extends Flowable<T> {
17+
public abstract class GroupedFlowable<K, T> extends Flowable<T> {
2018
final K key;
21-
protected GroupedFlowable(Publisher<T> onSubscribe, K key) {
22-
super(onSubscribe);
19+
protected GroupedFlowable(K key) {
2320
this.key = key;
2421
}
2522

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.reactivex.internal.operators.completable;
2+
3+
import io.reactivex.Completable;
4+
5+
public final class CompletableWrapper extends Completable {
6+
7+
final CompletableOnSubscribe onSubscribe;
8+
9+
public CompletableWrapper(CompletableOnSubscribe onSubscribe) {
10+
this.onSubscribe = onSubscribe;
11+
}
12+
13+
@Override
14+
protected void subscribeActual(CompletableSubscriber s) {
15+
onSubscribe.accept(s);
16+
}
17+
}

src/main/java/io/reactivex/internal/operators/flowable/CachedObservable.java

Lines changed: 20 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public final class CachedObservable<T> extends Flowable<T> {
3333
/** The cache and replay state. */
3434
private CacheState<T> state;
3535

36+
private final AtomicBoolean once;
3637
/**
3738
* Creates a cached Observable with a default capacity hint of 16.
3839
* @param <T> the value type
@@ -55,21 +56,35 @@ public static <T> CachedObservable<T> from(Flowable<? extends T> source, int cap
5556
throw new IllegalArgumentException("capacityHint > 0 required");
5657
}
5758
CacheState<T> state = new CacheState<T>(source, capacityHint);
58-
CachedSubscribe<T> onSubscribe = new CachedSubscribe<T>(state);
59-
return new CachedObservable<T>(onSubscribe, state);
59+
return new CachedObservable<T>(state);
6060
}
6161

6262
/**
6363
* Private constructor because state needs to be shared between the Observable body and
6464
* the onSubscribe function.
65-
* @param onSubscribe
6665
* @param state
6766
*/
68-
private CachedObservable(Publisher<T> onSubscribe, CacheState<T> state) {
69-
super(onSubscribe);
67+
private CachedObservable(CacheState<T> state) {
7068
this.state = state;
69+
this.once = new AtomicBoolean();
7170
}
7271

72+
@Override
73+
protected void subscribeActual(Subscriber<? super T> t) {
74+
// we can connect first because we replay everything anyway
75+
ReplaySubscription<T> rp = new ReplaySubscription<T>(t, state);
76+
state.addProducer(rp);
77+
78+
t.onSubscribe(rp);
79+
80+
// we ensure a single connection here to save an instance field of AtomicBoolean in state.
81+
if (!once.get() && once.compareAndSet(false, true)) {
82+
state.connect();
83+
}
84+
85+
// no need to call rp.replay() here because the very first request will trigger it anyway
86+
}
87+
7388
/**
7489
* Check if this cached observable is connected to its source.
7590
* @return true if already connected
@@ -222,35 +237,6 @@ void dispatch() {
222237
}
223238
}
224239

225-
/**
226-
* Manages the subscription of child subscribers by setting up a replay producer and
227-
* performs auto-connection of the very first subscription.
228-
* @param <T> the value type emitted
229-
*/
230-
static final class CachedSubscribe<T> extends AtomicBoolean implements Publisher<T> {
231-
/** */
232-
private static final long serialVersionUID = -2817751667698696782L;
233-
final CacheState<T> state;
234-
public CachedSubscribe(CacheState<T> state) {
235-
this.state = state;
236-
}
237-
@Override
238-
public void subscribe(Subscriber<? super T> t) {
239-
// we can connect first because we replay everything anyway
240-
ReplaySubscription<T> rp = new ReplaySubscription<T>(t, state);
241-
state.addProducer(rp);
242-
243-
t.onSubscribe(rp);
244-
245-
// we ensure a single connection here to save an instance field of AtomicBoolean in state.
246-
if (!get() && compareAndSet(false, true)) {
247-
state.connect();
248-
}
249-
250-
// no need to call rp.replay() here because the very first request will trigger it anyway
251-
}
252-
}
253-
254240
/**
255241
* Keeps track of the current request amount and the replay position for a child Subscriber.
256242
*
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.reactivex.internal.operators.flowable;
2+
3+
import org.reactivestreams.*;
4+
5+
import io.reactivex.Flowable;
6+
7+
public final class FlowableWrapper<T> extends Flowable<T> {
8+
final Publisher<? extends T> publisher;
9+
10+
public FlowableWrapper(Publisher<? extends T> publisher) {
11+
this.publisher = publisher;
12+
}
13+
14+
@Override
15+
protected void subscribeActual(Subscriber<? super T> s) {
16+
publisher.subscribe(s);
17+
}
18+
}

0 commit comments

Comments
 (0)