Skip to content

Commit 994d8fc

Browse files
authored
2.x: last() to return Single (#4570)
1 parent 85da0a8 commit 994d8fc

File tree

10 files changed

+356
-124
lines changed

10 files changed

+356
-124
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8877,14 +8877,14 @@ public final <TRight, TLeftEnd, TRightEnd, R> Flowable<R> join(
88778877
* <dd>{@code last} does not operate by default on a particular {@link Scheduler}.</dd>
88788878
* </dl>
88798879
*
8880-
* @return a Flowable that emits the last item from the source Publisher or notifies Subscribers of an
8880+
* @return a Single that emits the last item from the source Publisher or notifies Subscribers of an
88818881
* error
88828882
* @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX operators documentation: Last</a>
88838883
*/
88848884
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
88858885
@SchedulerSupport(SchedulerSupport.NONE)
8886-
public final Flowable<T> last() {
8887-
return takeLast(1).single();
8886+
public final Single<T> last() {
8887+
return RxJavaPlugins.onAssembly(new FlowableLastSingle<T>(this, null));
88888888
}
88898889

88908890
/**
@@ -8902,14 +8902,15 @@ public final Flowable<T> last() {
89028902
*
89038903
* @param defaultItem
89048904
* the default item to emit if the source Publisher is empty
8905-
* @return a Flowable that emits only the last item emitted by the source Publisher, or a default item
8905+
* @return a Single that emits only the last item emitted by the source Publisher, or a default item
89068906
* if the source Publisher is empty
89078907
* @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX operators documentation: Last</a>
89088908
*/
89098909
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
89108910
@SchedulerSupport(SchedulerSupport.NONE)
8911-
public final Flowable<T> last(T defaultItem) {
8912-
return takeLast(1).single(defaultItem);
8911+
public final Single<T> last(T defaultItem) {
8912+
ObjectHelper.requireNonNull(defaultItem, "defaultItem");
8913+
return RxJavaPlugins.onAssembly(new FlowableLastSingle<T>(this, defaultItem));
89138914
}
89148915

89158916
/**
@@ -9937,7 +9938,7 @@ public final Single<T> reduce(BiFunction<T, T, T> reducer) {
99379938
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
99389939
@SchedulerSupport(SchedulerSupport.NONE)
99399940
public final <R> Flowable<R> reduce(R seed, BiFunction<R, ? super T, R> reducer) {
9940-
return scan(seed, reducer).last();
9941+
return scan(seed, reducer).takeLast(1).single(); // TODO
99419942
}
99429943

99439944
/**
@@ -9987,7 +9988,7 @@ public final <R> Flowable<R> reduce(R seed, BiFunction<R, ? super T, R> reducer)
99879988
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
99889989
@SchedulerSupport(SchedulerSupport.NONE)
99899990
public final <R> Flowable<R> reduceWith(Callable<R> seedSupplier, BiFunction<R, ? super T, R> reducer) {
9990-
return scanWith(seedSupplier, reducer).last();
9991+
return scanWith(seedSupplier, reducer).takeLast(1).single();
99919992
}
99929993

99939994
/**

src/main/java/io/reactivex/Observable.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7659,8 +7659,8 @@ public final <TRight, TLeftEnd, TRightEnd, R> Observable<R> join(
76597659
* @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX operators documentation: Last</a>
76607660
*/
76617661
@SchedulerSupport(SchedulerSupport.NONE)
7662-
public final Observable<T> last() {
7663-
return takeLast(1).single();
7662+
public final Single<T> last() {
7663+
return RxJavaPlugins.onAssembly(new ObservableLastSingle<T>(this, null));
76647664
}
76657665

76667666
/**
@@ -7680,8 +7680,9 @@ public final Observable<T> last() {
76807680
* @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX operators documentation: Last</a>
76817681
*/
76827682
@SchedulerSupport(SchedulerSupport.NONE)
7683-
public final Observable<T> last(T defaultItem) {
7684-
return takeLast(1).single(defaultItem);
7683+
public final Single<T> last(T defaultItem) {
7684+
ObjectHelper.requireNonNull(defaultItem, "defaultItem is null");
7685+
return RxJavaPlugins.onAssembly(new ObservableLastSingle<T>(this, defaultItem));
76857686
}
76867687

76877688
/**
@@ -8202,7 +8203,7 @@ public final ConnectableObservable<T> publish(int bufferSize) {
82028203
*/
82038204
@SchedulerSupport(SchedulerSupport.NONE)
82048205
public final Observable<T> reduce(BiFunction<T, T, T> reducer) {
8205-
return scan(reducer).last();
8206+
return scan(reducer).takeLast(1).single();
82068207
}
82078208

82088209
/**
@@ -8248,7 +8249,7 @@ public final Observable<T> reduce(BiFunction<T, T, T> reducer) {
82488249
*/
82498250
@SchedulerSupport(SchedulerSupport.NONE)
82508251
public final <R> Observable<R> reduce(R seed, BiFunction<R, ? super T, R> reducer) {
8251-
return scan(seed, reducer).last();
8252+
return scan(seed, reducer).takeLast(1).single();
82528253
}
82538254

82548255
/**
@@ -8294,7 +8295,7 @@ public final <R> Observable<R> reduce(R seed, BiFunction<R, ? super T, R> reduce
82948295
*/
82958296
@SchedulerSupport(SchedulerSupport.NONE)
82968297
public final <R> Observable<R> reduceWith(Callable<R> seedSupplier, BiFunction<R, ? super T, R> reducer) {
8297-
return scanWith(seedSupplier, reducer).last();
8298+
return scanWith(seedSupplier, reducer).takeLast(1).single();
82988299
}
82998300

83008301
/**
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.flowable;
15+
16+
import java.util.NoSuchElementException;
17+
18+
import org.reactivestreams.*;
19+
20+
import io.reactivex.*;
21+
import io.reactivex.disposables.Disposable;
22+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
23+
24+
/**
25+
* Consumes the source Publisher and emits its last item, the defaultItem
26+
* if empty or a NoSuchElementException if even the defaultItem is null.
27+
*
28+
* @param <T> the value type
29+
*/
30+
public final class FlowableLastSingle<T> extends Single<T> {
31+
32+
final Publisher<T> source;
33+
34+
final T defaultItem;
35+
36+
public FlowableLastSingle(Publisher<T> source, T defaultItem) {
37+
this.source = source;
38+
this.defaultItem = defaultItem;
39+
}
40+
41+
// TODO fuse back to Flowable
42+
43+
@Override
44+
protected void subscribeActual(SingleObserver<? super T> observer) {
45+
source.subscribe(new LastSubscriber<T>(observer, defaultItem));
46+
}
47+
48+
static final class LastSubscriber<T> implements Subscriber<T>, Disposable {
49+
50+
final SingleObserver<? super T> actual;
51+
52+
final T defaultItem;
53+
54+
Subscription s;
55+
56+
T item;
57+
58+
public LastSubscriber(SingleObserver<? super T> actual, T defaultItem) {
59+
this.actual = actual;
60+
this.defaultItem = defaultItem;
61+
}
62+
63+
@Override
64+
public void dispose() {
65+
s.cancel();
66+
s = SubscriptionHelper.CANCELLED;
67+
}
68+
69+
@Override
70+
public boolean isDisposed() {
71+
return s == SubscriptionHelper.CANCELLED;
72+
}
73+
74+
@Override
75+
public void onSubscribe(Subscription s) {
76+
if (SubscriptionHelper.validate(this.s, s)) {
77+
this.s = s;
78+
79+
actual.onSubscribe(this);
80+
81+
s.request(Long.MAX_VALUE);
82+
}
83+
}
84+
85+
@Override
86+
public void onNext(T t) {
87+
item = t;
88+
}
89+
90+
@Override
91+
public void onError(Throwable t) {
92+
s = SubscriptionHelper.CANCELLED;
93+
item = null;
94+
actual.onError(t);
95+
}
96+
97+
@Override
98+
public void onComplete() {
99+
s = SubscriptionHelper.CANCELLED;
100+
T v = item;
101+
if (v != null) {
102+
item = null;
103+
actual.onSuccess(v);
104+
} else {
105+
v = defaultItem;
106+
if (v == null) {
107+
actual.onError(new NoSuchElementException());
108+
} else {
109+
actual.onSuccess(v);
110+
}
111+
}
112+
}
113+
114+
115+
}
116+
}
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.observable;
15+
16+
import java.util.NoSuchElementException;
17+
18+
import io.reactivex.*;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.internal.disposables.DisposableHelper;
21+
22+
/**
23+
* Consumes the source ObservableSource and emits its last item, the defaultItem
24+
* if empty or a NoSuchElementException if even the defaultItem is null.
25+
*
26+
* @param <T> the value type
27+
*/
28+
public final class ObservableLastSingle<T> extends Single<T> {
29+
30+
final ObservableSource<T> source;
31+
32+
final T defaultItem;
33+
34+
public ObservableLastSingle(ObservableSource<T> source, T defaultItem) {
35+
this.source = source;
36+
this.defaultItem = defaultItem;
37+
}
38+
39+
// TODO fuse back to Observable
40+
41+
@Override
42+
protected void subscribeActual(SingleObserver<? super T> observer) {
43+
source.subscribe(new LastObserver<T>(observer, defaultItem));
44+
}
45+
46+
static final class LastObserver<T> implements Observer<T>, Disposable {
47+
48+
final SingleObserver<? super T> actual;
49+
50+
final T defaultItem;
51+
52+
Disposable s;
53+
54+
T item;
55+
56+
public LastObserver(SingleObserver<? super T> actual, T defaultItem) {
57+
this.actual = actual;
58+
this.defaultItem = defaultItem;
59+
}
60+
61+
@Override
62+
public void dispose() {
63+
s.dispose();
64+
s = DisposableHelper.DISPOSED;
65+
}
66+
67+
@Override
68+
public boolean isDisposed() {
69+
return s == DisposableHelper.DISPOSED;
70+
}
71+
72+
@Override
73+
public void onSubscribe(Disposable s) {
74+
if (DisposableHelper.validate(this.s, s)) {
75+
this.s = s;
76+
77+
actual.onSubscribe(this);
78+
}
79+
}
80+
81+
@Override
82+
public void onNext(T t) {
83+
item = t;
84+
}
85+
86+
@Override
87+
public void onError(Throwable t) {
88+
s = DisposableHelper.DISPOSED;
89+
item = null;
90+
actual.onError(t);
91+
}
92+
93+
@Override
94+
public void onComplete() {
95+
s = DisposableHelper.DISPOSED;
96+
T v = item;
97+
if (v != null) {
98+
item = null;
99+
actual.onSuccess(v);
100+
} else {
101+
v = defaultItem;
102+
if (v == null) {
103+
actual.onError(new NoSuchElementException());
104+
} else {
105+
actual.onSuccess(v);
106+
}
107+
}
108+
}
109+
110+
111+
}
112+
}

src/test/java/io/reactivex/InternalWrongNaming.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ public void observableNoUnsubscrib() throws Exception {
158158

159159
@Test
160160
public void flowableNoObserver() throws Exception {
161-
checkInternalOperatorNaming("Flowable", "Observer", "FlowableFromObservable");
161+
checkInternalOperatorNaming("Flowable", "Observer",
162+
"FlowableFromObservable",
163+
"FlowableLastSingle"
164+
);
162165
}
163166
}

0 commit comments

Comments
 (0)