Skip to content

Commit 5f99006

Browse files
Merge branch 'master' of github.com:johnhmarks/RxJava into pull-407-refCount
2 parents 7408048 + 52c7a3f commit 5f99006

File tree

5 files changed

+220
-1
lines changed

5 files changed

+220
-1
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@
8585
import rx.plugins.RxJavaErrorHandler;
8686
import rx.plugins.RxJavaObservableExecutionHook;
8787
import rx.plugins.RxJavaPlugins;
88+
import rx.subjects.AsyncSubject;
8889
import rx.subjects.PublishSubject;
8990
import rx.subjects.ReplaySubject;
9091
import rx.subjects.Subject;
@@ -3657,6 +3658,14 @@ public ConnectableObservable<T> publish() {
36573658
return OperationMulticast.multicast(this, PublishSubject.<T> create());
36583659
}
36593660

3661+
/**
3662+
* Returns a {@link ConnectableObservable} that shares a single subscription that contains the last notification only.
3663+
* @return a {@link ConnectableObservable}
3664+
*/
3665+
public ConnectableObservable<T> publishLast() {
3666+
return OperationMulticast.multicast(this, AsyncSubject.<T> create());
3667+
}
3668+
36603669
/**
36613670
* Synonymous with <code>reduce()</code>.
36623671
* <p>
@@ -4414,7 +4423,7 @@ public Boolean call(T t) {
44144423
* For why this is being used see https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
44154424
*
44164425
* NOTE: If strong reasons for not depending on package names comes up then the implementation of this method can change to looking for a marker interface.
4417-
*
4426+
*
44184427
* @param o
44194428
* @return {@code true} if the given function is an internal implementation, and {@code false} otherwise.
44204429
*/

rxjava-core/src/main/java/rx/observables/ConnectableObservable.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
import rx.Observable;
1919
import rx.Observer;
2020
import rx.Subscription;
21+
import rx.operators.OperationRefCount;
22+
import rx.util.functions.Func1;
2123

2224
/**
2325
* A ConnectableObservable resembles an ordinary {@link Observable}, except that it does not begin
@@ -46,4 +48,22 @@ protected ConnectableObservable(OnSubscribeFunc<T> onSubscribe) {
4648
*/
4749
public abstract Subscription connect();
4850

51+
/**
52+
* Returns an observable sequence that stays connected to the source as long
53+
* as there is at least one subscription to the observable sequence.
54+
* @return a {@link Observable}
55+
*/
56+
public Observable<T> refCount() {
57+
return refCount(this);
58+
}
59+
60+
/**
61+
* Returns an observable sequence that stays connected to the source as long
62+
* as there is at least one subscription to the observable sequence.
63+
* @return a {@link Observable}
64+
* @param that a {@link ConnectableObservable}
65+
*/
66+
public static <T> Observable<T> refCount(ConnectableObservable<T> that) {
67+
return Observable.create(OperationRefCount.refCount(that));
68+
}
4969
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import rx.Observable;
19+
import rx.Observer;
20+
import rx.Subscription;
21+
import rx.observables.ConnectableObservable;
22+
import rx.subscriptions.Subscriptions;
23+
import rx.util.functions.Action0;
24+
25+
/**
26+
* Returns an observable sequence that stays connected to the source as long
27+
* as there is at least one subscription to the observable sequence.
28+
*/
29+
public final class OperationRefCount<T> {
30+
public static <T> Observable.OnSubscribeFunc<T> refCount(ConnectableObservable<T> connectableObservable) {
31+
return new RefCount<T>(connectableObservable);
32+
}
33+
34+
private static class RefCount<T> implements Observable.OnSubscribeFunc<T> {
35+
private final ConnectableObservable<T> innerConnectableObservable;
36+
private final Object gate = new Object();
37+
private int count = 0;
38+
private Subscription connection = null;
39+
40+
public RefCount(ConnectableObservable<T> innerConnectableObservable) {
41+
this.innerConnectableObservable = innerConnectableObservable;
42+
}
43+
44+
@Override
45+
public Subscription onSubscribe(Observer<? super T> observer) {
46+
final Subscription subscription = innerConnectableObservable.subscribe(observer);
47+
synchronized (gate) {
48+
if (count++ == 0) {
49+
connection = innerConnectableObservable.connect();
50+
}
51+
}
52+
return Subscriptions.create(new Action0() {
53+
@Override
54+
public void call() {
55+
synchronized (gate) {
56+
if (--count == 0) {
57+
connection.unsubscribe();
58+
connection = null;
59+
}
60+
}
61+
subscription.unsubscribe();
62+
}
63+
});
64+
}
65+
}
66+
}

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,48 @@ public void call(String v) {
489489
}
490490
}
491491

