Skip to content

Commit 2783fa2

Browse files
committed
Merge pull request #2238 from zsxwing/issue2191
Fix the bug that cache doesn't unsubscribe the source Observable when th...
2 parents 681163e + e26dd2b commit 2783fa2

File tree

2 files changed

+17
-3
lines changed

2 files changed

+17
-3
lines changed

src/main/java/rx/internal/operators/OnSubscribeCache.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ public OnSubscribeCache(Observable<? extends T> source, int capacity) {
6464
@Override
6565
public void call(Subscriber<? super T> s) {
6666
if (SRC_SUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
67-
source.unsafeSubscribe(Subscribers.from(cache));
67+
source.subscribe(cache);
6868
/*
69-
* Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values.
69+
* Note that we will never unsubscribe from 'source' unless we receive `onCompleted` or `onError`,
70+
* as we want to receive and cache all of its values.
7071
*
7172
* This means this should never be used on an infinite or very large sequence, similar to toList().
7273
*/

src/test/java/rx/internal/operators/OnSubscribeCacheTest.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
import static org.junit.Assert.assertEquals;
1919
import static org.junit.Assert.fail;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.times;
22+
import static org.mockito.Mockito.verify;
2023

2124
import java.util.Arrays;
2225
import java.util.concurrent.CountDownLatch;
@@ -27,10 +30,10 @@
2730

2831
import rx.Observable;
2932
import rx.Subscriber;
33+
import rx.functions.Action0;
3034
import rx.functions.Action1;
3135
import rx.functions.Func1;
3236
import rx.functions.Func2;
33-
import rx.internal.operators.OnSubscribeCache;
3437
import rx.observers.TestSubscriber;
3538
import rx.schedulers.Schedulers;
3639
import rx.subjects.AsyncSubject;
@@ -148,4 +151,14 @@ public void testWithPublishSubjectAndRepeat() {
148151
public void testWithReplaySubjectAndRepeat() {
149152
testWithCustomSubjectAndRepeat(ReplaySubject.<Integer> create(), 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3);
150153
}
154+
155+
@Test
156+
public void testUnsubscribeSource() {
157+
Action0 unsubscribe = mock(Action0.class);
158+
Observable<Integer> o = Observable.just(1).doOnUnsubscribe(unsubscribe).cache();
159+
o.subscribe();
160+
o.subscribe();
161+
o.subscribe();
162+
verify(unsubscribe, times(1)).call();
163+
}
151164
}

0 commit comments

Comments
 (0)