15
15
*/
16
16
package rx .operators ;
17
17
18
- import static org .mockito .Matchers .any ;
19
- import static org .mockito .Matchers .anyString ;
20
- import static org .mockito .Mockito .inOrder ;
21
- import static org .mockito .Mockito .mock ;
22
- import static org .mockito .Mockito .never ;
23
- import static org .mockito .Mockito .times ;
24
- import static org .mockito .Mockito .verify ;
18
+ import static org .mockito .Matchers .*;
19
+ import static org .mockito .Mockito .*;
25
20
26
21
import java .util .concurrent .Executors ;
27
22
import java .util .concurrent .TimeUnit ;
@@ -60,7 +55,7 @@ public final class OperationThrottle {
60
55
* @param unit The unit of time for the specified timeout.
61
56
* @return A {@link Func1} which performs the throttle operation.
62
57
*/
63
- public static <T > Func1 <Observer <T >, Subscription > throttle (final Observable <T > items , long timeout , TimeUnit unit ) {
58
+ public static <T > Func1 <Observer <T >, Subscription > throttle (Observable <T > items , long timeout , TimeUnit unit ) {
64
59
return throttle (items , timeout , unit , Schedulers .executor (Executors .newSingleThreadScheduledExecutor ()));
65
60
}
66
61
@@ -132,7 +127,7 @@ public void onError(Exception e) {
132
127
}
133
128
134
129
@ Override
135
- public void onNext (final T args ) {
130
+ public void onNext (T args ) {
136
131
throttle (new ThrottledOnNext <T >(observer , args ));
137
132
}
138
133
@@ -215,7 +210,7 @@ public void before() {
215
210
public void testThrottlingWithCompleted () {
216
211
Observable <String > source = Observable .create (new Func1 <Observer <String >, Subscription >() {
217
212
@ Override
218
- public Subscription call (final Observer <String > observser ) {
213
+ public Subscription call (Observer <String > observser ) {
219
214
publishNext (observser , 100 , "one" ); // Should be skipped since "two" will arrive before the timeout expires.
220
215
publishNext (observser , 400 , "two" ); // Should be published since "three" will arrive after the timeout expires.
221
216
publishNext (observser , 900 , "four" ); // Should be skipped since onCompleted will arrive before the timeout expires.
@@ -250,7 +245,7 @@ public Subscription call(final Observer<String> observser) {
250
245
public void testThrottlingWithError () {
251
246
Observable <String > source = Observable .create (new Func1 <Observer <String >, Subscription >() {
252
247
@ Override
253
- public Subscription call (final Observer <String > observser ) {
248
+ public Subscription call (Observer <String > observser ) {
254
249
Exception error = new TestException ();
255
250
publishNext (observser , 100 , "one" ); // Should be published since "two" will arrive after the timeout expires.
256
251
publishNext (observser , 600 , "two" ); // Should be skipped since onError will arrive before the timeout expires.
@@ -281,7 +276,7 @@ public Subscription call(final Observer<String> observser) {
281
276
verify (observer , times (1 )).onError (any (TestException .class ));
282
277
}
283
278
284
- private void publishCompleted (final Observer <String > observer , long delay ) {
279
+ private < T > void publishCompleted (final Observer <T > observer , long delay ) {
285
280
scheduler .schedule (new Action0 () {
286
281
@ Override
287
282
public void call () {
@@ -290,7 +285,7 @@ public void call() {
290
285
}, delay , TimeUnit .MILLISECONDS );
291
286
}
292
287
293
- private void publishError (final Observer <String > observer , long delay , final Exception error ) {
288
+ private < T > void publishError (final Observer <T > observer , long delay , final Exception error ) {
294
289
scheduler .schedule (new Action0 () {
295
290
@ Override
296
291
public void call () {
@@ -299,7 +294,7 @@ public void call() {
299
294
}, delay , TimeUnit .MILLISECONDS );
300
295
}
301
296
302
- private void publishNext (final Observer <String > observer , long delay , final String value ) {
297
+ private < T > void publishNext (final Observer <T > observer , long delay , final T value ) {
303
298
scheduler .schedule (new Action0 () {
304
299
@ Override
305
300
public void call () {
0 commit comments