Skip to content

Commit 90ef143

Browse files
akarnokdakarnokd
akarnokd
authored and
akarnokd
committed
ElementAt request management enhanced
1 parent c833083 commit 90ef143

File tree

1 file changed

+46
-13
lines changed

1 file changed

+46
-13
lines changed

src/main/java/rx/internal/operators/OperatorElementAt.java

Lines changed: 46 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
1820
import rx.Observable.Operator;
21+
import rx.Producer;
1922
import rx.Subscriber;
2023

2124
/**
@@ -45,40 +48,70 @@ private OperatorElementAt(int index, T defaultValue, boolean hasDefault) {
4548
}
4649

4750
@Override
48-
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
49-
return new Subscriber<T>(subscriber) {
51+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
52+
Subscriber<T> parent = new Subscriber<T>() {
5053

5154
private int currentIndex = 0;
5255

5356
@Override
5457
public void onNext(T value) {
55-
if (currentIndex == index) {
56-
subscriber.onNext(value);
57-
subscriber.onCompleted();
58-
} else {
59-
request(1);
58+
if (currentIndex++ == index) {
59+
child.onNext(value);
60+
child.onCompleted();
61+
unsubscribe();
6062
}
61-
currentIndex++;
6263
}
6364

6465
@Override
6566
public void onError(Throwable e) {
66-
subscriber.onError(e);
67+
child.onError(e);
6768
}
6869

6970
@Override
7071
public void onCompleted() {
7172
if (currentIndex <= index) {
7273
// If "subscriber.onNext(value)" is called, currentIndex must be greater than index
7374
if (hasDefault) {
74-
subscriber.onNext(defaultValue);
75-
subscriber.onCompleted();
75+
child.onNext(defaultValue);
76+
child.onCompleted();
7677
} else {
77-
subscriber.onError(new IndexOutOfBoundsException(index + " is out of bounds"));
78+
child.onError(new IndexOutOfBoundsException(index + " is out of bounds"));
7879
}
7980
}
8081
}
82+
83+
@Override
84+
public void setProducer(Producer p) {
85+
child.setProducer(new InnerProducer(p));
86+
}
8187
};
88+
child.add(parent);
89+
90+
return parent;
91+
}
92+
/**
93+
* A producer that wraps another Producer and requests Long.MAX_VALUE
94+
* when the first positive request() call comes in.
95+
*/
96+
static class InnerProducer extends AtomicBoolean implements Producer {
97+
/** */
98+
private static final long serialVersionUID = 1L;
99+
100+
final Producer actual;
101+
102+
public InnerProducer(Producer actual) {
103+
this.actual = actual;
104+
}
105+
@Override
106+
public void request(long n) {
107+
if (n < 0) {
108+
throw new IllegalArgumentException("n >= 0 required");
109+
}
110+
if (n > 0 && compareAndSet(false, true)) {
111+
// trigger the fast-path since the operator is going
112+
// to skip all but the indexth element
113+
actual.request(Long.MAX_VALUE);
114+
}
115+
}
82116
}
83-
84117
}

0 commit comments

Comments
 (0)