@@ -131,8 +131,10 @@ public void testSimpleAsyncConcat() {
131
131
132
132
@ Test
133
133
public void testNestedAsyncConcatLoop () throws Throwable {
134
- for (int i = 0 ; i < 100 ; i ++) {
135
- System .out .println ("testNestedAsyncConcat >> " + i );
134
+ for (int i = 0 ; i < 500 ; i ++) {
135
+ if (i % 10 == 0 ) {
136
+ System .out .println ("testNestedAsyncConcat >> " + i );
137
+ }
136
138
testNestedAsyncConcat ();
137
139
}
138
140
}
@@ -151,6 +153,9 @@ public void testNestedAsyncConcat() throws Throwable {
151
153
152
154
final AtomicReference <Thread > parent = new AtomicReference <>();
153
155
final CountDownLatch parentHasStarted = new CountDownLatch (1 );
156
+ final CountDownLatch parentHasFinished = new CountDownLatch (1 );
157
+
158
+
154
159
Observable <Observable <String >> observableOfObservables = Observable .create (new Publisher <Observable <String >>() {
155
160
156
161
@ Override
@@ -198,6 +203,7 @@ public void run() {
198
203
} finally {
199
204
System .out .println ("Done parent Observable" );
200
205
observer .onComplete ();
206
+ parentHasFinished .countDown ();
201
207
}
202
208
}
203
209
}));
@@ -246,6 +252,15 @@ public void run() {
246
252
throw new RuntimeException ("failed waiting on threads" , e );
247
253
}
248
254
255
+ try {
256
+ // wait for the parent to complete
257
+ if (!parentHasFinished .await (5 , TimeUnit .SECONDS )) {
258
+ fail ("Parent didn't finish within the time limit" );
259
+ }
260
+ } catch (Throwable e ) {
261
+ throw new RuntimeException ("failed waiting on threads" , e );
262
+ }
263
+
249
264
inOrder .verify (observer , times (1 )).onNext ("seven" );
250
265
inOrder .verify (observer , times (1 )).onNext ("eight" );
251
266
inOrder .verify (observer , times (1 )).onNext ("nine" );
0 commit comments