Skip to content

Commit da1ab61

Browse files
committed
Added takeLast to Observable
1 parent d0339e5 commit da1ab61

File tree

2 files changed

+33
-2
lines changed

2 files changed

+33
-2
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1347,6 +1347,22 @@ public static <T> Observable<T> take(final Observable<T> items, final int num) {
13471347
return _create(OperationTake.take(items, num));
13481348
}
13491349

1350+
/**
1351+
* Returns an Observable that emits the last <code>count</code> items emitted by the source
1352+
* Observable.
1353+
*
1354+
* @param items
1355+
* the source Observable
1356+
* @param count
1357+
* the number of items from the end of the sequence emitted by the source
1358+
* Observable to emit
1359+
* @return an Observable that only emits the last <code>count</code> items emitted by the source
1360+
* Observable
1361+
*/
1362+
public static <T> Observable<T> takeLast(final Observable<T> items, final int count) {
1363+
return _create(OperationTakeLast.takeLast(items, count));
1364+
}
1365+
13501366
/**
13511367
* Returns an Observable that emits a single item, a list composed of all the items emitted by
13521368
* the source Observable.
@@ -2235,6 +2251,20 @@ public Observable<T> take(final int num) {
22352251
return take(this, num);
22362252
}
22372253

2254+
/**
2255+
* Returns an Observable that emits the last <code>count</code> items emitted by the source
2256+
* Observable.
2257+
*
2258+
* @param count
2259+
* the number of items from the end of the sequence emitted by the source
2260+
* Observable to emit
2261+
* @return an Observable that only emits the last <code>count</code> items emitted by the source
2262+
* Observable
2263+
*/
2264+
public Observable<T> takeLast(final int count) {
2265+
return takeLast(this, count);
2266+
}
2267+
22382268
/**
22392269
* Returns an Observable that emits a single item, a list composed of all the items emitted by
22402270
* the source Observable.

rxjava-core/src/main/java/rx/operators/OperationTakeLast.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@
2323
import rx.util.functions.Func1;
2424

2525
import java.util.Iterator;
26-
import java.util.concurrent.ConcurrentLinkedQueue;
2726
import java.util.concurrent.LinkedBlockingDeque;
28-
import java.util.concurrent.atomic.AtomicInteger;
2927

3028
import static org.mockito.Matchers.any;
3129
import static org.mockito.Mockito.*;
3230

31+
/**
32+
* Returns a specified number of contiguous elements from the end of an observable sequence.
33+
*/
3334
public final class OperationTakeLast {
3435

3536
public static <T> Func1<Observer<T>, Subscription> takeLast(final Observable<T> items, final int count) {

0 commit comments

Comments
 (0)