Skip to content

Commit 072ffad

Browse files
committed
Merge pull request #3120 from ypresto/no-interrupt-for-sync
No InterruptedException with synchronous BlockingObservable
2 parents 3494c00 + 8ad2260 commit 072ffad

File tree

4 files changed

+215
-104
lines changed

4 files changed

+215
-104
lines changed

src/main/java/rx/internal/operators/BlockingOperatorToIterator.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ public T next() {
9393

9494
private Notification<? extends T> take() {
9595
try {
96+
Notification<? extends T> poll = notifications.poll();
97+
if (poll != null) {
98+
return poll;
99+
}
96100
return notifications.take();
97101
} catch (InterruptedException e) {
98102
subscription.unsubscribe();

src/main/java/rx/observables/BlockingObservable.java

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -123,17 +123,7 @@ public void onNext(T args) {
123123
onNext.call(args);
124124
}
125125
});
126-
// block until the subscription completes and then return
127-
try {
128-
latch.await();
129-
} catch (InterruptedException e) {
130-
subscription.unsubscribe();
131-
// set the interrupted flag again so callers can still get it
132-
// for more information see https://github.com/ReactiveX/RxJava/pull/147#issuecomment-13624780
133-
Thread.currentThread().interrupt();
134-
// using Runtime so it is not checked
135-
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
136-
}
126+
awaitForComplete(latch, subscription);
137127

138128
if (exceptionFromOnError.get() != null) {
139129
if (exceptionFromOnError.get() instanceof RuntimeException) {
@@ -456,14 +446,7 @@ public void onNext(final T item) {
456446
returnItem.set(item);
457447
}
458448
});
459-
460-
try {
461-
latch.await();
462-
} catch (InterruptedException e) {
463-
subscription.unsubscribe();
464-
Thread.currentThread().interrupt();
465-
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
466-
}
449+
awaitForComplete(latch, subscription);
467450

468451
if (returnException.get() != null) {
469452
if (returnException.get() instanceof RuntimeException) {
@@ -475,4 +458,23 @@ public void onNext(final T item) {
475458

476459
return returnItem.get();
477460
}
461+
462+
private void awaitForComplete(CountDownLatch latch, Subscription subscription) {
463+
if (latch.getCount() == 0) {
464+
// Synchronous observable completes before awaiting for it.
465+
// Skip await so InterruptedException will never be thrown.
466+
return;
467+
}
468+
// block until the subscription completes and then return
469+
try {
470+
latch.await();
471+
} catch (InterruptedException e) {
472+
subscription.unsubscribe();
473+
// set the interrupted flag again so callers can still get it
474+
// for more information see https://github.com/ReactiveX/RxJava/pull/147#issuecomment-13624780
475+
Thread.currentThread().interrupt();
476+
// using Runtime so it is not checked
477+
throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
478+
}
479+
}
478480
}

src/test/java/rx/internal/operators/BlockingOperatorNextTest.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,8 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.assertEquals;
19-
import static org.junit.Assert.assertFalse;
20-
import static org.junit.Assert.assertTrue;
21-
import static org.junit.Assert.fail;
22-
import static rx.internal.operators.BlockingOperatorNext.next;
18+
import org.junit.Assert;
19+
import org.junit.Test;
2320

2421
import java.util.Iterator;
2522
import java.util.NoSuchElementException;
@@ -28,19 +25,21 @@
2825
import java.util.concurrent.atomic.AtomicBoolean;
2926
import java.util.concurrent.atomic.AtomicInteger;
3027

31-
import org.junit.Assert;
32-
import org.junit.Test;
33-
3428
import rx.Observable;
3529
import rx.Subscriber;
3630
import rx.exceptions.TestException;
37-
import rx.internal.operators.BlockingOperatorNext;
3831
import rx.observables.BlockingObservable;
3932
import rx.schedulers.Schedulers;
4033
import rx.subjects.BehaviorSubject;
4134
import rx.subjects.PublishSubject;
4235
import rx.subjects.Subject;
4336

37+
import static org.junit.Assert.assertEquals;
38+
import static org.junit.Assert.assertFalse;
39+
import static org.junit.Assert.assertTrue;
40+
import static org.junit.Assert.fail;
41+
import static rx.internal.operators.BlockingOperatorNext.next;
42+
4443
public class BlockingOperatorNextTest {
4544

4645
private void fireOnNextInNewThread(final Subject<String, String> o, final String value) {
@@ -83,6 +82,13 @@ public void testNext() {
8382
assertTrue(it.hasNext());
8483
assertEquals("two", it.next());
8584

85+
fireOnNextInNewThread(obs, "three");
86+
try {
87+
assertEquals("three", it.next());
88+
} catch (NoSuchElementException e) {
89+
fail("Calling next() without hasNext() should wait for next fire");
90+
}
91+
8692
obs.onCompleted();
8793
assertFalse(it.hasNext());
8894
try {

0 commit comments

Comments
 (0)