Skip to content

Commit d0339e5

Browse files
committed
TakeLast basic implementation
1 parent ccd17ac commit d0339e5

File tree

1 file changed

+139
-0
lines changed

1 file changed

+139
-0
lines changed
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import org.junit.Test;
19+
import rx.Observable;
20+
import rx.Observer;
21+
import rx.Subscription;
22+
import rx.util.AtomicObservableSubscription;
23+
import rx.util.functions.Func1;
24+
25+
import java.util.Iterator;
26+
import java.util.concurrent.ConcurrentLinkedQueue;
27+
import java.util.concurrent.LinkedBlockingDeque;
28+
import java.util.concurrent.atomic.AtomicInteger;
29+
30+
import static org.mockito.Matchers.any;
31+
import static org.mockito.Mockito.*;
32+
33+
public final class OperationTakeLast {
34+
35+
public static <T> Func1<Observer<T>, Subscription> takeLast(final Observable<T> items, final int count) {
36+
return new Func1<Observer<T>, Subscription>() {
37+
38+
@Override
39+
public Subscription call(Observer<T> observer) {
40+
return new TakeLast<T>(items, count).call(observer);
41+
}
42+
43+
};
44+
}
45+
46+
private static class TakeLast<T> implements Func1<Observer<T>, Subscription> {
47+
private final int count;
48+
private final Observable<T> items;
49+
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
50+
51+
TakeLast(final Observable<T> items, final int count) {
52+
this.count = count;
53+
this.items = items;
54+
}
55+
56+
public Subscription call(Observer<T> observer) {
57+
return subscription.wrap(items.subscribe(new ItemObserver(observer)));
58+
}
59+
60+
private class ItemObserver implements Observer<T> {
61+
62+
private LinkedBlockingDeque<T> deque = new LinkedBlockingDeque<T>(count);
63+
private final Observer<T> observer;
64+
65+
public ItemObserver(Observer<T> observer) {
66+
this.observer = observer;
67+
}
68+
69+
@Override
70+
public void onCompleted() {
71+
Iterator<T> reverse = deque.descendingIterator();
72+
while (reverse.hasNext()) {
73+
observer.onNext(reverse.next());
74+
}
75+
observer.onCompleted();
76+
}
77+
78+
@Override
79+
public void onError(Exception e) {
80+
observer.onError(e);
81+
}
82+
83+
@Override
84+
public void onNext(T args) {
85+
while (!deque.offerFirst(args)) {
86+
deque.removeLast();
87+
}
88+
}
89+
90+
}
91+
92+
}
93+
94+
public static class UnitTest {
95+
96+
@Test
97+
public void testTakeLastEmpty() {
98+
Observable<String> w = Observable.toObservable();
99+
Observable<String> take = Observable.create(takeLast(w, 2));
100+
101+
@SuppressWarnings("unchecked")
102+
Observer<String> aObserver = mock(Observer.class);
103+
take.subscribe(aObserver);
104+
verify(aObserver, never()).onNext(any(String.class));
105+
verify(aObserver, never()).onError(any(Exception.class));
106+
verify(aObserver, times(1)).onCompleted();
107+
}
108+
109+
@Test
110+
public void testTakeLast1() {
111+
Observable<String> w = Observable.toObservable("one", "two", "three");
112+
Observable<String> take = Observable.create(takeLast(w, 2));
113+
114+
@SuppressWarnings("unchecked")
115+
Observer<String> aObserver = mock(Observer.class);
116+
take.subscribe(aObserver);
117+
verify(aObserver, times(1)).onNext("two");
118+
verify(aObserver, times(1)).onNext("three");
119+
verify(aObserver, never()).onNext("one");
120+
verify(aObserver, never()).onError(any(Exception.class));
121+
verify(aObserver, times(1)).onCompleted();
122+
}
123+
124+
@Test
125+
public void testTakeLast2() {
126+
Observable<String> w = Observable.toObservable("one");
127+
Observable<String> take = Observable.create(takeLast(w, 10));
128+
129+
@SuppressWarnings("unchecked")
130+
Observer<String> aObserver = mock(Observer.class);
131+
take.subscribe(aObserver);
132+
verify(aObserver, times(1)).onNext("one");
133+
verify(aObserver, never()).onError(any(Exception.class));
134+
verify(aObserver, times(1)).onCompleted();
135+
}
136+
137+
}
138+
139+
}

0 commit comments

Comments
 (0)