Skip to content

2.x: Add Single.delay overload that delays errors #5616

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 51 additions & 6 deletions src/main/java/io/reactivex/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -1617,32 +1617,53 @@ public final Flowable<T> concatWith(SingleSource<? extends T> other) {
}

/**
* Delays the emission of the success or error signal from the current Single by
* the specified amount.
* Delays the emission of the success signal from the current Single by the specified amount.
* An error signal will not be delayed.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code delay} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param time the time amount to delay the signals
* @param time the amount of time the success signal should be delayed for
* @param unit the time unit
* @return the new Single instance
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Single<T> delay(long time, TimeUnit unit) {
return delay(time, unit, Schedulers.computation());
return delay(time, unit, Schedulers.computation(), false);
}

/**
* Delays the emission of the success or error signal from the current Single by the specified amount.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code delay} operates by default on the {@code computation} {@link Scheduler}.</dd>
* </dl>
*
* @param time the amount of time the success or error signal should be delayed for
* @param unit the time unit
* @param delayError if true, both success and error signals are delayed. if false, only success signals are delayed.
* @return the new Single instance
* @since 2.1.5 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.COMPUTATION)
public final Single<T> delay(long time, TimeUnit unit, boolean delayError) {
return delay(time, unit, Schedulers.computation(), delayError);
}

/**
* Delays the emission of the success signal from the current Single by the specified amount.
* An error signal will not be delayed.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify the {@link Scheduler} where the non-blocking wait and emission happens</dd>
* </dl>
*
* @param time the time amount to delay the emission of the success signal
* @param time the amount of time the success signal should be delayed for
* @param unit the time unit
* @param scheduler the target scheduler to use for the non-blocking wait and emission
* @return the new Single instance
Expand All @@ -1654,9 +1675,33 @@ public final Single<T> delay(long time, TimeUnit unit) {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Single<T> delay(final long time, final TimeUnit unit, final Scheduler scheduler) {
return delay(time, unit, scheduler, false);
}

/**
* Delays the emission of the success or error signal from the current Single by the specified amount.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify the {@link Scheduler} where the non-blocking wait and emission happens</dd>
* </dl>
*
* @param time the amount of time the success or error signal should be delayed for
* @param unit the time unit
* @param scheduler the target scheduler to use for the non-blocking wait and emission
* @param delayError if true, both success and error signals are delayed. if false, only success signals are delayed.
* @return the new Single instance
* @throws NullPointerException
* if unit is null, or
* if scheduler is null
* @since 2.1.5 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Single<T> delay(final long time, final TimeUnit unit, final Scheduler scheduler, boolean delayError) {
ObjectHelper.requireNonNull(unit, "unit is null");
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleDelay<T>(this, time, unit, scheduler));
return RxJavaPlugins.onAssembly(new SingleDelay<T>(this, time, unit, scheduler, delayError));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@

public final class SingleDelay<T> extends Single<T> {


final SingleSource<? extends T> source;
final long time;
final TimeUnit unit;
final Scheduler scheduler;
final boolean delayError;

public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
public SingleDelay(SingleSource<? extends T> source, long time, TimeUnit unit, Scheduler scheduler, boolean delayError) {
this.source = source;
this.time = time;
this.unit = unit;
this.scheduler = scheduler;
this.delayError = delayError;
}

@Override
Expand Down Expand Up @@ -63,7 +64,7 @@ public void onSuccess(final T value) {

@Override
public void onError(final Throwable e) {
sd.replace(scheduler.scheduleDirect(new OnError(e), 0, unit));
sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
}

final class OnSuccess implements Runnable {
Expand Down
2 changes: 2 additions & 0 deletions src/test/java/io/reactivex/ParamValidationCheckerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,9 @@ public void checkParallelFlowable() {

// negative time is considered as zero time
addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class));
addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Boolean.TYPE));
addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class));
addOverride(new ParamOverride(Single.class, 0, ParamMode.ANY, "delay", Long.TYPE, TimeUnit.class, Scheduler.class, Boolean.TYPE));


// zero repeat is allowed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,64 @@
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.*;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.schedulers.TestScheduler;
import io.reactivex.subjects.PublishSubject;

public class SingleDelayTest {
@Test
public void delay() throws Exception {
final AtomicInteger value = new AtomicInteger();
public void delayOnSuccess() {
final TestScheduler scheduler = new TestScheduler();
final TestObserver<Integer> observer = Single.just(1)
.delay(5, TimeUnit.SECONDS, scheduler)
.test();

Single.just(1).delay(200, TimeUnit.MILLISECONDS)
.subscribe(new BiConsumer<Integer, Throwable>() {
@Override
public void accept(Integer v, Throwable e) throws Exception {
value.set(v);
}
});
scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
observer.assertNoValues();

Thread.sleep(100);
scheduler.advanceTimeTo(5, TimeUnit.SECONDS);
observer.assertValue(1);
}

@Test
public void delayOnError() {
final TestScheduler scheduler = new TestScheduler();
final TestObserver<?> observer = Single.error(new TestException())
.delay(5, TimeUnit.SECONDS, scheduler)
.test();

scheduler.triggerActions();
observer.assertError(TestException.class);
}

assertEquals(0, value.get());
@Test
public void delayedErrorOnSuccess() {
final TestScheduler scheduler = new TestScheduler();
final TestObserver<Integer> observer = Single.just(1)
.delay(5, TimeUnit.SECONDS, scheduler, true)
.test();

Thread.sleep(200);
scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
observer.assertNoValues();

assertEquals(1, value.get());
scheduler.advanceTimeTo(5, TimeUnit.SECONDS);
observer.assertValue(1);
}

@Test
public void delayError() {
Single.error(new TestException()).delay(5, TimeUnit.SECONDS)
.test()
.awaitDone(1, TimeUnit.SECONDS)
.assertFailure(TestException.class);
public void delayedErrorOnError() {
final TestScheduler scheduler = new TestScheduler();
final TestObserver<?> observer = Single.error(new TestException())
.delay(5, TimeUnit.SECONDS, scheduler, true)
.test();

scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
observer.assertNoErrors();

scheduler.advanceTimeTo(5, TimeUnit.SECONDS);
observer.assertError(TestException.class);
}

@Test
Expand Down