Skip to content

Commit fbb27ac

Browse files
Rename DefaultSubject to PublishSubject
- #242
1 parent 6bff9aa commit fbb27ac

File tree

3 files changed

+45
-24
lines changed

3 files changed

+45
-24
lines changed

rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import rx.Observable;
2727
import rx.Observer;
2828
import rx.Subscription;
29-
import rx.subjects.DefaultSubject;
29+
import rx.subjects.PublishSubject;
3030
import rx.subscriptions.Subscriptions;
3131
import rx.util.AtomicObservableSubscription;
3232
import rx.util.AtomicObserver;
@@ -174,7 +174,7 @@ public Boolean call(Integer input)
174174

175175
@Test
176176
public void testTakeWhileOnSubject1() {
177-
DefaultSubject<Integer> s = DefaultSubject.create();
177+
PublishSubject<Integer> s = PublishSubject.create();
178178
Observable<Integer> w = (Observable<Integer>) s;
179179
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>()
180180
{

rxjava-core/src/main/java/rx/operators/OperatorMulticast.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import rx.Observer;
2121
import rx.Subscription;
2222
import rx.observables.ConnectableObservable;
23-
import rx.subjects.DefaultSubject;
23+
import rx.subjects.PublishSubject;
2424
import rx.subjects.Subject;
2525
import rx.util.functions.Func1;
2626

@@ -96,7 +96,7 @@ public void testMulticast() {
9696
TestObservable source = new TestObservable();
9797

9898
ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
99-
DefaultSubject.<String>create());
99+
PublishSubject.<String>create());
100100

101101
Observer<String> observer = mock(Observer.class);
102102
multicasted.subscribe(observer);
@@ -123,7 +123,7 @@ public void testMulticastConnectTwice() {
123123
TestObservable source = new TestObservable();
124124

125125
ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
126-
DefaultSubject.<String>create());
126+
PublishSubject.<String>create());
127127

128128
Observer<String> observer = mock(Observer.class);
129129
multicasted.subscribe(observer);
@@ -147,7 +147,7 @@ public void testMulticastDisconnect() {
147147
TestObservable source = new TestObservable();
148148

149149
ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
150-
DefaultSubject.<String>create());
150+
PublishSubject.<String>create());
151151

152152
Observer<String> observer = mock(Observer.class);
153153
multicasted.subscribe(observer);

rxjava-core/src/main/java/rx/subjects/DefaultSubject.java renamed to rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 39 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,29 @@
3939
import rx.util.functions.Func0;
4040
import rx.util.functions.Func1;
4141

