Skip to content

Commit 59edd31

Browse files
committed
Merge pull request #2476 from akarnokd/WindowFixes0121
Fixed off-by-one error and value-drop in the window operator.
2 parents d16def7 + 4634535 commit 59edd31

File tree

2 files changed

+44
-29
lines changed

2 files changed

+44
-29
lines changed

src/main/java/rx/internal/operators/OperatorWindowWithTime.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333

3434
/**
3535
* Creates windows of values into the source sequence with timed window creation, length and size bounds.
36-
* If timespan == timeshift, windows are non-overlapping but may not be continuous if size number of items were already
37-
* emitted. If more items arrive after the window has reached its size bound, those items are dropped.
36+
* If timespan == timeshift, windows are non-overlapping but always continuous, i.e., when the size bound is reached, a new
37+
* window is opened.
3838
*
3939
* <p>Note that this conforms the Rx.NET behavior, but does not match former RxJava
4040
* behavior, which operated as a regular buffer and mapped its lists to Observables.</p>
@@ -205,17 +205,17 @@ void replaceSubject() {
205205
}
206206
void emitValue(T t) {
207207
State<T> s = state;
208-
209-
if (s.consumer != null) {
210-
s.consumer.onNext(t);
211-
if (s.count == size) {
212-
s.consumer.onCompleted();
213-
s = s.clear();
214-
} else {
215-
s = s.next();
216-
}
208+
if (s.consumer == null) {
209+
replaceSubject();
210+
s = state;
211+
}
212+
s.consumer.onNext(t);
213+
if (s.count == size - 1) {
214+
s.consumer.onCompleted();
215+
s = s.clear();
216+
} else {
217+
s = s.next();
217218
}
218-
219219
state = s;
220220
}
221221

src/test/java/rx/internal/operators/OperatorWindowWithTimeTest.java

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,17 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.assertEquals;
19-
import static org.junit.Assert.fail;
18+
import static org.junit.Assert.*;
2019

21-
import java.util.ArrayList;
22-
import java.util.List;
20+
import java.util.*;
2321
import java.util.concurrent.TimeUnit;
2422

25-
import org.junit.Before;
26-
import org.junit.Test;
23+
import org.junit.*;
2724

25+
import rx.*;
2826
import rx.Observable;
2927
import rx.Observer;
30-
import rx.Scheduler;
31-
import rx.Subscriber;
32-
import rx.functions.Action0;
33-
import rx.functions.Action1;
28+
import rx.functions.*;
3429
import rx.schedulers.TestScheduler;
3530

3631
public class OperatorWindowWithTimeTest {
@@ -132,14 +127,14 @@ public void call() {
132127
}, delay, TimeUnit.MILLISECONDS);
133128
}
134129

135-
private Action1<Observable<String>> observeWindow(final List<String> list, final List<List<String>> lists) {
136-
return new Action1<Observable<String>>() {
130+
private <T> Action1<Observable<T>> observeWindow(final List<T> list, final List<List<T>> lists) {
131+
return new Action1<Observable<T>>() {
137132
@Override
138-
public void call(Observable<String> stringObservable) {
139-
stringObservable.subscribe(new Observer<String>() {
133+
public void call(Observable<T> stringObservable) {
134+
stringObservable.subscribe(new Observer<T>() {
140135
@Override
141136
public void onCompleted() {
142-
lists.add(new ArrayList<String>(list));
137+
lists.add(new ArrayList<T>(list));
143138
list.clear();
144139
}
145140

@@ -149,11 +144,31 @@ public void onError(Throwable e) {
149144
}
150145

151146
@Override
152-
public void onNext(String args) {
147+
public void onNext(T args) {
153148
list.add(args);
154149
}
155150
});
156151
}
157152
};
158153
}
159-
}
154+
@Test
155+
public void testExactWindowSize() {
156+
Observable<Observable<Integer>> source = Observable.range(1, 10).window(1, TimeUnit.MINUTES, 3, scheduler);
157+
158+
final List<Integer> list = new ArrayList<Integer>();
159+
final List<List<Integer>> lists = new ArrayList<List<Integer>>();
160+
161+
source.subscribe(observeWindow(list, lists));
162+
163+
assertEquals(4, lists.size());
164+
assertEquals(3, lists.get(0).size());
165+
assertEquals(Arrays.asList(1, 2, 3), lists.get(0));
166+
assertEquals(3, lists.get(1).size());
167+
assertEquals(Arrays.asList(4, 5, 6), lists.get(1));
168+
assertEquals(3, lists.get(2).size());
169+
assertEquals(Arrays.asList(7, 8, 9), lists.get(2));
170+
assertEquals(1, lists.get(3).size());
171+
assertEquals(Arrays.asList(10), lists.get(3));
172+
}
173+
174+
}

0 commit comments

Comments
 (0)