Skip to content

Commit 2e18a2b

Browse files
Fix Observable.window static/instance bug
- reported at ReactiveX#349 (comment)
1 parent 9cef1bd commit 2e18a2b

File tree

2 files changed

+59
-40
lines changed

2 files changed

+59
-40
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 20 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1509,8 +1509,6 @@ public Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit unit,
15091509
* Observable produced by the specified {@link Func0} produces a {@link rx.util.Closing} object. The {@link Func0} will then be used to create a new Observable to listen for the end of the next
15101510
* window.
15111511
*
1512-
* @param source
1513-
* The source {@link Observable} which produces values.
15141512
* @param closingSelector
15151513
* The {@link Func0} which is used to produce an {@link Observable} for every window created.
15161514
* When this {@link Observable} produces a {@link rx.util.Closing} object, the associated window
@@ -1519,8 +1517,8 @@ public Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit unit,
15191517
* An {@link Observable} which produces connected non-overlapping windows, which are emitted
15201518
* when the current {@link Observable} created with the {@link Func0} argument produces a {@link rx.util.Closing} object.
15211519
*/
1522-
public Observable<Observable<T>> window(Observable<? extends T> source, Func0<? extends Observable<? extends Closing>> closingSelector) {
1523-
return create(OperationWindow.window(source, closingSelector));
1520+
public Observable<Observable<T>> window(Func0<? extends Observable<? extends Closing>> closingSelector) {
1521+
return create(OperationWindow.window(this, closingSelector));
15241522
}
15251523

15261524
/**
@@ -1529,8 +1527,6 @@ public Observable<Observable<T>> window(Observable<? extends T> source, Func0<?
15291527
* Additionally the {@link Func0} argument is used to create an Observable which produces {@link rx.util.Closing} objects. When this Observable produces such an object, the associated window is
15301528
* emitted.
15311529
*
1532-
* @param source
1533-
* The source {@link Observable} which produces values.
15341530
* @param windowOpenings
15351531
* The {@link Observable} which when it produces a {@link rx.util.Opening} object, will cause
15361532
* another window to be created.
@@ -1541,34 +1537,30 @@ public Observable<Observable<T>> window(Observable<? extends T> source, Func0<?
15411537
* @return
15421538
* An {@link Observable} which produces windows which are created and emitted when the specified {@link Observable}s publish certain objects.
15431539
*/
1544-
public Observable<Observable<T>> window(Observable<? extends T> source, Observable<? extends Opening> windowOpenings, Func1<Opening, ? extends Observable<? extends Closing>> closingSelector) {
1545-
return create(OperationWindow.window(source, windowOpenings, closingSelector));
1540+
public Observable<Observable<T>> window(Observable<? extends Opening> windowOpenings, Func1<Opening, ? extends Observable<? extends Closing>> closingSelector) {
1541+
return create(OperationWindow.window(this, windowOpenings, closingSelector));
15461542
}
15471543

15481544
/**
15491545
* Creates an Observable which produces windows of collected values. This Observable produces connected
15501546
* non-overlapping windows, each containing "count" elements. When the source Observable completes or
15511547
* encounters an error, the current window is emitted, and the event is propagated.
15521548
*
1553-
* @param source
1554-
* The source {@link Observable} which produces values.
15551549
* @param count
15561550
* The maximum size of each window before it should be emitted.
15571551
* @return
15581552
* An {@link Observable} which produces connected non-overlapping windows containing at most
15591553
* "count" produced values.
15601554
*/
1561-
public Observable<Observable<T>> window(Observable<? extends T> source, int count) {
1562-
return create(OperationWindow.window(source, count));
1555+
public Observable<Observable<T>> window(int count) {
1556+
return create(OperationWindow.window(this, count));
15631557
}
15641558

15651559
/**
15661560
* Creates an Observable which produces windows of collected values. This Observable produces windows every
15671561
* "skip" values, each containing "count" elements. When the source Observable completes or encounters an error,
15681562
* the current window is emitted and the event is propagated.
15691563
*
1570-
* @param source
1571-
* The source {@link Observable} which produces values.
15721564
* @param count
15731565
* The maximum size of each window before it should be emitted.
15741566
* @param skip
@@ -1578,17 +1570,15 @@ public Observable<Observable<T>> window(Observable<? extends T> source, int coun
15781570
* An {@link Observable} which produces windows every "skipped" values containing at most
15791571
* "count" produced values.
15801572
*/
1581-
public Observable<Observable<T>> window(Observable<? extends T> source, int count, int skip) {
1582-
return create(OperationWindow.window(source, count, skip));
1573+
public Observable<Observable<T>> window(int count, int skip) {
1574+
return create(OperationWindow.window(this, count, skip));
15831575
}
15841576

15851577
/**
15861578
* Creates an Observable which produces windows of collected values. This Observable produces connected
15871579
* non-overlapping windows, each of a fixed duration specified by the "timespan" argument. When the source
15881580
* Observable completes or encounters an error, the current window is emitted and the event is propagated.
15891581
*
1590-
* @param source
1591-
* The source {@link Observable} which produces values.
15921582
* @param timespan
15931583
* The period of time each window is collecting values before it should be emitted, and
15941584
* replaced with a new window.
@@ -1597,17 +1587,15 @@ public Observable<Observable<T>> window(Observable<? extends T> source, int coun
15971587
* @return
15981588
* An {@link Observable} which produces connected non-overlapping windows with a fixed duration.
15991589
*/
1600-
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit) {
1601-
return create(OperationWindow.window(source, timespan, unit));
1590+
public Observable<Observable<T>> window(long timespan, TimeUnit unit) {
1591+
return create(OperationWindow.window(this, timespan, unit));
16021592
}
16031593

16041594
/**
16051595
* Creates an Observable which produces windows of collected values. This Observable produces connected
16061596
* non-overlapping windows, each of a fixed duration specified by the "timespan" argument. When the source
16071597
* Observable completes or encounters an error, the current window is emitted and the event is propagated.
16081598
*
1609-
* @param source
1610-
* The source {@link Observable} which produces values.
16111599
* @param timespan
16121600
* The period of time each window is collecting values before it should be emitted, and
16131601
* replaced with a new window.
@@ -1618,8 +1606,8 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
16181606
* @return
16191607
* An {@link Observable} which produces connected non-overlapping windows with a fixed duration.
16201608
*/
1621-
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit, Scheduler scheduler) {
1622-
return create(OperationWindow.window(source, timespan, unit, scheduler));
1609+
public Observable<Observable<T>> window(long timespan, TimeUnit unit, Scheduler scheduler) {
1610+
return create(OperationWindow.window(this, timespan, unit, scheduler));
16231611
}
16241612

16251613
/**
@@ -1628,8 +1616,6 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
16281616
* specified by the "count" argument (which ever is reached first). When the source Observable completes
16291617
* or encounters an error, the current window is emitted and the event is propagated.
16301618
*
1631-
* @param source
1632-
* The source {@link Observable} which produces values.
16331619
* @param timespan
16341620
* The period of time each window is collecting values before it should be emitted, and
16351621
* replaced with a new window.
@@ -1641,8 +1627,8 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
16411627
* An {@link Observable} which produces connected non-overlapping windows which are emitted after
16421628
* a fixed duration or when the window has reached maximum capacity (which ever occurs first).
16431629
*/
1644-
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit, int count) {
1645-
return create(OperationWindow.window(source, timespan, unit, count));
1630+
public Observable<Observable<T>> window(long timespan, TimeUnit unit, int count) {
1631+
return create(OperationWindow.window(this, timespan, unit, count));
16461632
}
16471633

16481634
/**
@@ -1651,8 +1637,6 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
16511637
* specified by the "count" argument (which ever is reached first). When the source Observable completes
16521638
* or encounters an error, the current window is emitted and the event is propagated.
16531639
*
1654-
* @param source
1655-
* The source {@link Observable} which produces values.
16561640
* @param timespan
16571641
* The period of time each window is collecting values before it should be emitted, and
16581642
* replaced with a new window.
@@ -1666,8 +1650,8 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
16661650
* An {@link Observable} which produces connected non-overlapping windows which are emitted after
16671651
* a fixed duration or when the window has reached maximum capacity (which ever occurs first).
16681652
*/
1669-
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit, int count, Scheduler scheduler) {
1670-
return create(OperationWindow.window(source, timespan, unit, count, scheduler));
1653+
public Observable<Observable<T>> window(long timespan, TimeUnit unit, int count, Scheduler scheduler) {
1654+
return create(OperationWindow.window(this, timespan, unit, count, scheduler));
16711655
}
16721656

16731657
/**
@@ -1676,8 +1660,6 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
16761660
* specified by the "timespan" argument. When the source Observable completes or encounters an error, the
16771661
* current window is emitted and the event is propagated.
16781662
*
1679-
* @param source
1680-
* The source {@link Observable} which produces values.
16811663
* @param timespan
16821664
* The period of time each window is collecting values before it should be emitted.
16831665
* @param timeshift
@@ -1688,8 +1670,8 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
16881670
* An {@link Observable} which produces new windows periodically, and these are emitted after
16891671
* a fixed timespan has elapsed.
16901672
*/
1691-
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, long timeshift, TimeUnit unit) {
1692-
return create(OperationWindow.window(source, timespan, timeshift, unit));
1673+
public Observable<Observable<T>> window(long timespan, long timeshift, TimeUnit unit) {
1674+
return create(OperationWindow.window(this, timespan, timeshift, unit));
16931675
}
16941676

16951677
/**
@@ -1698,8 +1680,6 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
16981680
* specified by the "timespan" argument. When the source Observable completes or encounters an error, the
16991681
* current window is emitted and the event is propagated.
17001682
*
1701-
* @param source
1702-
* The source {@link Observable} which produces values.
17031683
* @param timespan
17041684
* The period of time each window is collecting values before it should be emitted.
17051685
* @param timeshift
@@ -1712,8 +1692,8 @@ public Observable<Observable<T>> window(Observable<? extends T> source, long tim
17121692
* An {@link Observable} which produces new windows periodically, and these are emitted after
17131693
* a fixed timespan has elapsed.
17141694
*/
1715-
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) {
1716-
return create(OperationWindow.window(source, timespan, timeshift, unit, scheduler));
1695+
public Observable<Observable<T>> window(long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) {
1696+
return create(OperationWindow.window(this, timespan, timeshift, unit, scheduler));
17171697
}
17181698

17191699
/**
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package rx;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
8+
import org.junit.Test;
9+
10+
import rx.util.functions.Action1;
11+
import rx.util.functions.Func1;
12+
13+
public class ObservableWindowTests {
14+
15+
@Test
16+
public void testWindow() {
17+
final ArrayList<List<Integer>> lists = new ArrayList<List<Integer>>();
18+
Observable.from(1, 2, 3, 4, 5, 6)
19+
.window(3).map(new Func1<Observable<Integer>, List<Integer>>() {
20+
21+
@Override
22+
public List<Integer> call(Observable<Integer> o) {
23+
return o.toList().toBlockingObservable().single();
24+
}
25+
26+
}).toBlockingObservable().forEach(new Action1<List<Integer>>() {
27+
28+
@Override
29+
public void call(List<Integer> t) {
30+
lists.add(t);
31+
}
32+
});
33+
34+
assertArrayEquals(lists.get(0).toArray(new Integer[3]), new Integer[] { 1, 2, 3 });
35+
assertArrayEquals(lists.get(1).toArray(new Integer[3]), new Integer[] { 4, 5, 6 });
36+
assertEquals(2, lists.size());
37+
38+
}
39+
}

0 commit comments

Comments
 (0)