Skip to content

Commit 490ef86

Browse files
committed
Removed static variant of refCount
1 parent d4b04d8 commit 490ef86

File tree

2 files changed

+26
-70
lines changed

2 files changed

+26
-70
lines changed

rxjava-core/src/main/java/rx/observables/ConnectableObservable.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,16 +54,6 @@ protected ConnectableObservable(OnSubscribeFunc<T> onSubscribe) {
5454
* @return a {@link Observable}
5555
*/
5656
public Observable<T> refCount() {
57-
return refCount(this);
58-
}
59-
60-
/**
61-
* Returns an observable sequence that stays connected to the source as long
62-
* as there is at least one subscription to the observable sequence.
63-
* @return a {@link Observable}
64-
* @param that a {@link ConnectableObservable}
65-
*/
66-
public static <T> Observable<T> refCount(ConnectableObservable<T> that) {
67-
return Observable.create(OperationRefCount.refCount(that));
57+
return Observable.create(OperationRefCount.refCount(this));
6858
}
6959
}

rxjava-core/src/test/java/rx/RefCountTests.java

Lines changed: 25 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
import org.junit.Before;
44
import org.junit.Test;
55
import org.mockito.MockitoAnnotations;
6-
import rx.observables.ConnectableObservable;
76
import rx.subscriptions.Subscriptions;
7+
import rx.util.functions.Action0;
88

9-
import static org.mockito.Mockito.*;
9+
import java.util.concurrent.atomic.AtomicInteger;
10+
11+
import static org.junit.Assert.assertEquals;
12+
import static org.mockito.Mockito.mock;
1013

1114
public class RefCountTests {
1215

@@ -16,67 +19,30 @@ public void setUp() {
1619
}
1720

1821
@Test
19-
public void subscriptionToUnderlyingOnFirstSubscription() {
20-
@SuppressWarnings("unchecked")
21-
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
22-
Observable<Integer> refCounted = ConnectableObservable.refCount(connectable);
23-
@SuppressWarnings("unchecked")
24-
Observer<Integer> observer = mock(Observer.class);
25-
when(connectable.subscribe(any(Observer.class))).thenReturn(Subscriptions.empty());
26-
when(connectable.connect()).thenReturn(Subscriptions.empty());
27-
refCounted.subscribe(observer);
28-
verify(connectable, times(1)).subscribe(any(Observer.class));
29-
verify(connectable, times(1)).connect();
30-
}
31-
32-
@Test
33-
public void noSubscriptionToUnderlyingOnSecondSubscription() {
34-
@SuppressWarnings("unchecked")
35-
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
36-
Observable<Integer> refCounted = ConnectableObservable.refCount(connectable);
37-
@SuppressWarnings("unchecked")
38-
Observer<Integer> observer = mock(Observer.class);
39-
when(connectable.subscribe(any(Observer.class))).thenReturn(Subscriptions.empty());
40-
when(connectable.connect()).thenReturn(Subscriptions.empty());
41-
refCounted.subscribe(observer);
42-
refCounted.subscribe(observer);
43-
verify(connectable, times(2)).subscribe(any(Observer.class));
44-
verify(connectable, times(1)).connect();
45-
}
46-
47-
@Test
48-
public void unsubscriptionFromUnderlyingOnLastUnsubscription() {
49-
@SuppressWarnings("unchecked")
50-
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
51-
Observable<Integer> refCounted = ConnectableObservable.refCount(connectable);
52-
@SuppressWarnings("unchecked")
53-
Observer<Integer> observer = mock(Observer.class);
54-
Subscription underlying = mock(Subscription.class);
55-
when(connectable.subscribe(any(Observer.class))).thenReturn(underlying);
56-
Subscription connection = mock(Subscription.class);
57-
when(connectable.connect()).thenReturn(connection);
58-
Subscription first = refCounted.subscribe(observer);
59-
first.unsubscribe();
60-
verify(underlying, times(1)).unsubscribe();
61-
verify(connection, times(1)).unsubscribe();
62-
}
63-
64-
@Test
65-
public void noUnsubscriptionFromUnderlyingOnFirstUnsubscription() {
66-
@SuppressWarnings("unchecked")
67-
ConnectableObservable<Integer> connectable = mock(ConnectableObservable.class);
68-
Observable<Integer> refCounted = ConnectableObservable.refCount(connectable);
69-
@SuppressWarnings("unchecked")
22+
public void onlyFirstShouldSubscribeAndLastUnsubscribe() {
23+
final AtomicInteger subscriptionCount = new AtomicInteger();
24+
final AtomicInteger unsubscriptionCount = new AtomicInteger();
25+
Observable<Integer> observable = Observable.create(new Observable.OnSubscribeFunc<Integer>() {
26+
@Override
27+
public Subscription onSubscribe(Observer<? super Integer> observer) {
28+
subscriptionCount.incrementAndGet();
29+
return Subscriptions.create(new Action0() {
30+
@Override
31+
public void call() {
32+
unsubscriptionCount.incrementAndGet();
33+
}
34+
});
35+
}
36+
});
37+
Observable<Integer> refCounted = observable.publish().refCount();
7038
Observer<Integer> observer = mock(Observer.class);
71-
Subscription underlying = mock(Subscription.class);
72-
when(connectable.subscribe(any(Observer.class))).thenReturn(underlying);
73-
Subscription connection = mock(Subscription.class);
74-
when(connectable.connect()).thenReturn(connection);
7539
Subscription first = refCounted.subscribe(observer);
40+
assertEquals(1, subscriptionCount.get());
7641
Subscription second = refCounted.subscribe(observer);
42+
assertEquals(1, subscriptionCount.get());
7743
first.unsubscribe();
44+
assertEquals(0, unsubscriptionCount.get());
7845
second.unsubscribe();
79-
verify(underlying, times(2)).unsubscribe();
80-
verify(connection, times(1)).unsubscribe();
46+
assertEquals(1, unsubscriptionCount.get());
8147
}
8248
}

0 commit comments

Comments
 (0)