Skip to content

Operator: throttleWithTimeout #366

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import rx.operators.OperationTakeLast;
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationThrottleWithTimeout;
import rx.operators.OperationTimestamp;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
Expand Down Expand Up @@ -1809,6 +1810,39 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
return create(OperationInterval.interval(interval, unit, scheduler));
}

/**
* Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer reset on each `onNext` call.
* <p>
* NOTE: If the timeout is set higher than the rate of traffic then this will drop all data.
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
* @param unit
* The {@link TimeUnit} for the timeout.
*
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
*/
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
return create(OperationThrottleWithTimeout.throttleWithTimeout(this, timeout, unit));
}

/**
* Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer reset on each `onNext` call.
* <p>
* NOTE: If the timeout is set higher than the rate of traffic then this will drop all data.
*
* @param timeout
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
*/
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
return create(OperationThrottleWithTimeout.throttleWithTimeout(this, timeout, unit, scheduler));
}

/**
* Wraps each item emitted by a source Observable in a {@link Timestamped} object.
* <p>
Expand Down
27 changes: 22 additions & 5 deletions rxjava-core/src/main/java/rx/concurrency/TestScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func2;

public class TestScheduler extends Scheduler {
private final Queue<TimedAction<?>> queue = new PriorityQueue<TimedAction<?>>(11, new CompareActionsByTime());

private static class TimedAction<T> {

private final long time;
private final Func2<? super Scheduler, ? super T, ? extends Subscription> action;
private final T state;
private final TestScheduler scheduler;
private final AtomicBoolean isCancelled = new AtomicBoolean(false);

private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler, ? super T, ? extends Subscription> action, T state) {
this.time = time;
Expand All @@ -41,6 +43,10 @@ private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler,
this.scheduler = scheduler;
}

public void cancel() {
isCancelled.set(true);
}

@Override
public String toString() {
return String.format("TimedAction(time = %d, action = %s)", time, action.toString());
Expand Down Expand Up @@ -84,8 +90,12 @@ private void triggerActions(long targetTimeInNanos) {
}
time = current.time;
queue.remove();
// because the queue can have wildcards we have to ignore the type T for the state
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);

// Only execute if the TimedAction has not yet been cancelled
if (!current.isCancelled.get()) {
// because the queue can have wildcards we have to ignore the type T for the state
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
}
}
time = targetTimeInNanos;
}
Expand All @@ -97,7 +107,14 @@ public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ?

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
queue.add(new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state));
return Subscriptions.empty();
final TimedAction<T> timedAction = new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state);
queue.add(timedAction);

return new Subscription() {
@Override
public void unsubscribe() {
timedAction.cancel();
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.concurrency.TestScheduler;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func1;

/**
* This operation is used to filter out bursts of events. This is done by ignoring the events from an observable which are too
* quickly followed up with other values. Values which are not followed up by other values within the specified timeout are published
* as soon as the timeout expires.
*/
public final class OperationThrottleWithTimeout {

/**
* This operation filters out events which are published too quickly in succession. This is done by dropping events which are
* followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet)
* the last received event is published.
*
* @param items
* The {@link Observable} which is publishing events.
* @param timeout
* How long each event has to be the 'last event' before it gets published.
* @param unit
* The unit of time for the specified timeout.
* @return A {@link Func1} which performs the throttle operation.
*/
public static <T> OnSubscribeFunc<T> throttleWithTimeout(Observable<T> items, long timeout, TimeUnit unit) {
return throttleWithTimeout(items, timeout, unit, Schedulers.threadPoolForComputation());
}

/**
* This operation filters out events which are published too quickly in succession. This is done by dropping events which are
* followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet)
* the last received event is published.
*
* @param items
* The {@link Observable} which is publishing events.
* @param timeout
* How long each event has to be the 'last event' before it gets published.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return A {@link Func1} which performs the throttle operation.
*/
public static <T> OnSubscribeFunc<T> throttleWithTimeout(final Observable<T> items, final long timeout, final TimeUnit unit, final Scheduler scheduler) {
return new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> observer) {
return new Throttle<T>(items, timeout, unit, scheduler).onSubscribe(observer);
}
};
}

