Skip to content

Operator: throttleFirst #367

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import rx.operators.OperationTakeLast;
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationThrottleFirst;
import rx.operators.OperationTimestamp;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
Expand Down Expand Up @@ -1809,6 +1810,34 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
return create(OperationInterval.interval(interval, unit, scheduler));
}

/**
* Throttles to first value in each window.
*
* @param windowDuration
* Duration of windows within with the first value will be chosen.
* @param unit
* The unit of time for the specified timeout.
* @return Observable which performs the throttle operation.
*/
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit) {
return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit));
}

/**
* Throttles to first value in each window.
*
* @param windowDuration
* Duration of windows within with the first value will be chosen.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return Observable which performs the throttle operation.
*/
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit, Scheduler scheduler) {
return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit, scheduler));
}

/**
* Wraps each item emitted by a source Observable in a {@link Timestamped} object.
* <p>
Expand Down
27 changes: 22 additions & 5 deletions rxjava-core/src/main/java/rx/concurrency/TestScheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func2;

public class TestScheduler extends Scheduler {
private final Queue<TimedAction<?>> queue = new PriorityQueue<TimedAction<?>>(11, new CompareActionsByTime());

private static class TimedAction<T> {

private final long time;
private final Func2<? super Scheduler, ? super T, ? extends Subscription> action;
private final T state;
private final TestScheduler scheduler;
private final AtomicBoolean isCancelled = new AtomicBoolean(false);

private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler, ? super T, ? extends Subscription> action, T state) {
this.time = time;
Expand All @@ -41,6 +43,10 @@ private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler,
this.scheduler = scheduler;
}

public void cancel() {
isCancelled.set(true);
}

@Override
public String toString() {
return String.format("TimedAction(time = %d, action = %s)", time, action.toString());
Expand Down Expand Up @@ -84,8 +90,12 @@ private void triggerActions(long targetTimeInNanos) {
}
time = current.time;
queue.remove();
// because the queue can have wildcards we have to ignore the type T for the state
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);

// Only execute if the TimedAction has not yet been cancelled
if (!current.isCancelled.get()) {
// because the queue can have wildcards we have to ignore the type T for the state
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
}
}
time = targetTimeInNanos;
}
Expand All @@ -97,7 +107,14 @@ public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ?

@Override
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
queue.add(new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state));
return Subscriptions.empty();
final TimedAction<T> timedAction = new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state);
queue.add(timedAction);

return new Subscription() {
@Override
public void unsubscribe() {
timedAction.cancel();
}
};
}
}
185 changes: 185 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.TimeUnit;

import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;

import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.concurrency.TestScheduler;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func1;

/**
* Throttle by windowing a stream and returning the first value in each window.
*/
public final class OperationThrottleFirst {

/**
* Throttles to first value in each window.
*
* @param items
* The {@link Observable} which is publishing events.
* @param windowDuration
* Duration of windows within with the first value will be chosen.
* @param unit
* The unit of time for the specified timeout.
* @return A {@link Func1} which performs the throttle operation.
*/
public static <T> OnSubscribeFunc<T> throttleFirst(Observable<T> items, long windowDuration, TimeUnit unit) {
return throttleFirst(items, windowDuration, unit, Schedulers.threadPoolForComputation());
}

/**
* Throttles to first value in each window.
*
* @param items
* The {@link Observable} which is publishing events.
* @param windowDuration
* Duration of windows within with the first value will be chosen.
* @param unit
* The unit of time for the specified timeout.
* @param scheduler
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
* @return A {@link Func1} which performs the throttle operation.
*/
public static <T> OnSubscribeFunc<T> throttleFirst(final Observable<T> items, final long windowDuration, final TimeUnit unit, final Scheduler scheduler) {
return new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> observer) {
return items.window(windowDuration, unit, scheduler).flatMap(new Func1<Observable<T>, Observable<T>>() {

@Override
public Observable<T> call(Observable<T> o) {
return o.takeFirst();
}
}).subscribe(observer);
}
};
}

public static class UnitTest {

private TestScheduler scheduler;
private Observer<String> observer;

@Before
@SuppressWarnings("unchecked")
public void before() {
scheduler = new TestScheduler();
observer = mock(Observer.class);
}

@Test
public void testThrottlingWithCompleted() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
publishNext(observer, 100, "one"); // publish as it's first
publishNext(observer, 300, "two"); // skip as it's last within the first 400
publishNext(observer, 900, "three"); // publish
publishNext(observer, 905, "four"); // skip
publishCompleted(observer, 1000); // Should be published as soon as the timeout expires.

return Subscriptions.empty();
}
});

Observable<String> sampled = Observable.create(OperationThrottleFirst.throttleFirst(source, 400, TimeUnit.MILLISECONDS, scheduler));
sampled.subscribe(observer);

InOrder inOrder = inOrder(observer);

scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onNext("one");
inOrder.verify(observer, times(0)).onNext("two");
inOrder.verify(observer, times(1)).onNext("three");
inOrder.verify(observer, times(0)).onNext("four");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testThrottlingWithError() {
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(Observer<? super String> observer) {
Exception error = new TestException();
publishNext(observer, 100, "one"); // Should be published since it is first
publishNext(observer, 200, "two"); // Should be skipped since onError will arrive before the timeout expires
publishError(observer, 300, error); // Should be published as soon as the timeout expires.

return Subscriptions.empty();
}
});

Observable<String> sampled = Observable.create(OperationThrottleFirst.throttleFirst(source, 400, TimeUnit.MILLISECONDS, scheduler));
sampled.subscribe(observer);

InOrder inOrder = inOrder(observer);

scheduler.advanceTimeTo(400, TimeUnit.MILLISECONDS);
inOrder.verify(observer).onNext("one");
inOrder.verify(observer).onError(any(TestException.class));
inOrder.verifyNoMoreInteractions();
}

private <T> void publishCompleted(final Observer<T> observer, long delay) {
scheduler.schedule(new Action0() {
@Override
public void call() {
observer.onCompleted();
}
}, delay, TimeUnit.MILLISECONDS);
}

private <T> void publishError(final Observer<T> observer, long delay, final Exception error) {
scheduler.schedule(new Action0() {
@Override
public void call() {
observer.onError(error);
}
}, delay, TimeUnit.MILLISECONDS);
}

private <T> void publishNext(final Observer<T> observer, long delay, final T value) {
scheduler.schedule(new Action0() {
@Override
public void call() {
observer.onNext(value);
}
}, delay, TimeUnit.MILLISECONDS);
}

@SuppressWarnings("serial")
private class TestException extends Exception {
}

}

}
46 changes: 46 additions & 0 deletions rxjava-core/src/test/java/rx/ThrottleFirstTests.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package rx;

import static org.mockito.Mockito.*;

import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.mockito.InOrder;

import rx.concurrency.TestScheduler;
import rx.subjects.PublishSubject;

public class ThrottleFirstTests {

@Test
public void testThrottle() {
@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
TestScheduler s = new TestScheduler();
PublishSubject<Integer> o = PublishSubject.create();
o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(observer);

// send events with simulated time increments
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
o.onNext(1); // deliver
o.onNext(2); // skip
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
o.onNext(3); // deliver
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
o.onNext(4); // skip
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
o.onNext(5); // skip
o.onNext(6); // skip
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
o.onNext(7); // deliver
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
o.onCompleted();

InOrder inOrder = inOrder(observer);
inOrder.verify(observer).onNext(1);
inOrder.verify(observer).onNext(3);
inOrder.verify(observer).onNext(7);
inOrder.verify(observer).onCompleted();
inOrder.verifyNoMoreInteractions();
}
}