File tree Expand file tree Collapse file tree 1 file changed +30
-0
lines changed
src/test/java/rx/internal/operators Expand file tree Collapse file tree 1 file changed +30
-0
lines changed Original file line number Diff line number Diff line change 29
29
import rx .Notification ;
30
30
import rx .Observable ;
31
31
import rx .Subscriber ;
32
+ import rx .TestUtil ;
33
+ import rx .functions .Action0 ;
32
34
import rx .functions .Action1 ;
33
35
import rx .observers .TestSubscriber ;
34
36
import rx .schedulers .Schedulers ;
37
+ import rx .subjects .PublishSubject ;
35
38
36
39
public class OperatorMaterializeTest {
37
40
@@ -201,6 +204,33 @@ public void testUnsubscribeJustBeforeCompletionNotificationShouldPreventThatNoti
201
204
ts .assertUnsubscribed ();
202
205
}
203
206
207
+ @ Test
208
+ public void testConcurrency () {
209
+ for (int i = 0 ; i < 1000 ; i ++) {
210
+ final TestSubscriber <Notification <Integer >> ts = TestSubscriber .create (0 );
211
+ final PublishSubject <Integer > ps = PublishSubject .create ();
212
+ Action0 publishAction = new Action0 () {
213
+ @ Override
214
+ public void call () {
215
+ ps .onCompleted ();
216
+ }
217
+ };
218
+
219
+ Action0 requestAction = new Action0 () {
220
+ @ Override
221
+ public void call () {
222
+ ts .requestMore (1 );
223
+ }
224
+ };
225
+
226
+ ps .materialize ().subscribe (ts );
227
+ TestUtil .race (publishAction , requestAction );
228
+ ts .assertValueCount (1 );
229
+ ts .assertTerminalEvent ();
230
+ ts .assertNoErrors ();
231
+ }
232
+ }
233
+
204
234
private static class TestObserver extends Subscriber <Notification <String >> {
205
235
206
236
boolean onCompleted ;
You can’t perform that action at this time.
0 commit comments