Skip to content

Commit 35c8da6

Browse files
authored
2.x: add assembly tracking, minor fixes and cleanup (#4417)
* 2.x: add assembly tracking, minor fixes and cleanup * Add missing header, add more time to test
1 parent 9fb4040 commit 35c8da6

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1206
-1175
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 48 additions & 48 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Flowable.java

Lines changed: 185 additions & 183 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Observable.java

Lines changed: 222 additions & 258 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/Single.java

Lines changed: 45 additions & 45 deletions
Large diffs are not rendered by default.

src/main/java/io/reactivex/flowables/ConnectableFlowable.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.reactivex.functions.Consumer;
2121
import io.reactivex.internal.functions.Functions;
2222
import io.reactivex.internal.operators.flowable.*;
23+
import io.reactivex.plugins.RxJavaPlugins;
2324

2425
/**
2526
* A {@code ConnectableObservable} resembles an ordinary {@link Flowable}, except that it does not begin
@@ -75,7 +76,7 @@ public void accept(Disposable d) {
7576
* @see <a href="http://reactivex.io/documentation/operators/refcount.html">ReactiveX documentation: RefCount</a>
7677
*/
7778
public Flowable<T> refCount() {
78-
return new FlowableRefCount<T>(this);
79+
return RxJavaPlugins.onAssembly(new FlowableRefCount<T>(this));
7980
}
8081

8182
/**
@@ -119,8 +120,8 @@ public Flowable<T> autoConnect(int numberOfSubscribers) {
119120
public Flowable<T> autoConnect(int numberOfSubscribers, Consumer<? super Disposable> connection) {
120121
if (numberOfSubscribers <= 0) {
121122
this.connect(connection);
122-
return this;
123+
return RxJavaPlugins.onAssembly(this);
123124
}
124-
return new FlowableAutoConnect<T>(this, numberOfSubscribers, connection);
125+
return RxJavaPlugins.onAssembly(new FlowableAutoConnect<T>(this, numberOfSubscribers, connection));
125126
}
126127
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.reactivex.exceptions.Exceptions;
2323
import io.reactivex.internal.subscriptions.SubscriptionHelper;
2424
import io.reactivex.internal.util.*;
25+
import io.reactivex.plugins.RxJavaPlugins;
2526

2627
/**
2728
* An observable which auto-connects to another observable, caches the elements
@@ -35,28 +36,28 @@ public final class FlowableCache<T> extends AbstractFlowableWithUpstream<T, T> {
3536

3637
final AtomicBoolean once;
3738
/**
38-
* Creates a cached Observable with a default capacity hint of 16.
39+
* Creates a cached Flowable with a default capacity hint of 16.
3940
* @param <T> the value type
4041
* @param source the source Observable to cache
4142
* @return the CachedObservable instance
4243
*/
43-
public static <T> FlowableCache<T> from(Flowable<T> source) {
44+
public static <T> Flowable<T> from(Flowable<T> source) {
4445
return from(source, 16);
4546
}
4647

4748
/**
48-
* Creates a cached Observable with the given capacity hint.
49+
* Creates a cached Flowable with the given capacity hint.
4950
* @param <T> the value type
5051
* @param source the source Observable to cache
5152
* @param capacityHint the hint for the internal buffer size
5253
* @return the CachedObservable instance
5354
*/
54-
public static <T> FlowableCache<T> from(Flowable<T> source, int capacityHint) {
55+
public static <T> Flowable<T> from(Flowable<T> source, int capacityHint) {
5556
if (capacityHint < 1) {
5657
throw new IllegalArgumentException("capacityHint > 0 required");
5758
}
5859
CacheState<T> state = new CacheState<T>(source, capacityHint);
59-
return new FlowableCache<T>(source, state);
60+
return RxJavaPlugins.onAssembly(new FlowableCache<T>(source, state));
6061
}
6162

6263
/**

src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinct.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818

1919
import org.reactivestreams.*;
2020

21+
import io.reactivex.Flowable;
2122
import io.reactivex.exceptions.*;
2223
import io.reactivex.functions.*;
2324
import io.reactivex.internal.functions.*;
2425
import io.reactivex.internal.subscriptions.*;
26+
import io.reactivex.plugins.RxJavaPlugins;
2527

2628
public final class FlowableDistinct<T, K> extends AbstractFlowableWithUpstream<T, T> {
2729
final Function<? super T, K> keySelector;
@@ -33,7 +35,7 @@ public FlowableDistinct(Publisher<T> source, Function<? super T, K> keySelector,
3335
this.keySelector = keySelector;
3436
}
3537

36-
public static <T, K> FlowableDistinct<T, K> withCollection(Publisher<T> source, Function<? super T, K> keySelector, final Callable<? extends Collection<? super K>> collectionSupplier) {
38+
public static <T, K> Flowable<T> withCollection(Publisher<T> source, Function<? super T, K> keySelector, final Callable<? extends Collection<? super K>> collectionSupplier) {
3739
Callable<? extends Predicate<? super K>> p = new Callable<Predicate<K>>() {
3840
@Override
3941
public Predicate<K> call() throws Exception {
@@ -52,10 +54,10 @@ public boolean test(K t) {
5254
}
5355
};
5456

55-
return new FlowableDistinct<T, K>(source, keySelector, p);
57+
return RxJavaPlugins.onAssembly(new FlowableDistinct<T, K>(source, keySelector, p));
5658
}
5759

58-
public static <T> FlowableDistinct<T, T> untilChanged(Publisher<T> source) {
60+
public static <T> Flowable<T> untilChanged(Publisher<T> source) {
5961
Callable<? extends Predicate<? super T>> p = new Callable<Predicate<T>>() {
6062
Object last;
6163
@Override
@@ -75,10 +77,10 @@ public boolean test(T t) {
7577
};
7678
}
7779
};
78-
return new FlowableDistinct<T, T>(source, Functions.<T>identity(), p);
80+
return RxJavaPlugins.onAssembly(new FlowableDistinct<T, T>(source, Functions.<T>identity(), p));
7981
}
8082

81-
public static <T, K> FlowableDistinct<T, K> untilChanged(Publisher<T> source, Function<? super T, K> keySelector) {
83+
public static <T, K> Flowable<T> untilChanged(Publisher<T> source, Function<? super T, K> keySelector) {
8284
Callable<? extends Predicate<? super K>> p = new Callable<Predicate<K>>() {
8385
Object last;
8486
@Override
@@ -98,7 +100,7 @@ public boolean test(K t) {
98100
};
99101
}
100102
};
101-
return new FlowableDistinct<T, K>(source, keySelector, p);
103+
return RxJavaPlugins.onAssembly(new FlowableDistinct<T, K>(source, keySelector, p));
102104
}
103105

104106
@Override

src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void subscribe(Subscriber<? super T> child) {
117117
}
118118
}
119119
};
120-
return new FlowablePublish<T>(onSubscribe, source, curr, bufferSize);
120+
return RxJavaPlugins.onAssembly(new FlowablePublish<T>(onSubscribe, source, curr, bufferSize));
121121
}
122122

123123
private FlowablePublish(Publisher<T> onSubscribe, Publisher<T> source,

src/main/java/io/reactivex/internal/operators/flowable/FlowableRepeatWhen.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public void subscribeActual(Subscriber<? super T> s) {
3939

4040
SerializedSubscriber<T> z = new SerializedSubscriber<T>(s);
4141

42-
FlowProcessor<Object> processor = new UnicastProcessor<Object>(8).toSerialized();
42+
FlowableProcessor<Object> processor = new UnicastProcessor<Object>(8).toSerialized();
4343

4444
Publisher<?> when;
4545

@@ -137,13 +137,13 @@ static abstract class WhenSourceSubscriber<T, U> extends SubscriptionArbiter imp
137137

138138
protected final Subscriber<? super T> actual;
139139

140-
protected final FlowProcessor<U> processor;
140+
protected final FlowableProcessor<U> processor;
141141

142142
protected final Subscription receiver;
143143

144144
private long produced;
145145

146-
public WhenSourceSubscriber(Subscriber<? super T> actual, FlowProcessor<U> processor,
146+
public WhenSourceSubscriber(Subscriber<? super T> actual, FlowableProcessor<U> processor,
147147
Subscription receiver) {
148148
this.actual = actual;
149149
this.processor = processor;
@@ -183,7 +183,7 @@ static final class RepeatWhenSubscriber<T> extends WhenSourceSubscriber<T, Objec
183183
/** */
184184
private static final long serialVersionUID = -2680129890138081029L;
185185

186-
public RepeatWhenSubscriber(Subscriber<? super T> actual, FlowProcessor<Object> processor,
186+
public RepeatWhenSubscriber(Subscriber<? super T> actual, FlowableProcessor<Object> processor,
187187
Subscription receiver) {
188188
super(actual, processor, receiver);
189189
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void accept(Disposable r) {
113113
*/
114114
public static <T> ConnectableFlowable<T> observeOn(final ConnectableFlowable<T> co, final Scheduler scheduler) {
115115
final Flowable<T> observable = co.observeOn(scheduler);
116-
return new ConnectableFlowable<T>() {
116+
return RxJavaPlugins.onAssembly(new ConnectableFlowable<T>() {
117117
@Override
118118
public void connect(Consumer<? super Disposable> connection) {
119119
co.connect(connection);
@@ -123,7 +123,7 @@ public void connect(Consumer<? super Disposable> connection) {
123123
protected void subscribeActual(Subscriber<? super T> s) {
124124
observable.subscribe(s);
125125
}
126-
};
126+
});
127127
}
128128

129129
/**
@@ -248,7 +248,7 @@ public void subscribe(Subscriber<? super T> child) {
248248
}
249249
}
250250
};
251-
return new FlowableReplay<T>(onSubscribe, source, curr, bufferFactory);
251+
return RxJavaPlugins.onAssembly(new FlowableReplay<T>(onSubscribe, source, curr, bufferFactory));
252252
}
253253

254254
private FlowableReplay(Publisher<T> onSubscribe, Flowable<T> source,

src/main/java/io/reactivex/internal/operators/flowable/FlowableRetryWhen.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public FlowableRetryWhen(Publisher<T> source,
3737
public void subscribeActual(Subscriber<? super T> s) {
3838
SerializedSubscriber<T> z = new SerializedSubscriber<T>(s);
3939

40-
FlowProcessor<Throwable> processor = new UnicastProcessor<Throwable>(8).toSerialized();
40+
FlowableProcessor<Throwable> processor = new UnicastProcessor<Throwable>(8).toSerialized();
4141

4242
Publisher<?> when;
4343

@@ -67,7 +67,7 @@ static final class RetryWhenSubscriber<T> extends WhenSourceSubscriber<T, Throwa
6767
/** */
6868
private static final long serialVersionUID = -2680129890138081029L;
6969

70-
public RetryWhenSubscriber(Subscriber<? super T> actual, FlowProcessor<Throwable> processor,
70+
public RetryWhenSubscriber(Subscriber<? super T> actual, FlowableProcessor<Throwable> processor,
7171
Subscription receiver) {
7272
super(actual, processor, receiver);
7373
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableScalarXMap.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.reactivex.functions.Function;
2323
import io.reactivex.internal.functions.ObjectHelper;
2424
import io.reactivex.internal.subscriptions.*;
25+
import io.reactivex.plugins.RxJavaPlugins;
2526

2627
/**
2728
* Utility classes to work with scalar-sourced XMap operators (where X == { flat, concat, switch }).
@@ -104,7 +105,7 @@ public static <T, R> boolean tryScalarXMapSubscribe(Publisher<T> source,
104105
* @return the new Flowable instance
105106
*/
106107
public static <T, U> Flowable<U> scalarXMap(final T value, final Function<? super T, ? extends Publisher<? extends U>> mapper) {
107-
return new ScalarXMapFlowable<T, U>(value, mapper);
108+
return RxJavaPlugins.onAssembly(new ScalarXMapFlowable<T, U>(value, mapper));
108109
}
109110

110111
/**

src/main/java/io/reactivex/internal/operators/observable/ObservableCache.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.reactivex.exceptions.Exceptions;
2121
import io.reactivex.internal.disposables.SequentialDisposable;
2222
import io.reactivex.internal.util.*;
23+
import io.reactivex.plugins.RxJavaPlugins;
2324

2425
/**
2526
* An observable which auto-connects to another observable, caches the elements
@@ -39,7 +40,7 @@ public final class ObservableCache<T> extends AbstractObservableWithUpstream<T,
3940
* @param source the source Observable to cache
4041
* @return the CachedObservable instance
4142
*/
42-
public static <T> ObservableCache<T> from(Observable<T> source) {
43+
public static <T> Observable<T> from(Observable<T> source) {
4344
return from(source, 16);
4445
}
4546

@@ -50,12 +51,12 @@ public static <T> ObservableCache<T> from(Observable<T> source) {
5051
* @param capacityHint the hint for the internal buffer size
5152
* @return the CachedObservable instance
5253
*/
53-
public static <T> ObservableCache<T> from(Observable<T> source, int capacityHint) {
54+
public static <T> Observable<T> from(Observable<T> source, int capacityHint) {
5455
if (capacityHint < 1) {
5556
throw new IllegalArgumentException("capacityHint > 0 required");
5657
}
5758
CacheState<T> state = new CacheState<T>(source, capacityHint);
58-
return new ObservableCache<T>(source, state);
59+
return RxJavaPlugins.onAssembly(new ObservableCache<T>(source, state));
5960
}
6061

6162
/**

src/main/java/io/reactivex/internal/operators/observable/ObservablePublish.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
2424
import io.reactivex.internal.util.NotificationLite;
2525
import io.reactivex.observables.ConnectableObservable;
26+
import io.reactivex.plugins.RxJavaPlugins;
2627

2728
/**
2829
* A connectable observable which shares an underlying source and dispatches source values to subscribers in a backpressure-aware
@@ -114,12 +115,12 @@ public void subscribe(Observer<? super T> child) {
114115
}
115116
}
116117
};
117-
return new ObservablePublish<T>(onSubscribe, source, curr, bufferSize);
118+
return RxJavaPlugins.onAssembly(new ObservablePublish<T>(onSubscribe, source, curr, bufferSize));
118119
}
119120

120121
public static <T, R> Observable<R> create(final ObservableSource<T> source,
121122
final Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final int bufferSize) {
122-
return new Observable<R>() {
123+
return RxJavaPlugins.onAssembly(new Observable<R>() {
123124
@Override
124125
protected void subscribeActual(Observer<? super R> o) {
125126
ConnectableObservable<T> op = ObservablePublish.create(source, bufferSize);
@@ -145,7 +146,7 @@ public void accept(Disposable r) {
145146
}
146147
});
147148
}
148-
};
149+
});
149150
}
150151

151152
private ObservablePublish(ObservableSource<T> onSubscribe, ObservableSource<T> source,

src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.reactivex.internal.disposables.*;
2727
import io.reactivex.internal.util.NotificationLite;
2828
import io.reactivex.observables.ConnectableObservable;
29+
import io.reactivex.plugins.RxJavaPlugins;
2930
import io.reactivex.schedulers.Timed;
3031

3132
public final class ObservableReplay<T> extends ConnectableObservable<T> implements ObservableWithUpstream<T> {
@@ -58,7 +59,7 @@ public Object call() {
5859
public static <U, R> Observable<R> multicastSelector(
5960
final Callable<? extends ConnectableObservable<U>> connectableFactory,
6061
final Function<? super Observable<U>, ? extends ObservableSource<R>> selector) {
61-
return new Observable<R>() {
62+
return RxJavaPlugins.onAssembly(new Observable<R>() {
6263
@Override
6364
protected void subscribeActual(Observer<? super R> child) {
6465
ConnectableObservable<U> co;
@@ -83,7 +84,7 @@ public void accept(Disposable r) {
8384
}
8485
});
8586
}
86-
};
87+
});
8788
}
8889

8990
/**
@@ -96,7 +97,7 @@ public void accept(Disposable r) {
9697
*/
9798
public static <T> ConnectableObservable<T> observeOn(final ConnectableObservable<T> co, final Scheduler scheduler) {
9899
final Observable<T> observable = co.observeOn(scheduler);
99-
return new ConnectableObservable<T>() {
100+
return RxJavaPlugins.onAssembly(new ConnectableObservable<T>() {
100101
@Override
101102
public void connect(Consumer<? super Disposable> connection) {
102103
co.connect(connection);
@@ -106,7 +107,7 @@ public void connect(Consumer<? super Disposable> connection) {
106107
protected void subscribeActual(Observer<? super T> observer) {
107108
observable.subscribe(observer);
108109
}
109-
};
110+
});
110111
}
111112

112113
/**
@@ -234,7 +235,7 @@ public void subscribe(Observer<? super T> child) {
234235
}
235236
}
236237
};
237-
return new ObservableReplay<T>(onSubscribe, source, curr, bufferFactory);
238+
return RxJavaPlugins.onAssembly(new ObservableReplay<T>(onSubscribe, source, curr, bufferFactory));
238239
}
239240

240241
private ObservableReplay(ObservableSource<T> onSubscribe, Observable<T> source,

src/main/java/io/reactivex/internal/operators/observable/ObservableScalarXMap.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.reactivex.internal.disposables.EmptyDisposable;
2323
import io.reactivex.internal.functions.ObjectHelper;
2424
import io.reactivex.internal.fuseable.QueueDisposable;
25+
import io.reactivex.plugins.RxJavaPlugins;
2526

2627
/**
2728
* Utility classes to work with scalar-sourced XMap operators (where X == { flat, concat, switch }).
@@ -105,7 +106,7 @@ public static <T, R> boolean tryScalarXMapSubscribe(ObservableSource<T> source,
105106
*/
106107
public static <T, U> Observable<U> scalarXMap(T value,
107108
Function<? super T, ? extends ObservableSource<? extends U>> mapper) {
108-
return new ScalarXMapObservable<T, U>(value, mapper);
109+
return RxJavaPlugins.onAssembly(new ScalarXMapObservable<T, U>(value, mapper));
109110
}
110111

111112
/**

src/main/java/io/reactivex/observables/ConnectableObservable.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.reactivex.functions.Consumer;
2121
import io.reactivex.internal.functions.Functions;
2222
import io.reactivex.internal.operators.observable.*;
23+
import io.reactivex.plugins.RxJavaPlugins;
2324

2425
/**
2526
* A {@code ConnectableObservable} resembles an ordinary {@link Flowable}, except that it does not begin
@@ -75,7 +76,7 @@ public void accept(Disposable d) {
7576
* @see <a href="http://reactivex.io/documentation/operators/refcount.html">ReactiveX documentation: RefCount</a>
7677
*/
7778
public Observable<T> refCount() {
78-
return new ObservableRefCount<T>(this);
79+
return RxJavaPlugins.onAssembly(new ObservableRefCount<T>(this));
7980
}
8081

8182
/**
@@ -119,8 +120,8 @@ public Observable<T> autoConnect(int numberOfSubscribers) {
119120
public Observable<T> autoConnect(int numberOfSubscribers, Consumer<? super Disposable> connection) {
120121
if (numberOfSubscribers <= 0) {
121122
this.connect(connection);
122-
return this;
123+
return RxJavaPlugins.onAssembly(this);
123124
}
124-
return new ObservableAutoConnect<T>(this, numberOfSubscribers, connection);
125+
return RxJavaPlugins.onAssembly(new ObservableAutoConnect<T>(this, numberOfSubscribers, connection));
125126
}
126127
}

0 commit comments

Comments
 (0)