Skip to content

Commit 1d7e889

Browse files
committed
1.x: fix multiple chained Single.doAfterTerminate not calling actions (#3883)
1 parent dec05b2 commit 1d7e889

File tree

3 files changed

+184
-1
lines changed

3 files changed

+184
-1
lines changed

src/main/java/rx/Single.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2484,7 +2484,7 @@ public final Single<T> doOnUnsubscribe(final Action0 action) {
24842484
*/
24852485
@Experimental
24862486
public final Single<T> doAfterTerminate(Action0 action) {
2487-
return lift(new OperatorDoAfterTerminate<T>(action));
2487+
return create(new SingleDoAfterTerminate<T>(this, action));
24882488
}
24892489

24902490
/**
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.*;
19+
import rx.exceptions.Exceptions;
20+
import rx.functions.Action0;
21+
import rx.internal.util.RxJavaPluginUtils;
22+
23+
/**
24+
* Execute an action after onSuccess or onError has been delivered.
25+
*
26+
* @param <T> the value type
27+
*/
28+
public final class SingleDoAfterTerminate<T> implements Single.OnSubscribe<T> {
29+
final Single<T> source;
30+
31+
final Action0 action;
32+
33+
public SingleDoAfterTerminate(Single<T> source, Action0 action) {
34+
this.source = source;
35+
this.action = action;
36+
}
37+
38+
@Override
39+
public void call(SingleSubscriber<? super T> t) {
40+
SingleDoAfterTerminateSubscriber<T> parent = new SingleDoAfterTerminateSubscriber<T>(t, action);
41+
t.add(parent);
42+
source.subscribe(parent);
43+
}
44+
45+
static final class SingleDoAfterTerminateSubscriber<T> extends SingleSubscriber<T> {
46+
final SingleSubscriber<? super T> actual;
47+
48+
final Action0 action;
49+
50+
public SingleDoAfterTerminateSubscriber(SingleSubscriber<? super T> actual, Action0 action) {
51+
this.actual = actual;
52+
this.action = action;
53+
}
54+
55+
@Override
56+
public void onSuccess(T value) {
57+
try {
58+
actual.onSuccess(value);
59+
} finally {
60+
doAction();
61+
}
62+
}
63+
64+
@Override
65+
public void onError(Throwable error) {
66+
try {
67+
actual.onError(error);
68+
} finally {
69+
doAction();
70+
}
71+
}
72+
73+
void doAction() {
74+
try {
75+
action.call();
76+
} catch (Throwable ex) {
77+
Exceptions.throwIfFatal(ex);
78+
RxJavaPluginUtils.handleException(ex);
79+
}
80+
}
81+
}
82+
83+
84+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/**
2+
* Copyright 2015 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import java.util.concurrent.atomic.AtomicInteger;
19+
20+
import org.junit.*;
21+
22+
import rx.Single;
23+
import rx.functions.*;
24+
import rx.observers.TestSubscriber;
25+
26+
public class SingleDoAfterTerminateTest {
27+
28+
@Test
29+
public void chainedCallsOuter() {
30+
for (int i = 2; i <= 5; i++) {
31+
final AtomicInteger counter = new AtomicInteger();
32+
33+
Single<String> source = Single.just("Test")
34+
.flatMap(new Func1<String, Single<String>>() {
35+
@Override
36+
public Single<String> call(String s) {
37+
return Single.just("Test2")
38+
.doAfterTerminate(new Action0() {
39+
@Override
40+
public void call() {
41+
counter.getAndIncrement();
42+
}
43+
});
44+
}
45+
}
46+
);
47+
Single<String> result = source;
48+
49+
for (int j = 1; j < i; j++) {
50+
result = result.doAfterTerminate(new Action0() {
51+
@Override
52+
public void call() {
53+
counter.getAndIncrement();
54+
}
55+
});
56+
}
57+
58+
result
59+
.subscribe(new TestSubscriber<String>());
60+
61+
Assert.assertEquals(i, counter.get());
62+
}
63+
}
64+
65+
@Test
66+
public void chainedCallsInner() {
67+
for (int i = 2; i <= 5; i++) {
68+
final AtomicInteger counter = new AtomicInteger();
69+
70+
final int fi = i;
71+
72+
Single.just("Test")
73+
.flatMap(new Func1<String, Single<String>>() {
74+
@Override
75+
public Single<String> call(String s) {
76+
Single<String> result = Single.just("Test2");
77+
for (int j = 1; j < fi; j++) {
78+
result = result.doAfterTerminate(new Action0() {
79+
@Override
80+
public void call() {
81+
counter.getAndIncrement();
82+
}
83+
});
84+
}
85+
return result;
86+
}
87+
})
88+
.doAfterTerminate(new Action0() {
89+
@Override
90+
public void call() {
91+
counter.getAndIncrement();
92+
}
93+
})
94+
.subscribe(new TestSubscriber<String>());
95+
96+
Assert.assertEquals(i, counter.get());
97+
}
98+
}
99+
}

0 commit comments

Comments
 (0)