Skip to content

Commit d31d46a

Browse files
committed
Merge pull request #3129 from akarnokd/RetryPredicateFix
Fix retry with predicate ignoring backpressure.
2 parents a3a0b1f + d000c10 commit d31d46a

File tree

3 files changed

+105
-54
lines changed

3 files changed

+105
-54
lines changed

src/main/java/rx/Observable.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6576,6 +6576,8 @@ public final Observable<T> retry(final long count) {
65766576
* <p>
65776577
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/retry.png" alt="">
65786578
* <dl>
6579+
* <dt><b>Backpressure Support:</b></dt>
6580+
* <dd>This operator honors backpressure.</td>
65796581
* <dt><b>Scheduler:</b></dt>
65806582
* <dd>{@code retry} operates by default on the {@code trampoline} {@link Scheduler}.</dd>
65816583
* </dl>

src/main/java/rx/internal/operators/OperatorRetryWithPredicate.java

Lines changed: 65 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616
package rx.internal.operators;
1717

1818
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19+
1920
import rx.Observable;
21+
import rx.Producer;
2022
import rx.Scheduler;
2123
import rx.Subscriber;
2224
import rx.functions.Action0;
2325
import rx.functions.Func2;
26+
import rx.internal.producers.ProducerArbiter;
2427
import rx.schedulers.Schedulers;
2528
import rx.subscriptions.SerialSubscription;
2629

@@ -38,88 +41,99 @@ public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child)
3841
final SerialSubscription serialSubscription = new SerialSubscription();
3942
// add serialSubscription so it gets unsubscribed if child is unsubscribed
4043
child.add(serialSubscription);
41-
42-
return new SourceSubscriber<T>(child, predicate, inner, serialSubscription);
44+
ProducerArbiter pa = new ProducerArbiter();
45+
child.setProducer(pa);
46+
return new SourceSubscriber<T>(child, predicate, inner, serialSubscription, pa);
4347
}
4448

4549
static final class SourceSubscriber<T> extends Subscriber<Observable<T>> {
4650
final Subscriber<? super T> child;
4751
final Func2<Integer, Throwable, Boolean> predicate;
4852
final Scheduler.Worker inner;
4953
final SerialSubscription serialSubscription;
54+
final ProducerArbiter pa;
5055

5156
volatile int attempts;
5257
@SuppressWarnings("rawtypes")
5358
static final AtomicIntegerFieldUpdater<SourceSubscriber> ATTEMPTS_UPDATER
5459
= AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "attempts");
5560

