Skip to content

Commit 1b25f07

Browse files
committed
Merge pull request #2950 from davidmoten/group-by-request-overflow
OperatorGroupBy - check for request overflow and don't decrement when at Long.MAX_VALUE
2 parents 24ca4f7 + 1236f07 commit 1b25f07

File tree

2 files changed

+60
-3
lines changed

2 files changed

+60
-3
lines changed

src/main/java/rx/internal/operators/OperatorGroupBy.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ public void onError(Throwable e) {
194194
// If we already have items queued when a request comes in we vend those and decrement the outstanding request count
195195

196196
void requestFromGroupedObservable(long n, GroupState<K, T> group) {
197-
group.requested.getAndAdd(n);
197+
BackpressureUtils.getAndAddRequest(group.requested, n);
198198
if (group.count.getAndIncrement() == 0) {
199199
pollQueue(group);
200200
}
@@ -330,13 +330,19 @@ private void cleanupGroup(Object key) {
330330
private void emitItem(GroupState<K, T> groupState, Object item) {
331331
Queue<Object> q = groupState.buffer;
332332
AtomicLong keyRequested = groupState.requested;
333+
//don't need to check for requested being Long.MAX_VALUE because this
334+
//field is capped at MAX_QUEUE_SIZE
333335
REQUESTED.decrementAndGet(this);
334336
// short circuit buffering
335337
if (keyRequested != null && keyRequested.get() > 0 && (q == null || q.isEmpty())) {
336338
@SuppressWarnings("unchecked")
337339
Observer<Object> obs = (Observer<Object>)groupState.getObserver();
338340
nl.accept(obs, item);
339-
keyRequested.decrementAndGet();
341+
if (keyRequested.get() != Long.MAX_VALUE) {
342+
// best endeavours check (no CAS loop here) because we mainly care about
343+
// the initial request being Long.MAX_VALUE and that value being conserved.
344+
keyRequested.decrementAndGet();
345+
}
340346
} else {
341347
q.add(item);
342348
BUFFERED_COUNT.incrementAndGet(this);
@@ -381,7 +387,11 @@ private void drainIfPossible(GroupState<K, T> groupState) {
381387
@SuppressWarnings("unchecked")
382388
Observer<Object> obs = (Observer<Object>)groupState.getObserver();
383389
nl.accept(obs, t);
384-
groupState.requested.decrementAndGet();
390+
if (groupState.requested.get()!=Long.MAX_VALUE) {
391+
// best endeavours check (no CAS loop here) because we mainly care about
392+
// the initial request being Long.MAX_VALUE and that value being conserved.
393+
groupState.requested.decrementAndGet();
394+
}
385395
BUFFERED_COUNT.decrementAndGet(this);
386396

387397
// if we have used up all the events we requested from upstream then figure out what to ask for this time based on the empty space in the buffer

src/test/java/rx/internal/operators/OperatorGroupByTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import java.util.concurrent.ConcurrentLinkedQueue;
3535
import java.util.concurrent.CountDownLatch;
3636
import java.util.concurrent.TimeUnit;
37+
import java.util.concurrent.atomic.AtomicBoolean;
3738
import java.util.concurrent.atomic.AtomicInteger;
3839
import java.util.concurrent.atomic.AtomicReference;
3940

@@ -1454,4 +1455,50 @@ public Integer call(Integer i) {
14541455
assertEquals(Arrays.asList(e), inner1.getOnErrorEvents());
14551456
assertEquals(Arrays.asList(e), inner2.getOnErrorEvents());
14561457
}
1458+
1459+
@Test
1460+
public void testRequestOverflow() {
1461+
final AtomicBoolean completed = new AtomicBoolean(false);
1462+
Observable
1463+
.just(1, 2, 3)
1464+
// group into one group
1465+
.groupBy(new Func1<Integer, Integer>() {
1466+
@Override
1467+
public Integer call(Integer t) {
1468+
return 1;
1469+
}
1470+
})
1471+
// flatten
1472+
.concatMap(new Func1<GroupedObservable<Integer, Integer>, Observable<Integer>>() {
1473+
@Override
1474+
public Observable<Integer> call(GroupedObservable<Integer, Integer> g) {
1475+
return g;
1476+
}
1477+
})
1478+
.subscribe(new Subscriber<Integer>() {
1479+
1480+
@Override
1481+
public void onStart() {
1482+
request(2);
1483+
}
1484+
1485+
@Override
1486+
public void onCompleted() {
1487+
completed.set(true);
1488+
1489+
}
1490+
1491+
@Override
1492+
public void onError(Throwable e) {
1493+
1494+
}
1495+
1496+
@Override
1497+
public void onNext(Integer t) {
1498+
System.out.println(t);
1499+
//provoke possible request overflow
1500+
request(Long.MAX_VALUE-1);
1501+
}});
1502+
assertTrue(completed.get());
1503+
}
14571504
}

0 commit comments

Comments
 (0)