Skip to content

Commit a73044a

Browse files
committed
OperatorSingle should request exactly what it needs
1 parent 9f2fc67 commit a73044a

File tree

2 files changed

+249
-36
lines changed

2 files changed

+249
-36
lines changed

src/main/java/rx/internal/operators/OperatorSingle.java

Lines changed: 81 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616
package rx.internal.operators;
1717

1818
import java.util.NoSuchElementException;
19+
import java.util.concurrent.atomic.AtomicLong;
1920

2021
import rx.Observable.Operator;
22+
import rx.Producer;
2123
import rx.Subscriber;
2224

2325
/**
@@ -44,53 +46,98 @@ private OperatorSingle(boolean hasDefaultValue, final T defaultValue) {
4446
}
4547

4648
@Override
47-
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
48-
return new Subscriber<T>(subscriber) {
49+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
4950

50-
private T value;
51-
private boolean isNonEmpty = false;
52-
private boolean hasTooManyElements = false;
51+
final AtomicLong requested = new AtomicLong(0);
52+
final ParentSubscriber<T> parent = new ParentSubscriber<T>(child, hasDefaultValue,
53+
defaultValue, requested);
54+
55+
child.setProducer(new Producer() {
5356

5457
@Override
55-
public void onNext(T value) {
56-
if (isNonEmpty) {
57-
hasTooManyElements = true;
58-
subscriber.onError(new IllegalArgumentException("Sequence contains too many elements"));
59-
unsubscribe();
60-
} else {
61-
this.value = value;
62-
isNonEmpty = true;
63-
// Issue: https://github.com/ReactiveX/RxJava/pull/1527
64-
// Because we cache a value and don't emit now, we need to request another one.
65-
request(1);
58+
public void request(long n) {
59+
if (n > 0) {
60+
// should request at most 2 in total to complete with no
61+
// items, emit the single item or decide that there are
62+
// too many items.
63+
long numToRequest = Math.min(2 - requested.get(), n);
64+
if (numToRequest > 0)
65+
parent.requestMore(numToRequest);
6666
}
6767
}
6868

69-
@Override
70-
public void onCompleted() {
71-
if (hasTooManyElements) {
72-
// We have already sent an onError message
69+
});
70+
child.add(parent);
71+
return parent;
72+
}
73+
74+
private static final class ParentSubscriber<T> extends Subscriber<T> {
75+
private final Subscriber<? super T> child;
76+
private final boolean hasDefaultValue;
77+
private final T defaultValue;
78+
private final AtomicLong requested;
79+
80+
private T value;
81+
private boolean isNonEmpty = false;
82+
private boolean hasTooManyElements = false;
83+
84+
85+
public ParentSubscriber(Subscriber<? super T> child, boolean hasDefaultValue,
86+
T defaultValue, AtomicLong requested) {
87+
this.child = child;
88+
this.hasDefaultValue = hasDefaultValue;
89+
this.defaultValue = defaultValue;
90+
this.requested = requested;
91+
}
92+
93+
private void requestMore(long n) {
94+
BackpressureUtils.getAndAddRequest(requested, n);
95+
request(n);
96+
}
97+
98+
@Override
99+
public void onNext(T value) {
100+
if (isNonEmpty) {
101+
hasTooManyElements = true;
102+
child.onError(new IllegalArgumentException("Sequence contains too many elements"));
103+
unsubscribe();
104+
} else {
105+
this.value = value;
106+
isNonEmpty = true;
107+
// Issue: https://github.com/ReactiveX/RxJava/pull/1527
108+
// Because we cache a value and don't emit now, we need to
109+
// request another one if insufficient have been requested
110+
// so far.
111+
long numToRequest = 2 - requested.get();
112+
if (numToRequest > 0)
113+
requestMore(numToRequest);
114+
}
115+
}
116+
117+
@Override
118+
public void onCompleted() {
119+
if (hasTooManyElements) {
120+
// We have already sent an onError message
121+
} else {
122+
if (isNonEmpty) {
123+
child.onNext(value);
124+
child.onCompleted();
73125
} else {
74-
if (isNonEmpty) {
75-
subscriber.onNext(value);
76-
subscriber.onCompleted();
126+
if (hasDefaultValue) {
127+
child.onNext(defaultValue);
128+
child.onCompleted();
77129
} else {
78-
if (hasDefaultValue) {
79-
subscriber.onNext(defaultValue);
80-
subscriber.onCompleted();
81-
} else {
82-
subscriber.onError(new NoSuchElementException("Sequence contains no elements"));
83-
}
130+
child.onError(new NoSuchElementException("Sequence contains no elements"));
84131
}
85132
}
86133
}
134+
}
87135

88-
@Override
89-
public void onError(Throwable e) {
90-
subscriber.onError(e);
91-
}
136+
@Override
137+
public void onError(Throwable e) {
138+
child.onError(e);
139+
}
92140

93-
};
94141
}
95142

96143
}

src/test/java/rx/internal/operators/OperatorSingleTest.java

Lines changed: 168 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,24 @@
1717

1818
import static org.junit.Assert.assertEquals;
1919
import static org.mockito.Matchers.isA;
20-
import static org.mockito.Mockito.*;
21-
20+
import static org.mockito.Mockito.inOrder;
21+
import static org.mockito.Mockito.mock;
22+
import static org.mockito.Mockito.spy;
23+
import static org.mockito.Mockito.times;
24+
25+
import java.util.ArrayList;
26+
import java.util.Arrays;
27+
import java.util.List;
2228
import java.util.NoSuchElementException;
29+
import java.util.concurrent.atomic.AtomicLong;
2330

2431
import org.junit.Test;
2532
import org.mockito.InOrder;
2633

2734
import rx.Observable;
2835
import rx.Observer;
2936
import rx.Subscriber;
37+
import rx.functions.Action1;
3038
import rx.functions.Func1;
3139
import rx.functions.Func2;
3240

@@ -73,6 +81,164 @@ public void testSingleWithEmpty() {
7381
isA(NoSuchElementException.class));
7482
inOrder.verifyNoMoreInteractions();
7583
}
84+
85+
@Test
86+
public void testSingleDoesNotRequestMoreThanItNeedsToEmitItem() {
87+
final AtomicLong request = new AtomicLong();
88+
Observable.just(1).doOnRequest(new Action1<Long>() {
89+
@Override
90+
public void call(Long n) {
91+
request.addAndGet(n);
92+
}
93+
}).toBlocking().single();
94+
assertEquals(2, request.get());
95+
}
96+
97+
@Test
98+
public void testSingleDoesNotRequestMoreThanItNeedsToEmitErrorFromEmpty() {
99+
final AtomicLong request = new AtomicLong();
100+
try {
101+
Observable.empty().doOnRequest(new Action1<Long>() {
102+
@Override
103+
public void call(Long n) {
104+
request.addAndGet(n);
105+
}
106+
}).toBlocking().single();
107+
} catch (NoSuchElementException e) {
108+
assertEquals(2, request.get());
109+
}
110+
}
111+
112+
@Test
113+
public void testSingleDoesNotRequestMoreThanItNeedsToEmitErrorFromMoreThanOne() {
114+
final AtomicLong request = new AtomicLong();
115+
try {
116+
Observable.just(1, 2).doOnRequest(new Action1<Long>() {
117+
@Override
118+
public void call(Long n) {
119+
request.addAndGet(n);
120+
}
121+
}).toBlocking().single();
122+
} catch (IllegalArgumentException e) {
123+
assertEquals(2, request.get());
124+
}
125+
}
126+
127+
@Test
128+
public void testSingleDoesNotRequestMoreThanItNeedsIf1Then2Requested() {
129+
final List<Long> requests = new ArrayList<Long>();
130+
Observable.just(1)
131+
//
132+
.doOnRequest(new Action1<Long>() {
133+
@Override
134+
public void call(Long n) {
135+
requests.add(n);
136+
}
137+
})
138+
//
139+
.single()
140+
//
141+
.subscribe(new Subscriber<Integer>() {
142+
143+
@Override
144+
public void onStart() {
145+
request(1);
146+
}
147+
148+
@Override
149+
public void onCompleted() {
150+
151+
}
152+
153+
@Override
154+
public void onError(Throwable e) {
155+
156+
}
157+
158+
@Override
159+
public void onNext(Integer t) {
160+
request(2);
161+
}
162+
});
163+
assertEquals(Arrays.asList(1L,1L), requests);
164+
}
165+
166+
@Test
167+
public void testSingleDoesNotRequestMoreThanItNeedsIf3Requested() {
168+
final List<Long> requests = new ArrayList<Long>();
169+
Observable.just(1)
170+
//
171+
.doOnRequest(new Action1<Long>() {
172+
@Override
173+
public void call(Long n) {
174+
requests.add(n);
175+
}
176+
})
177+
//
178+
.single()
179+
//
180+
.subscribe(new Subscriber<Integer>() {
181+
182+
@Override
183+
public void onStart() {
184+
request(3);
185+
}
186+
187+
@Override
188+
public void onCompleted() {
189+
190+
}
191+
192+
@Override
193+
public void onError(Throwable e) {
194+
195+
}
196+
197+
@Override
198+
public void onNext(Integer t) {
199+
}
200+
});
201+
assertEquals(Arrays.asList(2L), requests);
202+
}
203+
204+
@Test
205+
public void testSingleRequestsExactlyWhatItNeedsIf1Requested() {
206+
final List<Long> requests = new ArrayList<Long>();
207+
Observable.just(1)
208+
//
209+
.doOnRequest(new Action1<Long>() {
210+
@Override
211+
public void call(Long n) {
212+
requests.add(n);
213+
}
214+
})
215+
//
216+
.single()
217+
//
218+
.subscribe(new Subscriber<Integer>() {
219+
220+
@Override
221+
public void onStart() {
222+
request(1);
223+
}
224+
225+
@Override
226+
public void onCompleted() {
227+
228+
}
229+
230+
@Override
231+
public void onError(Throwable e) {
232+
233+
}
234+
235+
@Override
236+
public void onNext(Integer t) {
237+
}
238+
});
239+
assertEquals(Arrays.asList(1L, 1L), requests);
240+
}
241+
76242

77243
@Test
78244
public void testSingleWithPredicate() {

0 commit comments

Comments
 (0)