492+
@Test
493+
public void testPublishLast() throws InterruptedException {
494+
final AtomicInteger count = new AtomicInteger();
495+
ConnectableObservable<String> connectable = Observable.create(new OnSubscribeFunc<String>() {
496+
@Override
497+
public Subscription onSubscribe(final Observer<? super String> observer) {
498+
count.incrementAndGet();
499+
final BooleanSubscription subscription = new BooleanSubscription();
500+
new Thread(new Runnable() {
501+
@Override
502+
public void run() {
503+
observer.onNext("first");
504+
observer.onNext("last");
505+
observer.onCompleted();
506+
}
507+
}).start();
508+
return subscription;
509+
}
510+
}).publishLast();
511+
512+
// subscribe once
513+
final CountDownLatch latch = new CountDownLatch(1);
514+
connectable.subscribe(new Action1<String>() {
515+
@Override
516+
public void call(String value) {
517+
assertEquals("last", value);
518+
latch.countDown();
519+
}
520+
});
521+
522+
// subscribe twice
523+
connectable.subscribe(new Action1<String>() {
524+
@Override
525+
public void call(String _) {}
526+
});
527+
528+
Subscription subscription = connectable.connect();
529+
assertTrue(latch.await(1000, TimeUnit.MILLISECONDS));
530+
assertEquals(1, count.get());
531+
subscription.unsubscribe();
532+
}
533+
492534
@Test
493535
public void testReplay() throws InterruptedException {
494536
final AtomicInteger counter = new AtomicInteger();
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package rx;
2+
3+
import org.junit.Before;
4+
import org.junit.Test;
5+
import org.mockito.MockitoAnnotations;
6+
import rx.observables.ConnectableObservable;
7+
import rx.subscriptions.Subscriptions;
8+
9+
import static org.mockito.Mockito.*;
10+
11+
public class RefCountTests {
12+
13+
@Before
14+
public void setUp() {
15+
MockitoAnnotations.initMocks(this);
16+
}
17+
18+
@Test
19+
public void subscriptionToUnderlyingOnFirstSubscription() {
20+
@SuppressWarnings("unchecked")
21+
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
22+
Observable<Integer> refCounted = ConnectableObservable.refCount(connectable);
23+
@SuppressWarnings("unchecked")
24+
Observer<Integer> observer = mock(Observer.class);
25+
when(connectable.subscribe(any(Observer.class))).thenReturn(Subscriptions.empty());
26+
when(connectable.connect()).thenReturn(Subscriptions.empty());
27+
refCounted.subscribe(observer);
28+
verify(connectable, times(1)).subscribe(any(Observer.class));
29+
verify(connectable, times(1)).connect();
30+
}
31+
32+
@Test
33+
public void noSubscriptionToUnderlyingOnSecondSubscription() {
34+
@SuppressWarnings("unchecked")
35+
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
36+
Observable<Integer> refCounted = ConnectableObservable.refCount(connectable);
37+
@SuppressWarnings("unchecked")
38+
Observer<Integer> observer = mock(Observer.class);
39+
when(connectable.subscribe(any(Observer.class))).thenReturn(Subscriptions.empty());
40+
when(connectable.connect()).thenReturn(Subscriptions.empty());
41+
refCounted.subscribe(observer);
42+
refCounted.subscribe(observer);
43+
verify(connectable, times(2)).subscribe(any(Observer.class));
44+
verify(connectable, times(1)).connect();
45+
}
46+
47+
@Test
48+
public void unsubscriptionFromUnderlyingOnLastUnsubscription() {
49+
@SuppressWarnings("unchecked")
50+
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
51+
Observable<Integer> refCounted = ConnectableObservable.refCount(connectable);
52+
@SuppressWarnings("unchecked")
53+
Observer<Integer> observer = mock(Observer.class);
54+
Subscription underlying = mock(Subscription.class);
55+
when(connectable.subscribe(any(Observer.class))).thenReturn(underlying);
56+
Subscription connection = mock(Subscription.class);
57+
when(connectable.connect()).thenReturn(connection);
58+
Subscription first = refCounted.subscribe(observer);
59+
first.unsubscribe();
60+
verify(underlying, times(1)).unsubscribe();
61+
verify(connection, times(1)).unsubscribe();
62+
}
63+
64+
@Test
65+
public void noUnsubscriptionFromUnderlyingOnFirstUnsubscription() {
66+
@SuppressWarnings("unchecked")
67+
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
68+
Observable<Integer> refCounted = ConnectableObservable.refCount(connectable);
69+
@SuppressWarnings("unchecked")
70+
Observer<Integer> observer = mock(Observer.class);
71+
Subscription underlying = mock(Subscription.class);
72+
when(connectable.subscribe(any(Observer.class))).thenReturn(underlying);
73+
Subscription connection = mock(Subscription.class);
74+
when(connectable.connect()).thenReturn(connection);
75+
Subscription first = refCounted.subscribe(observer);
76+
Subscription second = refCounted.subscribe(observer);
77+
first.unsubscribe();
78+
second.unsubscribe();
79+
verify(underlying, times(2)).unsubscribe();
80+
verify(connection, times(1)).unsubscribe();
81+
}
82+
}

0 commit comments

Comments
 (0)