Skip to content

Commit e901ffa

Browse files
Merge pull request #2589 from akarnokd/OperatorDistinctFix
Repeat/retry: fixed unbounded downstream requesting above Long.MAX_VALUE
2 parents 43a912f + a280480 commit e901ffa

File tree

2 files changed

+21
-5
lines changed

2 files changed

+21
-5
lines changed

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ public void setProducer(Producer producer) {
323323

324324
@Override
325325
public void request(final long n) {
326-
long c = consumerCapacity.getAndAdd(n);
326+
long c = BackpressureUtils.getAndAddRequest(consumerCapacity, n);
327327
Producer producer = currentProducer.get();
328328
if (producer != null) {
329329
producer.request(n);

src/test/java/rx/internal/operators/OperatorRepeatTest.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,9 @@
1818
import static org.junit.Assert.assertArrayEquals;
1919
import static org.junit.Assert.assertEquals;
2020
import static org.mockito.Matchers.any;
21-
import static org.mockito.Mockito.mock;
22-
import static org.mockito.Mockito.never;
23-
import static org.mockito.Mockito.times;
24-
import static org.mockito.Mockito.verify;
21+
import static org.mockito.Mockito.*;
2522

23+
import java.util.Arrays;
2624
import java.util.concurrent.atomic.AtomicInteger;
2725

2826
import org.junit.Test;
@@ -33,6 +31,7 @@
3331
import rx.Subscriber;
3432
import rx.exceptions.TestException;
3533
import rx.functions.Func1;
34+
import rx.observers.TestSubscriber;
3635
import rx.schedulers.Schedulers;
3736

3837
public class OperatorRepeatTest {
@@ -158,4 +157,21 @@ public void testRepeatOne() {
158157
verify(o, times(1)).onNext(any());
159158
verify(o, never()).onError(any(Throwable.class));
160159
}
160+
161+
/** Issue #2587. */
162+
@Test
163+
public void testRepeatAndDistinctUnbounded() {
164+
Observable<Integer> src = Observable.from(Arrays.asList(1, 2, 3, 4, 5))
165+
.take(3)
166+
.repeat(3)
167+
.distinct();
168+
169+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
170+
171+
src.subscribe(ts);
172+
173+
ts.assertNoErrors();
174+
ts.assertTerminalEvent();
175+
ts.assertReceivedOnNext(Arrays.asList(1, 2, 3));
176+
}
161177
}

0 commit comments

Comments
 (0)