25
25
import java .util .concurrent .atomic .AtomicBoolean ;
26
26
import java .util .concurrent .atomic .AtomicReference ;
27
27
28
- import org .junit .Assert ;
29
- import org .junit .Before ;
30
28
import org .junit .Test ;
31
29
32
30
import org .mockito .InOrder ;
@@ -58,7 +56,7 @@ public final class OperationConcat {
58
56
* <p/>
59
57
*
60
58
* Beware that concat(o1,o2).subscribe() is a blocking call from
61
- * which it is impossible to unsubscribe.
59
+ * which it is impossible to unsubscribe if observables are running on same thread .
62
60
*
63
61
* @param sequences An observable sequence of elements to project.
64
62
* @return An observable sequence whose elements are the result of combining the output from the list of Observables.
@@ -182,55 +180,7 @@ public void testConcatWithList() {
182
180
}
183
181
184
182
@ Test
185
- public void testConcatUnsubscribe () {
186
- final CountDownLatch callOnce = new CountDownLatch (1 );
187
- final CountDownLatch okToContinue = new CountDownLatch (1 );
188
- final TestObservable <String > w1 = new TestObservable <String >("one" , "two" , "three" );
189
- final TestObservable <String > w2 = new TestObservable <String >(callOnce , okToContinue , "four" , "five" , "six" );
190
-
191
- @ SuppressWarnings ("unchecked" )
192
- final Observer <String > aObserver = mock (Observer .class );
193
- @ SuppressWarnings ("unchecked" )
194
- final Observable <String > concat = Observable .create (concat (w1 , w2 ));
195
- final AtomicObservableSubscription s1 = new AtomicObservableSubscription ();
196
- Thread t = new Thread () {
197
- @ Override
198
- public void run () {
199
- // NB: this statement does not complete until after "six" has been delivered.
200
- s1 .wrap (concat .subscribe (aObserver ));
201
- }
202
- };
203
- t .start ();
204
- try {
205
- //Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once.
206
- callOnce .await ();
207
- // NB: This statement has no effect, since s1 cannot possibly
208
- // wrap anything until "six" has been delivered, which cannot
209
- // happen until we okToContinue.countDown()
210
- s1 .unsubscribe ();
211
- //Unblock the observable to continue.
212
- okToContinue .countDown ();
213
- w1 .t .join ();
214
- w2 .t .join ();
215
- } catch (Exception e ) {
216
- e .printStackTrace ();
217
- fail (e .getMessage ());
218
- }
219
-
220
- InOrder inOrder = inOrder (aObserver );
221
- inOrder .verify (aObserver , times (1 )).onNext ("one" );
222
- inOrder .verify (aObserver , times (1 )).onNext ("two" );
223
- inOrder .verify (aObserver , times (1 )).onNext ("three" );
224
- inOrder .verify (aObserver , times (1 )).onNext ("four" );
225
- // NB: you might hope that five and six are not delivered, but see above.
226
- inOrder .verify (aObserver , times (1 )).onNext ("five" );
227
- inOrder .verify (aObserver , times (1 )).onNext ("six" );
228
- inOrder .verify (aObserver , times (1 )).onCompleted ();
229
-
230
- }
231
-
232
- @ Test
233
- public void testMergeObservableOfObservables () {
183
+ public void testConcatObservableOfObservables () {
234
184
@ SuppressWarnings ("unchecked" )
235
185
Observer <String > observer = mock (Observer .class );
236
186
@@ -260,8 +210,10 @@ public void unsubscribe() {
260
210
}
261
211
262
212
});
263
- Observable <String > concat = Observable .create (concat (observableOfObservables ));
213
+ Observable <String > concat = Observable .create (concat (observableOfObservables ));
214
+
264
215
concat .subscribe (observer );
216
+
265
217
verify (observer , times (7 )).onNext (anyString ());
266
218
}
267
219
@@ -454,7 +406,141 @@ public void testBlockedObservableOfObservables() {
454
406
verify (observer , times (1 )).onNext ("4" );
455
407
verify (observer , times (1 )).onNext ("6" );
456
408
}
409
+
410
+ @ Test
411
+ public void testConcatConcurrentWithInfinity () {
412
+ final TestObservable <String > w1 = new TestObservable <String >("one" , "two" , "three" );
413
+ //This observable will send "hello" MAX_VALUE time.
414
+ final TestObservable <String > w2 = new TestObservable <String >("hello" , Integer .MAX_VALUE );
457
415
416
+ @ SuppressWarnings ("unchecked" )
417
+ Observer <String > aObserver = mock (Observer .class );
418
+ @ SuppressWarnings ("unchecked" )
419
+ TestObservable <Observable <String >> observableOfObservables = new TestObservable <Observable <String >>(w1 , w2 );
420
+ Func1 <Observer <String >, Subscription > concatF = concat (observableOfObservables );
421
+
422
+ Observable <String > concat = Observable .create (concatF );
423
+
424
+ concat .take (50 ).subscribe (aObserver );
425
+
426
+ //Wait for the thread to start up.
427
+ try {
428
+ Thread .sleep (25 );
429
+ w1 .t .join ();
430
+ w2 .t .join ();
431
+ } catch (InterruptedException e ) {
432
+ // TODO Auto-generated catch block
433
+ e .printStackTrace ();
434
+ }
435
+
436
+ InOrder inOrder = inOrder (aObserver );
437
+ inOrder .verify (aObserver , times (1 )).onNext ("one" );
438
+ inOrder .verify (aObserver , times (1 )).onNext ("two" );
439
+ inOrder .verify (aObserver , times (1 )).onNext ("three" );
440
+ inOrder .verify (aObserver , times (47 )).onNext ("hello" );
441
+ verify (aObserver , times (1 )).onCompleted ();
442
+ verify (aObserver , never ()).onError (any (Exception .class ));
443
+
444
+ }
445
+
446
+
447
+ /**
448
+ * The outer observable is running on the same thread and subscribe() in this case is a blocking call. Calling unsubscribe() is no-op because the sequence is complete.
449
+ */
450
+ @ Test
451
+ public void testConcatUnsubscribe () {
452
+ final CountDownLatch callOnce = new CountDownLatch (1 );
453
+ final CountDownLatch okToContinue = new CountDownLatch (1 );
454
+ final TestObservable <String > w1 = new TestObservable <String >("one" , "two" , "three" );
455
+ final TestObservable <String > w2 = new TestObservable <String >(callOnce , okToContinue , "four" , "five" , "six" );
456
+
457
+ @ SuppressWarnings ("unchecked" )
458
+ final Observer <String > aObserver = mock (Observer .class );
459
+ @ SuppressWarnings ("unchecked" )
460
+ final Observable <String > concat = Observable .create (concat (w1 , w2 ));
461
+ final AtomicObservableSubscription s1 = new AtomicObservableSubscription ();
462
+ Thread t = new Thread () {
463
+ @ Override
464
+ public void run () {
465
+ // NB: this statement does not complete until after "six" has been delivered.
466
+ s1 .wrap (concat .subscribe (aObserver ));
467
+ }
468
+ };
469
+ t .start ();
470
+ try {
471
+ //Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once.
472
+ callOnce .await ();
473
+ // NB: This statement has no effect, since s1 cannot possibly
474
+ // wrap anything until "six" has been delivered, which cannot
475
+ // happen until we okToContinue.countDown()
476
+ s1 .unsubscribe ();
477
+ //Unblock the observable to continue.
478
+ okToContinue .countDown ();
479
+ w1 .t .join ();
480
+ w2 .t .join ();
481
+ } catch (Exception e ) {
482
+ e .printStackTrace ();
483
+ fail (e .getMessage ());
484
+ }
485
+
486
+ InOrder inOrder = inOrder (aObserver );
487
+ inOrder .verify (aObserver , times (1 )).onNext ("one" );
488
+ inOrder .verify (aObserver , times (1 )).onNext ("two" );
489
+ inOrder .verify (aObserver , times (1 )).onNext ("three" );
490
+ inOrder .verify (aObserver , times (1 )).onNext ("four" );
491
+ // NB: you might hope that five and six are not delivered, but see above.
492
+ inOrder .verify (aObserver , times (1 )).onNext ("five" );
493
+ inOrder .verify (aObserver , times (1 )).onNext ("six" );
494
+ inOrder .verify (aObserver , times (1 )).onCompleted ();
495
+
496
+ }
497
+
498
+ /**
499
+ * All observables will be running in different threads so subscribe() is unblocked. CountDownLatch is only used in order to call unsubscribe() in a predictable manner.
500
+ */
501
+ @ Test
502
+ public void testConcatUnsubscribeConcurrent () {
503
+ final CountDownLatch callOnce = new CountDownLatch (1 );
504
+ final CountDownLatch okToContinue = new CountDownLatch (1 );
505
+ final TestObservable <String > w1 = new TestObservable <String >("one" , "two" , "three" );
506
+ final TestObservable <String > w2 = new TestObservable <String >(callOnce , okToContinue , "four" , "five" , "six" );
507
+
508
+ @ SuppressWarnings ("unchecked" )
509
+ Observer <String > aObserver = mock (Observer .class );
510
+ @ SuppressWarnings ("unchecked" )
511
+ TestObservable <Observable <String >> observableOfObservables = new TestObservable <Observable <String >>(w1 , w2 );
512
+ Func1 <Observer <String >, Subscription > concatF = concat (observableOfObservables );
513
+
514
+ Observable <String > concat = Observable .create (concatF );
515
+
516
+ Subscription s1 = concat .subscribe (aObserver );
517
+
518
+ try {
519
+ //Block main thread to allow observable "w1" to complete and observable "w2" to call onNext exactly once.
520
+ callOnce .await ();
521
+ //"four" from w2 has been processed by onNext()
522
+ s1 .unsubscribe ();
523
+ //"five" and "six" will NOT be processed by onNext()
524
+ //Unblock the observable to continue.
525
+ okToContinue .countDown ();
526
+ w1 .t .join ();
527
+ w2 .t .join ();
528
+ } catch (Exception e ) {
529
+ e .printStackTrace ();
530
+ fail (e .getMessage ());
531
+ }
532
+
533
+ InOrder inOrder = inOrder (aObserver );
534
+ inOrder .verify (aObserver , times (1 )).onNext ("one" );
535
+ inOrder .verify (aObserver , times (1 )).onNext ("two" );
536
+ inOrder .verify (aObserver , times (1 )).onNext ("three" );
537
+ inOrder .verify (aObserver , times (1 )).onNext ("four" );
538
+ inOrder .verify (aObserver , never ()).onNext ("five" );
539
+ inOrder .verify (aObserver , never ()).onNext ("six" );
540
+ verify (aObserver , never ()).onCompleted ();
541
+ verify (aObserver , never ()).onError (any (Exception .class ));
542
+ }
543
+
458
544
private static class TestObservable <T > extends Observable <T > {
459
545
460
546
private final Subscription s = new Subscription () {
@@ -471,32 +557,48 @@ public void unsubscribe() {
471
557
private boolean subscribed = true ;
472
558
private final CountDownLatch once ;
473
559
private final CountDownLatch okToContinue ;
474
-
560
+ private final T seed ;
561
+ private final int size ;
562
+
475
563
public TestObservable (T ... values ) {
476
564
this (null , null , values );
477
565
}
478
566
479
567
public TestObservable (CountDownLatch once , CountDownLatch okToContinue , T ... values ) {
480
568
this .values = Arrays .asList (values );
569
+ this .size = this .values .size ();
481
570
this .once = once ;
482
571
this .okToContinue = okToContinue ;
572
+ this .seed = null ;
483
573
}
484
574
575
+ public TestObservable (T seed , int size ) {
576
+ values = null ;
577
+ once = null ;
578
+ okToContinue = null ;
579
+ this .seed = seed ;
580
+ this .size = size ;
581
+ }
582
+
583
+
485
584
@ Override
486
585
public Subscription subscribe (final Observer <T > observer ) {
487
586
t = new Thread (new Runnable () {
488
587
489
588
@ Override
490
589
public void run () {
491
590
try {
492
- while (count < values .size () && subscribed ) {
493
- observer .onNext (values .get (count ));
591
+ while (count < size && subscribed ) {
592
+ if (null != values )
593
+ observer .onNext (values .get (count ));
594
+ else
595
+ observer .onNext (seed );
494
596
count ++;
495
597
//Unblock the main thread to call unsubscribe.
496
598
if (null != once )
497
599
once .countDown ();
498
600
//Block until the main thread has called unsubscribe.
499
- if (null != once )
601
+ if (null != okToContinue )
500
602
okToContinue .await ();
501
603
}
502
604
if (subscribed )
0 commit comments