-
Notifications
You must be signed in to change notification settings - Fork 7.6k
rewrite OnSubscribeRefCount to handle synchronous source #1695
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a4af991
055057d
eb6ae37
b8da4a9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -137,7 +137,13 @@ public void call() { | |
})); | ||
|
||
// now that everything is hooked up let's subscribe | ||
source.unsafeSubscribe(subscription); | ||
// as long as the subscription is not null | ||
boolean subscriptionIsNull; | ||
synchronized(guard) { | ||
subscriptionIsNull = subscription == null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What causes this? Is it because the subscription has been unsubscribed already? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep it is in response to an existing RefCount test testing subscription
|
||
} | ||
if (!subscriptionIsNull) | ||
source.unsafeSubscribe(subscription); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
package rx; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
import static org.junit.Assert.assertTrue; | ||
import static org.mockito.Matchers.any; | ||
import static org.mockito.Mockito.inOrder; | ||
import static org.mockito.Mockito.mock; | ||
|
@@ -25,6 +26,7 @@ | |
import java.util.ArrayList; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicInteger; | ||
|
||
|
@@ -34,11 +36,14 @@ | |
import org.mockito.MockitoAnnotations; | ||
|
||
import rx.Observable.OnSubscribe; | ||
import rx.Observable.Operator; | ||
import rx.functions.Action0; | ||
import rx.functions.Action1; | ||
import rx.functions.Func2; | ||
import rx.observables.ConnectableObservable; | ||
import rx.observers.Subscribers; | ||
import rx.observers.TestSubscriber; | ||
import rx.schedulers.Schedulers; | ||
import rx.schedulers.TestScheduler; | ||
import rx.subjects.ReplaySubject; | ||
import rx.subscriptions.Subscriptions; | ||
|
@@ -237,4 +242,49 @@ public Integer call(Integer t1, Integer t2) { | |
ts2.assertNoErrors(); | ||
ts2.assertReceivedOnNext(Arrays.asList(30)); | ||
} | ||
|
||
@Test | ||
public void testRefCountUnsubscribeForSynchronousSource() throws InterruptedException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This test passes for me already, without the changes above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi Ben. I started with a failing unit test against 0.20.4 and in the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No it doesn't hang, it passes just fine. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Enjoy your holiday! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I run this test individually, it doesn't hang but if I run a full build, it hangs. |
||
final CountDownLatch latch = new CountDownLatch(1); | ||
Observable<Long> o = synchronousInterval().lift(detectUnsubscription(latch)); | ||
Subscriber<Long> sub = Subscribers.empty(); | ||
o.publish().refCount().subscribeOn(Schedulers.computation()).subscribe(sub); | ||
Thread.sleep(100); | ||
sub.unsubscribe(); | ||
assertTrue(latch.await(3, TimeUnit.SECONDS)); | ||
} | ||
|
||
@Test | ||
public void testSubscribeToPublishWithAlreadyUnsubscribedSubscriber() { | ||
Subscriber<Object> sub = Subscribers.empty(); | ||
sub.unsubscribe(); | ||
ConnectableObservable<Object> o = Observable.empty().publish(); | ||
o.subscribe(sub); | ||
o.connect(); | ||
} | ||
|
||
private Operator<Long, Long> detectUnsubscription(final CountDownLatch latch) { | ||
return new Operator<Long,Long>(){ | ||
@Override | ||
public Subscriber<? super Long> call(Subscriber<? super Long> subscriber) { | ||
latch.countDown(); | ||
return Subscribers.from(subscriber); | ||
}}; | ||
} | ||
|
||
private Observable<Long> synchronousInterval() { | ||
return Observable.create(new OnSubscribe<Long>() { | ||
|
||
@Override | ||
public void call(Subscriber<? super Long> subscriber) { | ||
while (!subscriber.isUnsubscribed()) { | ||
try { | ||
Thread.sleep(100); | ||
} catch (InterruptedException e) { | ||
} | ||
subscriber.onNext(1L); | ||
} | ||
}}); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this block right here on a synchronous source and then deadlock any other subscriber trying to subscribe since the lock is now held?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No because connect is called with an action that allows the subscription to
be read and in our case recorded. The action also unlocks.
On 3 Oct 2014 11:38, "Ben Christensen" [email protected] wrote: