Skip to content

Commit 8741a21

Browse files
akarnokdakarnokd
authored andcommitted
Fixed the error function.
1 parent 7563d80 commit 8741a21

File tree

1 file changed

+16
-4
lines changed

1 file changed

+16
-4
lines changed

src/test/java/rx/internal/operators/OperatorRetryTest.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ public void request(long n) {
418418
}
419419
return;
420420
}
421-
if (n > 0 && req.getAndAdd(1) == 0) {
421+
if (n > 0 && req.getAndAdd(n) == 0) {
422422
int i = count.getAndIncrement();
423423
if (i < numFailures) {
424424
o.onNext("beginningEveryTime");
@@ -722,20 +722,31 @@ public void testRetryWithBackpressureParallel() throws InterruptedException {
722722
int ncpu = Runtime.getRuntime().availableProcessors();
723723
ExecutorService exec = Executors.newFixedThreadPool(Math.max(ncpu / 2, 1));
724724
final AtomicInteger timeouts = new AtomicInteger();
725-
int m = 300;
725+
final AtomicInteger data = new AtomicInteger();
726+
int m = 2000;
726727
final CountDownLatch cdl = new CountDownLatch(m);
727728
for (int i = 0; i < m; i++) {
728729
final int j = i;
729730
exec.execute(new Runnable() {
730731
@Override
731732
public void run() {
732733
try {
734+
final AtomicInteger nexts = new AtomicInteger();
733735
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));
734736
TestSubscriber<String> ts = new TestSubscriber<String>();
735737
origin.retry().observeOn(Schedulers.computation()).unsafeSubscribe(ts);
736-
if (!ts.awaitTerminalEvent(10, TimeUnit.SECONDS)) {
738+
if (!ts.awaitTerminalEvent(2, TimeUnit.SECONDS)) {
737739
timeouts.incrementAndGet();
738-
System.out.println(j + " | " + cdl.getCount() + " !!!");
740+
System.out.println(j + " | " + cdl.getCount() + " !!! " + nexts.get());
741+
}
742+
if (ts.getOnNextEvents().size() != NUM_RETRIES + 2) {
743+
data.incrementAndGet();
744+
}
745+
if (ts.getOnErrorEvents().size() != 0) {
746+
data.incrementAndGet();
747+
}
748+
if (ts.getOnCompletedEvents().size() != 1) {
749+
data.incrementAndGet();
739750
}
740751
} catch (Throwable t) {
741752
timeouts.incrementAndGet();
@@ -747,6 +758,7 @@ public void run() {
747758
exec.shutdown();
748759
cdl.await();
749760
assertEquals(0, timeouts.get());
761+
assertEquals(0, data.get());
750762

751763
}
752764
@Test(timeout = 3000)

0 commit comments

Comments
 (0)