Skip to content

Commit 7563d80

Browse files
akarnokdakarnokd
authored andcommitted
Few adjustments.
1 parent b02e572 commit 7563d80

File tree

3 files changed

+78
-25
lines changed

3 files changed

+78
-25
lines changed

src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -123,18 +123,19 @@ public boolean offer(final E e) {
123123
}
124124
// local load of field to avoid repeated loads after volatile reads
125125
final E[] lElementBuffer = buffer;
126+
final long offset = calcElementOffset(producerIndex);
126127
if (producerIndex >= producerLookAhead) {
127-
if (null != lvElement(lElementBuffer, calcElementOffset(producerIndex + lookAheadStep))) {// LoadLoad
128+
if (null == lvElement(lElementBuffer, calcElementOffset(producerIndex + lookAheadStep))) {// LoadLoad
129+
producerLookAhead = producerIndex + lookAheadStep;
130+
}
131+
else if (null != lvElement(lElementBuffer, offset)){
128132
return false;
129133
}
130-
producerLookAhead = producerIndex + lookAheadStep;
131134
}
132-
long offset = calcElementOffset(producerIndex);
133135
producerIndex++; // do increment here so the ordered store give both a barrier
134136
soElement(lElementBuffer, offset, e);// StoreStore
135137
return true;
136-
}
137-
138+
}
138139
/**
139140
* {@inheritDoc}
140141
* <p>

src/main/java/rx/observers/TestSubscriber.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,9 +227,9 @@ public void awaitTerminalEvent() {
227227
* @throws RuntimeException
228228
* if the Subscriber is interrupted before the Observable is able to complete
229229
*/
230-
public void awaitTerminalEvent(long timeout, TimeUnit unit) {
230+
public boolean awaitTerminalEvent(long timeout, TimeUnit unit) {
231231
try {
232-
latch.await(timeout, unit);
232+
return latch.await(timeout, unit);
233233
} catch (InterruptedException e) {
234234
throw new RuntimeException("Interrupted", e);
235235
}

src/test/java/rx/internal/operators/OperatorRetryTest.java

Lines changed: 70 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,33 @@
1515
*/
1616
package rx.internal.operators;
1717

18-
import static org.junit.Assert.*;
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.fail;
1920
import static org.mockito.Matchers.any;
2021
import static org.mockito.Mockito.*;
2122

22-
import java.util.concurrent.*;
23-
import java.util.concurrent.atomic.*;
23+
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.ExecutorService;
25+
import java.util.concurrent.Executors;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
import java.util.concurrent.atomic.AtomicLong;
2430

2531
import org.junit.Test;
26-
import org.mockito.*;
32+
import org.mockito.InOrder;
33+
import org.mockito.Mockito;
2734

28-
import rx.*;
35+
import rx.Observable;
2936
import rx.Observable.OnSubscribe;
30-
import rx.functions.*;
37+
import rx.Observer;
38+
import rx.Producer;
39+
import rx.Subscriber;
40+
import rx.Subscription;
41+
import rx.functions.Action0;
42+
import rx.functions.Action1;
43+
import rx.functions.Func1;
44+
import rx.functions.Func2;
3145
import rx.internal.util.RxRingBuffer;
3246
import rx.observables.GroupedObservable;
3347
import rx.observers.TestSubscriber;
@@ -409,15 +423,20 @@ public void request(long n) {
409423
if (i < numFailures) {
410424
o.onNext("beginningEveryTime");
411425
o.onError(new RuntimeException("forced failure: " + count.get()));
412-
} else
413-
if (i == numFailures) {
414-
o.onNext("beginningEveryTime");
415-
} else
416-
if (i > numFailures) {
417-
o.onNext("onSuccessOnly");
418-
o.onCompleted();
426+
req.decrementAndGet();
427+
} else {
428+
do {
429+
if (i == numFailures) {
430+
o.onNext("beginningEveryTime");
431+
} else
432+
if (i > numFailures) {
433+
o.onNext("onSuccessOnly");
434+
o.onCompleted();
435+
break;
436+
}
437+
i = count.getAndIncrement();
438+
} while (req.decrementAndGet() > 0);
419439
}
420-
req.decrementAndGet();
421440
}
422441
}
423442
});
@@ -675,15 +694,15 @@ public void testTimeoutWithRetry() {
675694
}
676695

677696
@Test(timeout = 10000)
678-
public void testRetryWithBackpressure() {
679-
for (int i = 0; i < 200; i++) {
697+
public void testRetryWithBackpressure() throws InterruptedException {
698+
final int NUM_RETRIES = RxRingBuffer.SIZE * 2;
699+
for (int i = 0; i < 400; i++) {
680700
@SuppressWarnings("unchecked")
681701
Observer<String> observer = mock(Observer.class);
682-
int NUM_RETRIES = RxRingBuffer.SIZE * 2;
683702
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));
684703
TestSubscriber<String> ts = new TestSubscriber<String>(observer);
685704
origin.retry().observeOn(Schedulers.computation()).unsafeSubscribe(ts);
686-
ts.awaitTerminalEvent();
705+
ts.awaitTerminalEvent(5, TimeUnit.SECONDS);
687706

688707
InOrder inOrder = inOrder(observer);
689708
// should have no errors
@@ -697,6 +716,39 @@ public void testRetryWithBackpressure() {
697716
inOrder.verifyNoMoreInteractions();
698717
}
699718
}
719+
@Test(timeout = 10000)
720+
public void testRetryWithBackpressureParallel() throws InterruptedException {
721+
final int NUM_RETRIES = RxRingBuffer.SIZE * 2;
722+
int ncpu = Runtime.getRuntime().availableProcessors();
723+
ExecutorService exec = Executors.newFixedThreadPool(Math.max(ncpu / 2, 1));
724+
final AtomicInteger timeouts = new AtomicInteger();
725+
int m = 300;
726+
final CountDownLatch cdl = new CountDownLatch(m);
727+
for (int i = 0; i < m; i++) {
728+
final int j = i;
729+
exec.execute(new Runnable() {
730+
@Override
731+
public void run() {
732+
try {
733+
Observable<String> origin = Observable.create(new FuncWithErrors(NUM_RETRIES));
734+
TestSubscriber<String> ts = new TestSubscriber<String>();
735+
origin.retry().observeOn(Schedulers.computation()).unsafeSubscribe(ts);
736+
if (!ts.awaitTerminalEvent(10, TimeUnit.SECONDS)) {
737+
timeouts.incrementAndGet();
738+
System.out.println(j + " | " + cdl.getCount() + " !!!");
739+
}
740+
} catch (Throwable t) {
741+
timeouts.incrementAndGet();
742+
}
743+
cdl.countDown();
744+
}
745+
});
746+
}
747+
exec.shutdown();
748+
cdl.await();
749+
assertEquals(0, timeouts.get());
750+
751+
}
700752
@Test(timeout = 3000)
701753
public void testIssue1900() throws InterruptedException {
702754
@SuppressWarnings("unchecked")

0 commit comments

Comments
 (0)