Skip to content

Commit 479f89f

Browse files
authored
2.x: fix LambdaObserver calling dispose when terminating (#4957)
1 parent 19fac95 commit 479f89f

File tree

6 files changed

+190
-11
lines changed

6 files changed

+190
-11
lines changed

src/main/java/io/reactivex/internal/observers/LambdaObserver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public void onNext(T t) {
6767
@Override
6868
public void onError(Throwable t) {
6969
if (!isDisposed()) {
70-
dispose();
70+
lazySet(DisposableHelper.DISPOSED);
7171
try {
7272
onError.accept(t);
7373
} catch (Throwable e) {
@@ -80,7 +80,7 @@ public void onError(Throwable t) {
8080
@Override
8181
public void onComplete() {
8282
if (!isDisposed()) {
83-
dispose();
83+
lazySet(DisposableHelper.DISPOSED);
8484
try {
8585
onComplete.run();
8686
} catch (Throwable e) {

src/test/java/io/reactivex/internal/operators/flowable/FlowableDoFinallyTest.java

Lines changed: 81 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
import static org.junit.Assert.*;
1717

18-
import java.util.List;
18+
import java.util.*;
1919

2020
import org.junit.Test;
2121
import org.reactivestreams.*;
@@ -438,4 +438,84 @@ public void onComplete() {
438438

439439
assertEquals(1, calls);
440440
}
441+
442+
@Test
443+
public void eventOrdering() {
444+
final List<String> list = new ArrayList<String>();
445+
446+
Flowable.error(new TestException())
447+
.doOnCancel(new Action() {
448+
@Override
449+
public void run() throws Exception {
450+
list.add("cancel");
451+
}
452+
})
453+
.doFinally(new Action() {
454+
@Override
455+
public void run() throws Exception {
456+
list.add("finally");
457+
}
458+
})
459+
.subscribe(
460+
new Consumer<Object>() {
461+
@Override
462+
public void accept(Object v) throws Exception {
463+
list.add("onNext");
464+
}
465+
},
466+
new Consumer<Throwable>() {
467+
@Override
468+
public void accept(Throwable e) throws Exception {
469+
list.add("onError");
470+
}
471+
},
472+
new Action() {
473+
@Override
474+
public void run() throws Exception {
475+
list.add("onComplete");
476+
}
477+
});
478+
479+
assertEquals(Arrays.asList("onError", "finally"), list);
480+
}
481+
482+
@Test
483+
public void eventOrdering2() {
484+
final List<String> list = new ArrayList<String>();
485+
486+
Flowable.just(1)
487+
.doOnCancel(new Action() {
488+
@Override
489+
public void run() throws Exception {
490+
list.add("cancel");
491+
}
492+
})
493+
.doFinally(new Action() {
494+
@Override
495+
public void run() throws Exception {
496+
list.add("finally");
497+
}
498+
})
499+
.subscribe(
500+
new Consumer<Object>() {
501+
@Override
502+
public void accept(Object v) throws Exception {
503+
list.add("onNext");
504+
}
505+
},
506+
new Consumer<Throwable>() {
507+
@Override
508+
public void accept(Throwable e) throws Exception {
509+
list.add("onError");
510+
}
511+
},
512+
new Action() {
513+
@Override
514+
public void run() throws Exception {
515+
list.add("onComplete");
516+
}
517+
});
518+
519+
assertEquals(Arrays.asList("onNext", "onComplete", "finally"), list);
520+
}
441521
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableIgnoreElementsTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ public void testUnsubscribesFromUpstreamFlowable() {
9191
public void run() {
9292
unsub.set(true);
9393
}})
94+
.ignoreElements()
95+
.toFlowable()
9496
.subscribe().dispose();
9597

9698
assertTrue(unsub.get());
@@ -207,6 +209,7 @@ public void testUnsubscribesFromUpstream() {
207209
public void run() {
208210
unsub.set(true);
209211
}})
212+
.ignoreElements()
210213
.subscribe().dispose();
211214

212215
assertTrue(unsub.get());

src/test/java/io/reactivex/internal/operators/observable/ObservableDoFinallyTest.java

Lines changed: 86 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515

1616
import static org.junit.Assert.*;
1717

18-
import java.util.List;
18+
import java.util.*;
1919

2020
import org.junit.Test;
2121

22-
import io.reactivex.*;
22+
import io.reactivex.Observable;
23+
import io.reactivex.Observer;
24+
import io.reactivex.TestHelper;
2325
import io.reactivex.disposables.Disposable;
2426
import io.reactivex.exceptions.TestException;
2527
import io.reactivex.functions.*;
@@ -442,4 +444,86 @@ public void onComplete() {
442444

443445
assertEquals(1, calls);
444446
}
447+
448+
449+
@Test
450+
public void eventOrdering() {
451+
final List<String> list = new ArrayList<String>();
452+
453+
Observable.error(new TestException())
454+
.doOnDispose(new Action() {
455+
@Override
456+
public void run() throws Exception {
457+
list.add("dispose");
458+
}
459+
})
460+
.doFinally(new Action() {
461+
@Override
462+
public void run() throws Exception {
463+
list.add("finally");
464+
}
465+
})
466+
.subscribe(
467+
new Consumer<Object>() {
468+
@Override
469+
public void accept(Object v) throws Exception {
470+
list.add("onNext");
471+
}
472+
},
473+
new Consumer<Throwable>() {
474+
@Override
475+
public void accept(Throwable e) throws Exception {
476+
list.add("onError");
477+
}
478+
},
479+
new Action() {
480+
@Override
481+
public void run() throws Exception {
482+
list.add("onComplete");
483+
}
484+
});
485+
486+
assertEquals(Arrays.asList("onError", "finally"), list);
487+
}
488+
489+
@Test
490+
public void eventOrdering2() {
491+
final List<String> list = new ArrayList<String>();
492+
493+
Observable.just(1)
494+
.doOnDispose(new Action() {
495+
@Override
496+
public void run() throws Exception {
497+
list.add("dispose");
498+
}
499+
})
500+
.doFinally(new Action() {
501+
@Override
502+
public void run() throws Exception {
503+
list.add("finally");
504+
}
505+
})
506+
.subscribe(
507+
new Consumer<Object>() {
508+
@Override
509+
public void accept(Object v) throws Exception {
510+
list.add("onNext");
511+
}
512+
},
513+
new Consumer<Throwable>() {
514+
@Override
515+
public void accept(Throwable e) throws Exception {
516+
list.add("onError");
517+
}
518+
},
519+
new Action() {
520+
@Override
521+
public void run() throws Exception {
522+
list.add("onComplete");
523+
}
524+
});
525+
526+
assertEquals(Arrays.asList("onNext", "onComplete", "finally"), list);
527+
}
528+
445529
}

src/test/java/io/reactivex/internal/operators/observable/ObservableIgnoreElementsTest.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,16 @@ public void testErrorReceivedObservable() {
8282
@Test
8383
public void testUnsubscribesFromUpstreamObservable() {
8484
final AtomicBoolean unsub = new AtomicBoolean();
85-
Observable.range(1, 10).doOnDispose(new Action() {
85+
Observable.range(1, 10).concatWith(Observable.<Integer>never())
86+
.doOnDispose(new Action() {
8687
@Override
8788
public void run() {
8889
unsub.set(true);
8990
}})
90-
.subscribe();
91+
.ignoreElements()
92+
.toObservable()
93+
.subscribe()
94+
.dispose();
9195
assertTrue(unsub.get());
9296
}
9397

@@ -145,12 +149,15 @@ public void testErrorReceived() {
145149
@Test
146150
public void testUnsubscribesFromUpstream() {
147151
final AtomicBoolean unsub = new AtomicBoolean();
148-
Observable.range(1, 10).doOnDispose(new Action() {
152+
Observable.range(1, 10).concatWith(Observable.<Integer>never())
153+
.doOnDispose(new Action() {
149154
@Override
150155
public void run() {
151156
unsub.set(true);
152157
}})
153-
.subscribe();
158+
.ignoreElements()
159+
.subscribe()
160+
.dispose();
154161
assertTrue(unsub.get());
155162
}
156163

src/test/java/io/reactivex/internal/operators/observable/ObservableTakeLastOneTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,13 @@ public void run() {
6767
unsubscribed.set(true);
6868
}
6969
};
70-
Observable.just(1).doOnDispose(unsubscribeAction)
71-
.takeLast(1).subscribe();
70+
Observable.just(1)
71+
.concatWith(Observable.<Integer>never())
72+
.doOnDispose(unsubscribeAction)
73+
.takeLast(1)
74+
.subscribe()
75+
.dispose();
76+
7277
assertTrue(unsubscribed.get());
7378
}
7479

0 commit comments

Comments
 (0)