Skip to content

Commit b3c611c

Browse files
committed
ConnectableObservable autoConnect operator
1 parent d2a5d5d commit b3c611c

File tree

4 files changed

+289
-8
lines changed

4 files changed

+289
-8
lines changed

src/main/java/rx/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3533,13 +3533,13 @@ public final Observable<T> cache() {
35333533
* <dd>{@code cache} does not operate by default on a particular {@link Scheduler}.</dd>
35343534
* </dl>
35353535
*
3536-
* @param capacity hint for number of items to cache (for optimizing underlying data structure)
3536+
* @param capacityHint hint for number of items to cache (for optimizing underlying data structure)
35373537
* @return an Observable that, when first subscribed to, caches all of its items and notifications for the
35383538
* benefit of subsequent subscribers
35393539
* @see <a href="http://reactivex.io/documentation/operators/replay.html">ReactiveX operators documentation: Replay</a>
35403540
*/
3541-
public final Observable<T> cache(int capacity) {
3542-
return CachedObservable.from(this, capacity);
3541+
public final Observable<T> cache(int capacityHint) {
3542+
return CachedObservable.from(this, capacityHint);
35433543
}
35443544

35453545
/**
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
20+
import rx.Observable.OnSubscribe;
21+
import rx.*;
22+
import rx.functions.Action1;
23+
import rx.observables.ConnectableObservable;
24+
25+
/**
26+
* Wraps a ConnectableObservable and calls its connect() method once
27+
* the specified number of Subscribers have subscribed.
28+
*
29+
* @param <T> the value type of the chain
30+
*/
31+
public final class OnSubscribeAutoConnect<T> implements OnSubscribe<T> {
32+
final ConnectableObservable<? extends T> source;
33+
final int numberOfSubscribers;
34+
final Action1<? super Subscription> connection;
35+
final AtomicInteger clients;
36+
37+
public OnSubscribeAutoConnect(ConnectableObservable<? extends T> source,
38+
int numberOfSubscribers,
39+
Action1<? super Subscription> connection) {
40+
if (numberOfSubscribers <= 0) {
41+
throw new IllegalArgumentException("numberOfSubscribers > 0 required");
42+
}
43+
this.source = source;
44+
this.numberOfSubscribers = numberOfSubscribers;
45+
this.connection = connection;
46+
this.clients = new AtomicInteger();
47+
}
48+
@Override
49+
public void call(Subscriber<? super T> child) {
50+
source.unsafeSubscribe(child);
51+
if (clients.incrementAndGet() == numberOfSubscribers) {
52+
source.connect(connection);
53+
}
54+
}
55+
}

src/main/java/rx/observables/ConnectableObservable.java

Lines changed: 56 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,10 @@
1515
*/
1616
package rx.observables;
1717

18-
import rx.Observable;
19-
import rx.Subscriber;
20-
import rx.Subscription;
21-
import rx.functions.Action1;
22-
import rx.internal.operators.OnSubscribeRefCount;
18+
import rx.*;
19+
import rx.annotations.Experimental;
20+
import rx.functions.*;
21+
import rx.internal.operators.*;
2322

2423
/**
2524
* A {@code ConnectableObservable} resembles an ordinary {@link Observable}, except that it does not begin
@@ -80,4 +79,56 @@ public void call(Subscription t1) {
8079
public Observable<T> refCount() {
8180
return create(new OnSubscribeRefCount<T>(this));
8281
}
82+
83+
/**
84+
* Returns an Observable that automatically connects to this ConnectableObservable
85+
* when the first Subscriber subscribes.
86+
*
87+
* @return an Observable that automatically connects to this ConnectableObservable
88+
* when the first Subscriber subscribes
89+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
90+
*/
91+
@Experimental
92+
public Observable<T> autoConnect() {
93+
return autoConnect(1);
94+
}
95+
/**
96+
* Returns an Observable that automatically connects to this ConnectableObservable
97+
* when the specified number of Subscribers subscribe to it.
98+
*
99+
* @param numberOfSubscribers the number of subscribers to await before calling connect
100+
* on the ConnectableObservable. A non-positive value indicates
101+
* an immediate connection.
102+
* @return an Observable that automatically connects to this ConnectableObservable
103+
* when the specified number of Subscribers subscribe to it
104+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
105+
*/
106+
@Experimental
107+
public Observable<T> autoConnect(int numberOfSubscribers) {
108+
return autoConnect(numberOfSubscribers, Actions.empty());
109+
}
110+
111+
/**
112+
* Returns an Observable that automatically connects to this ConnectableObservable
113+
* when the specified number of Subscribers subscribe to it and calls the
114+
* specified callback with the Subscription associated with the established connection.
115+
*
116+
* @param numberOfSubscribers the number of subscribers to await before calling connect
117+
* on the ConnectableObservable. A non-positive value indicates
118+
* an immediate connection.
119+
* @param connection the callback Action1 that will receive the Subscription representing the
120+
* established connection
121+
* @return an Observable that automatically connects to this ConnectableObservable
122+
* when the specified number of Subscribers subscribe to it and calls the
123+
* specified callback with the Subscription associated with the established connection
124+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number)
125+
*/
126+
@Experimental
127+
public Observable<T> autoConnect(int numberOfSubscribers, Action1<? super Subscription> connection) {
128+
if (numberOfSubscribers <= 0) {
129+
this.connect(connection);
130+
return this;
131+
}
132+
return create(new OnSubscribeAutoConnect<T>(this, numberOfSubscribers, connection));
133+
}
83134
}
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.observables;
17+
18+
import java.util.concurrent.atomic.*;
19+
20+
import org.junit.*;
21+
22+
import rx.*;
23+
import rx.functions.*;
24+
import rx.observers.TestSubscriber;
25+
26+
public class ConnectableObservableTest {
27+
@Test
28+
public void testAutoConnect() {
29+
final AtomicInteger run = new AtomicInteger();
30+
31+
ConnectableObservable<Integer> co = Observable.defer(new Func0<Observable<Integer>>() {
32+
@Override
33+
public Observable<Integer> call() {
34+
return Observable.just(run.incrementAndGet());
35+
}
36+
}).publish();
37+
38+
Observable<Integer> source = co.autoConnect();
39+
40+
Assert.assertEquals(0, run.get());
41+
42+
TestSubscriber<Integer> ts1 = TestSubscriber.create();
43+
source.subscribe(ts1);
44+
45+
ts1.assertCompleted();
46+
ts1.assertNoErrors();
47+
ts1.assertValue(1);
48+
49+
Assert.assertEquals(1, run.get());
50+
51+
TestSubscriber<Integer> ts2 = TestSubscriber.create();
52+
source.subscribe(ts2);
53+
54+
ts2.assertNotCompleted();
55+
ts2.assertNoErrors();
56+
ts2.assertNoValues();
57+
58+
Assert.assertEquals(1, run.get());
59+
}
60+
@Test
61+
public void testAutoConnect0() {
62+
final AtomicInteger run = new AtomicInteger();
63+
64+
ConnectableObservable<Integer> co = Observable.defer(new Func0<Observable<Integer>>() {
65+
@Override
66+
public Observable<Integer> call() {
67+
return Observable.just(run.incrementAndGet());
68+
}
69+
}).publish();
70+
71+
Observable<Integer> source = co.autoConnect(0);
72+
73+
Assert.assertEquals(1, run.get());
74+
75+
TestSubscriber<Integer> ts1 = TestSubscriber.create();
76+
source.subscribe(ts1);
77+
78+
ts1.assertNotCompleted();
79+
ts1.assertNoErrors();
80+
ts1.assertNoValues();
81+
82+
Assert.assertEquals(1, run.get());
83+
84+
TestSubscriber<Integer> ts2 = TestSubscriber.create();
85+
source.subscribe(ts2);
86+
87+
ts2.assertNotCompleted();
88+
ts2.assertNoErrors();
89+
ts2.assertNoValues();
90+
91+
Assert.assertEquals(1, run.get());
92+
}
93+
@Test
94+
public void testAutoConnect2() {
95+
final AtomicInteger run = new AtomicInteger();
96+
97+
ConnectableObservable<Integer> co = Observable.defer(new Func0<Observable<Integer>>() {
98+
@Override
99+
public Observable<Integer> call() {
100+
return Observable.just(run.incrementAndGet());
101+
}
102+
}).publish();
103+
104+
Observable<Integer> source = co.autoConnect(2);
105+
106+
Assert.assertEquals(0, run.get());
107+
108+
TestSubscriber<Integer> ts1 = TestSubscriber.create();
109+
source.subscribe(ts1);
110+
111+
ts1.assertNotCompleted();
112+
ts1.assertNoErrors();
113+
ts1.assertNoValues();
114+
115+
Assert.assertEquals(0, run.get());
116+
117+
TestSubscriber<Integer> ts2 = TestSubscriber.create();
118+
source.subscribe(ts2);
119+
120+
Assert.assertEquals(1, run.get());
121+
122+
ts1.assertCompleted();
123+
ts1.assertNoErrors();
124+
ts1.assertValue(1);
125+
126+
ts2.assertCompleted();
127+
ts2.assertNoErrors();
128+
ts2.assertValue(1);
129+
130+
}
131+
132+
@Test
133+
public void testAutoConnectUnsubscribe() {
134+
final AtomicInteger run = new AtomicInteger();
135+
136+
ConnectableObservable<Integer> co = Observable.defer(new Func0<Observable<Integer>>() {
137+
@Override
138+
public Observable<Integer> call() {
139+
return Observable.range(run.incrementAndGet(), 10);
140+
}
141+
}).publish();
142+
143+
final AtomicReference<Subscription> conn = new AtomicReference<Subscription>();
144+
145+
Observable<Integer> source = co.autoConnect(1, new Action1<Subscription>() {
146+
@Override
147+
public void call(Subscription t) {
148+
conn.set(t);
149+
}
150+
});
151+
152+
Assert.assertEquals(0, run.get());
153+
154+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
155+
@Override
156+
public void onNext(Integer t) {
157+
super.onNext(t);
158+
Subscription s = conn.get();
159+
if (s != null) {
160+
s.unsubscribe();
161+
} else {
162+
onError(new NullPointerException("No connection reference"));
163+
}
164+
}
165+
};
166+
167+
source.subscribe(ts);
168+
169+
ts.assertNotCompleted();
170+
ts.assertNoErrors();
171+
ts.assertValue(1);
172+
173+
Assert.assertTrue("Connection not unsubscribed?", conn.get().isUnsubscribed());
174+
}
175+
}

0 commit comments

Comments
 (0)