Skip to content

Commit c7e3b2f

Browse files
committed
Fixes wrong request accounting in AbstractOnSubscribe
1 parent 9f2fc67 commit c7e3b2f

File tree

3 files changed

+54
-15
lines changed

3 files changed

+54
-15
lines changed

src/main/java/rx/internal/operators/BackpressureUtils.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@
2222
* Utility functions for use with backpressure.
2323
*
2424
*/
25-
final class BackpressureUtils {
26-
25+
public final class BackpressureUtils {
26+
/** Utility class, no instances. */
27+
private BackpressureUtils() {
28+
throw new IllegalStateException("No instances!");
29+
}
2730
/**
2831
* Adds {@code n} to {@code requested} field and returns the value prior to
2932
* addition once the addition is successful (uses CAS semantics). If
@@ -37,7 +40,7 @@ final class BackpressureUtils {
3740
* the number of requests to add to the requested count
3841
* @return requested value just prior to successful addition
3942
*/
40-
static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n) {
43+
public static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object, long n) {
4144
// add n to field but check for overflow
4245
while (true) {
4346
long current = requested.get(object);
@@ -63,7 +66,7 @@ static <T> long getAndAddRequest(AtomicLongFieldUpdater<T> requested, T object,
6366
* the number of requests to add to the requested count
6467
* @return requested value just prior to successful addition
6568
*/
66-
static <T> long getAndAddRequest(AtomicLong requested, long n) {
69+
public static <T> long getAndAddRequest(AtomicLong requested, long n) {
6770
// add n to field but check for overflow
6871
while (true) {
6972
long current = requested.get();

src/main/java/rx/observables/AbstractOnSubscribe.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -332,20 +332,22 @@ private SubscriptionProducer(SubscriptionState<T, S> state) {
332332
}
333333
@Override
334334
public void request(long n) {
335-
if (n == Long.MAX_VALUE) {
336-
for (; !state.subscriber.isUnsubscribed(); ) {
337-
if (!doNext()) {
338-
break;
339-
}
340-
}
341-
} else
342335
if (n > 0 && state.requestCount.getAndAdd(n) == 0) {
343-
if (!state.subscriber.isUnsubscribed()) {
344-
do {
336+
// fast-path
337+
if (n == Long.MAX_VALUE) {
338+
for (; !state.subscriber.isUnsubscribed(); ) {
345339
if (!doNext()) {
346340
break;
347341
}
348-
} while (state.requestCount.decrementAndGet() > 0 && !state.subscriber.isUnsubscribed());
342+
}
343+
} else {
344+
if (!state.subscriber.isUnsubscribed()) {
345+
do {
346+
if (!doNext()) {
347+
break;
348+
}
349+
} while (state.requestCount.decrementAndGet() > 0 && !state.subscriber.isUnsubscribed());
350+
}
349351
}
350352
}
351353
}

src/test/java/rx/observables/AbstractOnSubscribeTest.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@
1616

1717
package rx.observables;
1818

19-
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.*;
2020
import static org.mockito.Matchers.any;
2121
import static org.mockito.Mockito.*;
2222

2323
import java.util.*;
2424
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.atomic.AtomicReference;
2526

2627
import org.junit.Test;
2728
import org.mockito.InOrder;
@@ -503,4 +504,37 @@ public void testMissingEmission() {
503504
verify(o, never()).onNext(any(Object.class));
504505
verify(o).onError(any(IllegalStateException.class));
505506
}
507+
508+
@Test
509+
public void testCanRequestInOnNext() {
510+
AbstractOnSubscribe<Integer, Void> aos = new AbstractOnSubscribe<Integer, Void>() {
511+
@Override
512+
protected void next(SubscriptionState<Integer, Void> state) {
513+
state.onNext(1);
514+
state.onCompleted();
515+
}
516+
};
517+
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
518+
aos.toObservable().subscribe(new Subscriber<Integer>() {
519+
520+
@Override
521+
public void onCompleted() {
522+
523+
}
524+
525+
@Override
526+
public void onError(Throwable e) {
527+
exception.set(e);
528+
}
529+
530+
@Override
531+
public void onNext(Integer t) {
532+
request(1);
533+
}
534+
});
535+
if (exception.get()!=null) {
536+
exception.get().printStackTrace();
537+
}
538+
assertNull(exception.get());
539+
}
506540
}

0 commit comments

Comments
 (0)