Skip to content

Commit 82685fb

Browse files
committed
use boolean flag for completion in FuncWithErrors because o.onError and o.onCompleted may be buffered in o so that unsubscription is delayed
1 parent a8f809a commit 82685fb

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

src/test/java/rx/internal/operators/OperatorRetryTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,8 @@ public void call(final Subscriber<? super String> o) {
397397
final AtomicLong req = new AtomicLong();
398398
// 0 = not set, 1 = fast path, 2 = backpressure
399399
final AtomicInteger path = new AtomicInteger(0);
400+
volatile boolean done = false;
401+
400402
@Override
401403
public void request(long n) {
402404
if (n == Long.MAX_VALUE && path.compareAndSet(0, 1)) {
@@ -410,11 +412,12 @@ public void request(long n) {
410412
}
411413
return;
412414
}
413-
if (n > 0 && req.getAndAdd(n) == 0 && (path.get() == 2 || path.compareAndSet(0, 2)) && !o.isUnsubscribed()) {
415+
if (n > 0 && req.getAndAdd(n) == 0 && (path.get() == 2 || path.compareAndSet(0, 2)) && !done) {
414416
int i = count.getAndIncrement();
415417
if (i < numFailures) {
416418
o.onNext("beginningEveryTime");
417419
o.onError(new RuntimeException("forced failure: " + (i + 1)));
420+
done = true;
418421
} else {
419422
do {
420423
if (i == numFailures) {
@@ -423,6 +426,7 @@ public void request(long n) {
423426
if (i > numFailures) {
424427
o.onNext("onSuccessOnly");
425428
o.onCompleted();
429+
done = true;
426430
break;
427431
}
428432
i = count.getAndIncrement();

0 commit comments

Comments
 (0)