Skip to content

Commit 98f6cbc

Browse files
committed
Playing with Subject and ConnectedObservable
1 parent 04068c1 commit 98f6cbc

File tree

5 files changed

+192
-118
lines changed

5 files changed

+192
-118
lines changed
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package rx.observables;
2+
3+
import rx.Observable;
4+
import rx.Observer;
5+
import rx.Subscription;
6+
import rx.subjects.DefaultSubject;
7+
import rx.subjects.Subject;
8+
import rx.util.functions.Func1;
9+
10+
public class ConnectableObservable<T, R> extends Observable<T> {
11+
private final Observable<T> source;
12+
private final Subject<T, R> subject;
13+
14+
public static <T, R> ConnectableObservable create(final Observable<T> source, final Subject<T, R> subject) {
15+
return new ConnectableObservable<T, R>(source, subject, new Func1<Observer<T>, Subscription>() {
16+
@Override
17+
public Subscription call(Observer<T> observer) {
18+
return subject.subscribe(observer);
19+
}
20+
});
21+
}
22+
23+
protected ConnectableObservable(Observable<T> source, Subject<T, R> subject, Func1<Observer<T>, Subscription> onSubscribe) {
24+
super(onSubscribe);
25+
this.source = source;
26+
this.subject = subject;
27+
}
28+
29+
public Subscription connect() {
30+
return source.subscribe(new Observer<T>() {
31+
@Override
32+
public void onCompleted() {
33+
subject.onCompleted();
34+
}
35+
36+
@Override
37+
public void onError(Exception e) {
38+
subject.onError(e);
39+
}
40+
41+
@Override
42+
public void onNext(T args) {
43+
subject.onNext(args);
44+
}
45+
});
46+
}
47+
48+
49+
}

rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import rx.Observable;
2727
import rx.Observer;
2828
import rx.Subscription;
29-
import rx.subjects.Subject;
29+
import rx.subjects.DefaultSubject;
3030
import rx.subscriptions.Subscriptions;
3131
import rx.util.AtomicObservableSubscription;
3232
import rx.util.AtomicObserver;
@@ -174,7 +174,7 @@ public Boolean call(Integer input)
174174

