Skip to content

Commit 3aaed26

Browse files
Anton Rutkevichakarnokd
Anton Rutkevich
authored andcommitted
1.x: Fix multiple values produced by throttleFirst with TestScheduler (#4397)
When throttleFirst was operating on a TestScheduler, it delivered all items passed to it untill TestScheduler's time would change to a non-zero value.
1 parent 1a7de42 commit 3aaed26

File tree

2 files changed

+44
-2
lines changed

2 files changed

+44
-2
lines changed

src/main/java/rx/internal/operators/OperatorThrottleFirst.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public OperatorThrottleFirst(long windowDuration, TimeUnit unit, Scheduler sched
3838
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
3939
return new Subscriber<T>(subscriber) {
4040

41-
private long lastOnNext;
41+
private long lastOnNext = -1;
4242

4343
@Override
4444
public void onStart() {
@@ -48,7 +48,7 @@ public void onStart() {
4848
@Override
4949
public void onNext(T v) {
5050
long now = scheduler.now();
51-
if (lastOnNext == 0 || now - lastOnNext >= timeInMilliseconds) {
51+
if (lastOnNext == -1 || now - lastOnNext >= timeInMilliseconds) {
5252
lastOnNext = now;
5353
subscriber.onNext(v);
5454
}

src/test/java/rx/internal/operators/OperatorThrottleFirstTest.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import static org.mockito.Mockito.inOrder;
2020
import static org.mockito.Mockito.mock;
2121
import static org.mockito.Mockito.times;
22+
import static org.mockito.Mockito.verify;
23+
import static org.mockito.Mockito.verifyNoMoreInteractions;
2224

2325
import java.util.concurrent.TimeUnit;
2426

@@ -172,4 +174,44 @@ public void timed() {
172174
ts.assertNoErrors();
173175
ts.assertCompleted();
174176
}
177+
178+
@Test
179+
public void throttleWithoutAdvancingTimeOfTestScheduler() {
180+
@SuppressWarnings("unchecked")
181+
Observer<Integer> observer = mock(Observer.class);
182+
TestScheduler s = new TestScheduler();
183+
PublishSubject<Integer> o = PublishSubject.create();
184+
o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(observer);
185+
186+
// send events without calling advanceTimeBy/To
187+
o.onNext(1); // deliver
188+
o.onNext(2); // skip
189+
o.onNext(3); // skip
190+
o.onCompleted();
191+
192+
verify(observer).onNext(1);
193+
verify(observer).onCompleted();
194+
verifyNoMoreInteractions(observer);
195+
}
196+
197+
@Test
198+
public void throttleWithTestSchedulerTimeOfZero() {
199+
@SuppressWarnings("unchecked")
200+
Observer<Integer> observer = mock(Observer.class);
201+
TestScheduler s = new TestScheduler();
202+
PublishSubject<Integer> o = PublishSubject.create();
203+
o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(observer);
204+
205+
s.advanceTimeBy(0, TimeUnit.MILLISECONDS);
206+
207+
// send events while TestScheduler's time is 0
208+
o.onNext(1); // deliver
209+
o.onNext(2); // skip
210+
o.onNext(3); // skip
211+
o.onCompleted();
212+
213+
verify(observer).onNext(1);
214+
verify(observer).onCompleted();
215+
verifyNoMoreInteractions(observer);
216+
}
175217
}

0 commit comments

Comments
 (0)