Skip to content

2.x: improve request accounting overhead in retry/repeat #5790

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public void subscribeActual(Subscriber<? super T> s) {
rs.subscribeNext();
}

// FIXME update to a fresh Rsc algorithm
static final class RepeatSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T> {

private static final long serialVersionUID = -7098360935104053232L;
Expand All @@ -45,6 +44,9 @@ static final class RepeatSubscriber<T> extends AtomicInteger implements Flowable
final SubscriptionArbiter sa;
final Publisher<? extends T> source;
long remaining;

long produced;

RepeatSubscriber(Subscriber<? super T> actual, long count, SubscriptionArbiter sa, Publisher<? extends T> source) {
this.actual = actual;
this.sa = sa;
Expand All @@ -59,8 +61,8 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
produced++;
actual.onNext(t);
sa.produced(1L);
}
@Override
public void onError(Throwable t) {
Expand Down Expand Up @@ -90,6 +92,11 @@ void subscribeNext() {
if (sa.isCancelled()) {
return;
}
long p = produced;
if (p != 0L) {
produced = 0L;
sa.produced(p);
}
source.subscribe(this);

missed = addAndGet(-missed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public void subscribeActual(Subscriber<? super T> s) {
rs.subscribeNext();
}

// FIXME update to a fresh Rsc algorithm
static final class RepeatSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T> {

private static final long serialVersionUID = -7098360935104053232L;
Expand All @@ -47,6 +46,9 @@ static final class RepeatSubscriber<T> extends AtomicInteger implements Flowable
final SubscriptionArbiter sa;
final Publisher<? extends T> source;
final BooleanSupplier stop;

long produced;

RepeatSubscriber(Subscriber<? super T> actual, BooleanSupplier until, SubscriptionArbiter sa, Publisher<? extends T> source) {
this.actual = actual;
this.sa = sa;
Expand All @@ -61,8 +63,8 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
produced++;
actual.onNext(t);
sa.produced(1L);
}
@Override
public void onError(Throwable t) {
Expand Down Expand Up @@ -93,6 +95,16 @@ void subscribeNext() {
if (getAndIncrement() == 0) {
int missed = 1;
for (;;) {
if (sa.isCancelled()) {
return;
}

long p = produced;
if (p != 0L) {
produced = 0L;
sa.produced(p);
}

source.subscribe(this);

missed = addAndGet(-missed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public void subscribeActual(Subscriber<? super T> s) {
rs.subscribeNext();
}

// FIXME update to a fresh Rsc algorithm
static final class RetryBiSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T> {

private static final long serialVersionUID = -7098360935104053232L;
Expand All @@ -50,6 +49,9 @@ static final class RetryBiSubscriber<T> extends AtomicInteger implements Flowabl
final Publisher<? extends T> source;
final BiPredicate<? super Integer, ? super Throwable> predicate;
int retries;

long produced;

RetryBiSubscriber(Subscriber<? super T> actual,
BiPredicate<? super Integer, ? super Throwable> predicate, SubscriptionArbiter sa, Publisher<? extends T> source) {
this.actual = actual;
Expand All @@ -65,8 +67,8 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
produced++;
actual.onNext(t);
sa.produced(1L);
}
@Override
public void onError(Throwable t) {
Expand Down Expand Up @@ -100,6 +102,13 @@ void subscribeNext() {
if (sa.isCancelled()) {
return;
}

long p = produced;
if (p != 0L) {
produced = 0L;
sa.produced(p);
}

source.subscribe(this);

missed = addAndGet(-missed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,11 @@ public void subscribeActual(Subscriber<? super T> s) {
SubscriptionArbiter sa = new SubscriptionArbiter();
s.onSubscribe(sa);

RepeatSubscriber<T> rs = new RepeatSubscriber<T>(s, count, predicate, sa, source);
RetrySubscriber<T> rs = new RetrySubscriber<T>(s, count, predicate, sa, source);
rs.subscribeNext();
}

// FIXME update to a fresh Rsc algorithm
static final class RepeatSubscriber<T> extends AtomicInteger implements FlowableSubscriber<T> {
static final class RetrySubscriber<T> extends AtomicInteger implements FlowableSubscriber<T> {

private static final long serialVersionUID = -7098360935104053232L;

Expand All @@ -52,7 +51,10 @@ static final class RepeatSubscriber<T> extends AtomicInteger implements Flowable
final Publisher<? extends T> source;
final Predicate<? super Throwable> predicate;
long remaining;
RepeatSubscriber(Subscriber<? super T> actual, long count,

long produced;

RetrySubscriber(Subscriber<? super T> actual, long count,
Predicate<? super Throwable> predicate, SubscriptionArbiter sa, Publisher<? extends T> source) {
this.actual = actual;
this.sa = sa;
Expand All @@ -68,8 +70,8 @@ public void onSubscribe(Subscription s) {

@Override
public void onNext(T t) {
produced++;
actual.onNext(t);
sa.produced(1L);
}
@Override
public void onError(Throwable t) {
Expand Down Expand Up @@ -111,6 +113,13 @@ void subscribeNext() {
if (sa.isCancelled()) {
return;
}

long p = produced;
if (p != 0L) {
produced = 0L;
sa.produced(p);
}

source.subscribe(this);

missed = addAndGet(-missed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,19 @@ public boolean getAsBoolean() throws Exception {
.assertResult(1, 1, 1, 1, 1);
}

@Test
public void repeatUntilCancel() {
Flowable.just(1)
.repeatUntil(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
return true;
}
})
.test(2L, true)
.assertEmpty();
}

@Test
public void repeatLongPredicateInvalid() {
try {
Expand Down