Skip to content

Commit 5e0a2c0

Browse files
author
jmhofer
committed
made the public window methods more generic via the basic (lol) super/extends fluff; also simplified api by removing a few useless super definitions (there's no super of Opening and Closing)
1 parent 229c4aa commit 5e0a2c0

File tree

4 files changed

+37
-35
lines changed

4 files changed

+37
-35
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1337,7 +1337,7 @@ public Observable<List<T>> buffer(Func0<? extends Observable<? extends Closing>>
13371337
* @return
13381338
* An {@link Observable} which produces buffers which are created and emitted when the specified {@link Observable}s publish certain objects.
13391339
*/
1340-
public Observable<List<T>> buffer(Observable<? extends Opening> bufferOpenings, Func1<? super Opening, ? extends Observable<? extends Closing>> bufferClosingSelector) {
1340+
public Observable<List<T>> buffer(Observable<? extends Opening> bufferOpenings, Func1<Opening, ? extends Observable<? extends Closing>> bufferClosingSelector) {
13411341
return create(OperationBuffer.buffer(this, bufferOpenings, bufferClosingSelector));
13421342
}
13431343

@@ -1520,7 +1520,7 @@ public Observable<List<T>> buffer(long timespan, long timeshift, TimeUnit unit,
15201520
* An {@link Observable} which produces connected non-overlapping windows, which are emitted
15211521
* when the current {@link Observable} created with the {@link Func0} argument produces a {@link rx.util.Closing} object.
15221522
*/
1523-
public Observable<Observable<T>> window(Observable<T> source, Func0<Observable<Closing>> closingSelector) {
1523+
public Observable<Observable<T>> window(Observable<? extends T> source, Func0<? extends Observable<? extends Closing>> closingSelector) {
15241524
return create(OperationWindow.window(source, closingSelector));
15251525
}
15261526

@@ -1542,7 +1542,7 @@ public Observable<Observable<T>> window(Observable<T> source, Func0<Observable<C
15421542
* @return
15431543
* An {@link Observable} which produces windows which are created and emitted when the specified {@link Observable}s publish certain objects.
15441544
*/
1545-
public Observable<Observable<T>> window(Observable<T> source, Observable<Opening> windowOpenings, Func1<Opening, Observable<Closing>> closingSelector) {
1545+
public Observable<Observable<T>> window(Observable<? extends T> source, Observable<? extends Opening> windowOpenings, Func1<Opening, ? extends Observable<? extends Closing>> closingSelector) {
15461546
return create(OperationWindow.window(source, windowOpenings, closingSelector));
15471547
}
15481548

@@ -1559,7 +1559,7 @@ public Observable<Observable<T>> window(Observable<T> source, Observable<Opening
15591559
* An {@link Observable} which produces connected non-overlapping windows containing at most
15601560
* "count" produced values.
15611561
*/
1562-
public Observable<Observable<T>> window(Observable<T> source, int count) {
1562+
public Observable<Observable<T>> window(Observable<? extends T> source, int count) {
15631563
return create(OperationWindow.window(source, count));
15641564
}
15651565

@@ -1579,7 +1579,7 @@ public Observable<Observable<T>> window(Observable<T> source, int count) {
15791579
* An {@link Observable} which produces windows every "skipped" values containing at most
15801580
* "count" produced values.
15811581
*/
1582-
public Observable<Observable<T>> window(Observable<T> source, int count, int skip) {
1582+
public Observable<Observable<T>> window(Observable<? extends T> source, int count, int skip) {
15831583
return create(OperationWindow.window(source, count, skip));
15841584
}
15851585

@@ -1598,7 +1598,7 @@ public Observable<Observable<T>> window(Observable<T> source, int count, int ski
15981598
* @return
15991599
* An {@link Observable} which produces connected non-overlapping windows with a fixed duration.
16001600
*/
1601-
public Observable<Observable<T>> window(Observable<T> source, long timespan, TimeUnit unit) {
1601+
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit) {
16021602
return create(OperationWindow.window(source, timespan, unit));
16031603
}
16041604

@@ -1619,7 +1619,7 @@ public Observable<Observable<T>> window(Observable<T> source, long timespan, Tim
16191619
* @return
16201620
* An {@link Observable} which produces connected non-overlapping windows with a fixed duration.
16211621
*/
1622-
public Observable<Observable<T>> window(Observable<T> source, long timespan, TimeUnit unit, Scheduler scheduler) {
1622+
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit, Scheduler scheduler) {
16231623
return create(OperationWindow.window(source, timespan, unit, scheduler));
16241624
}
16251625

@@ -1642,7 +1642,7 @@ public Observable<Observable<T>> window(Observable<T> source, long timespan, Tim
16421642
* An {@link Observable} which produces connected non-overlapping windows which are emitted after
16431643
* a fixed duration or when the window has reached maximum capacity (which ever occurs first).
16441644
*/
1645-
public Observable<Observable<T>> window(Observable<T> source, long timespan, TimeUnit unit, int count) {
1645+
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit, int count) {
16461646
return create(OperationWindow.window(source, timespan, unit, count));
16471647
}
16481648

@@ -1667,7 +1667,7 @@ public Observable<Observable<T>> window(Observable<T> source, long timespan, Tim
16671667
* An {@link Observable} which produces connected non-overlapping windows which are emitted after
16681668
* a fixed duration or when the window has reached maximum capacity (which ever occurs first).
16691669
*/
1670-
public Observable<Observable<T>> window(Observable<T> source, long timespan, TimeUnit unit, int count, Scheduler scheduler) {
1670+
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit, int count, Scheduler scheduler) {
16711671
return create(OperationWindow.window(source, timespan, unit, count, scheduler));
16721672
}
16731673

@@ -1689,7 +1689,7 @@ public Observable<Observable<T>> window(Observable<T> source, long timespan, Tim
16891689
* An {@link Observable} which produces new windows periodically, and these are emitted after
16901690
* a fixed timespan has elapsed.
16911691
*/
1692-
public Observable<Observable<T>> window(Observable<T> source, long timespan, long timeshift, TimeUnit unit) {
1692+
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, long timeshift, TimeUnit unit) {
16931693
return create(OperationWindow.window(source, timespan, timeshift, unit));
16941694
}
16951695

@@ -1713,7 +1713,7 @@ public Observable<Observable<T>> window(Observable<T> source, long timespan, lon
17131713
* An {@link Observable} which produces new windows periodically, and these are emitted after
17141714
* a fixed timespan has elapsed.
17151715
*/
1716-
public Observable<Observable<T>> window(Observable<T> source, long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) {
1716+
public Observable<Observable<T>> window(Observable<? extends T> source, long timespan, long timeshift, TimeUnit unit, Scheduler scheduler) {
17171717
return create(OperationWindow.window(source, timespan, timeshift, unit, scheduler));
17181718
}
17191719

rxjava-core/src/main/java/rx/operators/ChunkedOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,7 @@ protected static class ObservableBasedMultiChunkCreator<T, C> implements ChunkCr
495495

496496
private final SafeObservableSubscription subscription = new SafeObservableSubscription();
497497

498-
public ObservableBasedMultiChunkCreator(final OverlappingChunks<T, C> chunks, Observable<? extends Opening> openings, final Func1<? super Opening, ? extends Observable<? extends Closing>> chunkClosingSelector) {
498+
public ObservableBasedMultiChunkCreator(final OverlappingChunks<T, C> chunks, Observable<? extends Opening> openings, final Func1<Opening, ? extends Observable<? extends Closing>> chunkClosingSelector) {
499499
subscription.wrap(openings.subscribe(new Action1<Opening>() {
500500
@Override
501501
public void call(Opening opening) {

rxjava-core/src/main/java/rx/operators/OperationBuffer.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,14 @@
4242

4343
public final class OperationBuffer extends ChunkedOperation {
4444

45-
private static final Func0 BUFFER_MAKER = new Func0() {
46-
@Override
47-
public Object call() {
48-
return new Buffer();
49-
}
50-
};
45+
private static <T> Func0<Buffer<T>> bufferMaker() {
46+
return new Func0<Buffer<T>>() {
47+
@Override
48+
public Buffer<T> call() {
49+
return new Buffer<T>();
50+
}
51+
};
52+
}
5153

5254
/**
5355
* <p>This method creates a {@link Func1} object which represents the buffer operation. This operation takes
@@ -74,7 +76,7 @@ public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<T> source, fi
7476

7577
@Override
7678
public Subscription onSubscribe(Observer<? super List<T>> observer) {
77-
NonOverlappingChunks<T, List<T>> buffers = new NonOverlappingChunks<T, List<T>>(observer, BUFFER_MAKER);
79+
NonOverlappingChunks<T, List<T>> buffers = new NonOverlappingChunks<T, List<T>>(observer, OperationBuffer.<T>bufferMaker());
7880
ChunkCreator creator = new ObservableBasedSingleChunkCreator<T, List<T>>(buffers, bufferClosingSelector);
7981
return source.subscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator));
8082
}
@@ -106,11 +108,11 @@ public Subscription onSubscribe(Observer<? super List<T>> observer) {
106108
* @return
107109
* the {@link Func1} object representing the specified buffer operation.
108110
*/
109-
public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<T> source, final Observable<? extends Opening> bufferOpenings, final Func1<? super Opening, ? extends Observable<? extends Closing>> bufferClosingSelector) {
111+
public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<T> source, final Observable<? extends Opening> bufferOpenings, final Func1<Opening, ? extends Observable<? extends Closing>> bufferClosingSelector) {
110112
return new OnSubscribeFunc<List<T>>() {
111113
@Override
112114
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
113-
OverlappingChunks<T, List<T>> buffers = new OverlappingChunks<T, List<T>>(observer, BUFFER_MAKER);
115+
OverlappingChunks<T, List<T>> buffers = new OverlappingChunks<T, List<T>>(observer, OperationBuffer.<T>bufferMaker());
114116
ChunkCreator creator = new ObservableBasedMultiChunkCreator<T, List<T>>(buffers, bufferOpenings, bufferClosingSelector);
115117
return source.subscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator));
116118
}
@@ -165,7 +167,7 @@ public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<T> source, fi
165167
return new OnSubscribeFunc<List<T>>() {
166168
@Override
167169
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
168-
Chunks<T, List<T>> chunks = new SizeBasedChunks<T, List<T>>(observer, BUFFER_MAKER, count);
170+
Chunks<T, List<T>> chunks = new SizeBasedChunks<T, List<T>>(observer, OperationBuffer.<T>bufferMaker(), count);
169171
ChunkCreator creator = new SkippingChunkCreator<T, List<T>>(chunks, skip);
170172
return source.subscribe(new ChunkObserver<T, List<T>>(chunks, observer, creator));
171173
}
@@ -220,7 +222,7 @@ public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<T> source, fi
220222
return new OnSubscribeFunc<List<T>>() {
221223
@Override
222224
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
223-
NonOverlappingChunks<T, List<T>> buffers = new NonOverlappingChunks<T, List<T>>(observer, BUFFER_MAKER);
225+
NonOverlappingChunks<T, List<T>> buffers = new NonOverlappingChunks<T, List<T>>(observer, OperationBuffer.<T>bufferMaker());
224226
ChunkCreator creator = new TimeBasedChunkCreator<T, List<T>>(buffers, timespan, unit, scheduler);
225227
return source.subscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator));
226228
}
@@ -281,7 +283,7 @@ public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<T> source, fi
281283
return new OnSubscribeFunc<List<T>>() {
282284
@Override
283285
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
284-
Chunks<T, List<T>> chunks = new TimeAndSizeBasedChunks<T, List<T>>(observer, BUFFER_MAKER, count, timespan, unit, scheduler);
286+
Chunks<T, List<T>> chunks = new TimeAndSizeBasedChunks<T, List<T>>(observer, OperationBuffer.<T>bufferMaker(), count, timespan, unit, scheduler);
285287
ChunkCreator creator = new SingleChunkCreator<T, List<T>>(chunks);
286288
return source.subscribe(new ChunkObserver<T, List<T>>(chunks, observer, creator));
287289
}
@@ -342,7 +344,7 @@ public static <T> OnSubscribeFunc<List<T>> buffer(final Observable<T> source, fi
342344
return new OnSubscribeFunc<List<T>>() {
343345
@Override
344346
public Subscription onSubscribe(final Observer<? super List<T>> observer) {
345-
OverlappingChunks<T, List<T>> buffers = new TimeBasedChunks<T, List<T>>(observer, BUFFER_MAKER, timespan, unit, scheduler);
347+
OverlappingChunks<T, List<T>> buffers = new TimeBasedChunks<T, List<T>>(observer, OperationBuffer.<T>bufferMaker(), timespan, unit, scheduler);
346348
ChunkCreator creator = new TimeBasedChunkCreator<T, List<T>>(buffers, timeshift, unit, scheduler);
347349
return source.subscribe(new ChunkObserver<T, List<T>>(buffers, observer, creator));
348350
}

rxjava-core/src/main/java/rx/operators/OperationWindow.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public Window<T> call() {
7272
* @return
7373
* the {@link rx.util.functions.Func1} object representing the specified window operation.
7474
*/
75-
public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> source, final Func0<Observable<Closing>> windowClosingSelector) {
75+
public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> source, final Func0<? extends Observable<? extends Closing>> windowClosingSelector) {
7676
return new OnSubscribeFunc<Observable<T>>() {
7777
@Override
7878
public Subscription onSubscribe(final Observer<? super Observable<T>> observer) {
@@ -109,7 +109,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
109109
* @return
110110
* the {@link rx.util.functions.Func1} object representing the specified window operation.
111111
*/
112-
public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> source, final Observable<Opening> windowOpenings, final Func1<Opening, Observable<Closing>> windowClosingSelector) {
112+
public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> source, final Observable<? extends Opening> windowOpenings, final Func1<Opening, ? extends Observable<? extends Closing>> windowClosingSelector) {
113113
return new OnSubscribeFunc<Observable<T>>() {
114114
@Override
115115
public Subscription onSubscribe(final Observer<? super Observable<T>> observer) {
@@ -137,7 +137,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
137137
* @return
138138
* the {@link rx.util.functions.Func1} object representing the specified window operation.
139139
*/
140-
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<T> source, int count) {
140+
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, int count) {
141141
return window(source, count, count);
142142
}
143143

@@ -164,7 +164,7 @@ public static <T> OnSubscribeFunc<Observable<T>> window(Observable<T> source, in
164164
* @return
165165
* the {@link rx.util.functions.Func1} object representing the specified window operation.
166166
*/
167-
public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> source, final int count, final int skip) {
167+
public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> source, final int count, final int skip) {
168168
return new OnSubscribeFunc<Observable<T>>() {
169169
@Override
170170
public Subscription onSubscribe(final Observer<? super Observable<T>> observer) {
@@ -194,7 +194,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
194194
* @return
195195
* the {@link rx.util.functions.Func1} object representing the specified window operation.
196196
*/
197-
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<T> source, long timespan, TimeUnit unit) {
197+
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit) {
198198
return window(source, timespan, unit, Schedulers.threadPoolForComputation());
199199
}
200200

@@ -219,7 +219,7 @@ public static <T> OnSubscribeFunc<Observable<T>> window(Observable<T> source, lo
219219
* @return
220220
* the {@link rx.util.functions.Func1} object representing the specified window operation.
221221
*/
222-
public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> source, final long timespan, final TimeUnit unit, final Scheduler scheduler) {
222+
public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> source, final long timespan, final TimeUnit unit, final Scheduler scheduler) {
223223
return new OnSubscribeFunc<Observable<T>>() {
224224
@Override
225225
public Subscription onSubscribe(final Observer<? super Observable<T>> observer) {
@@ -252,7 +252,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
252252
* @return
253253
* the {@link rx.util.functions.Func1} object representing the specified window operation.
254254
*/
255-
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<T> source, long timespan, TimeUnit unit, int count) {
255+
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, TimeUnit unit, int count) {
256256
return window(source, timespan, unit, count, Schedulers.threadPoolForComputation());
257257
}
258258

@@ -280,7 +280,7 @@ public static <T> OnSubscribeFunc<Observable<T>> window(Observable<T> source, lo
280280
* @return
281281
* the {@link rx.util.functions.Func1} object representing the specified window operation.
282282
*/
283-
public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> source, final long timespan, final TimeUnit unit, final int count, final Scheduler scheduler) {
283+
public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> source, final long timespan, final TimeUnit unit, final int count, final Scheduler scheduler) {
284284
return new OnSubscribeFunc<Observable<T>>() {
285285
@Override
286286
public Subscription onSubscribe(final Observer<? super Observable<T>> observer) {
@@ -313,7 +313,7 @@ public Subscription onSubscribe(final Observer<? super Observable<T>> observer)
313313
* @return
314314
* the {@link rx.util.functions.Func1} object representing the specified window operation.
315315
*/
316-
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<T> source, long timespan, long timeshift, TimeUnit unit) {
316+
public static <T> OnSubscribeFunc<Observable<T>> window(Observable<? extends T> source, long timespan, long timeshift, TimeUnit unit) {
317317
return window(source, timespan, timeshift, unit, Schedulers.threadPoolForComputation());
318318
}
319319

@@ -341,7 +341,7 @@ public static <T> OnSubscribeFunc<Observable<T>> window(Observable<T> source, lo
341341
* @return
342342
* the {@link rx.util.functions.Func1} object representing the specified window operation.
343343
*/
344-
public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<T> source, final long timespan, final long timeshift, final TimeUnit unit, final Scheduler scheduler) {
344+
public static <T> OnSubscribeFunc<Observable<T>> window(final Observable<? extends T> source, final long timespan, final long timeshift, final TimeUnit unit, final Scheduler scheduler) {
345345
return new OnSubscribeFunc<Observable<T>>() {
346346
@Override
347347
public Subscription onSubscribe(final Observer<? super Observable<T>> observer) {

0 commit comments

Comments
 (0)