File tree Expand file tree Collapse file tree 2 files changed +71
-0
lines changed
src/test/java/rx/internal/operators Expand file tree Collapse file tree 2 files changed +71
-0
lines changed Original file line number Diff line number Diff line change 32
32
import rx .Subscriber ;
33
33
import rx .functions .Func1 ;
34
34
import rx .observers .TestSubscriber ;
35
+ import rx .schedulers .Schedulers ;
35
36
36
37
public class OperatorOnErrorReturnTest {
37
38
@@ -145,6 +146,41 @@ public String call(Throwable t1) {
145
146
verify (observer , Mockito .never ()).onNext ("three" );
146
147
verify (observer , times (1 )).onNext ("resume" );
147
148
}
149
+
150
+ @ Test
151
+ public void testBackpressure () {
152
+ TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
153
+ Observable .range (0 , 100000 )
154
+ .onErrorReturn (new Func1 <Throwable , Integer >() {
155
+
156
+ @ Override
157
+ public Integer call (Throwable t1 ) {
158
+ return 1 ;
159
+ }
160
+
161
+ })
162
+ .observeOn (Schedulers .computation ())
163
+ .map (new Func1 <Integer , Integer >() {
164
+ int c = 0 ;
165
+
166
+ @ Override
167
+ public Integer call (Integer t1 ) {
168
+ if (c ++ <= 1 ) {
169
+ // slow
170
+ try {
171
+ Thread .sleep (500 );
172
+ } catch (InterruptedException e ) {
173
+ e .printStackTrace ();
174
+ }
175
+ }
176
+ return t1 ;
177
+ }
178
+
179
+ })
180
+ .subscribe (ts );
181
+ ts .awaitTerminalEvent ();
182
+ ts .assertNoErrors ();
183
+ }
148
184
149
185
private static class TestObservable implements Observable .OnSubscribe <String > {
150
186
@@ -180,4 +216,7 @@ public void run() {
180
216
System .out .println ("done starting TestObservable thread" );
181
217
}
182
218
}
219
+
220
+
221
+
183
222
}
Original file line number Diff line number Diff line change 30
30
import rx .Observer ;
31
31
import rx .Subscriber ;
32
32
import rx .functions .Func1 ;
33
+ import rx .observers .TestSubscriber ;
34
+ import rx .schedulers .Schedulers ;
33
35
34
36
public class OperatorOnExceptionResumeNextViaObservableTest {
35
37
@@ -188,6 +190,36 @@ public String call(String s) {
188
190
verify (observer , Mockito .never ()).onError (any (Throwable .class ));
189
191
verify (observer , times (1 )).onCompleted ();
190
192
}
193
+
194
+
195
+ @ Test
196
+ public void testBackpressure () {
197
+ TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
198
+ Observable .range (0 , 100000 )
199
+ .onExceptionResumeNext (Observable .just (1 ))
200
+ .observeOn (Schedulers .computation ())
201
+ .map (new Func1 <Integer , Integer >() {
202
+ int c = 0 ;
203
+
204
+ @ Override
205
+ public Integer call (Integer t1 ) {
206
+ if (c ++ <= 1 ) {
207
+ // slow
208
+ try {
209
+ Thread .sleep (500 );
210
+ } catch (InterruptedException e ) {
211
+ e .printStackTrace ();
212
+ }
213
+ }
214
+ return t1 ;
215
+ }
216
+
217
+ })
218
+ .subscribe (ts );
219
+ ts .awaitTerminalEvent ();
220
+ ts .assertNoErrors ();
221
+ }
222
+
191
223
192
224
private static class TestObservable implements Observable .OnSubscribe <String > {
193
225
You can’t perform that action at this time.
0 commit comments