Skip to content

Commit f84565f

Browse files
Merge pull request #373 from jmhofer/throttle-debounce-fix
a few warnings, javadoc, and one missing scheduler parameter
2 parents df4d569 + 965a212 commit f84565f

File tree

8 files changed

+24
-66
lines changed

8 files changed

+24
-66
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 20 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.util.ArrayList;
2121
import java.util.Arrays;
22-
import java.util.Collection;
2322
import java.util.List;
2423
import java.util.concurrent.Future;
2524
import java.util.concurrent.TimeUnit;
@@ -544,7 +543,7 @@ public static <T> Observable<T> from(Iterable<? extends T> iterable) {
544543
* <p>Implementation note: the entire iterable sequence will be immediately emitted each time an {@link Observer} subscribes. Since this occurs before the {@link Subscription} is returned,
545544
* it in not possible to unsubscribe from the sequence before it completes.
546545
*
547-
* @param array
546+
* @param items
548547
* the source sequence
549548
* @param <T>
550549
* the type of items in the {@link Iterable} sequence and the type of items to be
@@ -1434,7 +1433,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends Observable<
14341433
}
14351434

14361435
/**
1437-
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
1436+
* This behaves like {@link #merge(Observable, Observable)} except that if any of the merged Observables
14381437
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
14391438
* refrain from propagating that error notification until all of the merged Observables have
14401439
* finished emitting items.
@@ -1462,7 +1461,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
14621461
}
14631462

14641463
/**
1465-
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
1464+
* This behaves like {@link #merge(Observable, Observable, Observable)} except that if any of the merged Observables
14661465
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
14671466
* refrain from propagating that error notification until all of the merged Observables have
14681467
* finished emitting items.
@@ -1492,7 +1491,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
14921491
}
14931492

14941493
/**
1495-
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
1494+
* This behaves like {@link #merge(Observable, Observable, Observable, Observable)} except that if any of the merged Observables
14961495
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
14971496
* refrain from propagating that error notification until all of the merged Observables have
14981497
* finished emitting items.
@@ -1524,7 +1523,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
15241523
}
15251524

15261525
/**
1527-
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
1526+
* This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables
15281527
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
15291528
* refrain from propagating that error notification until all of the merged Observables have
15301529
* finished emitting items.
@@ -1558,7 +1557,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
15581557
}
15591558

15601559
/**
1561-
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
1560+
* This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables
15621561
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
15631562
* refrain from propagating that error notification until all of the merged Observables have
15641563
* finished emitting items.
@@ -1594,7 +1593,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
15941593
}
15951594

15961595
/**
1597-
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
1596+
* This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables
15981597
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
15991598
* refrain from propagating that error notification until all of the merged Observables have
16001599
* finished emitting items.
@@ -1632,7 +1631,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
16321631
}
16331632

16341633
/**
1635-
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
1634+
* This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables
16361635
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
16371636
* refrain from propagating that error notification until all of the merged Observables have
16381637
* finished emitting items.
@@ -1672,7 +1671,7 @@ public static <T> Observable<T> mergeDelayError(Observable<? extends T> t1, Obse
16721671
}
16731672

16741673
/**
1675-
* This behaves like {@link #merge(Observable...)} except that if any of the merged Observables
1674+
* This behaves like {@link #merge(Observable, Observable, Observable, Observable, Observable, Observable, Observable, Observable, Observable)} except that if any of the merged Observables
16761675
* notify of an error via {@link Observer#onError onError}, {@code mergeDelayError} will
16771676
* refrain from propagating that error notification until all of the merged Observables have
16781677
* finished emitting items.
@@ -1832,7 +1831,7 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
18321831
* The {@link TimeUnit} for the timeout.
18331832
*
18341833
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
1835-
* @see {@link #throttleWithTimeout};
1834+
* @see #throttleWithTimeout(long, TimeUnit)
18361835
*/
18371836
public Observable<T> debounce(long timeout, TimeUnit unit) {
18381837
return create(OperationDebounce.debounce(this, timeout, unit));
@@ -1860,10 +1859,10 @@ public Observable<T> debounce(long timeout, TimeUnit unit) {
18601859
* @param scheduler
18611860
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
18621861
* @return Observable which performs the throttle operation.
1863-
* @see {@link #throttleWithTimeout};
1862+
* @see #throttleWithTimeout(long, TimeUnit, Scheduler)
18641863
*/
18651864
public Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler) {
1866-
return create(OperationDebounce.debounce(this, timeout, unit));
1865+
return create(OperationDebounce.debounce(this, timeout, unit, scheduler));
18671866
}
18681867

18691868
/**
@@ -1887,7 +1886,7 @@ public Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler)
18871886
* The {@link TimeUnit} for the timeout.
18881887
*
18891888
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
1890-
* @see {@link #debounce}
1889+
* @see #debounce(long, TimeUnit)
18911890
*/
18921891
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
18931892
return create(OperationDebounce.debounce(this, timeout, unit));
@@ -1907,7 +1906,7 @@ public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
19071906
* @param scheduler
19081907
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
19091908
* @return Observable which performs the throttle operation.
1910-
* @see {@link #debounce}
1909+
* @see #debounce(long, TimeUnit, Scheduler)
19111910
*/
19121911
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
19131912
return create(OperationDebounce.debounce(this, timeout, unit, scheduler));
@@ -1920,12 +1919,10 @@ public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler
19201919
* <p>
19211920
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleFirst.png">
19221921
*
1923-
* @param skipDuration
1922+
* @param windowDuration
19241923
* Time to wait before sending another value after emitting last value.
19251924
* @param unit
19261925
* The unit of time for the specified timeout.
1927-
* @param scheduler
1928-
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
19291926
* @return Observable which performs the throttle operation.
19301927
*/
19311928
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit) {
@@ -1963,7 +1960,7 @@ public Observable<T> throttleFirst(long skipDuration, TimeUnit unit, Scheduler s
19631960
* @param unit
19641961
* The unit of time for the specified interval.
19651962
* @return Observable which performs the throttle operation.
1966-
* @see {@link #sample(long, TimeUnit)}
1963+
* @see #sample(long, TimeUnit)
19671964
*/
19681965
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit) {
19691966
return sample(intervalDuration, unit);
@@ -1981,7 +1978,7 @@ public Observable<T> throttleLast(long intervalDuration, TimeUnit unit) {
19811978
* @param unit
19821979
* The unit of time for the specified interval.
19831980
* @return Observable which performs the throttle operation.
1984-
* @see {@link #sample(long, TimeUnit, Scheduler)}
1981+
* @see #sample(long, TimeUnit, Scheduler)
19851982
*/
19861983
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler) {
19871984
return sample(intervalDuration, unit, scheduler);
@@ -2727,7 +2724,7 @@ public Observable<Observable<T>> window(int count) {
27272724
* The maximum size of each window before it should be emitted.
27282725
* @param skip
27292726
* How many produced values need to be skipped before starting a new window. Note that when "skip" and
2730-
* "count" are equals that this is the same operation as {@link Observable#window(Observable, int)}.
2727+
* "count" are equals that this is the same operation as {@link #window(int)}.
27312728
* @return
27322729
* An {@link Observable} which produces windows every "skipped" values containing at most
27332730
* "count" produced values.
@@ -3334,9 +3331,6 @@ public Observable<T> retry(int retryCount) {
33343331
* <p>
33353332
* For example, if an Observable fails on first time but emits [1, 2] then succeeds the second time and
33363333
* emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted].
3337-
*
3338-
* @param retryCount
3339-
* Number of retry attempts before failing.
33403334
* @return Observable with retry logic.
33413335
*/
33423336
public Observable<T> retry() {
@@ -3657,7 +3651,7 @@ public Observable<T> takeWhileWithIndex(final Func2<? super T, ? super Integer,
36573651
* @return an Observable that emits only the very first item from the source, or none if the
36583652
* source Observable completes without emitting a single item.
36593653
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177%28v=vs.103%29.aspx">MSDN: Observable.First</a>
3660-
* @see {@link #first()}
3654+
* @see #first()
36613655
*/
36623656
public Observable<T> takeFirst() {
36633657
return first();
@@ -3672,7 +3666,7 @@ public Observable<T> takeFirst() {
36723666
* @return an Observable that emits only the very first item satisfying the given condition from the source,
36733667
* or none if the source Observable completes without emitting a single matching item.
36743668
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177%28v=vs.103%29.aspx">MSDN: Observable.First</a>
3675-
* @see {@link #first(Func1)}
3669+
* @see #first(Func1)
36763670
*/
36773671
public Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
36783672
return first(predicate);
@@ -3812,8 +3806,6 @@ public Observable<T> startWith(Iterable<T> values) {
38123806
*
38133807
* @param t1
38143808
* item to include
3815-
* @param values
3816-
* Iterable of the items you want the modified Observable to emit first
38173809
* @return an Observable that exhibits the modified behavior
38183810
*/
38193811
public Observable<T> startWith(T t1) {
@@ -3829,8 +3821,6 @@ public Observable<T> startWith(T t1) {
38293821
* item to include
38303822
* @param t2
38313823
* item to include
3832-
* @param values
3833-
* Iterable of the items you want the modified Observable to emit first
38343824
* @return an Observable that exhibits the modified behavior
38353825
*/
38363826
public Observable<T> startWith(T t1, T t2) {
@@ -3848,8 +3838,6 @@ public Observable<T> startWith(T t1, T t2) {
38483838
* item to include
38493839
* @param t3
38503840
* item to include
3851-
* @param values
3852-
* Iterable of the items you want the modified Observable to emit first
38533841
* @return an Observable that exhibits the modified behavior
38543842
*/
38553843
public Observable<T> startWith(T t1, T t2, T t3) {
@@ -3869,8 +3857,6 @@ public Observable<T> startWith(T t1, T t2, T t3) {
38693857
* item to include
38703858
* @param t4
38713859
* item to include
3872-
* @param values
3873-
* Iterable of the items you want the modified Observable to emit first
38743860
* @return an Observable that exhibits the modified behavior
38753861
*/
38763862
public Observable<T> startWith(T t1, T t2, T t3, T t4) {
@@ -3892,8 +3878,6 @@ public Observable<T> startWith(T t1, T t2, T t3, T t4) {
38923878
* item to include
38933879
* @param t5
38943880
* item to include
3895-
* @param values
3896-
* Iterable of the items you want the modified Observable to emit first
38973881
* @return an Observable that exhibits the modified behavior
38983882
*/
38993883
public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5) {
@@ -3917,8 +3901,6 @@ public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5) {
39173901
* item to include
39183902
* @param t6
39193903
* item to include
3920-
* @param values
3921-
* Iterable of the items you want the modified Observable to emit first
39223904
* @return an Observable that exhibits the modified behavior
39233905
*/
39243906
public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6) {
@@ -3944,8 +3926,6 @@ public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6) {
39443926
* item to include
39453927
* @param t7
39463928
* item to include
3947-
* @param values
3948-
* Iterable of the items you want the modified Observable to emit first
39493929
* @return an Observable that exhibits the modified behavior
39503930
*/
39513931
public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7) {
@@ -3973,8 +3953,6 @@ public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7) {
39733953
* item to include
39743954
* @param t8
39753955
* item to include
3976-
* @param values
3977-
* Iterable of the items you want the modified Observable to emit first
39783956
* @return an Observable that exhibits the modified behavior
39793957
*/
39803958
public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8) {
@@ -4004,8 +3982,6 @@ public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8) {
40043982
* item to include
40053983
* @param t9
40063984
* item to include
4007-
* @param values
4008-
* Iterable of the items you want the modified Observable to emit first
40093985
* @return an Observable that exhibits the modified behavior
40103986
*/
40113987
public Observable<T> startWith(T t1, T t2, T t3, T t4, T t5, T t6, T t7, T t8, T t9) {

rxjava-core/src/main/java/rx/operators/ChunkedOperation.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ protected interface ChunkCreator {
6161
*
6262
* @param <T>
6363
* The type of objects which this {@link Chunk} can hold.
64-
* <C> The type of object being tracked by the {@link Chunk}
64+
* @param <C>
65+
* The type of object being tracked by the {@link Chunk}
6566
*/
6667
protected abstract static class Chunk<T, C> {
6768
protected final List<T> contents = new ArrayList<T>();
@@ -78,7 +79,7 @@ public void pushValue(T value) {
7879

7980
/**
8081
* @return
81-
* The mutable underlying {@link C} which contains all the
82+
* The mutable underlying {@code C} which contains all the
8283
* recorded values in this {@link Chunk} object.
8384
*/
8485
abstract public C getContents();

rxjava-core/src/main/java/rx/operators/OperationGroupBy.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,6 @@ public void testEmpty() {
287287
public void testError() {
288288
Observable<String> sourceStrings = Observable.from("one", "two", "three", "four", "five", "six");
289289
Observable<String> errorSource = Observable.error(new RuntimeException("forced failure"));
290-
@SuppressWarnings("unchecked")
291290
Observable<String> source = Observable.concat(sourceStrings, errorSource);
292291

293292
Observable<GroupedObservable<Integer, String>> grouped = Observable.create(groupBy(source, length));

rxjava-core/src/main/java/rx/operators/OperationMap.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,6 @@ public void before() {
148148
public void testMap() {
149149
Map<String, String> m1 = getMap("One");
150150
Map<String, String> m2 = getMap("Two");
151-
@SuppressWarnings("unchecked")
152151
Observable<Map<String, String>> observable = Observable.from(m1, m2);
153152

154153
Observable<String> m = Observable.create(map(observable, new Func1<Map<String, String>, String>() {
@@ -176,7 +175,6 @@ public void testMapMany() {
176175
/* now simulate the behavior to take those IDs and perform nested async calls based on them */
177176
Observable<String> m = Observable.create(mapMany(ids, new Func1<Integer, Observable<String>>() {
178177

179-
@SuppressWarnings("unchecked")
180178
@Override
181179
public Observable<String> call(Integer id) {
182180
/* simulate making a nested async call which creates another Observable */
@@ -215,15 +213,12 @@ public String call(Map<String, String> map) {
215213
public void testMapMany2() {
216214
Map<String, String> m1 = getMap("One");
217215
Map<String, String> m2 = getMap("Two");
218-
@SuppressWarnings("unchecked")
219216
Observable<Map<String, String>> observable1 = Observable.from(m1, m2);
220217

221218
Map<String, String> m3 = getMap("Three");
222219
Map<String, String> m4 = getMap("Four");
223-
@SuppressWarnings("unchecked")
224220
Observable<Map<String, String>> observable2 = Observable.from(m3, m4);
225221

226-
@SuppressWarnings("unchecked")
227222
Observable<Observable<Map<String, String>>> observable = Observable.from(observable1, observable2);
228223

229224
Observable<String> m = Observable.create(mapMany(observable, new Func1<Observable<Map<String, String>>, Observable<String>>() {

rxjava-core/src/main/java/rx/operators/OperationRetry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,6 @@ public Subscription onSubscribe(Observer<? super String> o) {
198198
}
199199
return Subscriptions.empty();
200200
}
201-
};
201+
}
202202
}
203203
}

rxjava-core/src/main/java/rx/operators/OperationTakeUntil.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,6 @@ public static <T, E> Observable<T> takeUntil(final Observable<? extends T> sourc
5050
Observable<Notification<T>> s = Observable.create(new SourceObservable<T>(source));
5151
Observable<Notification<T>> o = Observable.create(new OtherObservable<T, E>(other));
5252

53-
@SuppressWarnings("unchecked")
54-
/**
55-
* In JDK 7 we could use 'varargs' instead of 'unchecked'.
56-
* See http://stackoverflow.com/questions/1445233/is-it-possible-to-solve-the-a-generic-array-of-t-is-created-for-a-varargs-param
57-
* and http://hg.openjdk.java.net/jdk7/tl/langtools/rev/46cf751559ae
58-
*/
5953
Observable<Notification<T>> result = Observable.merge(s, o);
6054

6155
return result.takeWhile(new Func1<Notification<T>, Boolean>() {

0 commit comments

Comments
 (0)