42-
public class DefaultSubject<T> extends Subject<T, T> {
43-
public static <T> DefaultSubject<T> create() {
42+
/**
43+
* Subject that publishes a single event to each {@link Observer} that has subscribed.
44+
* <p>
45+
* Example usage:
46+
* <p>
47+
* <pre> {@code
48+
49+
PublishSubject<Object> subject = PublishSubject.create();
50+
// observer1 will receive all onNext events
51+
subject.subscribe(observer1);
52+
subject.onNext("one");
53+
subject.onNext("two");
54+
// observer2 will only receive "three" and onCompleted
55+
subject.subscribe(observer2);
56+
subject.onNext("three");
57+
subject.onCompleted();
58+
59+
} </pre>
60+
*
61+
* @param <T>
62+
*/
63+
public class PublishSubject<T> extends Subject<T, T> {
64+
public static <T> PublishSubject<T> create() {
4465
final ConcurrentHashMap<Subscription, Observer<T>> observers = new ConcurrentHashMap<Subscription, Observer<T>>();
4566

4667
Func1<Observer<T>, Subscription> onSubscribe = new Func1<Observer<T>, Subscription>() {
@@ -62,12 +83,12 @@ public void unsubscribe() {
6283
}
6384
};
6485

65-
return new DefaultSubject<T>(onSubscribe, observers);
86+
return new PublishSubject<T>(onSubscribe, observers);
6687
}
6788

6889
private final ConcurrentHashMap<Subscription, Observer<T>> observers;
6990

70-
protected DefaultSubject(Func1<Observer<T>, Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<T>> observers) {
91+
protected PublishSubject(Func1<Observer<T>, Subscription> onSubscribe, ConcurrentHashMap<Subscription, Observer<T>> observers) {
7192
super(onSubscribe);
7293
this.observers = observers;
7394
}
@@ -96,7 +117,7 @@ public void onNext(T args) {
96117
public static class UnitTest {
97118
@Test
98119
public void test() {
99-
DefaultSubject<Integer> subject = DefaultSubject.create();
120+
PublishSubject<Integer> subject = PublishSubject.create();
100121
final AtomicReference<List<Notification<String>>> actualRef = new AtomicReference<List<Notification<String>>>();
101122

102123
Observable<List<Notification<Integer>>> wNotificationsList = subject.materialize().toList();
@@ -147,7 +168,7 @@ public void unsubscribe() {
147168

148169
@Test
149170
public void testCompleted() {
150-
DefaultSubject<Object> subject = DefaultSubject.create();
171+
PublishSubject<Object> subject = PublishSubject.create();
151172

152173
@SuppressWarnings("unchecked")
153174
Observer<String> aObserver = mock(Observer.class);
@@ -188,7 +209,7 @@ private void assertNeverObserver(Observer<String> aObserver)
188209

189210
@Test
190211
public void testError() {
191-
DefaultSubject<Object> subject = DefaultSubject.create();
212+
PublishSubject<Object> subject = PublishSubject.create();
192213

193214
@SuppressWarnings("unchecked")
194215
Observer<String> aObserver = mock(Observer.class);
@@ -222,7 +243,7 @@ private void assertErrorObserver(Observer<String> aObserver)
222243

223244
@Test
224245
public void testSubscribeMidSequence() {
225-
DefaultSubject<Object> subject = DefaultSubject.create();
246+
PublishSubject<Object> subject = PublishSubject.create();
226247

227248
@SuppressWarnings("unchecked")
228249
Observer<String> aObserver = mock(Observer.class);
@@ -255,7 +276,7 @@ private void assertCompletedStartingWithThreeObserver(Observer<String> aObserver
255276

256277
@Test
257278
public void testUnsubscribeFirstObserver() {
258-
DefaultSubject<Object> subject = DefaultSubject.create();
279+
PublishSubject<Object> subject = PublishSubject.create();
259280

260281
@SuppressWarnings("unchecked")
261282
Observer<String> aObserver = mock(Observer.class);
@@ -290,31 +311,31 @@ private void assertObservedUntilTwo(Observer<String> aObserver)
290311
@Test
291312
public void testUnsubscribe()
292313
{
293-
UnsubscribeTester.test(new Func0<DefaultSubject<Object>>()
314+
UnsubscribeTester.test(new Func0<PublishSubject<Object>>()
294315
{
295316
@Override
296-
public DefaultSubject<Object> call()
317+
public PublishSubject<Object> call()
297318
{
298-
return DefaultSubject.create();
319+
return PublishSubject.create();
299320
}
300-
}, new Action1<DefaultSubject<Object>>()
321+
}, new Action1<PublishSubject<Object>>()
301322
{
302323
@Override
303-
public void call(DefaultSubject<Object> DefaultSubject)
324+
public void call(PublishSubject<Object> DefaultSubject)
304325
{
305326
DefaultSubject.onCompleted();
306327
}
307-
}, new Action1<DefaultSubject<Object>>()
328+
}, new Action1<PublishSubject<Object>>()
308329
{
309330
@Override
310-
public void call(DefaultSubject<Object> DefaultSubject)
331+
public void call(PublishSubject<Object> DefaultSubject)
311332
{
312333
DefaultSubject.onError(new Exception());
313334
}
314-
}, new Action1<DefaultSubject<Object>>()
335+
}, new Action1<PublishSubject<Object>>()
315336
{
316337
@Override
317-
public void call(DefaultSubject<Object> DefaultSubject)
338+
public void call(PublishSubject<Object> DefaultSubject)
318339
{
319340
DefaultSubject.onNext("one");
320341
}

0 commit comments

Comments
 (0)