Skip to content

Commit 47e6c67

Browse files
authored
1.x: deprecate create(), add alternatives (#5086)
1 parent a37e292 commit 47e6c67

File tree

99 files changed

+595
-529
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

99 files changed

+595
-529
lines changed

src/main/java/rx/Completable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2249,7 +2249,7 @@ public final <R> R to(Func1<? super Completable, R> converter) {
22492249
* @return the new Observable created
22502250
*/
22512251
public final <T> Observable<T> toObservable() {
2252-
return Observable.create(new Observable.OnSubscribe<T>() {
2252+
return Observable.unsafeCreate(new Observable.OnSubscribe<T>() {
22532253
@Override
22542254
public void call(Subscriber<? super T> s) {
22552255
unsafeSubscribe(s);

src/main/java/rx/Observable.java

Lines changed: 139 additions & 73 deletions
Large diffs are not rendered by default.

src/main/java/rx/Single.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public interface Transformer<T, R> extends Func1<Single<T>, Single<R>> {
202202
*/
203203
private static <T> Observable<T> asObservable(Single<T> t) {
204204
// is this sufficient, or do I need to keep the outer Single and subscribe to it?
205-
return Observable.create(new SingleToObservable<T>(t.onSubscribe));
205+
return Observable.unsafeCreate(new SingleToObservable<T>(t.onSubscribe));
206206
}
207207

208208
/* *********************************************************************************************************

src/main/java/rx/internal/operators/EmptyObservableHolder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public enum EmptyObservableHolder implements OnSubscribe<Object> {
2828
;
2929

3030
/** The singleton instance. */
31-
static final Observable<Object> EMPTY = Observable.create(INSTANCE);
31+
static final Observable<Object> EMPTY = Observable.unsafeCreate(INSTANCE);
3232

3333

3434
/**

src/main/java/rx/internal/operators/NeverObservableHolder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public enum NeverObservableHolder implements OnSubscribe<Object> {
2828
;
2929

3030
/** The singleton instance. */
31-
static final Observable<Object> NEVER = Observable.create(INSTANCE);
31+
static final Observable<Object> NEVER = Observable.unsafeCreate(INSTANCE);
3232

3333
/**
3434
* Returns a type-corrected singleton instance of the never Observable.

src/main/java/rx/internal/operators/OnSubscribeFromEmitter.java renamed to src/main/java/rx/internal/operators/OnSubscribeCreate.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@
2929
import rx.plugins.RxJavaHooks;
3030
import rx.subscriptions.SerialSubscription;
3131

32-
public final class OnSubscribeFromEmitter<T> implements OnSubscribe<T> {
32+
public final class OnSubscribeCreate<T> implements OnSubscribe<T> {
3333

3434
final Action1<Emitter<T>> Emitter;
3535

3636
final Emitter.BackpressureMode backpressure;
3737

38-
public OnSubscribeFromEmitter(Action1<Emitter<T>> Emitter, Emitter.BackpressureMode backpressure) {
38+
public OnSubscribeCreate(Action1<Emitter<T>> Emitter, Emitter.BackpressureMode backpressure) {
3939
this.Emitter = Emitter;
4040
this.backpressure = backpressure;
4141
}
@@ -268,7 +268,7 @@ public void onError(Throwable e) {
268268

269269
@Override
270270
void onOverflow() {
271-
onError(new MissingBackpressureException("fromEmitter: could not emit value due to lack of requests"));
271+
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
272272
}
273273

274274
}

src/main/java/rx/internal/operators/OnSubscribeFlattenIterable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,9 @@ public static <T, R> Observable<R> createFrom(Observable<? extends T> source,
7070
Func1<? super T, ? extends Iterable<? extends R>> mapper, int prefetch) {
7171
if (source instanceof ScalarSynchronousObservable) {
7272
T scalar = ((ScalarSynchronousObservable<? extends T>) source).get();
73-
return Observable.create(new OnSubscribeScalarFlattenIterable<T, R>(scalar, mapper));
73+
return Observable.unsafeCreate(new OnSubscribeScalarFlattenIterable<T, R>(scalar, mapper));
7474
}
75-
return Observable.create(new OnSubscribeFlattenIterable<T, R>(source, mapper, prefetch));
75+
return Observable.unsafeCreate(new OnSubscribeFlattenIterable<T, R>(source, mapper, prefetch));
7676
}
7777

7878
static final class FlattenIterableSubscriber<T, R> extends Subscriber<T> {

src/main/java/rx/internal/operators/OnSubscribeGroupJoin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public void onNext(T1 args) {
170170
leftMap().put(id, subjSerial);
171171
}
172172

173-
Observable<T2> window = Observable.create(new WindowObservableFunc<T2>(subj, cancel));
173+
Observable<T2> window = Observable.unsafeCreate(new WindowObservableFunc<T2>(subj, cancel));
174174

175175
Observable<D1> duration = leftDuration.call(args);
176176

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* limitations under the License.
3232
*/
3333

34-
import static rx.Observable.create; // NOPMD
34+
import static rx.Observable.unsafeCreate; // NOPMD
3535

3636
import java.util.concurrent.atomic.*;
3737

@@ -133,11 +133,11 @@ public static <T> Observable<T> retry(Observable<T> source, final long count) {
133133
}
134134

135135
public static <T> Observable<T> retry(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
136-
return create(new OnSubscribeRedo<T>(source, notificationHandler, true, false, Schedulers.trampoline()));
136+
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, true, false, Schedulers.trampoline()));
137137
}
138138

139139
public static <T> Observable<T> retry(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
140-
return create(new OnSubscribeRedo<T>(source, notificationHandler, true, false, scheduler));
140+
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, true, false, scheduler));
141141
}
142142

143143
public static <T> Observable<T> repeat(Observable<T> source) {
@@ -163,15 +163,15 @@ public static <T> Observable<T> repeat(Observable<T> source, final long count, S
163163
}
164164

165165
public static <T> Observable<T> repeat(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler) {
166-
return create(new OnSubscribeRedo<T>(source, notificationHandler, false, true, Schedulers.trampoline()));
166+
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false, true, Schedulers.trampoline()));
167167
}
168168

169169
public static <T> Observable<T> repeat(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
170-
return create(new OnSubscribeRedo<T>(source, notificationHandler, false, true, scheduler));
170+
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false, true, scheduler));
171171
}
172172

173173
public static <T> Observable<T> redo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> notificationHandler, Scheduler scheduler) {
174-
return create(new OnSubscribeRedo<T>(source, notificationHandler, false, false, scheduler));
174+
return unsafeCreate(new OnSubscribeRedo<T>(source, notificationHandler, false, false, scheduler));
175175
}
176176

177177
private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends Notification<?>>, ? extends Observable<?>> f, boolean stopOnComplete, boolean stopOnError,

src/main/java/rx/internal/operators/OperatorPublish.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public static <T, R> Observable<R> create(final Observable<? extends T> source,
123123

124124
public static <T, R> Observable<R> create(final Observable<? extends T> source,
125125
final Func1<? super Observable<T>, ? extends Observable<R>> selector, final boolean delayError) {
126-
return create(new OnSubscribe<R>() {
126+
return unsafeCreate(new OnSubscribe<R>() {
127127
@Override
128128
public void call(final Subscriber<? super R> child) {
129129
final OnSubscribePublishMulticast<T> op = new OnSubscribePublishMulticast<T>(RxRingBuffer.SIZE, delayError);
@@ -155,7 +155,7 @@ public void setProducer(Producer p) {
155155
child.add(op);
156156
child.add(subscriber);
157157

158-
selector.call(Observable.create(op)).unsafeSubscribe(subscriber);
158+
selector.call(Observable.unsafeCreate(op)).unsafeSubscribe(subscriber);
159159

160160
source.unsafeSubscribe(op.subscriber());
161161
}

src/main/java/rx/internal/operators/OperatorReplay.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public Object call() {
5858
public static <T, U, R> Observable<R> multicastSelector(
5959
final Func0<? extends ConnectableObservable<U>> connectableFactory,
6060
final Func1<? super Observable<U>, ? extends Observable<R>> selector) {
61-
return Observable.create(new OnSubscribe<R>() {
61+
return Observable.unsafeCreate(new OnSubscribe<R>() {
6262
@Override
6363
public void call(final Subscriber<? super R> child) {
6464
ConnectableObservable<U> co;

src/main/java/rx/internal/util/ScalarSynchronousObservable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public void call() {
122122
};
123123
}
124124

125-
return create(new ScalarAsyncOnSubscribe<T>(t, onSchedule));
125+
return unsafeCreate(new ScalarAsyncOnSubscribe<T>(t, onSchedule));
126126
}
127127

128128
/** The OnSubscribe callback for the Observable constructor. */
@@ -225,7 +225,7 @@ public String toString() {
225225
* @return the new observable
226226
*/
227227
public <R> Observable<R> scalarFlatMap(final Func1<? super T, ? extends Observable<? extends R>> func) {
228-
return create(new OnSubscribe<R>() {
228+
return unsafeCreate(new OnSubscribe<R>() {
229229
@Override
230230
public void call(final Subscriber<? super R> child) {
231231
Observable<? extends R> o = func.call(t);

src/main/java/rx/observables/AsyncOnSubscribe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
/**
3434
* A utility class to create {@code OnSubscribe<T>} functions that respond correctly to back
3535
* pressure requests from subscribers. This is an improvement over
36-
* {@link rx.Observable#create(OnSubscribe) Observable.create(OnSubscribe)} which does not provide
36+
* {@link rx.Observable#unsafeCreate(OnSubscribe) Observable.create(OnSubscribe)} which does not provide
3737
* any means of managing back pressure requests out-of-the-box. This variant of an OnSubscribe
3838
* function allows for the asynchronous processing of requests.
3939
*

src/main/java/rx/observables/ConnectableObservable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void call(Subscription t1) {
7676
* @see <a href="http://reactivex.io/documentation/operators/refcount.html">ReactiveX documentation: RefCount</a>
7777
*/
7878
public Observable<T> refCount() {
79-
return create(new OnSubscribeRefCount<T>(this));
79+
return unsafeCreate(new OnSubscribeRefCount<T>(this));
8080
}
8181

8282
/**
@@ -125,6 +125,6 @@ public Observable<T> autoConnect(int numberOfSubscribers, Action1<? super Subscr
125125
this.connect(connection);
126126
return this;
127127
}
128-
return create(new OnSubscribeAutoConnect<T>(this, numberOfSubscribers, connection));
128+
return unsafeCreate(new OnSubscribeAutoConnect<T>(this, numberOfSubscribers, connection));
129129
}
130130
}

src/main/java/rx/observables/SyncOnSubscribe.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
/**
3030
* A utility class to create {@code OnSubscribe<T>} functions that responds correctly to back
3131
* pressure requests from subscribers. This is an improvement over
32-
* {@link rx.Observable#create(OnSubscribe) Observable.create(OnSubscribe)} which does not provide
32+
* {@link rx.Observable#unsafeCreate(OnSubscribe) Observable.create(OnSubscribe)} which does not provide
3333
* any means of managing back pressure requests out-of-the-box.
3434
*
3535
* @param <S>

src/main/java/rx/plugins/RxJavaObservableExecutionHook.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
*/
4242
public abstract class RxJavaObservableExecutionHook { // NOPMD
4343
/**
44-
* Invoked during the construction by {@link Observable#create(OnSubscribe)}
44+
* Invoked during the construction by {@link Observable#unsafeCreate(OnSubscribe)}
4545
* <p>
4646
* This can be used to decorate or replace the <code>onSubscribe</code> function or just perform extra
4747
* logging, metrics and other such things and pass through the function.

src/perf/java/rx/OneItemPerf.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public void call(SingleSubscriber<? super T> t) {
7070
@Setup
7171
public void setup() {
7272
scalar = Observable.just(1);
73-
one = Observable.create(new OnSubscribe<Integer>() {
73+
one = Observable.unsafeCreate(new OnSubscribe<Integer>() {
7474
@Override
7575
public void call(Subscriber<? super Integer> t) {
7676
t.setProducer(new SingleProducer<Integer>(t, 1));

src/perf/java/rx/jmh/InputWithIncrementingInteger.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void setup(final Blackhole bh) {
4444
final int size = getSize();
4545
observable = Observable.range(0, size);
4646

47-
firehose = Observable.create(new OnSubscribe<Integer>() {
47+
firehose = Observable.unsafeCreate(new OnSubscribe<Integer>() {
4848

4949
@Override
5050
public void call(Subscriber<? super Integer> s) {

src/perf/java/rx/operators/FromComparison.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ public void setup() {
4949

5050
Arrays.fill(array, 1);
5151

52-
iterableSource = Observable.create(new OnSubscribeFromIterable<Integer>(Arrays.asList(array)));
53-
arraySource = Observable.create(new OnSubscribeFromArray<Integer>(array));
52+
iterableSource = Observable.unsafeCreate(new OnSubscribeFromIterable<Integer>(Arrays.asList(array)));
53+
arraySource = Observable.unsafeCreate(new OnSubscribeFromArray<Integer>(array));
5454
}
5555

5656
@Benchmark

src/perf/java/rx/operators/OperatorRangePerf.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public static class InputUsingRequest {
4343

4444
@Setup
4545
public void setup(final Blackhole bh) {
46-
observable = Observable.create(new OnSubscribeRange(0, size));
46+
observable = Observable.unsafeCreate(new OnSubscribeRange(0, size));
4747
this.bh = bh;
4848
}
4949

@@ -91,7 +91,7 @@ public static class InputWithoutRequest {
9191

9292
@Setup
9393
public void setup(final Blackhole bh) {
94-
observable = Observable.create(new OnSubscribeRange(0, size));
94+
observable = Observable.unsafeCreate(new OnSubscribeRange(0, size));
9595
this.bh = bh;
9696

9797
}

src/perf/java/rx/operators/OperatorSerializePerf.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void serializedSingleStream(Input input) throws InterruptedException {
5959
@Benchmark
6060
public void serializedTwoStreamsHighlyContended(final Input input) throws InterruptedException {
6161
LatchedObserver<Integer> o = input.newLatchedObserver();
62-
Observable.create(new OnSubscribe<Integer>() {
62+
Observable.unsafeCreate(new OnSubscribe<Integer>() {
6363

6464
@Override
6565
public void call(Subscriber<? super Integer> s) {
@@ -101,7 +101,7 @@ public Integer call(Long t1) {
101101
@Benchmark
102102
public void serializedTwoStreamsSlightlyContended(final InputWithInterval input) throws InterruptedException {
103103
LatchedObserver<Integer> o = input.newLatchedObserver();
104-
Observable.create(new OnSubscribe<Integer>() {
104+
Observable.unsafeCreate(new OnSubscribe<Integer>() {
105105

106106
@Override
107107
public void call(Subscriber<? super Integer> s) {
@@ -118,7 +118,7 @@ public void call(Subscriber<? super Integer> s) {
118118
@Benchmark
119119
public void serializedTwoStreamsOneFastOneSlow(final InputWithInterval input) throws InterruptedException {
120120
LatchedObserver<Integer> o = input.newLatchedObserver();
121-
Observable.create(new OnSubscribe<Integer>() {
121+
Observable.unsafeCreate(new OnSubscribe<Integer>() {
122122

123123
@Override
124124
public void call(final Subscriber<? super Integer> s) {

src/perf/java/rx/operators/OperatorTakeLastOnePerf.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void takeLastOneUsingTakeLast(Input input) {
4646

4747
@Benchmark
4848
public void takeLastOneUsingTakeLastOne(Input input) {
49-
Observable.create(new OnSubscribeTakeLastOne<Integer>(input.observable)).subscribe(input.observer);
49+
Observable.unsafeCreate(new OnSubscribeTakeLastOne<Integer>(input.observable)).subscribe(input.observer);
5050
}
5151

5252
}

src/test/java/rx/BackpressureTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,7 @@ private static Observable<Integer> incrementingIntegers(final AtomicInteger coun
593593
}
594594

595595
private static Observable<Integer> incrementingIntegers(final AtomicInteger counter, final ConcurrentLinkedQueue<Thread> threadsSeen) {
596-
return Observable.create(new OnSubscribe<Integer>() {
596+
return Observable.unsafeCreate(new OnSubscribe<Integer>() {
597597

598598
final AtomicLong requested = new AtomicLong();
599599

@@ -637,7 +637,7 @@ public void request(long n) {
637637
* @return
638638
*/
639639
private static Observable<Integer> firehose(final AtomicInteger counter) {
640-
return Observable.create(new OnSubscribe<Integer>() {
640+
return Observable.unsafeCreate(new OnSubscribe<Integer>() {
641641

642642
int i;
643643

src/test/java/rx/CompletableTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ public void call(CompletableSubscriber cs) {
391391
cs.onError(e);
392392
}
393393
})
394-
.andThen(Observable.<String>create(new Observable.OnSubscribe<String>() {
394+
.andThen(Observable.<String>unsafeCreate(new Observable.OnSubscribe<String>() {
395395
@Override
396396
public void call(Subscriber<? super String> s) {
397397
hasRun.set(true);

src/test/java/rx/ConcatTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public void testConcatCovariance4() {
149149
Media media = new Media();
150150
HorrorMovie horrorMovie2 = new HorrorMovie();
151151

152-
Observable<Movie> o1 = Observable.create(new OnSubscribe<Movie>() {
152+
Observable<Movie> o1 = Observable.unsafeCreate(new OnSubscribe<Movie>() {
153153

154154
@Override
155155
public void call(Subscriber<? super Movie> o) {

src/test/java/rx/EventStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ private EventStream() {
3232
throw new IllegalStateException("No instances!");
3333
}
3434
public static Observable<Event> getEventStream(final String type, final int numInstances) {
35-
return Observable.create(new OnSubscribe<Event>() {
35+
return Observable.unsafeCreate(new OnSubscribe<Event>() {
3636

3737
@Override
3838
public void call(final Subscriber<? super Event> subscriber) {

src/test/java/rx/MergeTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void testMergeCovariance3() {
8080
@Test
8181
public void testMergeCovariance4() {
8282

83-
Observable<Movie> o1 = Observable.create(new OnSubscribe<Movie>() {
83+
Observable<Movie> o1 = Observable.unsafeCreate(new OnSubscribe<Movie>() {
8484

8585
@Override
8686
public void call(Subscriber<? super Movie> o) {

0 commit comments

Comments
 (0)