Skip to content

Commit 2ea065c

Browse files
committed
Created and wired an implementation for the throttle operation on Observables.
1 parent bafd440 commit 2ea065c

File tree

3 files changed

+351
-8
lines changed

3 files changed

+351
-8
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,16 @@
1515
*/
1616
package rx;
1717

18-
import static org.junit.Assert.*;
19-
import static org.mockito.Matchers.*;
20-
import static org.mockito.Mockito.*;
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertNotNull;
20+
import static org.junit.Assert.assertNull;
21+
import static org.junit.Assert.fail;
22+
import static org.mockito.Matchers.any;
23+
import static org.mockito.Matchers.anyInt;
24+
import static org.mockito.Matchers.anyString;
25+
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.times;
27+
import static org.mockito.Mockito.verify;
2128

2229
import java.util.ArrayList;
2330
import java.util.Arrays;
@@ -65,6 +72,7 @@
6572
import rx.operators.OperationTakeLast;
6673
import rx.operators.OperationTakeUntil;
6774
import rx.operators.OperationTakeWhile;
75+
import rx.operators.OperationThrottle;
6876
import rx.operators.OperationTimestamp;
6977
import rx.operators.OperationToFuture;
7078
import rx.operators.OperationToIterator;
@@ -2095,6 +2103,29 @@ public Boolean call(T t, Integer integer)
20952103
}));
20962104
}
20972105

2106+
/**
2107+
* Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired.
2108+
*
2109+
* @param timeout The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
2110+
* @param unit The {@link TimeUnit} for the timeout.
2111+
* @return An {@link Observable} which filters out values which are too quickly followed up with never values.
2112+
*/
2113+
public Observable<T> throttle(long timeout, TimeUnit unit) {
2114+
return create(OperationThrottle.throttle(this, timeout, unit));
2115+
}
2116+
2117+
/**
2118+
* Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired.
2119+
*
2120+
* @param timeout The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
2121+
* @param unit The {@link TimeUnit} for the timeout.
2122+
* @param scheduler The {@link Scheduler} to use when timing incoming values.
2123+
* @return An {@link Observable} which filters out values which are too quickly followed up with never values.
2124+
*/
2125+
public Observable<T> throttle(long timeout, TimeUnit unit, Scheduler scheduler) {
2126+
return create(OperationThrottle.throttle(this, timeout, unit, scheduler));
2127+
}
2128+
20982129
/**
20992130
* Adds a timestamp to each item emitted by this observable.
21002131
* @return An observable sequence of timestamped items.

rxjava-core/src/main/java/rx/concurrency/TestScheduler.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,22 @@
1919
import java.util.PriorityQueue;
2020
import java.util.Queue;
2121
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223

2324
import rx.Scheduler;
2425
import rx.Subscription;
25-
import rx.subscriptions.Subscriptions;
2626
import rx.util.functions.Func2;
2727

2828
public class TestScheduler extends Scheduler {
2929
private final Queue<TimedAction<?>> queue = new PriorityQueue<TimedAction<?>>(11, new CompareActionsByTime());
3030

3131
private static class TimedAction<T> {
32+
3233
private final long time;
3334
private final Func2<Scheduler, T, Subscription> action;
3435
private final T state;
3536
private final TestScheduler scheduler;
37+
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
3638

3739
private TimedAction(TestScheduler scheduler, long time, Func2<Scheduler, T, Subscription> action, T state) {
3840
this.time = time;
@@ -41,6 +43,10 @@ private TimedAction(TestScheduler scheduler, long time, Func2<Scheduler, T, Subs
4143
this.scheduler = scheduler;
4244
}
4345

46+
public void cancel() {
47+
isCancelled.set(true);
48+
}
49+
4450
@Override
4551
public String toString() {
4652
return String.format("TimedAction(time = %d, action = %s)", time, action.toString());
@@ -85,8 +91,12 @@ private void triggerActions(long targetTimeInNanos) {
8591
}
8692
time = current.time;
8793
queue.remove();
88-
// because the queue can have wildcards we have to ignore the type T for the state
89-
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
94+
95+
// Only execute if the TimedAction has not yet been cancelled
96+
if (!current.isCancelled.get()) {
97+
// because the queue can have wildcards we have to ignore the type T for the state
98+
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
99+
}
90100
}
91101
}
92102

@@ -97,7 +107,14 @@ public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> acti
97107

98108
@Override
99109
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
100-
queue.add(new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state));
101-
return Subscriptions.empty();
110+
final TimedAction<T> timedAction = new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state);
111+
queue.add(timedAction);
112+
113+
return new Subscription() {
114+
@Override
115+
public void unsubscribe() {
116+
timedAction.cancel();
117+
}
118+
};
102119
}
103120
}

0 commit comments

Comments
 (0)