23
23
24
24
import org .junit .*;
25
25
26
+ import org .junit .rules .TestName ;
26
27
import rx .Observable .OnSubscribe ;
27
28
import rx .exceptions .MissingBackpressureException ;
28
29
import rx .functions .*;
33
34
34
35
public class BackpressureTests {
35
36
37
+ @ Rule
38
+ public TestName testName = new TestName ();
39
+
36
40
@ After
37
41
public void doAfterTest () {
38
42
TestObstructionDetection .checkObstruction ();
@@ -424,25 +428,69 @@ public void testOnBackpressureDrop() {
424
428
.map (SLOW_PASS_THRU ).take (NUM ).subscribe (ts );
425
429
ts .awaitTerminalEvent ();
426
430
ts .assertNoErrors ();
427
-
428
-
431
+
429
432
List <Integer > onNextEvents = ts .getOnNextEvents ();
430
433
assertEquals (NUM , onNextEvents .size ());
431
434
432
435
Integer lastEvent = onNextEvents .get (NUM - 1 );
433
-
436
+
434
437
System .out .println ("testOnBackpressureDrop => Received: " + onNextEvents .size () + " Emitted: " + c .get () + " Last value: " + lastEvent );
435
438
// it drop, so we should get some number far higher than what would have sequentially incremented
436
439
assertTrue (NUM - 1 <= lastEvent .intValue ());
437
440
}
438
441
}
442
+
443
+ @ Test (timeout = 10000 )
444
+ public void testOnBackpressureDropWithCallback () {
445
+ for (int i = 0 ; i < 100 ; i ++) {
446
+ final AtomicInteger emitCount = new AtomicInteger ();
447
+ final AtomicInteger dropCount = new AtomicInteger ();
448
+ final AtomicInteger passCount = new AtomicInteger ();
449
+ final int NUM = (int ) (RxRingBuffer .SIZE * 1.5 ); // > 1 so that take doesn't prevent buffer overflow
450
+ TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
451
+ firehose (emitCount ).onBackpressureDrop (new Action1 <Integer >() {
452
+ @ Override
453
+ public void call (Integer i ) {
454
+ dropCount .incrementAndGet ();
455
+ }
456
+ })
457
+ .doOnNext (new Action1 <Integer >() {
458
+ @ Override
459
+ public void call (Integer integer ) {
460
+ passCount .incrementAndGet ();
461
+ }
462
+ })
463
+ .observeOn (Schedulers .computation ())
464
+ .map (SLOW_PASS_THRU ).take (NUM ).subscribe (ts );
465
+ ts .awaitTerminalEvent ();
466
+ ts .assertNoErrors ();
467
+
468
+ List <Integer > onNextEvents = ts .getOnNextEvents ();
469
+ Integer lastEvent = onNextEvents .get (NUM - 1 );
470
+ System .out .println (testName .getMethodName () + " => Received: " + onNextEvents .size () + " Passed: " + passCount .get () + " Dropped: " + dropCount .get () + " Emitted: " + emitCount .get () + " Last value: " + lastEvent );
471
+ assertEquals (NUM , onNextEvents .size ());
472
+ // in reality, NUM < passCount
473
+ assertTrue (NUM <= passCount .get ());
474
+ // it drop, so we should get some number far higher than what would have sequentially incremented
475
+ assertTrue (NUM - 1 <= lastEvent .intValue ());
476
+ assertTrue (0 < dropCount .get ());
477
+ assertEquals (emitCount .get (), passCount .get () + dropCount .get ());
478
+ }
479
+ }
480
+
439
481
@ Test (timeout = 10000 )
440
482
public void testOnBackpressureDropSynchronous () {
441
483
for (int i = 0 ; i < 100 ; i ++) {
484
+ final AtomicInteger dropCount = new AtomicInteger ();
442
485
int NUM = (int ) (RxRingBuffer .SIZE * 1.1 ); // > 1 so that take doesn't prevent buffer overflow
443
486
AtomicInteger c = new AtomicInteger ();
444
487
TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
445
- firehose (c ).onBackpressureDrop ()
488
+ firehose (c ).onBackpressureDrop (new Action1 <Integer >() {
489
+ @ Override
490
+ public void call (Integer i ) {
491
+ dropCount .incrementAndGet ();
492
+ }
493
+ })
446
494
.map (SLOW_PASS_THRU ).take (NUM ).subscribe (ts );
447
495
ts .awaitTerminalEvent ();
448
496
ts .assertNoErrors ();
@@ -452,9 +500,12 @@ public void testOnBackpressureDropSynchronous() {
452
500
453
501
Integer lastEvent = onNextEvents .get (NUM - 1 );
454
502
455
- System .out .println ("testOnBackpressureDrop => Received: " + onNextEvents .size () + " Emitted: " + c .get () + " Last value: " + lastEvent );
503
+ System .out .println ("testOnBackpressureDrop => Received: " + onNextEvents .size () + " Dropped: " + dropCount . get () + " Emitted: " + c .get () + " Last value: " + lastEvent );
456
504
// it drop, so we should get some number far higher than what would have sequentially incremented
457
505
assertTrue (NUM - 1 <= lastEvent .intValue ());
506
+ // no drop in synchronous mode
507
+ assertEquals (0 , dropCount .get ());
508
+ assertEquals (c .get (), onNextEvents .size ());
458
509
}
459
510
}
460
511
0 commit comments