Skip to content

Commit 4fd16ee

Browse files
authored
2.x: improve request accounting overhead in retry/repeat (#5790)
* 2.x: improve request accounting overhead in retry/repeat * Test repeatUntil cancelled case
1 parent 6aea3f0 commit 4fd16ee

File tree

5 files changed

+61
-11
lines changed

5 files changed

+61
-11
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableRepeat.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ public void subscribeActual(Subscriber<? super T> s) {
3636
rs.subscribeNext();
3737
}
3838

39-
// FIXME update to a fresh Rsc algorithm
4039
static final class RepeatSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T> {
4140

4241
private static final long serialVersionUID = -7098360935104053232L;
@@ -45,6 +44,9 @@ static final class RepeatSubscriber<T> extends AtomicInteger implements Flowable
4544
final SubscriptionArbiter sa;
4645
final Publisher<? extends T> source;
4746
long remaining;
47+
48+
long produced;
49+
4850
RepeatSubscriber(Subscriber<? super T> actual, long count, SubscriptionArbiter sa, Publisher<? extends T> source) {
4951
this.actual = actual;
5052
this.sa = sa;
@@ -59,8 +61,8 @@ public void onSubscribe(Subscription s) {
5961

6062
@Override
6163
public void onNext(T t) {
64+
produced++;
6265
actual.onNext(t);
63-
sa.produced(1L);
6466
}
6567
@Override
6668
public void onError(Throwable t) {
@@ -90,6 +92,11 @@ void subscribeNext() {
9092
if (sa.isCancelled()) {
9193
return;
9294
}
95+
long p = produced;
96+
if (p != 0L) {
97+
produced = 0L;
98+
sa.produced(p);
99+
}
93100
source.subscribe(this);
94101

95102
missed = addAndGet(-missed);

src/main/java/io/reactivex/internal/operators/flowable/FlowableRepeatUntil.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ public void subscribeActual(Subscriber<? super T> s) {
3838
rs.subscribeNext();
3939
}
4040

41-
// FIXME update to a fresh Rsc algorithm
4241
static final class RepeatSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T> {
4342

4443
private static final long serialVersionUID = -7098360935104053232L;
@@ -47,6 +46,9 @@ static final class RepeatSubscriber<T> extends AtomicInteger implements Flowable
4746
final SubscriptionArbiter sa;
4847
final Publisher<? extends T> source;
4948
final BooleanSupplier stop;
49+
50+
long produced;
51+
5052
RepeatSubscriber(Subscriber<? super T> actual, BooleanSupplier until, SubscriptionArbiter sa, Publisher<? extends T> source) {
5153
this.actual = actual;
5254
this.sa = sa;
@@ -61,8 +63,8 @@ public void onSubscribe(Subscription s) {
6163

6264
@Override
6365
public void onNext(T t) {
66+
produced++;
6467
actual.onNext(t);
65-
sa.produced(1L);
6668
}
6769
@Override
6870
public void onError(Throwable t) {
@@ -93,6 +95,16 @@ void subscribeNext() {
9395
if (getAndIncrement() == 0) {
9496
int missed = 1;
9597
for (;;) {
98+
if (sa.isCancelled()) {
99+
return;
100+
}
101+
102+
long p = produced;
103+
if (p != 0L) {
104+
produced = 0L;
105+
sa.produced(p);
106+
}
107+
96108
source.subscribe(this);
97109

98110
missed = addAndGet(-missed);

src/main/java/io/reactivex/internal/operators/flowable/FlowableRetryBiPredicate.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ public void subscribeActual(Subscriber<? super T> s) {
4040
rs.subscribeNext();
4141
}
4242

43-
// FIXME update to a fresh Rsc algorithm
4443
static final class RetryBiSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T> {
4544

4645
private static final long serialVersionUID = -7098360935104053232L;
@@ -50,6 +49,9 @@ static final class RetryBiSubscriber<T> extends AtomicInteger implements Flowabl
5049
final Publisher<? extends T> source;
5150
final BiPredicate<? super Integer, ? super Throwable> predicate;
5251
int retries;
52+
53+
long produced;
54+
5355
RetryBiSubscriber(Subscriber<? super T> actual,
5456
BiPredicate<? super Integer, ? super Throwable> predicate, SubscriptionArbiter sa, Publisher<? extends T> source) {
5557
this.actual = actual;
@@ -65,8 +67,8 @@ public void onSubscribe(Subscription s) {
6567

6668
@Override
6769
public void onNext(T t) {
70+
produced++;
6871
actual.onNext(t);
69-
sa.produced(1L);
7072
}
7173
@Override
7274
public void onError(Throwable t) {
@@ -100,6 +102,13 @@ void subscribeNext() {
100102
if (sa.isCancelled()) {
101103
return;
102104
}
105+
106+
long p = produced;
107+
if (p != 0L) {
108+
produced = 0L;
109+
sa.produced(p);
110+
}
111+
103112
source.subscribe(this);
104113

105114
missed = addAndGet(-missed);

src/main/java/io/reactivex/internal/operators/flowable/FlowableRetryPredicate.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,11 @@ public void subscribeActual(Subscriber<? super T> s) {
3838
SubscriptionArbiter sa = new SubscriptionArbiter();
3939
s.onSubscribe(sa);
4040

41-
RepeatSubscriber<T> rs = new RepeatSubscriber<T>(s, count, predicate, sa, source);
41+
RetrySubscriber<T> rs = new RetrySubscriber<T>(s, count, predicate, sa, source);
4242
rs.subscribeNext();
4343
}
4444

45-
// FIXME update to a fresh Rsc algorithm
46-
static final class RepeatSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T> {
45+
static final class RetrySubscriber<T> extends AtomicInteger implements FlowableSubscriber<T> {
4746

4847
private static final long serialVersionUID = -7098360935104053232L;
4948

@@ -52,7 +51,10 @@ static final class RepeatSubscriber<T> extends AtomicInteger implements Flowable
5251
final Publisher<? extends T> source;
5352
final Predicate<? super Throwable> predicate;
5453
long remaining;
55-
RepeatSubscriber(Subscriber<? super T> actual, long count,
54+
55+
long produced;
56+
57+
RetrySubscriber(Subscriber<? super T> actual, long count,
5658
Predicate<? super Throwable> predicate, SubscriptionArbiter sa, Publisher<? extends T> source) {
5759
this.actual = actual;
5860
this.sa = sa;
@@ -68,8 +70,8 @@ public void onSubscribe(Subscription s) {
6870

6971
@Override
7072
public void onNext(T t) {
73+
produced++;
7174
actual.onNext(t);
72-
sa.produced(1L);
7375
}
7476
@Override
7577
public void onError(Throwable t) {
@@ -111,6 +113,13 @@ void subscribeNext() {
111113
if (sa.isCancelled()) {
112114
return;
113115
}
116+
117+
long p = produced;
118+
if (p != 0L) {
119+
produced = 0L;
120+
sa.produced(p);
121+
}
122+
114123
source.subscribe(this);
115124

116125
missed = addAndGet(-missed);

src/test/java/io/reactivex/internal/operators/flowable/FlowableRepeatTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,19 @@ public boolean getAsBoolean() throws Exception {
259259
.assertResult(1, 1, 1, 1, 1);
260260
}
261261

262+
@Test
263+
public void repeatUntilCancel() {
264+
Flowable.just(1)
265+
.repeatUntil(new BooleanSupplier() {
266+
@Override
267+
public boolean getAsBoolean() throws Exception {
268+
return true;
269+
}
270+
})
271+
.test(2L, true)
272+
.assertEmpty();
273+
}
274+
262275
@Test
263276
public void repeatLongPredicateInvalid() {
264277
try {

0 commit comments

Comments
 (0)