Skip to content

Commit dcda8a8

Browse files
committed
revert synchronization on consumerCapacity and adjust static helper method
1 parent 8ac6bec commit dcda8a8

File tree

2 files changed

+18
-21
lines changed

2 files changed

+18
-21
lines changed

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -315,12 +315,10 @@ public void onError(Throwable e) {
315315
@Override
316316
public void onNext(Object t) {
317317
if (!isLocked.get() && !child.isUnsubscribed()) {
318-
synchronized(consumerCapacity) {
319-
if (consumerCapacity.get() > 0) {
320-
worker.schedule(subscribeToSource);
321-
} else {
322-
resumeBoundary.compareAndSet(false, true);
323-
}
318+
if (consumerCapacity.get() > 0) {
319+
worker.schedule(subscribeToSource);
320+
} else {
321+
resumeBoundary.compareAndSet(false, true);
324322
}
325323
}
326324
}
@@ -337,16 +335,13 @@ public void setProducer(Producer producer) {
337335

338336
@Override
339337
public void request(final long n) {
340-
long c;
341-
synchronized(consumerCapacity) {
342-
c = BackpressureUtils.getAndAddRequest(consumerCapacity, n);
343-
Producer producer = currentProducer.get();
344-
if (producer != null) {
345-
producer.request(n);
346-
} else
347-
if (c == 0 && resumeBoundary.compareAndSet(true, false)) {
348-
worker.schedule(subscribeToSource);
349-
}
338+
long c = BackpressureUtils.getAndAddRequest(consumerCapacity, n);
339+
Producer producer = currentProducer.get();
340+
if (producer != null) {
341+
producer.request(n);
342+
} else
343+
if (c == 0 && resumeBoundary.compareAndSet(true, false)) {
344+
worker.schedule(subscribeToSource);
350345
}
351346
}
352347
});

src/test/java/rx/internal/operators/OperatorRetryTest.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -706,7 +706,7 @@ public void testRetryWithBackpressure() throws InterruptedException {
706706
}
707707
}
708708

709-
@Test(timeout = 150000)
709+
@Test(timeout = 15000)
710710
public void testRetryWithBackpressureParallel() throws InterruptedException {
711711
final int NUM_RETRIES = RxRingBuffer.SIZE * 2;
712712
int ncpu = Runtime.getRuntime().availableProcessors();
@@ -738,7 +738,6 @@ public void run() {
738738
}
739739
if (ts.getOnNextEvents().size() != NUM_RETRIES + 2) {
740740
data.put(j, ts.getOnNextEvents());
741-
System.out.println(j+"->"+ts.getOnNextEvents());
742741
}
743742
} catch (Throwable t) {
744743
timeouts.incrementAndGet();
@@ -784,10 +783,10 @@ static <T> StringBuilder allSequenceFrequency(Map<Integer, List<T>> its) {
784783
}
785784
static <T> StringBuilder sequenceFrequency(Iterable<T> it) {
786785
StringBuilder sb = new StringBuilder();
787-
786+
788787
Object prev = null;
789788
int cnt = 0;
790-
789+
791790
for (Object curr : it) {
792791
if (sb.length() > 0) {
793792
if (!curr.equals(prev)) {
@@ -806,7 +805,10 @@ static <T> StringBuilder sequenceFrequency(Iterable<T> it) {
806805
}
807806
prev = curr;
808807
}
809-
808+
if (cnt > 1) {
809+
sb.append(" x ").append(cnt);
810+
}
811+
810812
return sb;
811813
}
812814
@Test(timeout = 3000)

0 commit comments

Comments
 (0)