Skip to content

Commit 7692618

Browse files
committed
add OperatorTakeLastOne
1 parent 5b75d32 commit 7692618

File tree

4 files changed

+321
-1
lines changed

4 files changed

+321
-1
lines changed

src/main/java/rx/Observable.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7771,7 +7771,10 @@ public final Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
77717771
* @see <a href="http://reactivex.io/documentation/operators/takelast.html">ReactiveX operators documentation: TakeLast</a>
77727772
*/
77737773
public final Observable<T> takeLast(final int count) {
7774-
return lift(new OperatorTakeLast<T>(count));
7774+
if (count == 1 )
7775+
return lift(OperatorTakeLastOne.<T>instance());
7776+
else
7777+
return lift(new OperatorTakeLast<T>(count));
77757778
}
77767779

77777780
/**
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package rx.internal.operators;
2+
3+
import java.util.concurrent.atomic.AtomicInteger;
4+
5+
import rx.Observable.Operator;
6+
import rx.Producer;
7+
import rx.Subscriber;
8+
9+
public class OperatorTakeLastOne<T> implements Operator<T, T> {
10+
11+
private static class Holder {
12+
static final OperatorTakeLastOne<Object> INSTANCE = new OperatorTakeLastOne<Object>();
13+
}
14+
15+
@SuppressWarnings("unchecked")
16+
public static <T> OperatorTakeLastOne<T> instance() {
17+
return (OperatorTakeLastOne<T>) Holder.INSTANCE;
18+
}
19+
20+
private OperatorTakeLastOne() {
21+
22+
}
23+
24+
@Override
25+
public Subscriber<? super T> call(Subscriber<? super T> child) {
26+
final ParentSubscriber<T> parent = new ParentSubscriber<T>(child);
27+
child.setProducer(new Producer() {
28+
29+
@Override
30+
public void request(long n) {
31+
parent.requestMore(n);
32+
}
33+
});
34+
child.add(parent);
35+
return parent;
36+
}
37+
38+
private static class ParentSubscriber<T> extends Subscriber<T> {
39+
40+
private final static int NOT_REQUESTED_NOT_COMPLETED = 0;
41+
private final static int NOT_REQUESTED_COMPLETED = 1;
42+
private final static int REQUESTED_NOT_COMPLETED = 2;
43+
private final static int REQUESTED_COMPLETED = 3;
44+
45+
/*
46+
* These are the expected state transitions:
47+
*
48+
* NOT_REQUESTED_NOT_COMPLETED --> REQUESTED_NOT_COMPLETED
49+
* | |
50+
* V V
51+
* NOT_REQUESTED_COMPLETED --> REQUESTED_COMPLETED
52+
*
53+
* Once at REQUESTED_COMPLETED we emit the last value.
54+
*/
55+
56+
//Used as the initial value of last
57+
private static final Object ABSENT = new Object();
58+
59+
// the downstream subscriber
60+
private final Subscriber<? super T> child;
61+
62+
@SuppressWarnings("unchecked")
63+
// we can get away with this cast at runtime because of type erasure
64+
private T last = (T) ABSENT;
65+
66+
// holds the current state of the stream so that we can make atomic updates to it
67+
private final AtomicInteger state = new AtomicInteger(
68+
NOT_REQUESTED_NOT_COMPLETED);
69+
70+
ParentSubscriber(Subscriber<? super T> child) {
71+
this.child = child;
72+
}
73+
74+
void requestMore(long n) {
75+
if (n > 0) {
76+
// CAS loop to atomically change state given that onCompleted()
77+
// or another requestMore() may be acting concurrently
78+
while (true) {
79+
//read the value of state and then try state transitions
80+
//only if the value of state does not change in the meantime
81+
//(in another requestMore() or onCompleted()). If the value
82+
// has changed and we expect to do a transition still then
83+
// we loop and try again.
84+
final int s = state.get();
85+
if (s == NOT_REQUESTED_NOT_COMPLETED) {
86+
if (state.compareAndSet(NOT_REQUESTED_NOT_COMPLETED,
87+
REQUESTED_NOT_COMPLETED)) {
88+
request(Long.MAX_VALUE);
89+
return;
90+
}
91+
} else if (s == NOT_REQUESTED_COMPLETED) {
92+
if (state.compareAndSet(NOT_REQUESTED_COMPLETED, REQUESTED_COMPLETED)) {
93+
emit();
94+
return;
95+
}
96+
} else
97+
// already requested so we exit
98+
return;
99+
}
100+
}
101+
}
102+
103+
@Override
104+
public void onCompleted() {
105+
// CAS loop to atomically change state given that requestMore()
106+
// may be acting concurrently
107+
while (true) {
108+
//read the value of state and then try state transitions
109+
//only if the value of state does not change in the meantime
110+
//(in another requestMore()). If the value has changed and
111+
// we expect to do a transition still then we loop and try again.
112+
final int s = state.get();
113+
if (s == NOT_REQUESTED_NOT_COMPLETED) {
114+
if (state.compareAndSet(NOT_REQUESTED_NOT_COMPLETED, NOT_REQUESTED_COMPLETED)) {
115+
return;
116+
}
117+
} else if (s == REQUESTED_NOT_COMPLETED) {
118+
if (state.compareAndSet(REQUESTED_NOT_COMPLETED, REQUESTED_COMPLETED)) {
119+
emit();
120+
return;
121+
}
122+
} else
123+
// already completed so we exit
124+
return;
125+
}
126+
}
127+
128+
/**
129+
* If not unsubscribed then emits last value and completed to the child subscriber.
130+
*/
131+
private void emit() {
132+
if (isUnsubscribed()) {
133+
// release for gc
134+
last = null;
135+
return;
136+
}
137+
// synchronize to ensure that last is safely published
138+
synchronized (this) {
139+
if (last != ABSENT) {
140+
T t = last;
141+
// release for gc
142+
last = null;
143+
child.onNext(t);
144+
}
145+
}
146+
if (!isUnsubscribed())
147+
child.onCompleted();
148+
}
149+
150+
@Override
151+
public void onError(Throwable e) {
152+
child.onError(e);
153+
}
154+
155+
@Override
156+
public void onNext(T t) {
157+
last = t;
158+
}
159+
160+
}
161+
162+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package rx.operators;
2+
3+
import org.openjdk.jmh.annotations.Benchmark;
4+
5+
import rx.Observable;
6+
import rx.internal.operators.OperatorTakeLast;
7+
import rx.internal.operators.OperatorTakeLastOne;
8+
9+
public class OperatorTakeLastOnePerf {
10+
11+
private static final int MANY = 1000000;
12+
private static final int SOME = 100;
13+
private static final int FEW = 5;
14+
15+
@Benchmark
16+
public void takeLastOneUsingTakeLast_Many() {
17+
Observable.range(1, MANY).lift(new OperatorTakeLast<Integer>(1)).subscribe();
18+
}
19+
20+
@Benchmark
21+
public void takeLastOneUsingTakeLast_Few() {
22+
Observable.range(1, FEW).lift(new OperatorTakeLast<Integer>(1)).subscribe();
23+
}
24+
25+
@Benchmark
26+
public void takeLastOneUsingTakeLast_Some() {
27+
Observable.range(1, SOME).lift(new OperatorTakeLast<Integer>(1)).subscribe();
28+
}
29+
30+
@Benchmark
31+
public void takeLastOneUsingTakeLastOne_Many() {
32+
Observable.range(1, MANY).lift(OperatorTakeLastOne.instance()).subscribe();
33+
}
34+
35+
@Benchmark
36+
public void takeLastOneUsingTakeLastOne_Few() {
37+
Observable.range(1, FEW).lift(OperatorTakeLastOne.instance()).subscribe();
38+
}
39+
40+
@Benchmark
41+
public void takeLastOneUsingTakeLastOne_Some() {
42+
Observable.range(1, SOME).lift(OperatorTakeLastOne.instance()).subscribe();
43+
}
44+
45+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package rx.internal.operators;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertTrue;
5+
6+
import java.util.ArrayList;
7+
import java.util.Arrays;
8+
import java.util.Collections;
9+
import java.util.List;
10+
import java.util.concurrent.atomic.AtomicBoolean;
11+
12+
import org.junit.Test;
13+
14+
import rx.Observable;
15+
import rx.Subscriber;
16+
import rx.functions.Action0;
17+
import rx.observers.TestSubscriber;
18+
19+
public class OperatorTakeLastOneTest {
20+
21+
@Test
22+
public void testLastOfManyReturnsLast() {
23+
TestSubscriber<Integer> s = new TestSubscriber<Integer>();
24+
Observable.range(1, 10).takeLast(1).subscribe(s);
25+
s.assertReceivedOnNext(Arrays.asList(10));
26+
s.assertNoErrors();
27+
s.assertTerminalEvent();
28+
s.assertUnsubscribed();
29+
}
30+
31+
@Test
32+
public void testLastOfEmptyReturnsEmpty() {
33+
TestSubscriber<Object> s = new TestSubscriber<Object>();
34+
Observable.empty().takeLast(1).subscribe(s);
35+
s.assertReceivedOnNext(Collections.emptyList());
36+
s.assertNoErrors();
37+
s.assertTerminalEvent();
38+
s.assertUnsubscribed();
39+
}
40+
41+
@Test
42+
public void testLastOfOneReturnsLast() {
43+
TestSubscriber<Integer> s = new TestSubscriber<Integer>();
44+
Observable.just(1).takeLast(1).subscribe(s);
45+
s.assertReceivedOnNext(Arrays.asList(1));
46+
s.assertNoErrors();
47+
s.assertTerminalEvent();
48+
s.assertUnsubscribed();
49+
}
50+
51+
@Test
52+
public void testUnsubscribesFromUpstream() {
53+
final AtomicBoolean unsubscribed = new AtomicBoolean(false);
54+
Action0 unsubscribeAction = new Action0() {
55+
@Override
56+
public void call() {
57+
unsubscribed.set(true);
58+
}
59+
};
60+
Observable.just(1).doOnUnsubscribe(unsubscribeAction)
61+
.takeLast(1).subscribe();
62+
assertTrue(unsubscribed.get());
63+
}
64+
65+
@Test
66+
public void testLastWithBackpressure() {
67+
MySubscriber<Integer> s = new MySubscriber<Integer>(0);
68+
Observable.just(1).takeLast(1).subscribe(s);
69+
assertEquals(0, s.list.size());
70+
s.requestMore(1);
71+
assertEquals(1, s.list.size());
72+
}
73+
74+
private static class MySubscriber<T> extends Subscriber<T> {
75+
76+
private long initialRequest;
77+
78+
MySubscriber(long initialRequest) {
79+
this.initialRequest = initialRequest;
80+
}
81+
82+
final List<T> list = new ArrayList<T>();
83+
84+
public void requestMore(long n) {
85+
request(n);
86+
}
87+
88+
@Override
89+
public void onStart() {
90+
request(initialRequest);
91+
}
92+
93+
@Override
94+
public void onCompleted() {
95+
96+
}
97+
98+
@Override
99+
public void onError(Throwable e) {
100+
101+
}
102+
103+
@Override
104+
public void onNext(T t) {
105+
list.add(t);
106+
}
107+
108+
}
109+
110+
}

0 commit comments

Comments
 (0)