Skip to content

Commit 00bc07d

Browse files
committed
Added support for sever connect method calls
1 parent 74d82ea commit 00bc07d

File tree

5 files changed

+201
-18
lines changed

5 files changed

+201
-18
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.mockito.Mockito;
3737
import org.mockito.MockitoAnnotations;
3838

39+
import rx.observables.ConnectableObservable;
3940
import rx.observables.GroupedObservable;
4041
import rx.operators.OperationAll;
4142
import rx.operators.OperationConcat;
@@ -48,6 +49,7 @@
4849
import rx.operators.OperationMerge;
4950
import rx.operators.OperationMergeDelayError;
5051
import rx.operators.OperationMostRecent;
52+
import rx.operators.OperatorMulticast;
5153
import rx.operators.OperationNext;
5254
import rx.operators.OperationObserveOn;
5355
import rx.operators.OperationOnErrorResumeNextViaFunction;
@@ -72,6 +74,7 @@
7274
import rx.plugins.RxJavaErrorHandler;
7375
import rx.plugins.RxJavaObservableExecutionHook;
7476
import rx.plugins.RxJavaPlugins;
77+
import rx.subjects.Subject;
7578
import rx.subscriptions.BooleanSubscription;
7679
import rx.subscriptions.Subscriptions;
7780
import rx.util.AtomicObservableSubscription;
@@ -585,6 +588,17 @@ public void call(Object args) {
585588
});
586589
}
587590

