Skip to content

Commit d912922

Browse files
committed
fix decrement of consumerCapacity so doesn't decrement Long.MAX_VALUE, remove c==0 check in request method because of race condition
1 parent 2532484 commit d912922

File tree

2 files changed

+24
-8
lines changed

2 files changed

+24
-8
lines changed

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,23 @@ public void onError(Throwable e) {
235235
@Override
236236
public void onNext(T v) {
237237
if (!done) {
238-
if (consumerCapacity.get() != Long.MAX_VALUE) {
239-
consumerCapacity.decrementAndGet();
240-
}
241238
child.onNext(v);
239+
decrementConsumerCapacity();
240+
}
241+
}
242+
243+
private void decrementConsumerCapacity() {
244+
// use a CAS loop because we don't want to decrement the value
245+
// if it is Long.MAX_VALUE
246+
while (true) {
247+
long cc = consumerCapacity.get();
248+
if (cc != Long.MAX_VALUE) {
249+
if (consumerCapacity.compareAndSet(cc, cc - 1)) {
250+
break;
251+
}
252+
} else {
253+
break;
254+
}
242255
}
243256
}
244257

@@ -334,12 +347,12 @@ public void setProducer(Producer producer) {
334347

335348
@Override
336349
public void request(final long n) {
337-
long c = BackpressureUtils.getAndAddRequest(consumerCapacity, n);
350+
BackpressureUtils.getAndAddRequest(consumerCapacity, n);
338351
Producer producer = currentProducer.get();
339352
if (producer != null) {
340353
producer.request(n);
341354
} else
342-
if (c == 0 && resumeBoundary.compareAndSet(true, false)) {
355+
if (resumeBoundary.compareAndSet(true, false)) {
343356
worker.schedule(subscribeToSource);
344357
}
345358
}

src/test/java/rx/internal/operators/OperatorRetryTest.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -783,10 +783,10 @@ static <T> StringBuilder allSequenceFrequency(Map<Integer, List<T>> its) {
783783
}
784784
static <T> StringBuilder sequenceFrequency(Iterable<T> it) {
785785
StringBuilder sb = new StringBuilder();
786-
786+
787787
Object prev = null;
788788
int cnt = 0;
789-
789+
790790
for (Object curr : it) {
791791
if (sb.length() > 0) {
792792
if (!curr.equals(prev)) {
@@ -805,7 +805,10 @@ static <T> StringBuilder sequenceFrequency(Iterable<T> it) {
805805
}
806806
prev = curr;
807807
}
808-
808+
if (cnt > 1) {
809+
sb.append(" x ").append(cnt);
810+
}
811+
809812
return sb;
810813
}
811814
@Test(timeout = 3000)

0 commit comments

Comments
 (0)