Skip to content

Commit 229c4aa

Browse files
author
jmhofer
committed
removed raw Func0s from the chunks
1 parent 1231b94 commit 229c4aa

File tree

2 files changed

+13
-13
lines changed

2 files changed

+13
-13
lines changed

rxjava-core/src/main/java/rx/operators/ChunkedOperation.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ protected static class NonOverlappingChunks<T, C> extends Chunks<T, C> {
105105

106106
private final Object lock = new Object();
107107

108-
public NonOverlappingChunks(Observer<? super C> observer, Func0 chunkMaker) {
108+
public NonOverlappingChunks(Observer<? super C> observer, Func0<? extends Chunk<T, C>> chunkMaker) {
109109
super(observer, chunkMaker);
110110
}
111111

@@ -134,7 +134,7 @@ public void pushValue(T value) {
134134
* <C> The type of object being tracked by the {@link rx.operators.ChunkedOperation.Chunk}
135135
*/
136136
protected static class OverlappingChunks<T, C> extends Chunks<T, C> {
137-
public OverlappingChunks(Observer<? super C> observer, Func0 chunkMaker) {
137+
public OverlappingChunks(Observer<? super C> observer, Func0<? extends Chunk<T, C>> chunkMaker) {
138138
super(observer, chunkMaker);
139139
}
140140
}
@@ -158,7 +158,7 @@ protected static class TimeAndSizeBasedChunks<T, C> extends Chunks<T, C> {
158158
private final TimeUnit unit;
159159
private final int maxSize;
160160

161-
public TimeAndSizeBasedChunks(Observer<? super C> observer, Func0 chunkMaker, int maxSize, long maxTime, TimeUnit unit, Scheduler scheduler) {
161+
public TimeAndSizeBasedChunks(Observer<? super C> observer, Func0<? extends Chunk<T, C>> chunkMaker, int maxSize, long maxTime, TimeUnit unit, Scheduler scheduler) {
162162
super(observer, chunkMaker);
163163
this.maxSize = maxSize;
164164
this.maxTime = maxTime;
@@ -224,7 +224,7 @@ protected static class TimeBasedChunks<T, C> extends OverlappingChunks<T, C> {
224224
private final long time;
225225
private final TimeUnit unit;
226226

227-
public TimeBasedChunks(Observer<? super C> observer, Func0 chunkMaker, long time, TimeUnit unit, Scheduler scheduler) {
227+
public TimeBasedChunks(Observer<? super C> observer, Func0<? extends Chunk<T, C>> chunkMaker, long time, TimeUnit unit, Scheduler scheduler) {
228228
super(observer, chunkMaker);
229229
this.time = time;
230230
this.unit = unit;
@@ -263,7 +263,7 @@ protected static class SizeBasedChunks<T, C> extends Chunks<T, C> {
263263

264264
private final int size;
265265

266-
public SizeBasedChunks(Observer<? super C> observer, Func0 chunkMaker, int size) {
266+
public SizeBasedChunks(Observer<? super C> observer, Func0<? extends Chunk<T, C>> chunkMaker, int size) {
267267
super(observer, chunkMaker);
268268
this.size = size;
269269
}
@@ -295,15 +295,15 @@ protected static class Chunks<T, C> {
295295

296296
private final Queue<Chunk<T, C>> chunks = new ConcurrentLinkedQueue<Chunk<T, C>>();
297297
private final Observer<? super C> observer;
298-
private final Func0<Chunk<T, C>> chunkMaker;
298+
private final Func0<? extends Chunk<T, C>> chunkMaker;
299299

300300
/**
301301
* Constructs a new {@link ChunkedOperation.Chunks} object for the specified {@link rx.Observer}.
302302
*
303303
* @param observer
304304
* The {@link rx.Observer} to which this object will emit its internal {@link rx.operators.ChunkedOperation.Chunk} objects to when requested.
305305
*/
306-
public Chunks(Observer<? super C> observer, Func0 chunkMaker) {
306+
public Chunks(Observer<? super C> observer, Func0<? extends Chunk<T, C>> chunkMaker) {
307307
this.observer = observer;
308308
this.chunkMaker = chunkMaker;
309309
}

rxjava-core/src/main/java/rx/operators/OperationWindow.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> sour
7676
return new OnSubscribeFunc<Observable<T>>() {
7777
@Override
7878
public Subscription onSubscribe(final Observer<? super Observable<T>> observer) {
79-
NonOverlappingChunks<T, Observable<T>> windows = new NonOverlappingChunks<T, Observable<T>>(observer, windowMaker());
79+
NonOverlappingChunks<T, Observable<T>> windows = new NonOverlappingChunks<T, Observable<T>>(observer, OperationWindow.<T>windowMaker());
8080
ChunkCreator creator = new ObservableBasedSingleChunkCreator<T, Observable<T>>(windows, windowClosingSelector);
8181
return source.subscribe(new ChunkObserver<T, Observable<T>>(windows, observer, creator));
8282
}
@@ -113,7 +113,7 @@ public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> sour
113113
return new OnSubscribeFunc<Observable<T>>() {
114114
@Override
115115
public Subscription onSubscribe(final Observer<? super Observable<T>> observer) {
116-
OverlappingChunks<T, Observable<T>> windows = new OverlappingChunks<T, Observable<T>>(observer, windowMaker());
116+
OverlappingChunks<T, Observable<T>> windows = new OverlappingChunks<T, Observable<T>>(observer, OperationWindow.<T>windowMaker());
117117
ChunkCreator creator = new ObservableBasedMultiChunkCreator<T, Observable<T>>(windows, windowOpenings, windowClosingSelector);
118118
return source.subscribe(new ChunkObserver<T, Observable<T>>(windows, observer, creator));
119119
}
@@ -168,7 +168,7 @@ public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> sour
168168
return new OnSubscribeFunc<Observable<T>>() {
169169
@Override
170170
public Subscription onSubscribe(final Observer<? super Observable<T>> observer) {
171-
Chunks<T, Observable<T>> chunks = new SizeBasedChunks<T, Observable<T>>(observer, windowMaker(), count);
171+
Chunks<T, Observable<T>> chunks = new SizeBasedChunks<T, Observable<T>>(observer, OperationWindow.<T>windowMaker(), count);
172172
ChunkCreator creator = new SkippingChunkCreator<T, Observable<T>>(chunks, skip);
173173
return source.subscribe(new ChunkObserver<T, Observable<T>>(chunks, observer, creator));
174174
}
@@ -223,7 +223,7 @@ public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> sour
223223
return new OnSubscribeFunc<Observable<T>>() {
224224
@Override
225225
public Subscription onSubscribe(final Observer<? super Observable<T>> observer) {
226-
NonOverlappingChunks<T, Observable<T>> windows = new NonOverlappingChunks<T, Observable<T>>(observer, windowMaker());
226+
NonOverlappingChunks<T, Observable<T>> windows = new NonOverlappingChunks<T, Observable<T>>(observer, OperationWindow.<T>windowMaker());
227227
ChunkCreator creator = new TimeBasedChunkCreator<T, Observable<T>>(windows, timespan, unit, scheduler);
228228
return source.subscribe(new ChunkObserver<T, Observable<T>>(windows, observer, creator));
229229
}
@@ -284,7 +284,7 @@ public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> sour
284284
return new OnSubscribeFunc<Observable<T>>() {
285285
@Override
286286
public Subscription onSubscribe(final Observer<? super Observable<T>> observer) {
287-
Chunks<T, Observable<T>> chunks = new TimeAndSizeBasedChunks<T, Observable<T>>(observer, windowMaker(), count, timespan, unit, scheduler);
287+
Chunks<T, Observable<T>> chunks = new TimeAndSizeBasedChunks<T, Observable<T>>(observer, OperationWindow.<T>windowMaker(), count, timespan, unit, scheduler);
288288
ChunkCreator creator = new SingleChunkCreator<T, Observable<T>>(chunks);
289289
return source.subscribe(new ChunkObserver<T, Observable<T>>(chunks, observer, creator));
290290
}
@@ -345,7 +345,7 @@ public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> sour
345345
return new OnSubscribeFunc<Observable<T>>() {
346346
@Override
347347
public Subscription onSubscribe(final Observer<? super Observable<T>> observer) {
348-
OverlappingChunks<T, Observable<T>> windows = new TimeBasedChunks<T, Observable<T>>(observer, windowMaker(), timespan, unit, scheduler);
348+
OverlappingChunks<T, Observable<T>> windows = new TimeBasedChunks<T, Observable<T>>(observer, OperationWindow.<T>windowMaker(), timespan, unit, scheduler);
349349
ChunkCreator creator = new TimeBasedChunkCreator<T, Observable<T>>(windows, timeshift, unit, scheduler);
350350
return source.subscribe(new ChunkObserver<T, Observable<T>>(windows, observer, creator));
351351
}

0 commit comments

Comments
 (0)