Skip to content

Commit 4323b70

Browse files
committed
Implemented the 'SkipLast' operator
1 parent ea575d0 commit 4323b70

File tree

2 files changed

+224
-0
lines changed

2 files changed

+224
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import rx.operators.OperationSample;
6464
import rx.operators.OperationScan;
6565
import rx.operators.OperationSkip;
66+
import rx.operators.OperationSkipLast;
6667
import rx.operators.OperationSkipWhile;
6768
import rx.operators.OperationSubscribeOn;
6869
import rx.operators.OperationSum;
@@ -4054,6 +4055,32 @@ public Observable<T> skipWhile(Func1<? super T, Boolean> predicate) {
40544055
return create(OperationSkipWhile.skipWhile(this, predicate));
40554056
}
40564057

4058+
/**
4059+
* Bypasses a specified number of elements at the end of an observable
4060+
* sequence.
4061+
* <p>
4062+
* This operator accumulates a queue with a length enough to store the first
4063+
* count elements. As more elements are received, elements are taken from
4064+
* the front of the queue and produced on the result sequence. This causes
4065+
* elements to be delayed.
4066+
*
4067+
* @param source
4068+
* the source sequence.
4069+
* @param count
4070+
* number of elements to bypass at the end of the source
4071+
* sequence.
4072+
* @return An observable sequence containing the source sequence elements
4073+
* except for the bypassed ones at the end.
4074+
*
4075+
* @throws IndexOutOfBoundsException
4076+
* count is less than zero.
4077+
*
4078+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211750(v=vs.103).aspx">MSDN: Observable.SkipLast</a>
4079+
*/
4080+
public Observable<T> skipLast(int count) {
4081+
return create(OperationSkipLast.skipLast(this, count));
4082+
}
4083+
40574084
/**
40584085
* Returns an Observable that emits a single item, a list composed of all the items emitted by
40594086
* the source Observable.
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
package rx.operators;
2+
3+
import static org.mockito.Matchers.any;
4+
import static org.mockito.Mockito.inOrder;
5+
import static org.mockito.Mockito.mock;
6+
import static org.mockito.Mockito.never;
7+
import static org.mockito.Mockito.times;
8+
import static org.mockito.Mockito.verify;
9+
10+
import java.util.Deque;
11+
import java.util.LinkedList;
12+
import java.util.concurrent.locks.ReentrantLock;
13+
14+
import org.junit.Test;
15+
import org.mockito.InOrder;
16+
17+
import rx.Observable;
18+
import rx.Observable.OnSubscribeFunc;
19+
import rx.Observer;
20+
import rx.Subscription;
21+
22+
/**
23+
* Bypasses a specified number of elements at the end of an observable sequence.
24+
*/
25+
public class OperationSkipLast {
26+
27+
/**
28+
* Bypasses a specified number of elements at the end of an observable
29+
* sequence.
30+
* <p>
31+
* This operator accumulates a queue with a length enough to store the first
32+
* count elements. As more elements are received, elements are taken from
33+
* the front of the queue and produced on the result sequence. This causes
34+
* elements to be delayed.
35+
*
36+
* @param source
37+
* the source sequence.
38+
* @param count
39+
* number of elements to bypass at the end of the source
40+
* sequence.
41+
* @return An observable sequence containing the source sequence elements
42+
* except for the bypassed ones at the end.
43+
*
44+
* @throws IndexOutOfBoundsException
45+
* count is less than zero.
46+
*/
47+
public static <T> OnSubscribeFunc<T> skipLast(
48+
Observable<? extends T> source, int count) {
49+
return new SkipLast<T>(source, count);
50+
}
51+
52+
private static class SkipLast<T> implements OnSubscribeFunc<T> {
53+
private final int count;
54+
private final Observable<? extends T> source;
55+
56+
private SkipLast(Observable<? extends T> source, int count) {
57+
this.count = count;
58+
this.source = source;
59+
}
60+
61+
public Subscription onSubscribe(final Observer<? super T> observer) {
62+
if (count < 0) {
63+
throw new IndexOutOfBoundsException(
64+
"count could not be negative");
65+
}
66+
final SafeObservableSubscription subscription = new SafeObservableSubscription();
67+
return subscription.wrap(source.subscribe(new Observer<T>() {
68+
69+
private final ReentrantLock lock = new ReentrantLock();
70+
71+
/**
72+
* Store the last count elements until now.
73+
*/
74+
private final Deque<T> deque = new LinkedList<T>();
75+
76+
@Override
77+
public void onCompleted() {
78+
observer.onCompleted();
79+
}
80+
81+
@Override
82+
public void onError(Throwable e) {
83+
observer.onError(e);
84+
}
85+
86+
@Override
87+
public void onNext(T value) {
88+
lock.lock();
89+
try {
90+
deque.offerLast(value);
91+
if (deque.size() > count) {
92+
// Now deque has count + 1 elements, so the first
93+
// element in the deque definitely does not belong
94+
// to the last count elements of the source
95+
// sequence. We can emit it now.
96+
observer.onNext(deque.removeFirst());
97+
}
98+
} catch (Throwable ex) {
99+
observer.onError(ex);
100+
subscription.unsubscribe();
101+
} finally {
102+
lock.unlock();
103+
}
104+
}
105+
106+
}));
107+
}
108+
}
109+
110+
public static class UnitTest {
111+
112+
@Test
113+
public void testSkipLastEmpty() {
114+
Observable<String> w = Observable.empty();
115+
Observable<String> observable = Observable.create(skipLast(w, 2));
116+
117+
@SuppressWarnings("unchecked")
118+
Observer<String> aObserver = mock(Observer.class);
119+
observable.subscribe(aObserver);
120+
verify(aObserver, never()).onNext(any(String.class));
121+
verify(aObserver, never()).onError(any(Throwable.class));
122+
verify(aObserver, times(1)).onCompleted();
123+
}
124+
125+
@Test
126+
public void testSkipLast1() {
127+
Observable<String> w = Observable.from("one", "two", "three");
128+
Observable<String> observable = Observable.create(skipLast(w, 2));
129+
130+
@SuppressWarnings("unchecked")
131+
Observer<String> aObserver = mock(Observer.class);
132+
InOrder inOrder = inOrder(aObserver);
133+
observable.subscribe(aObserver);
134+
inOrder.verify(aObserver, never()).onNext("two");
135+
inOrder.verify(aObserver, never()).onNext("three");
136+
verify(aObserver, times(1)).onNext("one");
137+
verify(aObserver, never()).onError(any(Throwable.class));
138+
verify(aObserver, times(1)).onCompleted();
139+
}
140+
141+
@Test
142+
public void testSkipLast2() {
143+
Observable<String> w = Observable.from("one", "two");
144+
Observable<String> observable = Observable.create(skipLast(w, 2));
145+
146+
@SuppressWarnings("unchecked")
147+
Observer<String> aObserver = mock(Observer.class);
148+
observable.subscribe(aObserver);
149+
verify(aObserver, never()).onNext(any(String.class));
150+
verify(aObserver, never()).onError(any(Throwable.class));
151+
verify(aObserver, times(1)).onCompleted();
152+
}
153+
154+
@Test
155+
public void testSkipLastWithZeroCount() {
156+
Observable<String> w = Observable.from("one", "two");
157+
Observable<String> observable = Observable.create(skipLast(w, 0));
158+
159+
@SuppressWarnings("unchecked")
160+
Observer<String> aObserver = mock(Observer.class);
161+
observable.subscribe(aObserver);
162+
verify(aObserver, times(1)).onNext("one");
163+
verify(aObserver, times(1)).onNext("two");
164+
verify(aObserver, never()).onError(any(Throwable.class));
165+
verify(aObserver, times(1)).onCompleted();
166+
}
167+
168+
@Test
169+
public void testSkipLastWithNull() {
170+
Observable<String> w = Observable.from("one", null, "two");
171+
Observable<String> observable = Observable.create(skipLast(w, 1));
172+
173+
@SuppressWarnings("unchecked")
174+
Observer<String> aObserver = mock(Observer.class);
175+
observable.subscribe(aObserver);
176+
verify(aObserver, times(1)).onNext("one");
177+
verify(aObserver, times(1)).onNext(null);
178+
verify(aObserver, never()).onNext("two");
179+
verify(aObserver, never()).onError(any(Throwable.class));
180+
verify(aObserver, times(1)).onCompleted();
181+
}
182+
183+
@Test
184+
public void testSkipLastWithNegativeCount() {
185+
Observable<String> w = Observable.from("one");
186+
Observable<String> observable = Observable.create(skipLast(w, -1));
187+
188+
@SuppressWarnings("unchecked")
189+
Observer<String> aObserver = mock(Observer.class);
190+
observable.subscribe(aObserver);
191+
verify(aObserver, never()).onNext(any(String.class));
192+
verify(aObserver, times(1)).onError(
193+
any(IndexOutOfBoundsException.class));
194+
verify(aObserver, never()).onCompleted();
195+
}
196+
}
197+
}

0 commit comments

Comments
 (0)