Skip to content

Commit 9c25306

Browse files
committed
add concurrency protection to FuncWithErrors
1 parent 222d4ad commit 9c25306

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

src/test/java/rx/internal/operators/OperatorRetryTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,9 +395,11 @@ public static class FuncWithErrors implements Observable.OnSubscribe<String> {
395395
public void call(final Subscriber<? super String> o) {
396396
o.setProducer(new Producer() {
397397
final AtomicLong req = new AtomicLong();
398+
// 0 = not set, 1 = fast path, 2 = backpressure
399+
final AtomicInteger path = new AtomicInteger(0);
398400
@Override
399401
public void request(long n) {
400-
if (n == Long.MAX_VALUE) {
402+
if (n == Long.MAX_VALUE && path.compareAndSet(0, 1)) {
401403
o.onNext("beginningEveryTime");
402404
int i = count.getAndIncrement();
403405
if (i < numFailures) {
@@ -408,7 +410,7 @@ public void request(long n) {
408410
}
409411
return;
410412
}
411-
if (n > 0 && req.getAndAdd(n) == 0) {
413+
if (n > 0 && req.getAndAdd(n) == 0 && (path.get() == 2 || path.compareAndSet(0, 2))) {
412414
int i = count.getAndIncrement();
413415
if (i < numFailures) {
414416
o.onNext("beginningEveryTime");

0 commit comments

Comments
 (0)