Skip to content

Commit 7147a8d

Browse files
committed
Merge pull request #2897 from alexwen/window_overlap
Fix for overlapping windows.
2 parents d7d94ba + 01cddd5 commit 7147a8d

File tree

2 files changed

+39
-1
lines changed

2 files changed

+39
-1
lines changed

src/main/java/rx/internal/operators/OperatorWindowWithObservable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ public void onNext(T t) {
119119
do {
120120
drain(localQueue);
121121
if (once) {
122-
once = true;
122+
once = false;
123123
emitValue(t);
124124
}
125125

src/test/java/rx/internal/operators/OperatorWindowWithObservableTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,16 @@
2222
import static org.mockito.Mockito.verify;
2323

2424
import java.util.ArrayList;
25+
import java.util.Arrays;
2526
import java.util.List;
2627

2728
import org.junit.Test;
2829

2930
import rx.Observable;
3031
import rx.Observer;
3132
import rx.exceptions.TestException;
33+
import rx.functions.Func0;
34+
import rx.observers.TestSubscriber;
3235
import rx.subjects.PublishSubject;
3336

3437
public class OperatorWindowWithObservableTest {
@@ -252,4 +255,39 @@ public void onCompleted() {
252255
verify(o, never()).onCompleted();
253256
verify(o).onError(any(TestException.class));
254257
}
258+
259+
@Test
260+
public void testWindowNoDuplication() {
261+
final PublishSubject<Integer> source = PublishSubject.create();
262+
final TestSubscriber<Integer> tsw = new TestSubscriber<Integer>() {
263+
boolean once;
264+
@Override
265+
public void onNext(Integer t) {
266+
if (!once) {
267+
once = true;
268+
source.onNext(2);
269+
}
270+
super.onNext(t);
271+
}
272+
};
273+
TestSubscriber<Observable<Integer>> ts = new TestSubscriber<Observable<Integer>>() {
274+
@Override
275+
public void onNext(Observable<Integer> t) {
276+
t.subscribe(tsw);
277+
super.onNext(t);
278+
}
279+
};
280+
source.window(new Func0<Observable<Object>>() {
281+
@Override
282+
public Observable<Object> call() {
283+
return Observable.never();
284+
}
285+
}).subscribe(ts);
286+
287+
source.onNext(1);
288+
source.onCompleted();
289+
290+
assertEquals(1, ts.getOnNextEvents().size());
291+
assertEquals(Arrays.asList(1, 2), tsw.getOnNextEvents());
292+
}
255293
}

0 commit comments

Comments
 (0)