Skip to content

Commit 8a78c74

Browse files
authored
2.x: Single.subscribe(BiConsumer) consistent isDisposed (#5277)
1 parent 80d9b90 commit 8a78c74

File tree

2 files changed

+39
-0
lines changed

2 files changed

+39
-0
lines changed

src/main/java/io/reactivex/internal/observers/BiConsumerSingleObserver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public BiConsumerSingleObserver(BiConsumer<? super T, ? super Throwable> onCallb
3737
@Override
3838
public void onError(Throwable e) {
3939
try {
40+
lazySet(DisposableHelper.DISPOSED);
4041
onCallback.accept(null, e);
4142
} catch (Throwable ex) {
4243
Exceptions.throwIfFatal(ex);
@@ -52,6 +53,7 @@ public void onSubscribe(Disposable d) {
5253
@Override
5354
public void onSuccess(T value) {
5455
try {
56+
lazySet(DisposableHelper.DISPOSED);
5557
onCallback.accept(value, null);
5658
} catch (Throwable ex) {
5759
Exceptions.throwIfFatal(ex);

src/test/java/io/reactivex/single/SingleSubscribeTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import static org.junit.Assert.*;
1717

18+
import java.io.IOException;
1819
import java.util.List;
1920

2021
import org.junit.Test;
@@ -227,4 +228,40 @@ public void successIsDisposed() {
227228
public void errorIsDisposed() {
228229
assertTrue(Single.error(new TestException()).subscribe(Functions.emptyConsumer(), Functions.emptyConsumer()).isDisposed());
229230
}
231+
232+
@Test
233+
public void biConsumerIsDisposedOnSuccess() {
234+
final Object[] result = { null, null };
235+
236+
Disposable d = Single.just(1)
237+
.subscribe(new BiConsumer<Integer, Throwable>() {
238+
@Override
239+
public void accept(Integer t1, Throwable t2) throws Exception {
240+
result[0] = t1;
241+
result[1] = t2;
242+
}
243+
});
244+
245+
assertTrue("Not disposed?!", d.isDisposed());
246+
assertEquals(1, result[0]);
247+
assertNull(result[1]);
248+
}
249+
250+
@Test
251+
public void biConsumerIsDisposedOnError() {
252+
final Object[] result = { null, null };
253+
254+
Disposable d = Single.<Integer>error(new IOException())
255+
.subscribe(new BiConsumer<Integer, Throwable>() {
256+
@Override
257+
public void accept(Integer t1, Throwable t2) throws Exception {
258+
result[0] = t1;
259+
result[1] = t2;
260+
}
261+
});
262+
263+
assertTrue("Not disposed?!", d.isDisposed());
264+
assertNull(result[0]);
265+
assertTrue("" + result[1], result[1] instanceof IOException);
266+
}
230267
}

0 commit comments

Comments
 (0)