private static class Throttle<T> implements OnSubscribeFunc<T> {

private final Observable<T> items;
private final long timeout;
private final TimeUnit unit;
private final Scheduler scheduler;

public Throttle(Observable<T> items, long timeout, TimeUnit unit, Scheduler scheduler) {
this.items = items;
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
}

@Override
public Subscription onSubscribe(Observer<? super T> observer) {
return items.subscribe(new ThrottledObserver<T>(observer, timeout, unit, scheduler));
}
}

private static class ThrottledObserver<T> implements Observer<T> {

private final Observer<? super T> observer;
private final long timeout;
private final TimeUnit unit;
private final Scheduler scheduler;

private final AtomicReference<Subscription> lastScheduledNotification = new AtomicReference<Subscription>();

public ThrottledObserver(Observer<? super T> observer, long timeout, TimeUnit unit, Scheduler scheduler) {
this.observer = observer;
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
}

@Override
public void onCompleted() {
observer.onCompleted();
}

@Override
public void onError(Throwable e) {
lastScheduledNotification.get().unsubscribe();
observer.onError(e);
}

@Override
public void onNext(final T v) {
Subscription previousSubscription = lastScheduledNotification.getAndSet(scheduler.schedule(new Action0() {

@Override
public void call() {
observer.onNext(v);
}

}, timeout, unit));
// cancel previous if not already executed
if (previousSubscription != null) {
previousSubscription.unsubscribe();
}
}
}

public static class UnitTest {

private TestScheduler scheduler;
private Observer<String> observer;

@Before
@SuppressWarnings("unchecked")
public void before() {
scheduler = new TestScheduler();
observer = mock(Observer.class);
}

@Test
public void testThrottlingWithCompleted() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
publishNext(observer, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires.
publishNext(observer, 400, "two"); // Should be published since "three" will arrive after the timeout expires.
publishNext(observer, 900, "three"); // Should be skipped since onCompleted will arrive before the timeout expires.
publishCompleted(observer, 1000); // Should be published as soon as the timeout expires.

return Subscriptions.empty();
}
});

Observable<String> sampled = Observable.create(OperationThrottleWithTimeout.throttleWithTimeout(source, 400, TimeUnit.MILLISECONDS, scheduler));
sampled.subscribe(observer);

scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
InOrder inOrder = inOrder(observer);
// must go to 800 since it must be 400 after when two is sent, which is at 400
scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onNext("two");
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testThrottlingWithError() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
Exception error = new TestException();
publishNext(observer, 100, "one"); // Should be published since "two" will arrive after the timeout expires.
publishNext(observer, 600, "two"); // Should be skipped since onError will arrive before the timeout expires.
publishError(observer, 700, error); // Should be published as soon as the timeout expires.

return Subscriptions.empty();
}
});

Observable<String> sampled = Observable.create(OperationThrottleWithTimeout.throttleWithTimeout(source, 400, TimeUnit.MILLISECONDS, scheduler));
sampled.subscribe(observer);

scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS);
InOrder inOrder = inOrder(observer);
// 100 + 400 means it triggers at 500
scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS);
inOrder.verify(observer).onNext("one");
scheduler.advanceTimeTo(701, TimeUnit.MILLISECONDS);
inOrder.verify(observer).onError(any(TestException.class));
inOrder.verifyNoMoreInteractions();
}

private <T> void publishCompleted(final Observer<T> observer, long delay) {
scheduler.schedule(new Action0() {
@Override
public void call() {
observer.onCompleted();
}
}, delay, TimeUnit.MILLISECONDS);
}

private <T> void publishError(final Observer<T> observer, long delay, final Exception error) {
scheduler.schedule(new Action0() {
@Override
public void call() {
observer.onError(error);
}
}, delay, TimeUnit.MILLISECONDS);
}

private <T> void publishNext(final Observer<T> observer, final long delay, final T value) {
scheduler.schedule(new Action0() {
@Override
public void call() {
observer.onNext(value);
}
}, delay, TimeUnit.MILLISECONDS);
}

@SuppressWarnings("serial")
private class TestException extends Exception {
}

}

}
Loading