591+
/**
592+
* Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
593+
*
594+
* @param subject the subject to push source elements into.
595+
* @param <R> result type
596+
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
597+
*/
598+
public <R> ConnectableObservable<R> multicast(Subject<T, R> subject) {
599+
return multicast(this, subject);
600+
}
601+
588602
/**
589603
* Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence.
590604
*
@@ -2072,9 +2086,22 @@ public static <T> Iterable<T> mostRecent(Observable<T> source, T initialValue) {
20722086
return OperationMostRecent.mostRecent(source, initialValue);
20732087
}
20742088

2089+
/**
2090+
* Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
2091+
*
2092+
* @param source the source sequence whose elements will be pushed into the specified subject.
2093+
* @param subject the subject to push source elements into.
2094+
* @param <T> source type
2095+
* @param <R> result type
2096+
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
2097+
*/
2098+
public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, final Subject<T, R> subject) {
2099+
return OperatorMulticast.multicast(source, subject);
2100+
}
2101+
20752102
/**
20762103
* Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence.
2077-
*
2104+
*
20782105
* @param that
20792106
* the source Observable
20802107
* @return The single element in the observable sequence.

rxjava-core/src/main/java/rx/observables/ConnectableObservable.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package rx.observables;
217

318
import rx.Observable;

rxjava-core/src/main/java/rx/operators/OperatorMulticast.java

Lines changed: 128 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package rx.operators;
217

318
import org.junit.Test;
@@ -9,17 +24,24 @@
924
import rx.subjects.Subject;
1025
import rx.util.functions.Func1;
1126

27+
import javax.annotation.concurrent.GuardedBy;
28+
1229
import static org.mockito.Mockito.*;
1330

1431
public class OperatorMulticast {
1532
public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, final Subject<T, R> subject) {
16-
return new MulticastConnectableObservable<T ,R>(source, subject);
33+
return new MulticastConnectableObservable<T, R>(source, subject);
1734
}
1835

1936
private static class MulticastConnectableObservable<T, R> extends ConnectableObservable<R> {
37+
private final Object lock = new Object();
38+
2039
private final Observable<T> source;
2140
private final Subject<T, R> subject;
2241

42+
@GuardedBy("lock")
43+
private Subscription subscription;
44+
2345
public MulticastConnectableObservable(Observable<T> source, final Subject<T, R> subject) {
2446
super(new Func1<Observer<R>, Subscription>() {
2547
@Override
@@ -32,22 +54,39 @@ public Subscription call(Observer<R> observer) {
3254
}
3355

3456
public Subscription connect() {
35-
return source.subscribe(new Observer<T>() {
36-
@Override
37-
public void onCompleted() {
38-
subject.onCompleted();
57+
synchronized (lock) {
58+
if (subscription == null) {
59+
subscription = source.subscribe(new Observer<T>() {
60+
@Override
61+
public void onCompleted() {
62+
subject.onCompleted();
63+
}
64+
65+
@Override
66+
public void onError(Exception e) {
67+
subject.onError(e);
68+
}
69+
70+
@Override
71+
public void onNext(T args) {
72+
subject.onNext(args);
73+
}
74+
});
3975
}
76+
}
4077

41-
@Override
42-
public void onError(Exception e) {
43-
subject.onError(e);
44-
}
4578

79+
return new Subscription() {
4680
@Override
47-
public void onNext(T args) {
48-
subject.onNext(args);
81+
public void unsubscribe() {
82+
synchronized (lock) {
83+
if (subscription != null) {
84+
subscription.unsubscribe();
85+
subscription = null;
86+
}
87+
}
4988
}
50-
});
89+
};
5190
}
5291

5392

@@ -57,8 +96,7 @@ public static class UnitTest {
5796

5897
@Test
5998
public void testMulticast() {
60-
Subscription s = mock(Subscription.class);
61-
TestObservable source = new TestObservable(s);
99+
TestObservable source = new TestObservable();
62100

63101
ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
64102
DefaultSubject.<String>create());
@@ -83,6 +121,60 @@ public void testMulticast() {
83121

84122
}
85123

124+
@Test
125+
public void testMulticastConnectTwice() {
126+
TestObservable source = new TestObservable();
127+
128+
ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
129+
DefaultSubject.<String>create());
130+
131+
Observer<String> observer = mock(Observer.class);
132+
multicasted.subscribe(observer);
133+
134+
source.sendOnNext("one");
135+
136+
multicasted.connect();
137+
multicasted.connect();
138+
139+
source.sendOnNext("two");
140+
source.sendOnCompleted();
141+
142+
verify(observer, never()).onNext("one");
143+
verify(observer, times(1)).onNext("two");
144+
verify(observer, times(1)).onCompleted();
145+
146+
}
147+
148+
@Test
149+
public void testMulticastDisconnect() {
150+
TestObservable source = new TestObservable();
151+
152+
ConnectableObservable<String> multicasted = OperatorMulticast.multicast(source,
153+
DefaultSubject.<String>create());
154+
155+
Observer<String> observer = mock(Observer.class);
156+
multicasted.subscribe(observer);
157+
158+
source.sendOnNext("one");
159+
160+
Subscription connection = multicasted.connect();
161+
source.sendOnNext("two");
162+
163+
connection.unsubscribe();
164+
source.sendOnNext("three");
165+
166+
multicasted.connect();
167+
source.sendOnNext("four");
168+
source.sendOnCompleted();
169+
170+
verify(observer, never()).onNext("one");
171+
verify(observer, times(1)).onNext("two");
172+
verify(observer, never()).onNext("three");
173+
verify(observer, times(1)).onNext("four");
174+
verify(observer, times(1)).onCompleted();
175+
176+
}
177+
86178

87179
private static class TestObservable extends Observable<String> {
88180

@@ -102,10 +194,29 @@ public void onNext(String args) {
102194
// Do nothing
103195
}
104196
};
105-
Subscription s;
197+
Subscription s = new Subscription() {
198+
@Override
199+
public void unsubscribe() {
200+
observer = new Observer<String>() {
201+
@Override
202+
public void onCompleted() {
203+
// Do nothing
204+
}
205+
206+
@Override
207+
public void onError(Exception e) {
208+
// Do nothing
209+
}
210+
211+
@Override
212+
public void onNext(String args) {
213+
// Do nothing
214+
}
215+
};
216+
}
217+
};
106218

107-
public TestObservable(Subscription s) {
108-
this.s = s;
219+
public TestObservable() {
109220
}
110221

111222
/* used to simulate subscription */

rxjava-core/src/main/java/rx/subjects/DefaultSubject.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package rx.subjects;
217

318
import java.util.ArrayList;

rxjava-core/src/main/java/rx/subjects/Subject.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package rx.subjects;
217

318
import rx.Observable;

0 commit comments

Comments
 (0)