Skip to content

Commit 5839720

Browse files
committed
Merge pull request #2332 from akarnokd/OperatorRetryTestFixAttempt
Operator retry test fix
2 parents b37c7ed + 939a3b1 commit 5839720

File tree

3 files changed

+92
-30
lines changed

3 files changed

+92
-30
lines changed

src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,18 +123,19 @@ public boolean offer(final E e) {
123123
}
124124
// local load of field to avoid repeated loads after volatile reads
125125
final E[] lElementBuffer = buffer;
126+
final long offset = calcElementOffset(producerIndex);
126127
if (producerIndex >= producerLookAhead) {
127-
if (null != lvElement(lElementBuffer, calcElementOffset(producerIndex + lookAheadStep))) {// LoadLoad
128+
if (null == lvElement(lElementBuffer, calcElementOffset(producerIndex + lookAheadStep))) {// LoadLoad
129+
producerLookAhead = producerIndex + lookAheadStep;
130+
}
131+
else if (null != lvElement(lElementBuffer, offset)){
128132
return false;
129133
}
130-
producerLookAhead = producerIndex + lookAheadStep;
131134
}
132-
long offset = calcElementOffset(producerIndex);
133135
producerIndex++; // do increment here so the ordered store give both a barrier
134136
soElement(lElementBuffer, offset, e);// StoreStore
135137
return true;
136-
}
137-
138+
}
138139
/**
139140
* {@inheritDoc}
140141
* <p>

src/main/java/rx/observers/TestSubscriber.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,9 @@
1616
package rx.observers;
1717

1818
import java.util.List;
19-
import java.util.concurrent.CountDownLatch;
20-
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.*;
2120

22-
import rx.Notification;
23-
import rx.Observer;
24-
import rx.Subscriber;
21+
import rx.*;
2522

2623
/**
2724
* A {@code TestSubscriber} is a variety of {@link Subscriber} that you can use for unit testing, to perform
@@ -229,7 +226,9 @@ public void awaitTerminalEvent() {
229226
*/
230227
public void awaitTerminalEvent(long timeout, TimeUnit unit) {
231228
try {
232-
latch.await(timeout, unit);
229+
if (!latch.await(timeout, unit)) {
230+
throw new RuntimeException(new TimeoutException());
231+
}
233232
} catch (InterruptedException e) {
234233
throw new RuntimeException("Interrupted", e);
235234
}

src/test/java/rx/internal/operators/OperatorRetryTest.java

Lines changed: 81 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,33 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.*;
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.fail;
1920
import static org.mockito.Matchers.any;
2021
import static org.mockito.Mockito.*;
2122

22-
import java.util.concurrent.*;
23-
import java.util.concurrent.atomic.*;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
import java.util.concurrent.atomic.AtomicLong;
2430

2531
import org.junit.Test;
26-
import org.mockito.*;
32+
import org.mockito.InOrder;
33+
import org.mockito.Mockito;
2734

28-
import rx.*;
35+
import rx.Observable;
2936
import rx.Observable.OnSubscribe;
30-
import rx.functions.*;
37+
import rx.Observer;
38+
import rx.Producer;
39+
import rx.Subscriber;
40+
import rx.Subscription;
41+
import rx.functions.Action0;
42+
import rx.functions.Action1;
43+
import rx.functions.Func1;
44+
import rx.functions.Func2;
3145
import rx.internal.util.RxRingBuffer;
3246
import rx.observables.GroupedObservable;
3347
import rx.observers.TestSubscriber;
@@ -404,20 +418,25 @@ public void request(long n) {
404418
}
405419
return;
406420
}
407-
if (n > 0 && req.getAndAdd(1) == 0) {
421+
if (n > 0 && req.getAndAdd(n) == 0) {
408422
int i = count.getAndIncrement();
409423
if (i < numFailures) {
410424
o.onNext("beginningEveryTime");
411425
o.onError(new RuntimeException("forced failure: " + count.get()));
412-
} else
413-
if (i == numFailures) {
414-
o.onNext("beginningEveryTime");
415-
} else
416-
if (i > numFailures) {
417-
o.onNext("onSuccessOnly");
418-
o.onCompleted();
426+
req.decrementAndGet();
427+
} else {
428+
do {
429+
if (i == numFailures) {
430+
o.onNext("beginningEveryTime");
431+
} else
432+
if (i > numFailures) {
433+
o.onNext("onSuccessOnly");
434+
o.onCompleted();
435+
break;
436+
}
437+
i = count.getAndIncrement();
438+
} while (req.decrementAndGet() > 0);
419439
}
420-
req.decrementAndGet();
421440
}
422441
}
423442
});
@@ -675,15 +694,15 @@ public void testTimeoutWithRetry() {
675694
}
676695

