Skip to content

Commit 7bfeccc

Browse files
VeskoIakarnokd
authored andcommitted
Add doAfterTerminate callback to the Single type. (#5093)
* Add doAfterTerminate callback to the Single type. * Mark the doAfterTerminate() experimental since 2.0.6
1 parent 28d1352 commit 7bfeccc

File tree

3 files changed

+257
-0
lines changed

3 files changed

+257
-0
lines changed

src/main/java/io/reactivex/Single.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1798,6 +1798,32 @@ public final Single<T> doAfterSuccess(Consumer<? super T> onAfterSuccess) {
17981798
return RxJavaPlugins.onAssembly(new SingleDoAfterSuccess<T>(this, onAfterSuccess));
17991799
}
18001800

1801+
/**
1802+
* Registers an {@link Action} to be called after this Single invokes either onSuccess or onError.
1803+
* * <p>Note that the {@code doAfterSuccess} action is shared between subscriptions and as such
1804+
* should be thread-safe.</p>
1805+
* <p>
1806+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/doAfterTerminate.png" alt="">
1807+
* <dl>
1808+
* <dt><b>Scheduler:</b></dt>
1809+
* <dd>{@code doAfterTerminate} does not operate by default on a particular {@link Scheduler}.</dd>
1810+
* </dl>
1811+
*
1812+
* @param onAfterTerminate
1813+
* an {@link Action} to be invoked when the source Single finishes
1814+
* @return a Single that emits the same items as the source Single, then invokes the
1815+
* {@link Action}
1816+
* @see <a href="http://reactivex.io/documentation/operators/do.html">ReactiveX operators documentation: Do</a>
1817+
* @since 2.0.6 - experimental
1818+
*/
1819+
@CheckReturnValue
1820+
@SchedulerSupport(SchedulerSupport.NONE)
1821+
@Experimental
1822+
public final Single<T> doAfterTerminate(Action onAfterTerminate) {
1823+
ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
1824+
return RxJavaPlugins.onAssembly(new SingleDoAfterTerminate<T>(this, onAfterTerminate));
1825+
}
1826+
18011827
/**
18021828
* Calls the specified action after this Single signals onSuccess or onError or gets disposed by
18031829
* the downstream.
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.single;
15+
16+
import io.reactivex.Single;
17+
import io.reactivex.SingleObserver;
18+
import io.reactivex.SingleSource;
19+
import io.reactivex.disposables.Disposable;
20+
import io.reactivex.exceptions.Exceptions;
21+
import io.reactivex.functions.Action;
22+
import io.reactivex.internal.disposables.DisposableHelper;
23+
import io.reactivex.plugins.RxJavaPlugins;
24+
25+
/**
26+
* Calls an action after pushing the current item or an error to the downstream.
27+
* @param <T> the value type
28+
* @since 2.0.6 - experimental
29+
*/
30+
public final class SingleDoAfterTerminate<T> extends Single<T> {
31+
32+
final SingleSource<T> source;
33+
34+
final Action onAfterTerminate;
35+
36+
public SingleDoAfterTerminate(SingleSource<T> source, Action onAfterTerminate) {
37+
this.source = source;
38+
this.onAfterTerminate = onAfterTerminate;
39+
}
40+
41+
@Override
42+
protected void subscribeActual(SingleObserver<? super T> s) {
43+
source.subscribe(new DoAfterTerminateObserver<T>(s, onAfterTerminate));
44+
}
45+
46+
static final class DoAfterTerminateObserver<T> implements SingleObserver<T>, Disposable {
47+
48+
final SingleObserver<? super T> actual;
49+
50+
final Action onAfterTerminate;
51+
52+
Disposable d;
53+
54+
DoAfterTerminateObserver(SingleObserver<? super T> actual, Action onAfterTerminate) {
55+
this.actual = actual;
56+
this.onAfterTerminate = onAfterTerminate;
57+
}
58+
59+
@Override
60+
public void onSubscribe(Disposable d) {
61+
if (DisposableHelper.validate(this.d, d)) {
62+
this.d = d;
63+
64+
actual.onSubscribe(this);
65+
}
66+
}
67+
68+
@Override
69+
public void onSuccess(T t) {
70+
actual.onSuccess(t);
71+
72+
onAfterTerminate();
73+
}
74+
75+
@Override
76+
public void onError(Throwable e) {
77+
actual.onError(e);
78+
79+
onAfterTerminate();
80+
}
81+
82+
@Override
83+
public void dispose() {
84+
d.dispose();
85+
}
86+
87+
@Override
88+
public boolean isDisposed() {
89+
return d.isDisposed();
90+
}
91+
92+
private void onAfterTerminate() {
93+
try {
94+
onAfterTerminate.run();
95+
} catch (Throwable ex) {
96+
Exceptions.throwIfFatal(ex);
97+
RxJavaPlugins.onError(ex);
98+
}
99+
}
100+
}
101+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.single;
15+
16+
import io.reactivex.Single;
17+
import io.reactivex.SingleSource;
18+
import io.reactivex.TestHelper;
19+
import io.reactivex.exceptions.TestException;
20+
import io.reactivex.functions.Action;
21+
import io.reactivex.functions.Function;
22+
import io.reactivex.internal.functions.Functions;
23+
import io.reactivex.observers.TestObserver;
24+
import io.reactivex.plugins.RxJavaPlugins;
25+
import io.reactivex.subjects.PublishSubject;
26+
import org.junit.Test;
27+
28+
import java.util.List;
29+
30+
import static org.junit.Assert.assertEquals;
31+
32+
public class SingleDoAfterTerminateTest {
33+
34+
private final int[] call = { 0 };
35+
36+
private final Action afterTerminate = new Action() {
37+
@Override
38+
public void run() throws Exception {
39+
call[0]++;
40+
}
41+
};
42+
43+
private final TestObserver<Integer> ts = new TestObserver<Integer>();
44+
45+
@Test
46+
public void just() {
47+
Single.just(1)
48+
.doAfterTerminate(afterTerminate)
49+
.subscribeWith(ts)
50+
.assertResult(1);
51+
52+
assertAfterTerminateCalledOnce();
53+
}
54+
55+
@Test
56+
public void error() {
57+
Single.<Integer>error(new TestException())
58+
.doAfterTerminate(afterTerminate)
59+
.subscribeWith(ts)
60+
.assertFailure(TestException.class);
61+
62+
assertAfterTerminateCalledOnce();
63+
}
64+
65+
@Test(expected = NullPointerException.class)
66+
public void afterTerminateActionNull() {
67+
Single.just(1).doAfterTerminate(null);
68+
}
69+
70+
@Test
71+
public void justConditional() {
72+
Single.just(1)
73+
.doAfterTerminate(afterTerminate)
74+
.filter(Functions.alwaysTrue())
75+
.subscribeWith(ts)
76+
.assertResult(1);
77+
78+
assertAfterTerminateCalledOnce();
79+
}
80+
81+
@Test
82+
public void errorConditional() {
83+
Single.<Integer>error(new TestException())
84+
.doAfterTerminate(afterTerminate)
85+
.filter(Functions.alwaysTrue())
86+
.subscribeWith(ts)
87+
.assertFailure(TestException.class);
88+
89+
assertAfterTerminateCalledOnce();
90+
}
91+
92+
@Test
93+
public void actionThrows() {
94+
List<Throwable> errors = TestHelper.trackPluginErrors();
95+
try {
96+
Single.just(1)
97+
.doAfterTerminate(new Action() {
98+
@Override
99+
public void run() throws Exception {
100+
throw new TestException();
101+
}
102+
})
103+
.test()
104+
.assertResult(1);
105+
106+
TestHelper.assertUndeliverable(errors, 0, TestException.class);
107+
} finally {
108+
RxJavaPlugins.reset();
109+
}
110+
}
111+
112+
@Test
113+
public void dispose() {
114+
TestHelper.checkDisposed(PublishSubject.<Integer>create().singleOrError().doAfterTerminate(afterTerminate));
115+
}
116+
117+
@Test
118+
public void doubleOnSubscribe() {
119+
TestHelper.checkDoubleOnSubscribeSingle(new Function<Single<Integer>, SingleSource<Integer>>() {
120+
@Override
121+
public SingleSource<Integer> apply(Single<Integer> m) throws Exception {
122+
return m.doAfterTerminate(afterTerminate);
123+
}
124+
});
125+
}
126+
127+
private void assertAfterTerminateCalledOnce() {
128+
assertEquals(1, call[0]);
129+
}
130+
}

0 commit comments

Comments
 (0)