File tree Expand file tree Collapse file tree 2 files changed +67
-0
lines changed
src/test/java/rx/internal/operators Expand file tree Collapse file tree 2 files changed +67
-0
lines changed Original file line number Diff line number Diff line change @@ -309,4 +309,39 @@ public void run() {
309
309
}
310
310
311
311
}
312
+
313
+ @ Test
314
+ public void testBackpressure () {
315
+ TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
316
+ Observable .range (0 , 100000 )
317
+ .onErrorResumeNext (new Func1 <Throwable , Observable <Integer >>() {
318
+
319
+ @ Override
320
+ public Observable <Integer > call (Throwable t1 ) {
321
+ return Observable .just (1 );
322
+ }
323
+
324
+ })
325
+ .observeOn (Schedulers .computation ())
326
+ .map (new Func1 <Integer , Integer >() {
327
+ int c = 0 ;
328
+
329
+ @ Override
330
+ public Integer call (Integer t1 ) {
331
+ if (c ++ <= 1 ) {
332
+ // slow
333
+ try {
334
+ Thread .sleep (500 );
335
+ } catch (InterruptedException e ) {
336
+ e .printStackTrace ();
337
+ }
338
+ }
339
+ return t1 ;
340
+ }
341
+
342
+ })
343
+ .subscribe (ts );
344
+ ts .awaitTerminalEvent ();
345
+ ts .assertNoErrors ();
346
+ }
312
347
}
Original file line number Diff line number Diff line change 21
21
import static org .mockito .Mockito .times ;
22
22
import static org .mockito .Mockito .verify ;
23
23
24
+ import java .util .concurrent .TimeUnit ;
25
+
24
26
import org .junit .Test ;
25
27
import org .mockito .Mockito ;
26
28
29
31
import rx .Subscriber ;
30
32
import rx .Subscription ;
31
33
import rx .functions .Func1 ;
34
+ import rx .observers .TestSubscriber ;
35
+ import rx .schedulers .Schedulers ;
32
36
33
37
public class OperatorOnErrorResumeNextViaObservableTest {
34
38
@@ -143,4 +147,32 @@ public void run() {
143
147
System .out .println ("done starting TestObservable thread" );
144
148
}
145
149
}
150
+
151
+ @ Test
152
+ public void testBackpressure () {
153
+ TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
154
+ Observable .range (0 , 100000 )
155
+ .onErrorResumeNext (Observable .just (1 ))
156
+ .observeOn (Schedulers .computation ())
157
+ .map (new Func1 <Integer , Integer >() {
158
+ int c = 0 ;
159
+
160
+ @ Override
161
+ public Integer call (Integer t1 ) {
162
+ if (c ++ <= 1 ) {
163
+ // slow
164
+ try {
165
+ Thread .sleep (500 );
166
+ } catch (InterruptedException e ) {
167
+ e .printStackTrace ();
168
+ }
169
+ }
170
+ return t1 ;
171
+ }
172
+
173
+ })
174
+ .subscribe (ts );
175
+ ts .awaitTerminalEvent ();
176
+ ts .assertNoErrors ();
177
+ }
146
178
}
You can’t perform that action at this time.
0 commit comments