Skip to content

Commit 1ebd979

Browse files
nsk-mironovakarnokd
authored andcommitted
Observable/Flowable should unsubscribe from underlying subscription on dispose (#4536)
1 parent 56d5586 commit 1ebd979

File tree

6 files changed

+102
-2
lines changed

6 files changed

+102
-2
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,6 @@ void scheduleTimeout(final long idx) {
252252
public void run() {
253253
if (idx == index) {
254254
done = true;
255-
s.cancel();
256255
dispose();
257256

258257
actual.onError(new TimeoutException());
@@ -293,6 +292,7 @@ public void onComplete() {
293292
public void dispose() {
294293
worker.dispose();
295294
DisposableHelper.dispose(timer);
295+
s.cancel();
296296
}
297297

298298
@Override

src/main/java/io/reactivex/internal/operators/observable/ObservableTimeoutTimed.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,6 @@ void scheduleTimeout(final long idx) {
251251
public void run() {
252252
if (idx == index) {
253253
done = true;
254-
s.dispose();
255254
dispose();
256255

257256
actual.onError(new TimeoutException());
@@ -292,6 +291,7 @@ public void onComplete() {
292291
public void dispose() {
293292
worker.dispose();
294293
DisposableHelper.dispose(timer);
294+
s.dispose();
295295
}
296296

297297
@Override

src/test/java/io/reactivex/internal/operators/completable/CompletableTimeoutTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,14 @@
1414
package io.reactivex.internal.operators.completable;
1515

1616
import static org.junit.Assert.assertEquals;
17+
import static org.junit.Assert.assertFalse;
18+
import static org.junit.Assert.assertTrue;
1719

1820
import java.util.concurrent.*;
1921

22+
import io.reactivex.schedulers.TestScheduler;
23+
import io.reactivex.subjects.PublishSubject;
24+
import io.reactivex.subscribers.TestSubscriber;
2025
import org.junit.Test;
2126

2227
import io.reactivex.Completable;
@@ -56,4 +61,20 @@ public void run() throws Exception {
5661
assertEquals(1, call[0]);
5762
}
5863

64+
@Test
65+
public void shouldUnsubscribeFromUnderlyingSubscriptionOnDispose() {
66+
final PublishSubject<String> subject = PublishSubject.create();
67+
final TestScheduler scheduler = new TestScheduler();
68+
69+
final TestSubscriber<Void> observer = subject.toCompletable()
70+
.timeout(100, TimeUnit.MILLISECONDS, scheduler)
71+
.test();
72+
73+
assertTrue(subject.hasObservers());
74+
75+
observer.dispose();
76+
77+
assertFalse(subject.hasObservers());
78+
}
79+
5980
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTests.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16+
import static org.junit.Assert.assertFalse;
17+
import static org.junit.Assert.assertTrue;
1618
import static org.mockito.Matchers.*;
1719
import static org.mockito.Mockito.*;
1820

@@ -358,6 +360,21 @@ public void subscribe(Subscriber<? super String> subscriber) {
358360
verify(s, times(1)).cancel();
359361
}
360362

363+
@Test
364+
public void shouldUnsubscribeFromUnderlyingSubscriptionOnDispose() {
365+
final PublishProcessor<String> subject = PublishProcessor.create();
366+
final TestScheduler scheduler = new TestScheduler();
367+
368+
final TestSubscriber<String> observer = subject
369+
.timeout(100, TimeUnit.MILLISECONDS, scheduler)
370+
.test();
371+
372+
assertTrue(subject.hasSubscribers());
373+
374+
observer.dispose();
375+
376+
assertFalse(subject.hasSubscribers());
377+
}
361378

362379
@Test
363380
public void timedAndOther() {

src/test/java/io/reactivex/internal/operators/observable/ObservableTimeoutTests.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
package io.reactivex.internal.operators.observable;
1515

16+
import static org.junit.Assert.assertFalse;
17+
import static org.junit.Assert.assertTrue;
1618
import static org.mockito.Matchers.*;
1719
import static org.mockito.Mockito.*;
1820

@@ -357,6 +359,22 @@ public void subscribe(Observer<? super String> NbpSubscriber) {
357359
verify(s, times(1)).dispose();
358360
}
359361

362+
@Test
363+
public void shouldUnsubscribeFromUnderlyingSubscriptionOnDispose() {
364+
final PublishSubject<String> subject = PublishSubject.create();
365+
final TestScheduler scheduler = new TestScheduler();
366+
367+
final TestObserver<String> observer = subject
368+
.timeout(100, TimeUnit.MILLISECONDS, scheduler)
369+
.test();
370+
371+
assertTrue(subject.hasObservers());
372+
373+
observer.dispose();
374+
375+
assertFalse(subject.hasObservers());
376+
}
377+
360378
@Test
361379
public void timedAndOther() {
362380
Observable.never().timeout(100, TimeUnit.MILLISECONDS, Observable.just(1))
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.single;
15+
16+
import io.reactivex.schedulers.TestScheduler;
17+
import io.reactivex.subjects.PublishSubject;
18+
import io.reactivex.subscribers.TestSubscriber;
19+
import org.junit.Test;
20+
21+
import java.util.concurrent.TimeUnit;
22+
23+
import static org.junit.Assert.assertFalse;
24+
import static org.junit.Assert.assertTrue;
25+
26+
public class SingleTimeoutTests {
27+
28+
@Test
29+
public void shouldUnsubscribeFromUnderlyingSubscriptionOnDispose() {
30+
final PublishSubject<String> subject = PublishSubject.create();
31+
final TestScheduler scheduler = new TestScheduler();
32+
33+
final TestSubscriber<String> observer = subject.toSingle()
34+
.timeout(100, TimeUnit.MILLISECONDS, scheduler)
35+
.test();
36+
37+
assertTrue(subject.hasObservers());
38+
39+
observer.dispose();
40+
41+
assertFalse(subject.hasObservers());
42+
}
43+
44+
}

0 commit comments

Comments
 (0)