Skip to content

Commit 80cd791

Browse files
Merge pull request ReactiveX#345 from benjchristensen/unit-test-cleanup-with-create
Remove unnecessary Observable constructor
2 parents 65ac7c2 + 8c7afbf commit 80cd791

19 files changed

+197
-325
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -102,17 +102,15 @@
102102
* <p>
103103
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/legend.png">
104104
* <p>
105-
* For more information see the <a href="https://github.com/Netflix/RxJava/wiki/Observable">RxJava
106-
* Uncyclo</a>
105+
* For more information see the <a href="https://github.com/Netflix/RxJava/wiki/Observable">RxJava Uncyclo</a>
107106
*
108107
* @param <T>
109108
*/
110109
public class Observable<T> {
111110

112-
//TODO use a consistent parameter naming scheme (for example: for all operators that modify a source Observable, the parameter representing that source Observable should have the same name, e.g. "source" -- currently such parameters are named any of "sequence", "that", "source", "items", or "observable")
113-
114-
private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
115-
111+
/**
112+
* Executed when 'subscribe' is invoked.
113+
*/
116114
private final OnSubscribeFunc<T> onSubscribe;
117115

118116
/**
@@ -125,7 +123,7 @@ public static interface OnSubscribeFunc<T> extends Function {
125123
public Subscription onSubscribe(Observer<? super T> t1);
126124

127125
}
128-
126+
129127
/**
130128
* Observable with Function to execute when subscribed to.
131129
* <p>
@@ -139,10 +137,8 @@ protected Observable(OnSubscribeFunc<T> onSubscribe) {
139137
this.onSubscribe = onSubscribe;
140138
}
141139

142-
protected Observable() {
143-
this(null);
144-
//TODO should this be made private to prevent it? It really serves no good purpose and only confuses things. Unit tests are incorrectly using it today
145-
}
140+
private final static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
141+
146142

147143
/**
148144
* An {@link Observer} must call an Observable's {@code subscribe} method in order to

rxjava-core/src/main/java/rx/observables/BlockingObservable.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,6 @@ protected BlockingObservable(OnSubscribeFunc<T> onSubscribe) {
6464
super(onSubscribe);
6565
}
6666

67-
/**
68-
* Used to prevent public instantiation
69-
*/
70-
@SuppressWarnings("unused")
71-
private BlockingObservable() {
72-
// prevent public instantiation
73-
}
74-
7567
/**
7668
* Convert an Observable into a BlockingObservable.
7769
*/

rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ public void testCombineLatestWithFunctionThatThrowsAnException() {
362362
TestObservable w1 = new TestObservable();
363363
TestObservable w2 = new TestObservable();
364364

365-
Observable<String> combined = Observable.create(combineLatest(w1, w2, new Func2<String, String, String>() {
365+
Observable<String> combined = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), new Func2<String, String, String>() {
366366
@Override
367367
public String call(String v1, String v2) {
368368
throw new RuntimeException("I don't work.");
@@ -387,7 +387,7 @@ public void testCombineLatestDifferentLengthObservableSequences1() {
387387
TestObservable w2 = new TestObservable();
388388
TestObservable w3 = new TestObservable();
389389

390-
Observable<String> combineLatestW = Observable.create(combineLatest(w1, w2, w3, getConcat3StringsCombineLatestFunction()));
390+
Observable<String> combineLatestW = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsCombineLatestFunction()));
391391
combineLatestW.subscribe(w);
392392

393393
/* simulate sending data */
@@ -425,7 +425,7 @@ public void testCombineLatestDifferentLengthObservableSequences2() {
425425
TestObservable w2 = new TestObservable();
426426
TestObservable w3 = new TestObservable();
427427

428-
Observable<String> combineLatestW = Observable.create(combineLatest(w1, w2, w3, getConcat3StringsCombineLatestFunction()));
428+
Observable<String> combineLatestW = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsCombineLatestFunction()));
429429
combineLatestW.subscribe(w);
430430

431431
/* simulate sending data */
@@ -461,7 +461,7 @@ public void testCombineLatestWithInterleavingSequences() {
461461
TestObservable w2 = new TestObservable();
462462
TestObservable w3 = new TestObservable();
463463

464-
Observable<String> combineLatestW = Observable.create(combineLatest(w1, w2, w3, getConcat3StringsCombineLatestFunction()));
464+
Observable<String> combineLatestW = Observable.create(combineLatest(Observable.create(w1), Observable.create(w2), Observable.create(w3), getConcat3StringsCombineLatestFunction()));
465465
combineLatestW.subscribe(w);
466466

467467
/* simulate sending data */
@@ -936,12 +936,12 @@ private static String getStringValue(Object o) {
936936
}
937937
}
938938

939-
private static class TestObservable extends Observable<String> {
939+
private static class TestObservable implements OnSubscribeFunc<String> {
940940

941941
Observer<? super String> observer;
942942

943943
@Override
944-
public Subscription subscribe(Observer<? super String> observer) {
944+
public Subscription onSubscribe(Observer<? super String> observer) {
945945
// just store the variable where it can be accessed so we can manually trigger it
946946
this.observer = observer;
947947
return Subscriptions.empty();

rxjava-core/src/main/java/rx/operators/OperationConcat.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ public void testSimpleAsyncConcat() {
255255
TestObservable<String> o1 = new TestObservable<String>("one", "two", "three");
256256
TestObservable<String> o2 = new TestObservable<String>("four", "five", "six");
257257

258-
Observable.concat(o1, o2).subscribe(observer);
258+
Observable.concat(Observable.create(o1), Observable.create(o2)).subscribe(observer);
259259

260260
try {
261261
// wait for async observables to complete
@@ -301,12 +301,12 @@ public void run() {
301301
// emit first
302302
if (!s.isUnsubscribed()) {
303303
System.out.println("Emit o1");
304-
observer.onNext(o1);
304+
observer.onNext(Observable.create(o1));
305305
}
306306
// emit second
307307
if (!s.isUnsubscribed()) {
308308
System.out.println("Emit o2");
309-
observer.onNext(o2);
309+
observer.onNext(Observable.create(o2));
310310
}
311311

312312
// wait until sometime later and emit third
@@ -317,7 +317,7 @@ public void run() {
317317
}
318318
if (!s.isUnsubscribed()) {
319319
System.out.println("Emit o3");
320-
observer.onNext(o3);
320+
observer.onNext(Observable.create(o3));
321321
}
322322

323323
} catch (Throwable e) {
@@ -404,7 +404,7 @@ public void testBlockedObservableOfObservables() {
404404
final CountDownLatch callOnce = new CountDownLatch(1);
405405
final CountDownLatch okToContinue = new CountDownLatch(1);
406406
TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(callOnce, okToContinue, odds, even);
407-
OnSubscribeFunc<String> concatF = concat(observableOfObservables);
407+
OnSubscribeFunc<String> concatF = concat(Observable.create(observableOfObservables));
408408
Observable<String> concat = Observable.create(concatF);
409409
concat.subscribe(observer);
410410
try {
@@ -443,8 +443,8 @@ public void testConcatConcurrentWithInfinity() {
443443
@SuppressWarnings("unchecked")
444444
Observer<String> aObserver = mock(Observer.class);
445445
@SuppressWarnings("unchecked")
446-
TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(w1, w2);
447-
OnSubscribeFunc<String> concatF = concat(observableOfObservables);
446+
TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(Observable.create(w1), Observable.create(w2));
447+
OnSubscribeFunc<String> concatF = concat(Observable.create(observableOfObservables));
448448

449449
Observable<String> concat = Observable.create(concatF);
450450

@@ -485,8 +485,8 @@ public void testConcatNonBlockingObservables() {
485485
@Override
486486
public Subscription onSubscribe(Observer<? super Observable<String>> observer) {
487487
// simulate what would happen in an observable
488-
observer.onNext(w1);
489-
observer.onNext(w2);
488+
observer.onNext(Observable.create(w1));
489+
observer.onNext(Observable.create(w2));
490490
observer.onCompleted();
491491

492492
return new Subscription() {
@@ -540,7 +540,7 @@ public void testConcatUnsubscribe() {
540540
@SuppressWarnings("unchecked")
541541
final Observer<String> aObserver = mock(Observer.class);
542542
@SuppressWarnings("unchecked")
543-
final Observable<String> concat = Observable.create(concat(w1, w2));
543+
final Observable<String> concat = Observable.create(concat(Observable.create(w1), Observable.create(w2)));
544544
final SafeObservableSubscription s1 = new SafeObservableSubscription();
545545

546546
try {
@@ -583,8 +583,8 @@ public void testConcatUnsubscribeConcurrent() {
583583
@SuppressWarnings("unchecked")
584584
Observer<String> aObserver = mock(Observer.class);
585585
@SuppressWarnings("unchecked")
586-
TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(w1, w2);
587-
OnSubscribeFunc<String> concatF = concat(observableOfObservables);
586+
TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(Observable.create(w1), Observable.create(w2));
587+
OnSubscribeFunc<String> concatF = concat(Observable.create(observableOfObservables));
588588

589589
Observable<String> concat = Observable.create(concatF);
590590

@@ -616,7 +616,7 @@ public void testConcatUnsubscribeConcurrent() {
616616
verify(aObserver, never()).onError(any(Throwable.class));
617617
}
618618

619-
private static class TestObservable<T> extends Observable<T> {
619+
private static class TestObservable<T> implements OnSubscribeFunc<T> {
620620

621621
private final Subscription s = new Subscription() {
622622

@@ -656,7 +656,7 @@ public TestObservable(T seed, int size) {
656656
}
657657

658658
@Override
659-
public Subscription subscribe(final Observer<? super T> observer) {
659+
public Subscription onSubscribe(final Observer<? super T> observer) {
660660
t = new Thread(new Runnable() {
661661

662662
@Override

rxjava-core/src/main/java/rx/operators/OperationMaterialize.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public void testMaterialize1() {
9393
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", null, "three");
9494

9595
TestObserver Observer = new TestObserver();
96-
Observable<Notification<String>> m = Observable.create(materialize(o1));
96+
Observable<Notification<String>> m = Observable.create(materialize(Observable.create(o1)));
9797
m.subscribe(Observer);
9898

9999
try {
@@ -118,7 +118,7 @@ public void testMaterialize2() {
118118
final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable("one", "two", "three");
119119

120120
TestObserver Observer = new TestObserver();
121-
Observable<Notification<String>> m = Observable.create(materialize(o1));
121+
Observable<Notification<String>> m = Observable.create(materialize(Observable.create(o1)));
122122
m.subscribe(Observer);
123123

124124
try {
@@ -143,7 +143,7 @@ public void testMaterialize2() {
143143
public void testMultipleSubscribes() throws InterruptedException, ExecutionException {
144144
final TestAsyncErrorObservable o = new TestAsyncErrorObservable("one", "two", null, "three");
145145

146-
Observable<Notification<String>> m = Observable.create(materialize(o));
146+
Observable<Notification<String>> m = Observable.create(materialize(Observable.create(o)));
147147

148148
assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size());
149149
assertEquals(3, m.toList().toBlockingObservable().toFuture().get().size());
@@ -174,7 +174,7 @@ public void onNext(Notification<String> value) {
174174

175175
}
176176

177-
private static class TestAsyncErrorObservable extends Observable<String> {
177+
private static class TestAsyncErrorObservable implements OnSubscribeFunc<String> {
178178

179179
String[] valuesToReturn;
180180

@@ -185,7 +185,7 @@ private static class TestAsyncErrorObservable extends Observable<String> {
185185
volatile Thread t;
186186

187187
@Override
188-
public Subscription subscribe(final Observer<? super String> observer) {
188+
public Subscription onSubscribe(final Observer<? super String> observer) {
189189
t = new Thread(new Runnable() {
190190

191191
@Override

0 commit comments

Comments
 (0)