Skip to content

Commit a8f809a

Browse files
committed
prevent calling subscriber to FuncWithErrors with events after terminal event occurred (race condition)
1 parent 9c25306 commit a8f809a

File tree

1 file changed

+23
-20
lines changed

1 file changed

+23
-20
lines changed

src/test/java/rx/internal/operators/OperatorRetryTest.java

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,7 @@ public void request(long n) {
410410
}
411411
return;
412412
}
413-
if (n > 0 && req.getAndAdd(n) == 0 && (path.get() == 2 || path.compareAndSet(0, 2))) {
413+
if (n > 0 && req.getAndAdd(n) == 0 && (path.get() == 2 || path.compareAndSet(0, 2)) && !o.isUnsubscribed()) {
414414
int i = count.getAndIncrement();
415415
if (i < numFailures) {
416416
o.onNext("beginningEveryTime");
@@ -686,25 +686,28 @@ public void testTimeoutWithRetry() {
686686

687687
@Test//(timeout = 15000)
688688
public void testRetryWithBackpressure() throws InterruptedException {
689-
final int NUM_RETRIES = RxRingBuffer.SIZE * 2;
690-
for (int i = 0; i < 400; i++) {
691-
@SuppressWarnings("unchecked")
692-
Observer<String> observer = mock(Observer.class);
693-
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));
694-
TestSubscriber<String> ts = new TestSubscriber<String>(observer);
695-
origin.retry().observeOn(Schedulers.computation()).unsafeSubscribe(ts);
696-
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
697-
698-
InOrder inOrder = inOrder(observer);
699-
// should have no errors
700-
verify(observer, never()).onError(any(Throwable.class));
701-
// should show NUM_RETRIES attempts
702-
inOrder.verify(observer, times(NUM_RETRIES + 1)).onNext("beginningEveryTime");
703-
// should have a single success
704-
inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
705-
// should have a single successful onCompleted
706-
inOrder.verify(observer, times(1)).onCompleted();
707-
inOrder.verifyNoMoreInteractions();
689+
final int NUM_LOOPS = 1;
690+
for (int j=0;j<NUM_LOOPS;j++) {
691+
final int NUM_RETRIES = RxRingBuffer.SIZE * 2;
692+
for (int i = 0; i < 400; i++) {
693+
@SuppressWarnings("unchecked")
694+
Observer<String> observer = mock(Observer.class);
695+
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));
696+
TestSubscriber<String> ts = new TestSubscriber<String>(observer);
697+
origin.retry().observeOn(Schedulers.computation()).unsafeSubscribe(ts);
698+
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
699+
700+
InOrder inOrder = inOrder(observer);
701+
// should have no errors
702+
verify(observer, never()).onError(any(Throwable.class));
703+
// should show NUM_RETRIES attempts
704+
inOrder.verify(observer, times(NUM_RETRIES + 1)).onNext("beginningEveryTime");
705+
// should have a single success
706+
inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
707+
// should have a single successful onCompleted
708+
inOrder.verify(observer, times(1)).onCompleted();
709+
inOrder.verifyNoMoreInteractions();
710+
}
708711
}
709712
}
710713

0 commit comments

Comments
 (0)