Skip to content

Commit 43a912f

Browse files
Merge pull request #2590 from akarnokd/ZipHangFix
Zip: fixed unbounded downstream requesting above Long.MAX_VALUE
2 parents 0cecfc5 + 01e97fc commit 43a912f

File tree

2 files changed

+28
-2
lines changed

2 files changed

+28
-2
lines changed

src/main/java/rx/internal/operators/OperatorZip.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ public Subscriber<? super Observable[]> call(final Subscriber<? super R> child)
117117
return subscriber;
118118
}
119119

120+
@SuppressWarnings("rawtypes")
120121
private final class ZipSubscriber extends Subscriber<Observable[]> {
121122

122123
final Subscriber<? super R> child;
@@ -158,7 +159,8 @@ public void onNext(Observable[] observables) {
158159
}
159160

160161
private static final class ZipProducer<R> extends AtomicLong implements Producer {
161-
162+
/** */
163+
private static final long serialVersionUID = -1216676403723546796L;
162164
private Zip<R> zipper;
163165

164166
public ZipProducer(Zip<R> zipper) {
@@ -167,7 +169,7 @@ public ZipProducer(Zip<R> zipper) {
167169

168170
@Override
169171
public void request(long n) {
170-
addAndGet(n);
172+
BackpressureUtils.getAndAddRequest(this, n);
171173
// try and claim emission if no other threads are doing so
172174
zipper.tick();
173175
}
@@ -179,6 +181,7 @@ private static final class Zip<R> {
179181
private final FuncN<? extends R> zipFunction;
180182
private final CompositeSubscription childSubscription = new CompositeSubscription();
181183

184+
@SuppressWarnings("unused")
182185
volatile long counter;
183186
@SuppressWarnings("rawtypes")
184187
static final AtomicLongFieldUpdater<Zip> COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(Zip.class, "counter");

src/test/java/rx/internal/operators/OperatorZipTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1243,4 +1243,27 @@ public Integer call(Integer i1, Integer i2) {
12431243
}
12441244
assertEquals(expected, zip2.toList().toBlocking().single());
12451245
}
1246+
@Test
1247+
public void testUnboundedDownstreamOverrequesting() {
1248+
Observable<Integer> source = Observable.range(1, 2).zipWith(Observable.range(1, 2), new Func2<Integer, Integer, Integer>() {
1249+
@Override
1250+
public Integer call(Integer t1, Integer t2) {
1251+
return t1 + 10 * t2;
1252+
}
1253+
});
1254+
1255+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
1256+
@Override
1257+
public void onNext(Integer t) {
1258+
super.onNext(t);
1259+
requestMore(5);
1260+
}
1261+
};
1262+
1263+
source.subscribe(ts);
1264+
1265+
ts.assertNoErrors();
1266+
ts.assertTerminalEvent();
1267+
ts.assertReceivedOnNext(Arrays.asList(11, 22));
1268+
}
12461269
}

0 commit comments

Comments
 (0)