43
43
44
44
public final class OperationWindow extends ChunkedOperation {
45
45
46
- public static final Func0 WINDOW_MAKER = new Func0 () {
47
- @ Override
48
- public Object call () {
49
- return new Window ();
50
- }
51
- };
46
+ public static <T > Func0 <Window <T >> windowMaker () {
47
+ return new Func0 <Window <T >>() {
48
+ @ Override
49
+ public Window <T > call () {
50
+ return new Window <T >();
51
+ }
52
+ };
53
+ }
52
54
53
55
/**
54
56
* <p>This method creates a {@link rx.util.functions.Func1} object which represents the window operation. This operation takes
@@ -74,7 +76,7 @@ public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> sour
74
76
return new OnSubscribeFunc <Observable <T >>() {
75
77
@ Override
76
78
public Subscription onSubscribe (final Observer <? super Observable <T >> observer ) {
77
- NonOverlappingChunks <T , Observable <T >> windows = new NonOverlappingChunks <T , Observable <T >>(observer , WINDOW_MAKER );
79
+ NonOverlappingChunks <T , Observable <T >> windows = new NonOverlappingChunks <T , Observable <T >>(observer , windowMaker () );
78
80
ChunkCreator creator = new ObservableBasedSingleChunkCreator <T , Observable <T >>(windows , windowClosingSelector );
79
81
return source .subscribe (new ChunkObserver <T , Observable <T >>(windows , observer , creator ));
80
82
}
@@ -111,7 +113,7 @@ public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> sour
111
113
return new OnSubscribeFunc <Observable <T >>() {
112
114
@ Override
113
115
public Subscription onSubscribe (final Observer <? super Observable <T >> observer ) {
114
- OverlappingChunks <T , Observable <T >> windows = new OverlappingChunks <T , Observable <T >>(observer , WINDOW_MAKER );
116
+ OverlappingChunks <T , Observable <T >> windows = new OverlappingChunks <T , Observable <T >>(observer , windowMaker () );
115
117
ChunkCreator creator = new ObservableBasedMultiChunkCreator <T , Observable <T >>(windows , windowOpenings , windowClosingSelector );
116
118
return source .subscribe (new ChunkObserver <T , Observable <T >>(windows , observer , creator ));
117
119
}
@@ -166,7 +168,7 @@ public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> sour
166
168
return new OnSubscribeFunc <Observable <T >>() {
167
169
@ Override
168
170
public Subscription onSubscribe (final Observer <? super Observable <T >> observer ) {
169
- Chunks <T , Observable <T >> chunks = new SizeBasedChunks <T , Observable <T >>(observer , WINDOW_MAKER , count );
171
+ Chunks <T , Observable <T >> chunks = new SizeBasedChunks <T , Observable <T >>(observer , windowMaker () , count );
170
172
ChunkCreator creator = new SkippingChunkCreator <T , Observable <T >>(chunks , skip );
171
173
return source .subscribe (new ChunkObserver <T , Observable <T >>(chunks , observer , creator ));
172
174
}
@@ -221,7 +223,7 @@ public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> sour
221
223
return new OnSubscribeFunc <Observable <T >>() {
222
224
@ Override
223
225
public Subscription onSubscribe (final Observer <? super Observable <T >> observer ) {
224
- NonOverlappingChunks <T , Observable <T >> windows = new NonOverlappingChunks <T , Observable <T >>(observer , WINDOW_MAKER );
226
+ NonOverlappingChunks <T , Observable <T >> windows = new NonOverlappingChunks <T , Observable <T >>(observer , windowMaker () );
225
227
ChunkCreator creator = new TimeBasedChunkCreator <T , Observable <T >>(windows , timespan , unit , scheduler );
226
228
return source .subscribe (new ChunkObserver <T , Observable <T >>(windows , observer , creator ));
227
229
}
@@ -282,7 +284,7 @@ public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> sour
282
284
return new OnSubscribeFunc <Observable <T >>() {
283
285
@ Override
284
286
public Subscription onSubscribe (final Observer <? super Observable <T >> observer ) {
285
- Chunks <T , Observable <T >> chunks = new TimeAndSizeBasedChunks <T , Observable <T >>(observer , WINDOW_MAKER , count , timespan , unit , scheduler );
287
+ Chunks <T , Observable <T >> chunks = new TimeAndSizeBasedChunks <T , Observable <T >>(observer , windowMaker () , count , timespan , unit , scheduler );
286
288
ChunkCreator creator = new SingleChunkCreator <T , Observable <T >>(chunks );
287
289
return source .subscribe (new ChunkObserver <T , Observable <T >>(chunks , observer , creator ));
288
290
}
@@ -343,7 +345,7 @@ public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> sour
343
345
return new OnSubscribeFunc <Observable <T >>() {
344
346
@ Override
345
347
public Subscription onSubscribe (final Observer <? super Observable <T >> observer ) {
346
- OverlappingChunks <T , Observable <T >> windows = new TimeBasedChunks <T , Observable <T >>(observer , WINDOW_MAKER , timespan , unit , scheduler );
348
+ OverlappingChunks <T , Observable <T >> windows = new TimeBasedChunks <T , Observable <T >>(observer , windowMaker () , timespan , unit , scheduler );
347
349
ChunkCreator creator = new TimeBasedChunkCreator <T , Observable <T >>(windows , timeshift , unit , scheduler );
348
350
return source .subscribe (new ChunkObserver <T , Observable <T >>(windows , observer , creator ));
349
351
}
@@ -373,7 +375,6 @@ public static class UnitTest {
373
375
private TestScheduler scheduler ;
374
376
375
377
@ Before
376
- @ SuppressWarnings ("unchecked" )
377
378
public void before () {
378
379
scheduler = new TestScheduler ();
379
380
}
0 commit comments