Skip to content

Commit fc0c0d7

Browse files
author
Aaron Tull
committed
Implemented Observable.x(ConversionFunc) to allow external extensions to Observables.
1 parent bb89744 commit fc0c0d7

File tree

3 files changed

+276
-8
lines changed

3 files changed

+276
-8
lines changed

src/main/java/rx/Observable.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,23 @@ public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<
109109
// cover for generics insanity
110110
}
111111

112+
/**
113+
* Passes all emitted values from {@code this} Observable to the provided {@link ConversionFunc} to be
114+
* collected and returned as a single value. Note that it is legal for a {@link ConversionFunc} to
115+
* return an Observable (enabling chaining).
116+
*
117+
* @param conversion a function that converts from this {@code Observable<T>} to an {@code R}
118+
* @return an instance of R created by the provided Conversion
119+
*/
120+
@Experimental
121+
public <R> R x(Func1<? super OnSubscribe<T>, ? extends R> conversion) {
122+
return conversion.call(new OnSubscribe<T>() {
123+
@Override
124+
public void call(Subscriber<? super T> subscriber) {
125+
subscriber.add(Observable.subscribe(subscriber, Observable.this));
126+
}});
127+
}
128+
112129
/**
113130
* Lifts a function to the current Observable and returns a new Observable that when subscribed to will pass
114131
* the values of the current Observable through the Operator function.
@@ -127,17 +144,17 @@ public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<
127144
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}.</dd>
128145
* </dl>
129146
*
130-
* @param lift the Operator that implements the Observable-operating function to be applied to the source
147+
* @param operator the Operator that implements the Observable-operating function to be applied to the source
131148
* Observable
132149
* @return an Observable that is the result of applying the lifted Operator to the source Observable
133150
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
134151
*/
135-
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> lift) {
152+
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
136153
return new Observable<R>(new OnSubscribe<R>() {
137154
@Override
138155
public void call(Subscriber<? super R> o) {
139156
try {
140-
Subscriber<? super T> st = hook.onLift(lift).call(o);
157+
Subscriber<? super T> st = hook.onLift(operator).call(o);
141158
try {
142159
// new Subscriber created and being subscribed with so 'onStart' it
143160
st.onStart();
@@ -163,7 +180,6 @@ public void call(Subscriber<? super R> o) {
163180
});
164181
}
165182

166-
167183
/**
168184
* Transform an Observable by applying a particular Transformer function to it.
169185
* <p>
@@ -7752,11 +7768,15 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
77527768
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
77537769
*/
77547770
public final Subscription subscribe(Subscriber<? super T> subscriber) {
7755-
// validate and proceed
7771+
return Observable.subscribe(subscriber, this);
7772+
}
7773+
7774+
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
7775+
// validate and proceed
77567776
if (subscriber == null) {
77577777
throw new IllegalArgumentException("observer can not be null");
77587778
}
7759-
if (onSubscribe == null) {
7779+
if (observable.onSubscribe == null) {
77607780
throw new IllegalStateException("onSubscribe function can not be null.");
77617781
/*
77627782
* the subscribe function can also be overridden but generally that's not the appropriate approach
@@ -7780,7 +7800,7 @@ public final Subscription subscribe(Subscriber<? super T> subscriber) {
77807800
// The code below is exactly the same an unsafeSubscribe but not used because it would add a sigificent depth to alreay huge call stacks.
77817801
try {
77827802
// allow the hook to intercept and/or decorate
7783-
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
7803+
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
77847804
return hook.onSubscribeReturn(subscriber);
77857805
} catch (Throwable e) {
77867806
// special handling for certain Throwable/Error/Exception types
Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
package rx;
2+
3+
import java.util.Arrays;
4+
import java.util.List;
5+
import java.util.concurrent.ConcurrentLinkedQueue;
6+
import java.util.concurrent.atomic.AtomicBoolean;
7+
import java.util.concurrent.atomic.AtomicReference;
8+
9+
import static junit.framework.Assert.*;
10+
11+
import org.junit.Test;
12+
13+
import rx.Observable.OnSubscribe;
14+
import rx.Observable.Operator;
15+
import rx.exceptions.OnErrorNotImplementedException;
16+
import rx.functions.Func1;
17+
import rx.functions.Func2;
18+
import rx.internal.operators.OperatorFilter;
19+
import rx.internal.operators.OperatorMap;
20+
import rx.observers.TestSubscriber;
21+
import rx.schedulers.Schedulers;
22+
23+
public class ObservableConversionTest {
24+
25+
public static class Cylon {}
26+
27+
public static class Jail {
28+
Object cylon;
29+
30+
Jail(Object cylon) {
31+
this.cylon = cylon;
32+
}
33+
}
34+
35+
public static class CylonDetectorObservable<T> {
36+
protected OnSubscribe<T> onSubscribe;
37+
38+
public static <T> CylonDetectorObservable<T> create(OnSubscribe<T> onSubscribe) {
39+
return new CylonDetectorObservable<T>(onSubscribe);
40+
}
41+
42+
protected CylonDetectorObservable(OnSubscribe<T> onSubscribe) {
43+
this.onSubscribe = onSubscribe;
44+
}
45+
46+
public void subscribe(Subscriber<T> subscriber) {
47+
onSubscribe.call(subscriber);
48+
}
49+
50+
public <R> CylonDetectorObservable<R> lift(Operator<? extends R, ? super T> operator) {
51+
return x(new RobotConversionFunc<T, R>(operator));
52+
}
53+
54+
public <R, O> O x(Func1<OnSubscribe<T>, O> operator) {
55+
return operator.call(onSubscribe);
56+
}
57+
58+
public <R> CylonDetectorObservable<? extends R> compose(Func1<CylonDetectorObservable<? super T>, CylonDetectorObservable<? extends R>> transformer) {
59+
return transformer.call(this);
60+
}
61+
62+
public final CylonDetectorObservable<T> beep(Func1<? super T, Boolean> predicate) {
63+
return lift(new OperatorFilter<T>(predicate));
64+
}
65+
66+
public final <R> CylonDetectorObservable<R> boop(Func1<? super T, ? extends R> func) {
67+
return lift(new OperatorMap<T, R>(func));
68+
}
69+
70+
public CylonDetectorObservable<String> DESTROY() {
71+
return boop(new Func1<T, String>() {
72+
@Override
73+
public String call(T t) {
74+
Object cylon = ((Jail) t).cylon;
75+
throwOutTheAirlock(cylon);
76+
if (t instanceof Jail) {
77+
String name = cylon.toString();
78+
return "Cylon '" + name + "' has been destroyed";
79+
}
80+
else {
81+
return "Cylon 'anonymous' has been destroyed";
82+
}
83+
}});
84+
}
85+
86+
private static void throwOutTheAirlock(Object cylon) {
87+
// ...
88+
}
89+
}
90+
91+
public static class RobotConversionFunc<T, R> implements Func1<OnSubscribe<T>, CylonDetectorObservable<R>> {
92+
private Operator<? extends R, ? super T> operator;
93+
94+
public RobotConversionFunc(Operator<? extends R, ? super T> operator) {
95+
this.operator = operator;
96+
}
97+
98+
@Override
99+
public CylonDetectorObservable<R> call(final OnSubscribe<T> onSubscribe) {
100+
return CylonDetectorObservable.create(new OnSubscribe<R>() {
101+
@Override
102+
public void call(Subscriber<? super R> o) {
103+
try {
104+
Subscriber<? super T> st = operator.call(o);
105+
try {
106+
st.onStart();
107+
onSubscribe.call(st);
108+
} catch (OnErrorNotImplementedException e) {
109+
throw e;
110+
} catch (Throwable e) {
111+
st.onError(e);
112+
}
113+
} catch (OnErrorNotImplementedException e) {
114+
throw e;
115+
} catch (Throwable e) {
116+
o.onError(e);
117+
}
118+
119+
}});
120+
}
121+
}
122+
123+
public static class ConvertToCylonDetector<T> implements Func1<OnSubscribe<T>, CylonDetectorObservable<T>> {
124+
@Override
125+
public CylonDetectorObservable<T> call(final OnSubscribe<T> onSubscribe) {
126+
return CylonDetectorObservable.create(onSubscribe);
127+
}
128+
}
129+
130+
public static class ConvertToObservable<T> implements Func1<OnSubscribe<T>, Observable<T>> {
131+
@Override
132+
public Observable<T> call(final OnSubscribe<T> onSubscribe) {
133+
return Observable.create(onSubscribe);
134+
}
135+
}
136+
137+
@Test
138+
public void testConversionBetweenObservableClasses() {
139+
final TestSubscriber<String> subscriber = new TestSubscriber<String>(new Subscriber<String>(){
140+
141+
@Override
142+
public void onCompleted() {
143+
System.out.println("Complete");
144+
}
145+
146+
@Override
147+
public void onError(Throwable e) {
148+
System.out.println("error: " + e.getMessage());
149+
e.printStackTrace();
150+
}
151+
152+
@Override
153+
public void onNext(String t) {
154+
System.out.println(t);
155+
}});
156+
List<Object> crewOfBattlestarGalactica = Arrays.asList(new Object[] {"William Adama", "Laura Roslin", "Lee Adama", new Cylon()});
157+
Observable.from(crewOfBattlestarGalactica)
158+
.x(new ConvertToCylonDetector<Object>())
159+
.beep(new Func1<Object, Boolean>(){
160+
@Override
161+
public Boolean call(Object t) {
162+
return t instanceof Cylon;
163+
}})
164+
.boop(new Func1<Object, Object>() {
165+
@Override
166+
public Jail call(Object cylon) {
167+
return new Jail(cylon);
168+
}})
169+
.DESTROY()
170+
.x(new ConvertToObservable<String>())
171+
.reduce("Cylon Detector finished. Report:\n", new Func2<String, String, String>() {
172+
@Override
173+
public String call(String a, String n) {
174+
return a + n + "\n";
175+
}})
176+
.subscribe(subscriber);
177+
subscriber.assertNoErrors();
178+
subscriber.assertCompleted();
179+
}
180+
181+
@Test
182+
public void testConvertToConcurrentQueue() {
183+
final AtomicReference<Throwable> thrown = new AtomicReference<Throwable>(null);
184+
final AtomicBoolean isFinished = new AtomicBoolean(false);
185+
ConcurrentLinkedQueue<? extends Integer> queue = Observable.range(0,5)
186+
.flatMap(new Func1<Integer, Observable<Integer>>(){
187+
@Override
188+
public Observable<Integer> call(final Integer i) {
189+
return Observable.range(0, 5)
190+
.observeOn(Schedulers.io())
191+
.map(new Func1<Integer, Integer>(){
192+
@Override
193+
public Integer call(Integer k) {
194+
try {
195+
Thread.sleep(System.currentTimeMillis() % 100);
196+
} catch (InterruptedException e) {
197+
e.printStackTrace();
198+
}
199+
return i + k;
200+
}});
201+
}})
202+
.x(new Func1<OnSubscribe<Integer>, ConcurrentLinkedQueue<Integer>>() {
203+
@Override
204+
public ConcurrentLinkedQueue<Integer> call(OnSubscribe<Integer> onSubscribe) {
205+
final ConcurrentLinkedQueue<Integer> q = new ConcurrentLinkedQueue<Integer>();
206+
onSubscribe.call(new Subscriber<Integer>(){
207+
@Override
208+
public void onCompleted() {
209+
isFinished.set(true);
210+
}
211+
212+
@Override
213+
public void onError(Throwable e) {
214+
thrown.set(e);
215+
}
216+
217+
@Override
218+
public void onNext(Integer t) {
219+
q.add(t);
220+
}});
221+
return q;
222+
}});
223+
224+
int x = 0;
225+
while(!isFinished.get()) {
226+
Integer i = queue.poll();
227+
if (i != null) {
228+
x++;
229+
System.out.println(x + " item: " + i);
230+
}
231+
}
232+
assertEquals(null, thrown.get());
233+
}
234+
}

src/test/java/rx/ObservableTests.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
import rx.functions.Func0;
5454
import rx.functions.Func1;
5555
import rx.functions.Func2;
56-
import rx.functions.Functions;
5756
import rx.observables.ConnectableObservable;
5857
import rx.observers.TestSubscriber;
5958
import rx.schedulers.TestScheduler;
@@ -1157,4 +1156,19 @@ public void testForEachWithNull() {
11571156
//
11581157
.forEach(null);
11591158
}
1159+
1160+
@Test
1161+
public void testExtend() {
1162+
final TestSubscriber<Object> subscriber = new TestSubscriber<Object>();
1163+
final Object value = new Object();
1164+
Observable.just(value).x(new Func1<OnSubscribe<Object>,Object>(){
1165+
@Override
1166+
public Object call(OnSubscribe<Object> onSubscribe) {
1167+
onSubscribe.call(subscriber);
1168+
subscriber.assertNoErrors();
1169+
subscriber.assertCompleted();
1170+
subscriber.assertValue(value);
1171+
return subscriber.getOnNextEvents().get(0);
1172+
}});
1173+
}
11601174
}

0 commit comments

Comments
 (0)