Skip to content

Commit 880d6aa

Browse files
Merge pull request ReactiveX#140 from mairbek/TakeLast
Implemented TakeLast operator
2 parents dbb4dc1 + cc4789d commit 880d6aa

File tree

2 files changed

+205
-0
lines changed

2 files changed

+205
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import rx.operators.OperationSkip;
4747
import rx.operators.OperationSynchronize;
4848
import rx.operators.OperationTake;
49+
import rx.operators.OperationTakeLast;
4950
import rx.operators.OperationToObservableFuture;
5051
import rx.operators.OperationToObservableIterable;
5152
import rx.operators.OperationToObservableList;
@@ -1347,6 +1348,22 @@ public static <T> Observable<T> take(final Observable<T> items, final int num) {
13471348
return _create(OperationTake.take(items, num));
13481349
}
13491350

1351+
/**
1352+
* Returns an Observable that emits the last <code>count</code> items emitted by the source
1353+
* Observable.
1354+
*
1355+
* @param items
1356+
* the source Observable
1357+
* @param count
1358+
* the number of items from the end of the sequence emitted by the source
1359+
* Observable to emit
1360+
* @return an Observable that only emits the last <code>count</code> items emitted by the source
1361+
* Observable
1362+
*/
1363+
public static <T> Observable<T> takeLast(final Observable<T> items, final int count) {
1364+
return _create(OperationTakeLast.takeLast(items, count));
1365+
}
1366+
13501367
/**
13511368
* Returns an Observable that emits a single item, a list composed of all the items emitted by
13521369
* the source Observable.
@@ -2279,6 +2296,20 @@ public Observable<T> take(final int num) {
22792296
return take(this, num);
22802297
}
22812298

2299+
/**
2300+
* Returns an Observable that emits the last <code>count</code> items emitted by the source
2301+
* Observable.
2302+
*
2303+
* @param count
2304+
* the number of items from the end of the sequence emitted by the source
2305+
* Observable to emit
2306+
* @return an Observable that only emits the last <code>count</code> items emitted by the source
2307+
* Observable
2308+
*/
2309+
public Observable<T> takeLast(final int count) {
2310+
return takeLast(this, count);
2311+
}
2312+
22822313
/**
22832314
* Returns an Observable that emits a single item, a list composed of all the items emitted by
22842315
* the source Observable.
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import org.junit.Test;
19+
import rx.Observable;
20+
import rx.Observer;
21+
import rx.Subscription;
22+
import rx.util.AtomicObservableSubscription;
23+
import rx.util.functions.Func1;
24+
25+
import java.util.Iterator;
26+
import java.util.concurrent.LinkedBlockingDeque;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
29+
import static org.mockito.Matchers.any;
30+
import static org.mockito.Mockito.*;
31+
32+
/**
33+
* Returns a specified number of contiguous elements from the end of an observable sequence.
34+
*/
35+
public final class OperationTakeLast {
36+
37+
public static <T> Func1<Observer<T>, Subscription> takeLast(final Observable<T> items, final int count) {
38+
return new Func1<Observer<T>, Subscription>() {
39+
40+
@Override
41+
public Subscription call(Observer<T> observer) {
42+
return new TakeLast<T>(items, count).call(observer);
43+
}
44+
45+
};
46+
}
47+
48+
private static class TakeLast<T> implements Func1<Observer<T>, Subscription> {
49+
private final int count;
50+
private final Observable<T> items;
51+
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
52+
53+
TakeLast(final Observable<T> items, final int count) {
54+
this.count = count;
55+
this.items = items;
56+
}
57+
58+
public Subscription call(Observer<T> observer) {
59+
return subscription.wrap(items.subscribe(new ItemObserver(observer)));
60+
}
61+
62+
private class ItemObserver implements Observer<T> {
63+
64+
private LinkedBlockingDeque<T> deque = new LinkedBlockingDeque<T>(count);
65+
private final Observer<T> observer;
66+
67+
public ItemObserver(Observer<T> observer) {
68+
this.observer = observer;
69+
}
70+
71+
@Override
72+
public void onCompleted() {
73+
Iterator<T> reverse = deque.descendingIterator();
74+
while (reverse.hasNext()) {
75+
observer.onNext(reverse.next());
76+
}
77+
observer.onCompleted();
78+
}
79+
80+
@Override
81+
public void onError(Exception e) {
82+
observer.onError(e);
83+
}
84+
85+
@Override
86+
public void onNext(T args) {
87+
while (!deque.offerFirst(args)) {
88+
deque.removeLast();
89+
}
90+
}
91+
92+
}
93+
94+
}
95+
96+
public static class UnitTest {
97+
98+
@Test
99+
public void testTakeLastEmpty() {
100+
Observable<String> w = Observable.toObservable();
101+
Observable<String> take = Observable.create(takeLast(w, 2));
102+
103+
@SuppressWarnings("unchecked")
104+
Observer<String> aObserver = mock(Observer.class);
105+
take.subscribe(aObserver);
106+
verify(aObserver, never()).onNext(any(String.class));
107+
verify(aObserver, never()).onError(any(Exception.class));
108+
verify(aObserver, times(1)).onCompleted();
109+
}
110+
111+
@Test
112+
public void testTakeLast1() {
113+
Observable<String> w = Observable.toObservable("one", "two", "three");
114+
Observable<String> take = Observable.create(takeLast(w, 2));
115+
116+
@SuppressWarnings("unchecked")
117+
Observer<String> aObserver = mock(Observer.class);
118+
take.subscribe(aObserver);
119+
verify(aObserver, times(1)).onNext("two");
120+
verify(aObserver, times(1)).onNext("three");
121+
verify(aObserver, never()).onNext("one");
122+
verify(aObserver, never()).onError(any(Exception.class));
123+
verify(aObserver, times(1)).onCompleted();
124+
}
125+
126+
@Test
127+
public void testTakeLast2() {
128+
Observable<String> w = Observable.toObservable("one");
129+
Observable<String> take = Observable.create(takeLast(w, 10));
130+
131+
@SuppressWarnings("unchecked")
132+
Observer<String> aObserver = mock(Observer.class);
133+
take.subscribe(aObserver);
134+
verify(aObserver, times(1)).onNext("one");
135+
verify(aObserver, never()).onError(any(Exception.class));
136+
verify(aObserver, times(1)).onCompleted();
137+
}
138+
139+
@Test
140+
public void testTakeLastOrdering() {
141+
Observable<String> w = Observable.toObservable("one", "two", "three");
142+
Observable<String> take = Observable.create(takeLast(w, 2));
143+
144+
@SuppressWarnings("unchecked")
145+
Observer<String> aObserver = mock(Observer.class);
146+
take.subscribe(countingWrapper(aObserver));
147+
verify(aObserver, times(1)).onNext("two_1");
148+
verify(aObserver, times(1)).onNext("three_2");
149+
}
150+
151+
152+
private static Observer<String> countingWrapper(final Observer<String> underlying) {
153+
return new Observer<String>() {
154+
private final AtomicInteger counter = new AtomicInteger();
155+
@Override
156+
public void onCompleted() {
157+
underlying.onCompleted();
158+
}
159+
160+
@Override
161+
public void onError(Exception e) {
162+
underlying.onCompleted();
163+
}
164+
165+
@Override
166+
public void onNext(String args) {
167+
underlying.onNext(args + "_" + counter.incrementAndGet());
168+
}
169+
};
170+
}
171+
172+
}
173+
174+
}

0 commit comments

Comments
 (0)