Skip to content

Commit 49c0032

Browse files
Operator Scan Backpressure Fix
Problem 1) The initial value was being emitted before subscription which caused issues with request/producer state, particularly if filter() was used to skip that initial value and then called request(1) before the real request had been sent. Problem 2) The initial value was not accounted for by the request so it was sending 1 more value than requested. It now modifies the request to account for it. Problem 3) Redo relied upon these nuances to work. I've fixed this by using a simpler implementation that just maintains state within a map function.
1 parent 57dbf3c commit 49c0032

File tree

3 files changed

+219
-21
lines changed

3 files changed

+219
-21
lines changed

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,18 +74,24 @@ public RedoFinite(long count) {
7474

7575
@Override
7676
public Observable<?> call(Observable<? extends Notification<?>> ts) {
77-
final Notification<Long> first = count < 0 ? Notification.<Long> createOnCompleted() : Notification.createOnNext(0l);
77+
return ts.map(new Func1<Notification<?>, Notification<?>>() {
7878

79-
return ts.scan(first, new Func2<Notification<Long>, Notification<?>, Notification<Long>>() {
80-
@SuppressWarnings("unchecked")
79+
int num=0;
80+
8181
@Override
82-
public Notification<Long> call(Notification<Long> n, Notification<?> term) {
83-
final long value = n.getValue();
84-
if (value < count)
85-
return Notification.createOnNext(value + 1);
86-
else
87-
return (Notification<Long>) term;
82+
public Notification<?> call(Notification<?> terminalNotification) {
83+
if(count == 0) {
84+
return terminalNotification;
85+
}
86+
87+
num++;
88+
if(num <= count) {
89+
return Notification.createOnNext(num);
90+
} else {
91+
return terminalNotification;
92+
}
8893
}
94+
8995
}).dematerialize();
9096
}
9197
}
@@ -146,6 +152,9 @@ public static <T> Observable<T> repeat(Observable<T> source, final long count) {
146152
}
147153

148154
public static <T> Observable<T> repeat(Observable<T> source, final long count, Scheduler scheduler) {
155+
if(count == 0) {
156+
return Observable.empty();
157+
}
149158
if (count < 0)
150159
throw new IllegalArgumentException("count >= 0 expected");
151160
return repeat(source, new RedoFinite(count - 1), scheduler);

src/main/java/rx/internal/operators/OperatorScan.java

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
1820
import rx.Observable.Operator;
21+
import rx.Producer;
1922
import rx.Subscriber;
2023
import rx.exceptions.OnErrorThrowable;
2124
import rx.functions.Func2;
@@ -70,37 +73,73 @@ public OperatorScan(final Func2<R, ? super T, R> accumulator) {
7073
}
7174

7275
@Override
73-
public Subscriber<? super T> call(final Subscriber<? super R> observer) {
74-
if (initialValue != NO_INITIAL_VALUE) {
75-
observer.onNext(initialValue);
76-
}
77-
return new Subscriber<T>(observer) {
76+
public Subscriber<? super T> call(final Subscriber<? super R> child) {
77+
return new Subscriber<T>(child) {
7878
private R value = initialValue;
79+
boolean initialized = false;
7980

8081
@SuppressWarnings("unchecked")
8182
@Override
82-
public void onNext(T value) {
83+
public void onNext(T currentValue) {
84+
emitInitialValueIfNeeded(child);
85+
8386
if (this.value == NO_INITIAL_VALUE) {
8487
// if there is NO_INITIAL_VALUE then we know it is type T for both so cast T to R
85-
this.value = (R) value;
88+
this.value = (R) currentValue;
8689
} else {
8790
try {
88-
this.value = accumulator.call(this.value, value);
91+
this.value = accumulator.call(this.value, currentValue);
8992
} catch (Throwable e) {
90-
observer.onError(OnErrorThrowable.addValueAsLastCause(e, value));
93+
child.onError(OnErrorThrowable.addValueAsLastCause(e, currentValue));
9194
}
9295
}
93-
observer.onNext(this.value);
96+
child.onNext(this.value);
9497
}
9598

9699
@Override
97100
public void onError(Throwable e) {
98-
observer.onError(e);
101+
child.onError(e);
99102
}
100103

101104
@Override
102105
public void onCompleted() {
103-
observer.onCompleted();
106+
emitInitialValueIfNeeded(child);
107+
child.onCompleted();
108+
}
109+
110+
private void emitInitialValueIfNeeded(final Subscriber<? super R> child) {
111+
if (!initialized) {
112+
initialized = true;
113+
// we emit first time through if we have an initial value
114+
if (initialValue != NO_INITIAL_VALUE) {
115+
child.onNext(initialValue);
116+
}
117+
}
118+
}
119+
120+
/**
121+
* We want to adjust the requested value by subtracting 1 if we have an initial value
122+
*/
123+
@Override
124+
public void setProducer(final Producer producer) {
125+
child.setProducer(new Producer() {
126+
127+
final AtomicBoolean once = new AtomicBoolean();
128+
129+
@Override
130+
public void request(long n) {
131+
if (once.compareAndSet(false, true)) {
132+
if (initialValue == NO_INITIAL_VALUE) {
133+
producer.request(n);
134+
} else {
135+
producer.request(n - 1);
136+
}
137+
} else {
138+
// pass-thru after first time
139+
producer.request(n);
140+
}
141+
}
142+
});
104143
}
105144
};
106145
}

src/test/java/rx/internal/operators/OperatorScanTest.java

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.*;
1819
import static org.mockito.Matchers.any;
1920
import static org.mockito.Matchers.anyInt;
2021
import static org.mockito.Matchers.anyString;
@@ -23,13 +24,21 @@
2324
import static org.mockito.Mockito.times;
2425
import static org.mockito.Mockito.verify;
2526

27+
import java.util.concurrent.atomic.AtomicInteger;
28+
2629
import org.junit.Before;
2730
import org.junit.Test;
2831
import org.mockito.MockitoAnnotations;
2932

3033
import rx.Observable;
3134
import rx.Observer;
35+
import rx.Subscriber;
36+
import rx.functions.Action1;
37+
import rx.functions.Func1;
3238
import rx.functions.Func2;
39+
import rx.internal.util.RxRingBuffer;
40+
import rx.observers.TestSubscriber;
41+
import rx.schedulers.Schedulers;
3342

3443
public class OperatorScanTest {
3544

@@ -116,4 +125,145 @@ public Integer call(Integer t1, Integer t2) {
116125
verify(observer, times(1)).onCompleted();
117126
verify(observer, never()).onError(any(Throwable.class));
118127
}
128+
129+
@Test
130+
public void shouldNotEmitUntilAfterSubscription() {
131+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
132+
Observable.range(1, 100).scan(0, new Func2<Integer, Integer, Integer>() {
133+
134+
@Override
135+
public Integer call(Integer t1, Integer t2) {
136+
return t1 + t2;
137+
}
138+
139+
}).filter(new Func1<Integer, Boolean>() {
140+
141+
@Override
142+
public Boolean call(Integer t1) {
143+
// this will cause request(1) when 0 is emitted
144+
return t1 > 0;
145+
}
146+
147+
}).subscribe(ts);
148+
149+
assertEquals(100, ts.getOnNextEvents().size());
150+
}
151+
152+
@Test
153+
public void testBackpressureWithInitialValue() {
154+
final AtomicInteger count = new AtomicInteger();
155+
Observable.range(1, 100)
156+
.scan(0, new Func2<Integer, Integer, Integer>() {
157+
158+
@Override
159+
public Integer call(Integer t1, Integer t2) {
160+
return t1 + t2;
161+
}
162+
163+
})
164+
.subscribe(new Subscriber<Integer>() {
165+
166+
@Override
167+
public void onStart() {
168+
request(10);
169+
}
170+
171+
@Override
172+
public void onCompleted() {
173+
174+
}
175+
176+
@Override
177+
public void onError(Throwable e) {
178+
fail(e.getMessage());
179+
e.printStackTrace();
180+
}
181+
182+
@Override
183+
public void onNext(Integer t) {
184+
count.incrementAndGet();
185+
}
186+
187+
});
188+
189+
// we only expect to receive 10 since we request(10)
190+
assertEquals(10, count.get());
191+
}
192+
193+
@Test
194+
public void testBackpressureWithoutInitialValue() {
195+
final AtomicInteger count = new AtomicInteger();
196+
Observable.range(1, 100)
197+
.scan(new Func2<Integer, Integer, Integer>() {
198+
199+
@Override
200+
public Integer call(Integer t1, Integer t2) {
201+
return t1 + t2;
202+
}
203+
204+
})
205+
.subscribe(new Subscriber<Integer>() {
206+
207+
@Override
208+
public void onStart() {
209+
request(10);
210+
}
211+
212+
@Override
213+
public void onCompleted() {
214+
215+
}
216+
217+
@Override
218+
public void onError(Throwable e) {
219+
fail(e.getMessage());
220+
e.printStackTrace();
221+
}
222+
223+
@Override
224+
public void onNext(Integer t) {
225+
count.incrementAndGet();
226+
}
227+
228+
});
229+
230+
// we only expect to receive 10 since we request(10)
231+
assertEquals(10, count.get());
232+
}
233+
234+
@Test
235+
public void testNoBackpressureWithInitialValue() {
236+
final AtomicInteger count = new AtomicInteger();
237+
Observable.range(1, 100)
238+
.scan(0, new Func2<Integer, Integer, Integer>() {
239+
240+
@Override
241+
public Integer call(Integer t1, Integer t2) {
242+
return t1 + t2;
243+
}
244+
245+
})
246+
.subscribe(new Subscriber<Integer>() {
247+
248+
@Override
249+
public void onCompleted() {
250+
251+
}
252+
253+
@Override
254+
public void onError(Throwable e) {
255+
fail(e.getMessage());
256+
e.printStackTrace();
257+
}
258+
259+
@Override
260+
public void onNext(Integer t) {
261+
count.incrementAndGet();
262+
}
263+
264+
});
265+
266+
// we only expect to receive 101 as we'll receive all 100 + the initial value
267+
assertEquals(101, count.get());
268+
}
119269
}

0 commit comments

Comments
 (0)