175175
@Test
176176
public void testTakeWhileOnSubject1() {
177-
Subject<Integer> s = Subject.create();
177+
DefaultSubject<Integer> s = DefaultSubject.create();
178178
Observable<Integer> w = (Observable<Integer>) s;
179179
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>()
180180
{
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package rx.operators;
2+
3+
import rx.Observable;
4+
import rx.Observer;
5+
import rx.Subscription;
6+
import rx.util.functions.Func1;
7+
8+
public class OperatorMulticast {
9+
public static <T> Func1<Observer<T>, Subscription> multicast(Observable<T> source, Func1<T, Boolean> predicate) {
10+
return null;
11+
}
12+
13+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package rx.subjects;
2+
3+
import java.util.ArrayList;
4+
import java.util.List;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.atomic.AtomicBoolean;
7+
import java.util.concurrent.atomic.AtomicReference;
8+
9+
import junit.framework.Assert;
10+
11+
import org.junit.Test;
12+
13+
import rx.Notification;
14+
import rx.Observable;
15+
import rx.Observer;
16+
import rx.Subscription;
17+
import rx.util.AtomicObservableSubscription;
18+
import rx.util.SynchronizedObserver;
19+
import rx.util.functions.Action1;
20+
import rx.util.functions.Func1;
21+
22+
public class DefaultSubject<T> extends Subject<T, T> {
23+
public static <T> DefaultSubject<T> create() {
24+
final ConcurrentHashMap<Subscription, Observer<T>> observers = new ConcurrentHashMap<Subscription, Observer<T>>();
25+
26+
Func1<Observer<T>, Subscription> onSubscribe = new Func1<Observer<T>, Subscription>() {
27+
@Override
28+
public Subscription call(Observer<T> observer) {
29+
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
30+
31+
subscription.wrap(new Subscription() {
32+
@Override
33+
public void unsubscribe() {
34+
// on unsubscribe remove it from the map of outbound observers to notify
35+
observers.remove(subscription);
36+
}
37+
});
38+
39+
// on subscribe add it to the map of outbound observers to notify
40+
observers.put(subscription, new SynchronizedObserver<T>(observer, subscription));
41+
return subscription;
42+
}
43+
};
44+
45+
return new DefaultSubject<T>(onSubscribe, observers);
46+
}
47+
48+
private final ConcurrentHashMap<Subscription, Observer<T>> observers;
49+
50+
protected DefaultSubject(Func1<Observer<T>, Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<T>> observers) {
51+
super(onSubscribe);
52+
this.observers = observers;
53+
}
54+
55+
@Override
56+
public void onCompleted() {
57+
for (Observer<T> observer : observers.values()) {
58+
observer.onCompleted();
59+
}
60+
}
61+
62+
@Override
63+
public void onError(Exception e) {
64+
for (Observer<T> observer : observers.values()) {
65+
observer.onError(e);
66+
}
67+
}
68+
69+
@Override
70+
public void onNext(T args) {
71+
for (Observer<T> observer : observers.values()) {
72+
observer.onNext(args);
73+
}
74+
}
75+
76+
public static class UnitTest {
77+
@Test
78+
public void test() {
79+
DefaultSubject<Integer> subject = DefaultSubject.create();
80+
final AtomicReference<List<Notification<String>>> actualRef = new AtomicReference<List<Notification<String>>>();
81+
82+
Observable<List<Notification<Integer>>> wNotificationsList = subject.materialize().toList();
83+
wNotificationsList.subscribe(new Action1<List<Notification<String>>>() {
84+
@Override
85+
public void call(List<Notification<String>> actual) {
86+
actualRef.set(actual);
87+
}
88+
});
89+
90+
Subscription sub = Observable.create(new Func1<Observer<Integer>, Subscription>() {
91+
@Override
92+
public Subscription call(final Observer<Integer> observer) {
93+
final AtomicBoolean stop = new AtomicBoolean(false);
94+
new Thread() {
95+
@Override
96+
public void run() {
97+
int i = 1;
98+
while (!stop.get()) {
99+
observer.onNext(i++);
100+
}
101+
observer.onCompleted();
102+
}
103+
}.start();
104+
return new Subscription() {
105+
@Override
106+
public void unsubscribe() {
107+
stop.set(true);
108+
}
109+
};
110+
}
111+
}).subscribe(subject);
112+
// the subject has received an onComplete from the first subscribe because
113+
// it is synchronous and the next subscribe won't do anything.
114+
Observable.toObservable(-1, -2, -3).subscribe(subject);
115+
116+
List<Notification<Integer>> expected = new ArrayList<Notification<Integer>>();
117+
expected.add(new Notification<Integer>(-1));
118+
expected.add(new Notification<Integer>(-2));
119+
expected.add(new Notification<Integer>(-3));
120+
expected.add(new Notification<Integer>());
121+
Assert.assertTrue(actualRef.get().containsAll(expected));
122+
123+
sub.unsubscribe();
124+
}
125+
}
126+
}
Lines changed: 2 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -1,126 +1,12 @@
11
package rx.subjects;
22

3-
import java.util.ArrayList;
4-
import java.util.List;
5-
import java.util.concurrent.ConcurrentHashMap;
6-
import java.util.concurrent.atomic.AtomicBoolean;
7-
import java.util.concurrent.atomic.AtomicReference;
8-
9-
import junit.framework.Assert;
10-
11-
import org.junit.Test;
12-
13-
import rx.Notification;
143
import rx.Observable;
154
import rx.Observer;
165
import rx.Subscription;
17-
import rx.util.AtomicObservableSubscription;
18-
import rx.util.SynchronizedObserver;
19-
import rx.util.functions.Action1;
206
import rx.util.functions.Func1;
217

22-
public class Subject<T> extends Observable<T> implements Observer<T> {
23-
public static <T> Subject<T> create() {
24-
final ConcurrentHashMap<Subscription, Observer<T>> observers = new ConcurrentHashMap<Subscription, Observer<T>>();
25-
26-
Func1<Observer<T>, Subscription> onSubscribe = new Func1<Observer<T>, Subscription>() {
27-
@Override
28-
public Subscription call(Observer<T> observer) {
29-
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
30-
31-
subscription.wrap(new Subscription() {
32-
@Override
33-
public void unsubscribe() {
34-
// on unsubscribe remove it from the map of outbound observers to notify
35-
observers.remove(subscription);
36-
}
37-
});
38-
39-
// on subscribe add it to the map of outbound observers to notify
40-
observers.put(subscription, new SynchronizedObserver<T>(observer, subscription));
41-
return subscription;
42-
}
43-
};
44-
45-
return new Subject<T>(onSubscribe, observers);
46-
}
47-
48-
private final ConcurrentHashMap<Subscription, Observer<T>> observers;
49-
50-
protected Subject(Func1<Observer<T>, Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<T>> observers) {
8+
public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
9+
protected Subject(Func1<Observer<R>, Subscription> onSubscribe) {
5110
super(onSubscribe);
52-
this.observers = observers;
53-
}
54-
55-
@Override
56-
public void onCompleted() {
57-
for (Observer<T> observer : observers.values()) {
58-
observer.onCompleted();
59-
}
60-
}
61-
62-
@Override
63-
public void onError(Exception e) {
64-
for (Observer<T> observer : observers.values()) {
65-
observer.onError(e);
66-
}
67-
}
68-
69-
@Override
70-
public void onNext(T args) {
71-
for (Observer<T> observer : observers.values()) {
72-
observer.onNext(args);
73-
}
74-
}
75-
76-
public static class UnitTest {
77-
@Test
78-
public void test() {
79-
Subject<Integer> subject = Subject.create();
80-
final AtomicReference<List<Notification<String>>> actualRef = new AtomicReference<List<Notification<String>>>();
81-
82-
Observable<List<Notification<Integer>>> wNotificationsList = subject.materialize().toList();
83-
wNotificationsList.subscribe(new Action1<List<Notification<String>>>() {
84-
@Override
85-
public void call(List<Notification<String>> actual) {
86-
actualRef.set(actual);
87-
}
88-
});
89-
90-
Subscription sub = Observable.create(new Func1<Observer<Integer>, Subscription>() {
91-
@Override
92-
public Subscription call(final Observer<Integer> observer) {
93-
final AtomicBoolean stop = new AtomicBoolean(false);
94-
new Thread() {
95-
@Override
96-
public void run() {
97-
int i = 1;
98-
while (!stop.get()) {
99-
observer.onNext(i++);
100-
}
101-
observer.onCompleted();
102-
}
103-
}.start();
104-
return new Subscription() {
105-
@Override
106-
public void unsubscribe() {
107-
stop.set(true);
108-
}
109-
};
110-
}
111-
}).subscribe(subject);
112-
// the subject has received an onComplete from the first subscribe because
113-
// it is synchronous and the next subscribe won't do anything.
114-
Observable.toObservable(-1, -2, -3).subscribe(subject);
115-
116-
List<Notification<Integer>> expected = new ArrayList<Notification<Integer>>();
117-
expected.add(new Notification<Integer>(-1));
118-
expected.add(new Notification<Integer>(-2));
119-
expected.add(new Notification<Integer>(-3));
120-
expected.add(new Notification<Integer>());
121-
Assert.assertTrue(actualRef.get().containsAll(expected));
122-
123-
sub.unsubscribe();
124-
}
12511
}
12612
}

0 commit comments

Comments
 (0)