Skip to content

Commit 79bd261

Browse files
Merge pull request #2567 from akarnokd/RefCountDisconnectOnTerminalEvent
RefCount: disconnect all if upstream terminates
2 parents 1f08a67 + aa451ef commit 79bd261

File tree

2 files changed

+135
-44
lines changed

2 files changed

+135
-44
lines changed

src/main/java/rx/internal/operators/OnSubscribeRefCount.java

Lines changed: 44 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,8 @@ public void call(final Subscriber<? super T> subscriber) {
8080
}
8181
} else {
8282
try {
83-
// handle unsubscribing from the base subscription
84-
subscriber.add(disconnect());
85-
8683
// ready to subscribe to source so do it
87-
source.unsafeSubscribe(subscriber);
84+
doSubscribe(subscriber, baseSubscription);
8885
} finally {
8986
// release the read lock
9087
lock.unlock();
@@ -101,12 +98,8 @@ public void call(Subscription subscription) {
10198

10299
try {
103100
baseSubscription.add(subscription);
104-
105-
// handle unsubscribing from the base subscription
106-
subscriber.add(disconnect());
107-
108101
// ready to subscribe to source so do it
109-
source.unsafeSubscribe(subscriber);
102+
doSubscribe(subscriber, baseSubscription);
110103
} finally {
111104
// release the write lock
112105
lock.unlock();
@@ -115,18 +108,54 @@ public void call(Subscription subscription) {
115108
}
116109
};
117110
}
111+
112+
void doSubscribe(final Subscriber<? super T> subscriber, final CompositeSubscription currentBase) {
113+
// handle unsubscribing from the base subscription
114+
subscriber.add(disconnect(currentBase));
115+
116+
source.unsafeSubscribe(new Subscriber<T>(subscriber) {
117+
@Override
118+
public void onError(Throwable e) {
119+
cleanup();
120+
subscriber.onError(e);
121+
}
122+
@Override
123+
public void onNext(T t) {
124+
subscriber.onNext(t);
125+
}
126+
@Override
127+
public void onCompleted() {
128+
cleanup();
129+
subscriber.onCompleted();
130+
}
131+
void cleanup() {
132+
lock.lock();
133+
try {
134+
if (baseSubscription == currentBase) {
135+
baseSubscription.unsubscribe();
136+
baseSubscription = new CompositeSubscription();
137+
subscriptionCount.set(0);
138+
}
139+
} finally {
140+
lock.unlock();
141+
}
142+
}
143+
});
144+
}
118145

119-
private Subscription disconnect() {
146+
private Subscription disconnect(final CompositeSubscription current) {
120147
return Subscriptions.create(new Action0() {
121148
@Override
122149
public void call() {
123150
lock.lock();
124151
try {
125-
if (subscriptionCount.decrementAndGet() == 0) {
126-
baseSubscription.unsubscribe();
127-
// need a new baseSubscription because once
128-
// unsubscribed stays that way
129-
baseSubscription = new CompositeSubscription();
152+
if (baseSubscription == current) {
153+
if (subscriptionCount.decrementAndGet() == 0) {
154+
baseSubscription.unsubscribe();
155+
// need a new baseSubscription because once
156+
// unsubscribed stays that way
157+
baseSubscription = new CompositeSubscription();
158+
}
130159
}
131160
} finally {
132161
lock.unlock();

src/test/java/rx/internal/operators/OnSubscribeRefCountTest.java

Lines changed: 91 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -15,39 +15,24 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.assertEquals;
19-
import static org.junit.Assert.assertTrue;
20-
import static org.junit.Assert.fail;
18+
import static org.junit.Assert.*;
2119
import static org.mockito.Matchers.any;
22-
import static org.mockito.Mockito.inOrder;
23-
import static org.mockito.Mockito.mock;
24-
import static org.mockito.Mockito.never;
25-
import static org.mockito.Mockito.verify;
26-
27-
import java.util.ArrayList;
28-
import java.util.Arrays;
29-
import java.util.List;
30-
import java.util.concurrent.CountDownLatch;
31-
import java.util.concurrent.TimeUnit;
20+
import static org.mockito.Mockito.*;
21+
22+
import java.util.*;
23+
import java.util.concurrent.*;
3224
import java.util.concurrent.atomic.AtomicInteger;
3325

34-
import org.junit.Before;
35-
import org.junit.Test;
36-
import org.mockito.InOrder;
37-
import org.mockito.MockitoAnnotations;
26+
import org.junit.*;
27+
import org.mockito.*;
3828

39-
import rx.Observable;
29+
import rx.*;
4030
import rx.Observable.OnSubscribe;
31+
import rx.Observable;
4132
import rx.Observer;
42-
import rx.Subscriber;
43-
import rx.Subscription;
44-
import rx.functions.Action0;
45-
import rx.functions.Action1;
46-
import rx.functions.Func2;
47-
import rx.observers.Subscribers;
48-
import rx.observers.TestSubscriber;
49-
import rx.schedulers.Schedulers;
50-
import rx.schedulers.TestScheduler;
33+
import rx.functions.*;
34+
import rx.observers.*;
35+
import rx.schedulers.*;
5136
import rx.subjects.ReplaySubject;
5237
import rx.subscriptions.Subscriptions;
5338

@@ -285,6 +270,13 @@ public void call() {
285270
s.assertNoErrors();
286271
}
287272

273+
@Test
274+
public void testConnectUnsubscribeRaceConditionLoop() throws InterruptedException {
275+
for (int i = 0; i < 1000; i++) {
276+
testConnectUnsubscribeRaceCondition();
277+
}
278+
}
279+
288280
@Test
289281
public void testConnectUnsubscribeRaceCondition() throws InterruptedException {
290282
final AtomicInteger subUnsubCount = new AtomicInteger();
@@ -310,12 +302,14 @@ public void call() {
310302
});
311303

312304
TestSubscriber<Long> s = new TestSubscriber<Long>();
313-
o.publish().refCount().subscribeOn(Schedulers.newThread()).subscribe(s);
305+
306+
o.publish().refCount().subscribeOn(Schedulers.computation()).subscribe(s);
314307
System.out.println("send unsubscribe");
315308
// now immediately unsubscribe while subscribeOn is racing to subscribe
316309
s.unsubscribe();
317310
// this generally will mean it won't even subscribe as it is already unsubscribed by the time connect() gets scheduled
318-
311+
// give time to the counter to update
312+
Thread.sleep(1);
319313
// either we subscribed and then unsubscribed, or we didn't ever even subscribe
320314
assertEquals(0, subUnsubCount.get());
321315

@@ -532,4 +526,72 @@ public Integer call(Integer t1, Integer t2) {
532526
ts2.assertReceivedOnNext(Arrays.asList(30));
533527
}
534528

529+
@Test(timeout = 10000)
530+
public void testUpstreamErrorAllowsRetry() throws InterruptedException {
531+
final AtomicInteger intervalSubscribed = new AtomicInteger();
532+
Observable<String> interval =
533+
Observable.interval(200,TimeUnit.MILLISECONDS)
534+
.doOnSubscribe(
535+
new Action0() {
536+
@Override
537+
public void call() {
538+
System.out.println("Subscribing to interval " + intervalSubscribed.incrementAndGet());
539+
}
540+
}
541+
)
542+
.flatMap(new Func1<Long, Observable<String>>() {
543+
@Override
544+
public Observable<String> call(Long t1) {
545+
return Observable.defer(new Func0<Observable<String>>() {
546+
@Override
547+
public Observable<String> call() {
548+
return Observable.<String>error(new Exception("Some exception"));
549+
}
550+
});
551+
}
552+
})
553+
.onErrorResumeNext(new Func1<Throwable, Observable<String>>() {
554+
@Override
555+
public Observable<String> call(Throwable t1) {
556+
return Observable.error(t1);
557+
}
558+
})
559+
.publish()
560+
.refCount();
561+
562+
interval
563+
.doOnError(new Action1<Throwable>() {
564+
@Override
565+
public void call(Throwable t1) {
566+
System.out.println("Subscriber 1 onError: " + t1);
567+
}
568+
})
569+
.retry(5)
570+
.subscribe(new Action1<String>() {
571+
@Override
572+
public void call(String t1) {
573+
System.out.println("Subscriber 1: " + t1);
574+
}
575+
});
576+
Thread.sleep(100);
577+
interval
578+
.doOnError(new Action1<Throwable>() {
579+
@Override
580+
public void call(Throwable t1) {
581+
System.out.println("Subscriber 2 onError: " + t1);
582+
}
583+
})
584+
.retry(5)
585+
.subscribe(new Action1<String>() {
586+
@Override
587+
public void call(String t1) {
588+
System.out.println("Subscriber 2: " + t1);
589+
}
590+
});
591+
592+
Thread.sleep(1300);
593+
594+
System.out.println(intervalSubscribed.get());
595+
assertEquals(6, intervalSubscribed.get());
596+
}
535597
}

0 commit comments

Comments
 (0)