Skip to content

Commit 61e6eb9

Browse files
committed
1. Update javadoc on subscribe() blocking if observables are running on the same thread.
2. Add test case with infinity observable. 3. Add test case to unsubscribe() with all observables running in different thread (non-blocking).
1 parent f2e4e81 commit 61e6eb9

File tree

1 file changed

+159
-57
lines changed

1 file changed

+159
-57
lines changed

rxjava-core/src/main/java/rx/operators/OperationConcat.java

Lines changed: 159 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
import java.util.concurrent.atomic.AtomicBoolean;
2626
import java.util.concurrent.atomic.AtomicReference;
2727

28-
import org.junit.Assert;
29-
import org.junit.Before;
3028
import org.junit.Test;
3129

3230
import org.mockito.InOrder;
@@ -58,7 +56,7 @@ public final class OperationConcat {
5856
* <p/>
5957
*
6058
* Beware that concat(o1,o2).subscribe() is a blocking call from
61-
* which it is impossible to unsubscribe.
59+
* which it is impossible to unsubscribe if observables are running on same thread.
6260
*
6361
* @param sequences An observable sequence of elements to project.
6462
* @return An observable sequence whose elements are the result of combining the output from the list of Observables.
@@ -182,55 +180,7 @@ public void testConcatWithList() {
182180
}
183181

184182
@Test
185-
public void testConcatUnsubscribe() {
186-
final CountDownLatch callOnce = new CountDownLatch(1);
187-
final CountDownLatch okToContinue = new CountDownLatch(1);
188-
final TestObservable<String> w1 = new TestObservable<String>("one", "two", "three");
189-
final TestObservable<String> w2 = new TestObservable<String>(callOnce, okToContinue, "four", "five", "six");
190-
191-
@SuppressWarnings("unchecked")
192-
final Observer<String> aObserver = mock(Observer.class);
193-
@SuppressWarnings("unchecked")
194-
final Observable<String> concat = Observable.create(concat(w1, w2));
195-
final AtomicObservableSubscription s1 = new AtomicObservableSubscription();
196-
Thread t = new Thread() {
197-
@Override
198-
public void run() {
199-
// NB: this statement does not complete until after "six" has been delivered.
200-
s1.wrap(concat.subscribe(aObserver));
201-
}
202-
};
203-
t.start();
204-
try {
205-
//Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once.
206-
callOnce.await();
207-
// NB: This statement has no effect, since s1 cannot possibly
208-
// wrap anything until "six" has been delivered, which cannot
209-
// happen until we okToContinue.countDown()
210-
s1.unsubscribe();
211-
//Unblock the observable to continue.
212-
okToContinue.countDown();
213-
w1.t.join();
214-
w2.t.join();
215-
} catch (Exception e) {
216-
e.printStackTrace();
217-
fail(e.getMessage());
218-
}
219-
220-
InOrder inOrder = inOrder(aObserver);
221-
inOrder.verify(aObserver, times(1)).onNext("one");
222-
inOrder.verify(aObserver, times(1)).onNext("two");
223-
inOrder.verify(aObserver, times(1)).onNext("three");
224-
inOrder.verify(aObserver, times(1)).onNext("four");
225-
// NB: you might hope that five and six are not delivered, but see above.
226-
inOrder.verify(aObserver, times(1)).onNext("five");
227-
inOrder.verify(aObserver, times(1)).onNext("six");
228-
inOrder.verify(aObserver, times(1)).onCompleted();
229-
230-
}
231-
232-
@Test
233-
public void testMergeObservableOfObservables() {
183+
public void testConcatObservableOfObservables() {
234184
@SuppressWarnings("unchecked")
235185
Observer<String> observer = mock(Observer.class);
236186

@@ -260,8 +210,10 @@ public void unsubscribe() {
260210
}
261211

262212
});
263-
Observable<String> concat = Observable.create(concat(observableOfObservables));
213+
Observable<String> concat = Observable.create(concat(observableOfObservables));
214+
264215
concat.subscribe(observer);
216+
265217
verify(observer, times(7)).onNext(anyString());
266218
}
267219

