Skip to content

Commit 3ea9265

Browse files
akarnokdzsxwing
authored andcommitted
1.x: just() construction to call the onCreate execution hook (#3958)
This PR adds the call to RxJavaObservableExecutionHook.onCreate() to the just() - ScalarSynchronousObservable construction. Related: #2656.
1 parent d24c395 commit 3ea9265

File tree

2 files changed

+92
-10
lines changed

2 files changed

+92
-10
lines changed

src/main/java/rx/internal/util/ScalarSynchronousObservable.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import rx.internal.producers.SingleProducer;
2424
import rx.internal.schedulers.EventLoopsScheduler;
2525
import rx.observers.Subscribers;
26+
import rx.plugins.*;
2627

2728
/**
2829
* An Observable that emits a single constant scalar value to Subscribers.
@@ -33,6 +34,14 @@
3334
* @param <T> the value type
3435
*/
3536
public final class ScalarSynchronousObservable<T> extends Observable<T> {
37+
/**
38+
* The execution hook instance.
39+
* <p>
40+
* Can't be final to allow tests overriding it in place; if the class
41+
* has been initialized, the plugin reset has no effect because
42+
* how RxJavaPlugins was designed.
43+
*/
44+
static RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
3645
/**
3746
* Indicates that the Producer used by this Observable should be fully
3847
* threadsafe. It is possible, but unlikely that multiple concurrent
@@ -72,14 +81,7 @@ public static <T> ScalarSynchronousObservable<T> create(T t) {
7281
final T t;
7382

7483
protected ScalarSynchronousObservable(final T t) {
75-
super(new OnSubscribe<T>() {
76-
77-
@Override
78-
public void call(Subscriber<? super T> s) {
79-
s.setProducer(createProducer(s, t));
80-
}
81-
82-
});
84+
super(hook.onCreate(new JustOnSubscribe<T>(t)));
8385
this.t = t;
8486
}
8587

@@ -131,6 +133,20 @@ public void call() {
131133
return create(new ScalarAsyncOnSubscribe<T>(t, onSchedule));
132134
}
133135

136+
/** The OnSubscribe callback for the Observable constructor. */
137+
static final class JustOnSubscribe<T> implements OnSubscribe<T> {
138+
final T value;
139+
140+
JustOnSubscribe(T value) {
141+
this.value = value;
142+
}
143+
144+
@Override
145+
public void call(Subscriber<? super T> s) {
146+
s.setProducer(createProducer(s, value));
147+
}
148+
}
149+
134150
/**
135151
* The OnSubscribe implementation that creates the ScalarAsyncProducer for each
136152
* incoming subscriber.

src/test/java/rx/internal/util/ScalarSynchronousObservableTest.java

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@
1616

1717
package rx.internal.util;
1818

19-
import org.junit.Test;
19+
import java.util.concurrent.atomic.*;
2020

21-
import rx.Observable;
21+
import org.junit.*;
22+
23+
import rx.*;
24+
import rx.Observable.OnSubscribe;
2225
import rx.exceptions.TestException;
2326
import rx.functions.Func1;
2427
import rx.observers.TestSubscriber;
28+
import rx.plugins.*;
2529
import rx.schedulers.Schedulers;
2630

2731
public class ScalarSynchronousObservableTest {
@@ -230,4 +234,66 @@ public void onNext(Integer t) {
230234
ts.assertError(TestException.class);
231235
ts.assertNotCompleted();
232236
}
237+
238+
@Test
239+
public void hookCalled() {
240+
RxJavaObservableExecutionHook save = ScalarSynchronousObservable.hook;
241+
try {
242+
final AtomicInteger c = new AtomicInteger();
243+
244+
ScalarSynchronousObservable.hook = new RxJavaObservableExecutionHook() {
245+
@Override
246+
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
247+
c.getAndIncrement();
248+
return f;
249+
}
250+
};
251+
252+
int n = 10;
253+
254+
for (int i = 0; i < n; i++) {
255+
Observable.just(1).subscribe();
256+
}
257+
258+
Assert.assertEquals(n, c.get());
259+
} finally {
260+
ScalarSynchronousObservable.hook = save;
261+
}
262+
}
263+
264+
@Test
265+
public void hookChangesBehavior() {
266+
RxJavaObservableExecutionHook save = ScalarSynchronousObservable.hook;
267+
try {
268+
ScalarSynchronousObservable.hook = new RxJavaObservableExecutionHook() {
269+
@Override
270+
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
271+
if (f instanceof ScalarSynchronousObservable.JustOnSubscribe) {
272+
final T v = ((ScalarSynchronousObservable.JustOnSubscribe<T>) f).value;
273+
return new OnSubscribe<T>() {
274+
@Override
275+
public void call(Subscriber<? super T> t) {
276+
t.onNext(v);
277+
t.onNext(v);
278+
t.onCompleted();
279+
}
280+
};
281+
}
282+
return f;
283+
}
284+
};
285+
286+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
287+
288+
Observable.just(1).subscribe(ts);
289+
290+
ts.assertValues(1, 1);
291+
ts.assertNoErrors();
292+
ts.assertCompleted();
293+
294+
} finally {
295+
ScalarSynchronousObservable.hook = save;
296+
}
297+
}
298+
233299
}

0 commit comments

Comments
 (0)