19
19
import static org .junit .Assert .assertFalse ;
20
20
import static org .junit .Assert .assertTrue ;
21
21
22
+ import java .util .Arrays ;
22
23
import java .util .List ;
23
24
import java .util .Vector ;
24
25
import java .util .concurrent .ExecutionException ;
28
29
import rx .Notification ;
29
30
import rx .Observable ;
30
31
import rx .Subscriber ;
32
+ import rx .functions .Action1 ;
33
+ import rx .observers .TestSubscriber ;
34
+ import rx .schedulers .Schedulers ;
31
35
32
36
public class OperatorMaterializeTest {
33
37
34
38
@ Test
35
39
public void testMaterialize1 () {
36
- // null will cause onError to be triggered before "three" can be returned
37
- final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable ("one" , "two" , null , "three" );
40
+ // null will cause onError to be triggered before "three" can be
41
+ // returned
42
+ final TestAsyncErrorObservable o1 = new TestAsyncErrorObservable ("one" , "two" , null ,
43
+ "three" );
38
44
39
45
TestObserver Observer = new TestObserver ();
40
46
Observable <Notification <String >> m = Observable .create (o1 ).materialize ();
@@ -53,7 +59,8 @@ public void testMaterialize1() {
53
59
assertTrue (Observer .notifications .get (0 ).isOnNext ());
54
60
assertEquals ("two" , Observer .notifications .get (1 ).getValue ());
55
61
assertTrue (Observer .notifications .get (1 ).isOnNext ());
56
- assertEquals (NullPointerException .class , Observer .notifications .get (2 ).getThrowable ().getClass ());
62
+ assertEquals (NullPointerException .class , Observer .notifications .get (2 ).getThrowable ()
63
+ .getClass ());
57
64
assertTrue (Observer .notifications .get (2 ).isOnError ());
58
65
}
59
66
@@ -93,6 +100,107 @@ public void testMultipleSubscribes() throws InterruptedException, ExecutionExcep
93
100
assertEquals (3 , m .toList ().toBlocking ().toFuture ().get ().size ());
94
101
}
95
102
103
+ @ Test
104
+ public void testBackpressureOnEmptyStream () {
105
+ TestSubscriber <Notification <Integer >> ts = TestSubscriber .create (0 );
106
+ Observable .<Integer > empty ().materialize ().subscribe (ts );
107
+ ts .assertNoValues ();
108
+ ts .requestMore (1 );
109
+ ts .assertValueCount (1 );
110
+ assertTrue (ts .getOnNextEvents ().get (0 ).isOnCompleted ());
111
+ ts .assertCompleted ();
112
+ }
113
+
114
+ @ Test
115
+ public void testBackpressureNoError () {
116
+ TestSubscriber <Notification <Integer >> ts = TestSubscriber .create (0 );
117
+ Observable .just (1 , 2 , 3 ).materialize ().subscribe (ts );
118
+ ts .assertNoValues ();
119
+ ts .requestMore (1 );
120
+ ts .assertValueCount (1 );
121
+ ts .requestMore (2 );
122
+ ts .assertValueCount (3 );
123
+ ts .requestMore (1 );
124
+ ts .assertValueCount (4 );
125
+ ts .assertCompleted ();
126
+ }
127
+
128
+ @ Test
129
+ public void testBackpressureNoErrorAsync () throws InterruptedException {
130
+ TestSubscriber <Notification <Integer >> ts = TestSubscriber .create (0 );
131
+ Observable .just (1 , 2 , 3 )
132
+ .materialize ()
133
+ .subscribeOn (Schedulers .computation ())
134
+ .subscribe (ts );
135
+ Thread .sleep (100 );
136
+ ts .assertNoValues ();
137
+ ts .requestMore (1 );
138
+ Thread .sleep (100 );
139
+ ts .assertValueCount (1 );
140
+ ts .requestMore (2 );
141
+ Thread .sleep (100 );
142
+ ts .assertValueCount (3 );
143
+ ts .requestMore (1 );
144
+ Thread .sleep (100 );
145
+ ts .assertValueCount (4 );
146
+ ts .assertCompleted ();
147
+ }
148
+
149
+ @ Test
150
+ public void testBackpressureWithError () {
151
+ TestSubscriber <Notification <Integer >> ts = TestSubscriber .create (0 );
152
+ Observable .<Integer > error (new IllegalArgumentException ()).materialize ().subscribe (ts );
153
+ ts .assertNoValues ();
154
+ ts .requestMore (1 );
155
+ ts .assertValueCount (1 );
156
+ ts .assertCompleted ();
157
+ }
158
+
159
+ @ Test
160
+ public void testBackpressureWithEmissionThenError () {
161
+ TestSubscriber <Notification <Integer >> ts = TestSubscriber .create (0 );
162
+ IllegalArgumentException ex = new IllegalArgumentException ();
163
+ Observable .from (Arrays .asList (1 )).concatWith (Observable .<Integer > error (ex )).materialize ()
164
+ .subscribe (ts );
165
+ ts .assertNoValues ();
166
+ ts .requestMore (1 );
167
+ ts .assertValueCount (1 );
168
+ assertTrue (ts .getOnNextEvents ().get (0 ).hasValue ());
169
+ ts .requestMore (1 );
170
+ ts .assertValueCount (2 );
171
+ assertTrue (ts .getOnNextEvents ().get (1 ).isOnError ());
172
+ assertTrue (ex == ts .getOnNextEvents ().get (1 ).getThrowable ());
173
+ ts .assertCompleted ();
174
+ }
175
+
176
+ @ Test
177
+ public void testWithCompletionCausingError () {
178
+ TestSubscriber <Notification <Integer >> ts = TestSubscriber .create ();
179
+ final RuntimeException ex = new RuntimeException ("boo" );
180
+ Observable .<Integer >empty ().materialize ().doOnNext (new Action1 <Object >() {
181
+ @ Override
182
+ public void call (Object t ) {
183
+ throw ex ;
184
+ }
185
+ }).subscribe (ts );
186
+ ts .assertError (ex );
187
+ ts .assertNoValues ();
188
+ ts .assertTerminalEvent ();
189
+ }
190
+
191
+ @ Test
192
+ public void testUnsubscribeJustBeforeCompletionNotificationShouldPreventThatNotificationArriving () {
193
+ TestSubscriber <Notification <Integer >> ts = TestSubscriber .create (0 );
194
+ IllegalArgumentException ex = new IllegalArgumentException ();
195
+ Observable .<Integer >empty ().materialize ()
196
+ .subscribe (ts );
197
+ ts .assertNoValues ();
198
+ ts .unsubscribe ();
199
+ ts .requestMore (1 );
200
+ ts .assertNoValues ();
201
+ ts .assertUnsubscribed ();
202
+ }
203
+
96
204
private static class TestObserver extends Subscriber <Notification <String >> {
97
205
98
206
boolean onCompleted = false ;
0 commit comments