@@ -454,7 +406,141 @@ public void testBlockedObservableOfObservables() {
454406
verify(observer, times(1)).onNext("4");
455407
verify(observer, times(1)).onNext("6");
456408
}
409+
410+
@Test
411+
public void testConcatConcurrentWithInfinity() {
412+
final TestObservable<String> w1 = new TestObservable<String>("one", "two", "three");
413+
//This observable will send "hello" MAX_VALUE time.
414+
final TestObservable<String> w2 = new TestObservable<String>("hello", Integer.MAX_VALUE);
457415

416+
@SuppressWarnings("unchecked")
417+
Observer<String> aObserver = mock(Observer.class);
418+
@SuppressWarnings("unchecked")
419+
TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(w1, w2);
420+
Func1<Observer<String>, Subscription> concatF = concat(observableOfObservables);
421+
422+
Observable<String> concat = Observable.create(concatF);
423+
424+
concat.take(50).subscribe(aObserver);
425+
426+
//Wait for the thread to start up.
427+
try {
428+
Thread.sleep(25);
429+
w1.t.join();
430+
w2.t.join();
431+
} catch (InterruptedException e) {
432+
// TODO Auto-generated catch block
433+
e.printStackTrace();
434+
}
435+
436+
InOrder inOrder = inOrder(aObserver);
437+
inOrder.verify(aObserver, times(1)).onNext("one");
438+
inOrder.verify(aObserver, times(1)).onNext("two");
439+
inOrder.verify(aObserver, times(1)).onNext("three");
440+
inOrder.verify(aObserver, times(47)).onNext("hello");
441+
verify(aObserver, times(1)).onCompleted();
442+
verify(aObserver, never()).onError(any(Exception.class));
443+
444+
}
445+
446+
447+
/**
448+
* The outer observable is running on the same thread and subscribe() in this case is a blocking call. Calling unsubscribe() is no-op because the sequence is complete.
449+
*/
450+
@Test
451+
public void testConcatUnsubscribe() {
452+
final CountDownLatch callOnce = new CountDownLatch(1);
453+
final CountDownLatch okToContinue = new CountDownLatch(1);
454+
final TestObservable<String> w1 = new TestObservable<String>("one", "two", "three");
455+
final TestObservable<String> w2 = new TestObservable<String>(callOnce, okToContinue, "four", "five", "six");
456+
457+
@SuppressWarnings("unchecked")
458+
final Observer<String> aObserver = mock(Observer.class);
459+
@SuppressWarnings("unchecked")
460+
final Observable<String> concat = Observable.create(concat(w1, w2));
461+
final AtomicObservableSubscription s1 = new AtomicObservableSubscription();
462+
Thread t = new Thread() {
463+
@Override
464+
public void run() {
465+
// NB: this statement does not complete until after "six" has been delivered.
466+
s1.wrap(concat.subscribe(aObserver));
467+
}
468+
};
469+
t.start();
470+
try {
471+
//Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once.
472+
callOnce.await();
473+
// NB: This statement has no effect, since s1 cannot possibly
474+
// wrap anything until "six" has been delivered, which cannot
475+
// happen until we okToContinue.countDown()
476+
s1.unsubscribe();
477+
//Unblock the observable to continue.
478+
okToContinue.countDown();
479+
w1.t.join();
480+
w2.t.join();
481+
} catch (Exception e) {
482+
e.printStackTrace();
483+
fail(e.getMessage());
484+
}
485+
486+
InOrder inOrder = inOrder(aObserver);
487+
inOrder.verify(aObserver, times(1)).onNext("one");
488+
inOrder.verify(aObserver, times(1)).onNext("two");
489+
inOrder.verify(aObserver, times(1)).onNext("three");
490+
inOrder.verify(aObserver, times(1)).onNext("four");
491+
// NB: you might hope that five and six are not delivered, but see above.
492+
inOrder.verify(aObserver, times(1)).onNext("five");
493+
inOrder.verify(aObserver, times(1)).onNext("six");
494+
inOrder.verify(aObserver, times(1)).onCompleted();
495+
496+
}
497+
498+
/**
499+
* All observables will be running in different threads so subscribe() is unblocked. CountDownLatch is only used in order to call unsubscribe() in a predictable manner.
500+
*/
501+
@Test
502+
public void testConcatUnsubscribeConcurrent() {
503+
final CountDownLatch callOnce = new CountDownLatch(1);
504+
final CountDownLatch okToContinue = new CountDownLatch(1);
505+
final TestObservable<String> w1 = new TestObservable<String>("one", "two", "three");
506+
final TestObservable<String> w2 = new TestObservable<String>(callOnce, okToContinue, "four", "five", "six");
507+
508+
@SuppressWarnings("unchecked")
509+
Observer<String> aObserver = mock(Observer.class);
510+
@SuppressWarnings("unchecked")
511+
TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(w1, w2);
512+
Func1<Observer<String>, Subscription> concatF = concat(observableOfObservables);
513+
514+
Observable<String> concat = Observable.create(concatF);
515+
516+
Subscription s1 = concat.subscribe(aObserver);
517+
518+
try {
519+
//Block main thread to allow observable "w1" to complete and observable "w2" to call onNext exactly once.
520+
callOnce.await();
521+
//"four" from w2 has been processed by onNext()
522+
s1.unsubscribe();
523+
//"five" and "six" will NOT be processed by onNext()
524+
//Unblock the observable to continue.
525+
okToContinue.countDown();
526+
w1.t.join();
527+
w2.t.join();
528+
} catch (Exception e) {
529+
e.printStackTrace();
530+
fail(e.getMessage());
531+
}
532+
533+
InOrder inOrder = inOrder(aObserver);
534+
inOrder.verify(aObserver, times(1)).onNext("one");
535+
inOrder.verify(aObserver, times(1)).onNext("two");
536+
inOrder.verify(aObserver, times(1)).onNext("three");
537+
inOrder.verify(aObserver, times(1)).onNext("four");
538+
inOrder.verify(aObserver, never()).onNext("five");
539+
inOrder.verify(aObserver, never()).onNext("six");
540+
verify(aObserver, never()).onCompleted();
541+
verify(aObserver, never()).onError(any(Exception.class));
542+
}
543+
458544
private static class TestObservable<T> extends Observable<T> {
459545

460546
private final Subscription s = new Subscription() {
@@ -471,32 +557,48 @@ public void unsubscribe() {
471557
private boolean subscribed = true;
472558
private final CountDownLatch once;
473559
private final CountDownLatch okToContinue;
474-
560+
private final T seed;
561+
private final int size;
562+
475563
public TestObservable(T... values) {
476564
this(null, null, values);
477565
}
478566

479567
public TestObservable(CountDownLatch once, CountDownLatch okToContinue, T... values) {
480568
this.values = Arrays.asList(values);
569+
this.size = this.values.size();
481570
this.once = once;
482571
this.okToContinue = okToContinue;
572+
this.seed = null;
483573
}
484574

575+
public TestObservable(T seed, int size) {
576+
values = null;
577+
once = null;
578+
okToContinue = null;
579+
this.seed = seed;
580+
this.size = size;
581+
}
582+
583+
485584
@Override
486585
public Subscription subscribe(final Observer<T> observer) {
487586
t = new Thread(new Runnable() {
488587

489588
@Override
490589
public void run() {
491590
try {
492-
while (count < values.size() && subscribed) {
493-
observer.onNext(values.get(count));
591+
while (count < size && subscribed) {
592+
if (null != values)
593+
observer.onNext(values.get(count));
594+
else
595+
observer.onNext(seed);
494596
count++;
495597
//Unblock the main thread to call unsubscribe.
496598
if (null != once)
497599
once.countDown();
498600
//Block until the main thread has called unsubscribe.
499-
if (null != once)
601+
if (null != okToContinue)
500602
okToContinue.await();
501603
}
502604
if (subscribed)

0 commit comments

Comments
 (0)