677696
@Test(timeout = 10000)
678-
public void testRetryWithBackpressure() {
679-
for (int i = 0; i < 200; i++) {
697+
public void testRetryWithBackpressure() throws InterruptedException {
698+
final int NUM_RETRIES = RxRingBuffer.SIZE * 2;
699+
for (int i = 0; i < 400; i++) {
680700
@SuppressWarnings("unchecked")
681701
Observer<String> observer = mock(Observer.class);
682-
int NUM_RETRIES = RxRingBuffer.SIZE * 2;
683702
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));
684703
TestSubscriber<String> ts = new TestSubscriber<String>(observer);
685704
origin.retry().observeOn(Schedulers.computation()).unsafeSubscribe(ts);
686-
ts.awaitTerminalEvent();
705+
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
687706

688707
InOrder inOrder = inOrder(observer);
689708
// should have no errors
@@ -697,6 +716,49 @@ public void testRetryWithBackpressure() {
697716
inOrder.verifyNoMoreInteractions();
698717
}
699718
}
719+
@Test(timeout = 10000)
720+
public void testRetryWithBackpressureParallel() throws InterruptedException {
721+
final int NUM_RETRIES = RxRingBuffer.SIZE * 2;
722+
int ncpu = Runtime.getRuntime().availableProcessors();
723+
ExecutorService exec = Executors.newFixedThreadPool(Math.max(ncpu / 2, 1));
724+
final AtomicInteger timeouts = new AtomicInteger();
725+
final AtomicInteger data = new AtomicInteger();
726+
int m = 2000;
727+
final CountDownLatch cdl = new CountDownLatch(m);
728+
for (int i = 0; i < m; i++) {
729+
final int j = i;
730+
exec.execute(new Runnable() {
731+
@Override
732+
public void run() {
733+
final AtomicInteger nexts = new AtomicInteger();
734+
try {
735+
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));
736+
TestSubscriber<String> ts = new TestSubscriber<String>();
737+
origin.retry().observeOn(Schedulers.computation()).unsafeSubscribe(ts);
738+
ts.awaitTerminalEvent(2, TimeUnit.SECONDS);
739+
if (ts.getOnNextEvents().size() != NUM_RETRIES + 2) {
740+
data.incrementAndGet();
741+
}
742+
if (ts.getOnErrorEvents().size() != 0) {
743+
data.incrementAndGet();
744+
}
745+
if (ts.getOnCompletedEvents().size() != 1) {
746+
data.incrementAndGet();
747+
}
748+
} catch (Throwable t) {
749+
timeouts.incrementAndGet();
750+
System.out.println(j + " | " + cdl.getCount() + " !!! " + nexts.get());
751+
}
752+
cdl.countDown();
753+
}
754+
});
755+
}
756+
exec.shutdown();
757+
cdl.await();
758+
assertEquals(0, timeouts.get());
759+
assertEquals(0, data.get());
760+
761+
}
700762
@Test(timeout = 3000)
701763
public void testIssue1900() throws InterruptedException {
702764
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)