56-
public SourceSubscriber(Subscriber<? super T> child, final Func2<Integer, Throwable, Boolean> predicate, Scheduler.Worker inner,
57-
SerialSubscription serialSubscription) {
61+
public SourceSubscriber(Subscriber<? super T> child,
62+
final Func2<Integer, Throwable, Boolean> predicate,
63+
Scheduler.Worker inner,
64+
SerialSubscription serialSubscription,
65+
ProducerArbiter pa) {
5866
this.child = child;
5967
this.predicate = predicate;
6068
this.inner = inner;
6169
this.serialSubscription = serialSubscription;
70+
this.pa = pa;
6271
}
6372

6473

6574
@Override
66-
public void onCompleted() {
67-
// ignore as we expect a single nested Observable<T>
68-
}
75+
public void onCompleted() {
76+
// ignore as we expect a single nested Observable<T>
77+
}
6978

70-
@Override
71-
public void onError(Throwable e) {
72-
child.onError(e);
73-
}
79+
@Override
80+
public void onError(Throwable e) {
81+
child.onError(e);
82+
}
7483

75-
@Override
76-
public void onNext(final Observable<T> o) {
77-
inner.schedule(new Action0() {
84+
@Override
85+
public void onNext(final Observable<T> o) {
86+
inner.schedule(new Action0() {
7887

79-
@Override
80-
public void call() {
81-
final Action0 _self = this;
82-
ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this);
88+
@Override
89+
public void call() {
90+
final Action0 _self = this;
91+
ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this);
8392

84-
// new subscription each time so if it unsubscribes itself it does not prevent retries
85-
// by unsubscribing the child subscription
86-
Subscriber<T> subscriber = new Subscriber<T>() {
87-
boolean done;
88-
@Override
89-
public void onCompleted() {
90-
if (!done) {
91-
done = true;
92-
child.onCompleted();
93-
}
93+
// new subscription each time so if it unsubscribes itself it does not prevent retries
94+
// by unsubscribing the child subscription
95+
Subscriber<T> subscriber = new Subscriber<T>() {
96+
boolean done;
97+
@Override
98+
public void onCompleted() {
99+
if (!done) {
100+
done = true;
101+
child.onCompleted();
94102
}
103+
}
95104

96-
@Override
97-
public void onError(Throwable e) {
98-
if (!done) {
99-
done = true;
100-
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
101-
// retry again
102-
inner.schedule(_self);
103-
} else {
104-
// give up and pass the failure
105-
child.onError(e);
106-
}
105+
@Override
106+
public void onError(Throwable e) {
107+
if (!done) {
108+
done = true;
109+
if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
110+
// retry again
111+
inner.schedule(_self);
112+
} else {
113+
// give up and pass the failure
114+
child.onError(e);
107115
}
108116
}
117+
}
109118

110-
@Override
111-
public void onNext(T v) {
112-
if (!done) {
113-
child.onNext(v);
114-
}
119+
@Override
120+
public void onNext(T v) {
121+
if (!done) {
122+
child.onNext(v);
123+
pa.produced(1);
115124
}
125+
}
116126

117-
};
118-
// register this Subscription (and unsubscribe previous if exists)
119-
serialSubscription.set(subscriber);
120-
o.unsafeSubscribe(subscriber);
121-
}
122-
});
123-
}
127+
@Override
128+
public void setProducer(Producer p) {
129+
pa.setProducer(p);
130+
}
131+
};
132+
// register this Subscription (and unsubscribe previous if exists)
133+
serialSubscription.set(subscriber);
134+
o.unsafeSubscribe(subscriber);
135+
}
136+
});
137+
}
124138
}
125139
}

src/test/java/rx/internal/operators/OperatorRetryWithPredicateTest.java

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,27 @@
2020
import static org.mockito.Mockito.*;
2121

2222
import java.io.IOException;
23+
import java.util.ArrayList;
2324
import java.util.Arrays;
2425
import java.util.Collections;
2526
import java.util.List;
2627
import java.util.concurrent.CopyOnWriteArrayList;
2728
import java.util.concurrent.TimeUnit;
28-
import java.util.concurrent.atomic.*;
29+
import java.util.concurrent.atomic.AtomicBoolean;
30+
import java.util.concurrent.atomic.AtomicInteger;
2931

3032
import org.junit.Test;
3133
import org.mockito.InOrder;
3234

33-
import rx.*;
35+
import rx.Observable;
3436
import rx.Observable.OnSubscribe;
37+
import rx.Observer;
38+
import rx.Subscriber;
39+
import rx.Subscription;
3540
import rx.exceptions.TestException;
36-
import rx.functions.*;
41+
import rx.functions.Action1;
42+
import rx.functions.Func1;
43+
import rx.functions.Func2;
3744
import rx.observers.TestSubscriber;
3845
import rx.subjects.PublishSubject;
3946

@@ -360,4 +367,32 @@ public void call(Long t) {
360367
}});
361368
assertEquals(Arrays.asList(1L,1L,2L,3L), list);
362369
}
370+
@Test
371+
public void testBackpressure() {
372+
final List<Long> requests = new ArrayList<Long>();
373+
374+
Observable<Integer> source = Observable
375+
.just(1)
376+
.concatWith(Observable.<Integer>error(new TestException()))
377+
.doOnRequest(new Action1<Long>() {
378+
@Override
379+
public void call(Long t) {
380+
requests.add(t);
381+
}
382+
});
383+
384+
TestSubscriber<Integer> ts = TestSubscriber.create(3);
385+
source
386+
.retry(new Func2<Integer, Throwable, Boolean>() {
387+
@Override
388+
public Boolean call(Integer t1, Throwable t2) {
389+
return t1 < 3;
390+
}
391+
}).subscribe(ts);
392+
393+
assertEquals(Arrays.asList(3L, 2L, 1L), requests);
394+
ts.assertValues(1, 1, 1);
395+
ts.assertNotCompleted();
396+
ts.assertNoErrors();
397+
}
363398
}

0 commit comments

Comments
 (0)