Skip to content

Commit 939a3b1

Browse files
committed
Unbroken TestSubscriber
1 parent 8741a21 commit 939a3b1

File tree

2 files changed

+9
-12
lines changed

2 files changed

+9
-12
lines changed

src/main/java/rx/observers/TestSubscriber.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,9 @@
1616
package rx.observers;
1717

1818
import java.util.List;
19-
import java.util.concurrent.CountDownLatch;
20-
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.*;
2120

22-
import rx.Notification;
23-
import rx.Observer;
24-
import rx.Subscriber;
21+
import rx.*;
2522

2623
/**
2724
* A {@code TestSubscriber} is a variety of {@link Subscriber} that you can use for unit testing, to perform
@@ -227,9 +224,11 @@ public void awaitTerminalEvent() {
227224
* @throws RuntimeException
228225
* if the Subscriber is interrupted before the Observable is able to complete
229226
*/
230-
public boolean awaitTerminalEvent(long timeout, TimeUnit unit) {
227+
public void awaitTerminalEvent(long timeout, TimeUnit unit) {
231228
try {
232-
return latch.await(timeout, unit);
229+
if (!latch.await(timeout, unit)) {
230+
throw new RuntimeException(new TimeoutException());
231+
}
233232
} catch (InterruptedException e) {
234233
throw new RuntimeException("Interrupted", e);
235234
}

src/test/java/rx/internal/operators/OperatorRetryTest.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -730,15 +730,12 @@ public void testRetryWithBackpressureParallel() throws InterruptedException {
730730
exec.execute(new Runnable() {
731731
@Override
732732
public void run() {
733+
final AtomicInteger nexts = new AtomicInteger();
733734
try {
734-
final AtomicInteger nexts = new AtomicInteger();
735735
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));
736736
TestSubscriber<String> ts = new TestSubscriber<String>();
737737
origin.retry().observeOn(Schedulers.computation()).unsafeSubscribe(ts);
738-
if (!ts.awaitTerminalEvent(2, TimeUnit.SECONDS)) {
739-
timeouts.incrementAndGet();
740-
System.out.println(j + " | " + cdl.getCount() + " !!! " + nexts.get());
741-
}
738+
ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
742739
if (ts.getOnNextEvents().size() != NUM_RETRIES + 2) {
743740
data.incrementAndGet();
744741
}
@@ -750,6 +747,7 @@ public void run() {
750747
}
751748
} catch (Throwable t) {
752749
timeouts.incrementAndGet();
750+
System.out.println(j + " | " + cdl.getCount() + " !!! " + nexts.get());
753751
}
754752
cdl.countDown();
755753
}

0 commit comments

Comments
 (0)