Skip to content

Commit 1cff8bf

Browse files
committed
Merge pull request #2914 from davidmoten/take-last-one
Optimization - use OperatorTakeLastOne for takeLast(1)
2 parents 5b75d32 + 9d96b79 commit 1cff8bf

File tree

4 files changed

+346
-1
lines changed

4 files changed

+346
-1
lines changed

src/main/java/rx/Observable.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7771,7 +7771,12 @@ public final Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
77717771
* @see <a href="http://reactivex.io/documentation/operators/takelast.html">ReactiveX operators documentation: TakeLast</a>
77727772
*/
77737773
public final Observable<T> takeLast(final int count) {
7774-
return lift(new OperatorTakeLast<T>(count));
7774+
if (count == 0)
7775+
return ignoreElements();
7776+
else if (count == 1 )
7777+
return lift(OperatorTakeLastOne.<T>instance());
7778+
else
7779+
return lift(new OperatorTakeLast<T>(count));
77757780
}
77767781

77777782
/**
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
package rx.internal.operators;
2+
3+
import java.util.concurrent.atomic.AtomicInteger;
4+
5+
import rx.Observable.Operator;
6+
import rx.Producer;
7+
import rx.Subscriber;
8+
9+
public class OperatorTakeLastOne<T> implements Operator<T, T> {
10+
11+
private static class Holder {
12+
static final OperatorTakeLastOne<Object> INSTANCE = new OperatorTakeLastOne<Object>();
13+
}
14+
15+
@SuppressWarnings("unchecked")
16+
public static <T> OperatorTakeLastOne<T> instance() {
17+
return (OperatorTakeLastOne<T>) Holder.INSTANCE;
18+
}
19+
20+
private OperatorTakeLastOne() {
21+
22+
}
23+
24+
@Override
25+
public Subscriber<? super T> call(Subscriber<? super T> child) {
26+
final ParentSubscriber<T> parent = new ParentSubscriber<T>(child);
27+
child.setProducer(new Producer() {
28+
29+
@Override
30+
public void request(long n) {
31+
parent.requestMore(n);
32+
}
33+
});
34+
child.add(parent);
35+
return parent;
36+
}
37+
38+
private static class ParentSubscriber<T> extends Subscriber<T> {
39+
40+
private final static int NOT_REQUESTED_NOT_COMPLETED = 0;
41+
private final static int NOT_REQUESTED_COMPLETED = 1;
42+
private final static int REQUESTED_NOT_COMPLETED = 2;
43+
private final static int REQUESTED_COMPLETED = 3;
44+
45+
/*
46+
* These are the expected state transitions:
47+
*
48+
* NOT_REQUESTED_NOT_COMPLETED --> REQUESTED_NOT_COMPLETED
49+
* | |
50+
* V V
51+
* NOT_REQUESTED_COMPLETED --> REQUESTED_COMPLETED
52+
*
53+
* Once at REQUESTED_COMPLETED we emit the last value if one exists
54+
*/
55+
56+
// Used as the initial value of last
57+
private static final Object ABSENT = new Object();
58+
59+
// the downstream subscriber
60+
private final Subscriber<? super T> child;
61+
62+
@SuppressWarnings("unchecked")
63+
// we can get away with this cast at runtime because of type erasure
64+
private T last = (T) ABSENT;
65+
66+
// holds the current state of the stream so that we can make atomic
67+
// updates to it
68+
private final AtomicInteger state = new AtomicInteger(NOT_REQUESTED_NOT_COMPLETED);
69+
70+
ParentSubscriber(Subscriber<? super T> child) {
71+
this.child = child;
72+
}
73+
74+
void requestMore(long n) {
75+
if (n > 0) {
76+
// CAS loop to atomically change state given that onCompleted()
77+
// or another requestMore() may be acting concurrently
78+
while (true) {
79+
// read the value of state and then try state transitions
80+
// only if the value of state does not change in the
81+
// meantime (in another requestMore() or onCompleted()). If
82+
// the value has changed and we expect to do a transition
83+
// still then we loop and try again.
84+
final int s = state.get();
85+
if (s == NOT_REQUESTED_NOT_COMPLETED) {
86+
if (state.compareAndSet(NOT_REQUESTED_NOT_COMPLETED,
87+
REQUESTED_NOT_COMPLETED)) {
88+
return;
89+
}
90+
} else if (s == NOT_REQUESTED_COMPLETED) {
91+
if (state.compareAndSet(NOT_REQUESTED_COMPLETED, REQUESTED_COMPLETED)) {
92+
emit();
93+
return;
94+
}
95+
} else
96+
// already requested so we exit
97+
return;
98+
}
99+
}
100+
}
101+
102+
@Override
103+
public void onCompleted() {
104+
//shortcut if an empty stream
105+
if (last == ABSENT) {
106+
child.onCompleted();
107+
return;
108+
}
109+
// CAS loop to atomically change state given that requestMore()
110+
// may be acting concurrently
111+
while (true) {
112+
// read the value of state and then try state transitions
113+
// only if the value of state does not change in the meantime
114+
// (in another requestMore()). If the value has changed and
115+
// we expect to do a transition still then we loop and try
116+
// again.
117+
final int s = state.get();
118+
if (s == NOT_REQUESTED_NOT_COMPLETED) {
119+
if (state.compareAndSet(NOT_REQUESTED_NOT_COMPLETED, NOT_REQUESTED_COMPLETED)) {
120+
return;
121+
}
122+
} else if (s == REQUESTED_NOT_COMPLETED) {
123+
if (state.compareAndSet(REQUESTED_NOT_COMPLETED, REQUESTED_COMPLETED)) {
124+
emit();
125+
return;
126+
}
127+
} else
128+
// already completed so we exit
129+
return;
130+
}
131+
}
132+
133+
/**
134+
* If not unsubscribed then emits last value and completed to the child
135+
* subscriber.
136+
*/
137+
private void emit() {
138+
if (isUnsubscribed()) {
139+
// release for gc
140+
last = null;
141+
return;
142+
}
143+
// Note that last is safely published despite not being volatile
144+
// because a CAS update must have happened in the current thread just before
145+
// emit() was called
146+
T t = last;
147+
// release for gc
148+
last = null;
149+
if (t != ABSENT) {
150+
try {
151+
child.onNext(t);
152+
} catch (Throwable e) {
153+
child.onError(e);
154+
return;
155+
}
156+
}
157+
if (!isUnsubscribed())
158+
child.onCompleted();
159+
}
160+
161+
@Override
162+
public void onError(Throwable e) {
163+
child.onError(e);
164+
}
165+
166+
@Override
167+
public void onNext(T t) {
168+
last = t;
169+
}
170+
171+
}
172+
173+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package rx.operators;
2+
3+
import org.openjdk.jmh.annotations.Benchmark;
4+
import org.openjdk.jmh.annotations.Param;
5+
import org.openjdk.jmh.annotations.Scope;
6+
import org.openjdk.jmh.annotations.State;
7+
8+
import rx.internal.operators.OperatorTakeLast;
9+
import rx.internal.operators.OperatorTakeLastOne;
10+
import rx.jmh.InputWithIncrementingInteger;
11+
12+
public class OperatorTakeLastOnePerf {
13+
14+
private static final OperatorTakeLast<Integer> TAKE_LAST = new OperatorTakeLast<Integer>(1);
15+
16+
@State(Scope.Thread)
17+
public static class Input extends InputWithIncrementingInteger {
18+
19+
@Param({ "5", "100", "1000000" })
20+
public int size;
21+
22+
@Override
23+
public int getSize() {
24+
return size;
25+
}
26+
27+
}
28+
29+
@Benchmark
30+
public void takeLastOneUsingTakeLast(Input input) {
31+
input.observable.lift(TAKE_LAST).subscribe(input.observer);
32+
}
33+
34+
@Benchmark
35+
public void takeLastOneUsingTakeLastOne(Input input) {
36+
input.observable.lift(OperatorTakeLastOne.<Integer>instance()).subscribe(input.observer);
37+
}
38+
39+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package rx.internal.operators;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertTrue;
5+
6+
import java.util.ArrayList;
7+
import java.util.Arrays;
8+
import java.util.Collections;
9+
import java.util.List;
10+
import java.util.concurrent.atomic.AtomicBoolean;
11+
import java.util.concurrent.atomic.AtomicInteger;
12+
13+
import org.junit.Test;
14+
15+
import rx.Observable;
16+
import rx.Subscriber;
17+
import rx.Subscription;
18+
import rx.functions.Action0;
19+
import rx.functions.Action1;
20+
import rx.observers.TestSubscriber;
21+
22+
public class OperatorTakeLastOneTest {
23+
24+
@Test
25+
public void testLastOfManyReturnsLast() {
26+
TestSubscriber<Integer> s = new TestSubscriber<Integer>();
27+
Observable.range(1, 10).takeLast(1).subscribe(s);
28+
s.assertReceivedOnNext(Arrays.asList(10));
29+
s.assertNoErrors();
30+
s.assertTerminalEvent();
31+
s.assertUnsubscribed();
32+
}
33+
34+
@Test
35+
public void testLastOfEmptyReturnsEmpty() {
36+
TestSubscriber<Object> s = new TestSubscriber<Object>();
37+
Observable.empty().takeLast(1).subscribe(s);
38+
s.assertReceivedOnNext(Collections.emptyList());
39+
s.assertNoErrors();
40+
s.assertTerminalEvent();
41+
s.assertUnsubscribed();
42+
}
43+
44+
@Test
45+
public void testLastOfOneReturnsLast() {
46+
TestSubscriber<Integer> s = new TestSubscriber<Integer>();
47+
Observable.just(1).takeLast(1).subscribe(s);
48+
s.assertReceivedOnNext(Arrays.asList(1));
49+
s.assertNoErrors();
50+
s.assertTerminalEvent();
51+
s.assertUnsubscribed();
52+
}
53+
54+
@Test
55+
public void testUnsubscribesFromUpstream() {
56+
final AtomicBoolean unsubscribed = new AtomicBoolean(false);
57+
Action0 unsubscribeAction = new Action0() {
58+
@Override
59+
public void call() {
60+
unsubscribed.set(true);
61+
}
62+
};
63+
Observable.just(1).doOnUnsubscribe(unsubscribeAction)
64+
.takeLast(1).subscribe();
65+
assertTrue(unsubscribed.get());
66+
}
67+
68+
@Test
69+
public void testLastWithBackpressure() {
70+
MySubscriber<Integer> s = new MySubscriber<Integer>(0);
71+
Observable.just(1).takeLast(1).subscribe(s);
72+
assertEquals(0, s.list.size());
73+
s.requestMore(1);
74+
assertEquals(1, s.list.size());
75+
}
76+
77+
@Test
78+
public void testTakeLastZeroProcessesAllItemsButIgnoresThem() {
79+
final AtomicInteger upstreamCount = new AtomicInteger();
80+
final int num = 10;
81+
int count = Observable.range(1,num).doOnNext(new Action1<Integer>() {
82+
83+
@Override
84+
public void call(Integer t) {
85+
upstreamCount.incrementAndGet();
86+
}})
87+
.takeLast(0).count().toBlocking().single();
88+
assertEquals(num, upstreamCount.get());
89+
assertEquals(0, count);
90+
}
91+
92+
private static class MySubscriber<T> extends Subscriber<T> {
93+
94+
private long initialRequest;
95+
96+
MySubscriber(long initialRequest) {
97+
this.initialRequest = initialRequest;
98+
}
99+
100+
final List<T> list = new ArrayList<T>();
101+
102+
public void requestMore(long n) {
103+
request(n);
104+
}
105+
106+
@Override
107+
public void onStart() {
108+
request(initialRequest);
109+
}
110+
111+
@Override
112+
public void onCompleted() {
113+
114+
}
115+
116+
@Override
117+
public void onError(Throwable e) {
118+
119+
}
120+
121+
@Override
122+
public void onNext(T t) {
123+
list.add(t);
124+
}
125+
126+
}
127+
128+
}

0 commit comments

Comments
 (0)