Skip to content

Commit b917754

Browse files
authored
2.x: fix takeUntil() other triggering twice (#4962)
1 parent 71330c0 commit b917754

File tree

3 files changed

+34
-2
lines changed

3 files changed

+34
-2
lines changed

src/main/java/io/reactivex/internal/operators/single/SingleTakeUntil.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public void onSubscribe(Subscription s) {
147147
@Override
148148
public void onNext(Object t) {
149149
if (SubscriptionHelper.cancel(this)) {
150-
onComplete();
150+
parent.otherError(new CancellationException());
151151
}
152152
}
153153

@@ -158,7 +158,10 @@ public void onError(Throwable t) {
158158

159159
@Override
160160
public void onComplete() {
161-
parent.otherError(new CancellationException());
161+
if (get() != SubscriptionHelper.CANCELLED) {
162+
lazySet(SubscriptionHelper.CANCELLED);
163+
parent.otherError(new CancellationException());
164+
}
162165
}
163166

164167
public void dispose() {

src/test/java/io/reactivex/internal/operators/maybe/MaybeTakeUntilPublisherTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,4 +184,18 @@ public void run() {
184184
to.assertResult();
185185
}
186186
}
187+
188+
@Test
189+
public void otherSignalsAndCompletes() {
190+
List<Throwable> errors = TestHelper.trackPluginErrors();
191+
try {
192+
Maybe.just(1).takeUntil(Flowable.just(1).take(1))
193+
.test()
194+
.assertResult();
195+
196+
assertTrue(errors.toString(), errors.isEmpty());
197+
} finally {
198+
RxJavaPlugins.reset();
199+
}
200+
}
187201
}

src/test/java/io/reactivex/internal/operators/single/SingleTakeUntilTest.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.List;
1717
import java.util.concurrent.CancellationException;
1818

19+
import static org.junit.Assert.*;
1920
import org.junit.Test;
2021

2122
import io.reactivex.*;
@@ -259,4 +260,18 @@ public void run() {
259260
}
260261
}
261262
}
263+
264+
@Test
265+
public void otherSignalsAndCompletes() {
266+
List<Throwable> errors = TestHelper.trackPluginErrors();
267+
try {
268+
Single.just(1).takeUntil(Flowable.just(1).take(1))
269+
.test()
270+
.assertFailure(CancellationException.class);
271+
272+
assertTrue(errors.toString(), errors.isEmpty());
273+
} finally {
274+
RxJavaPlugins.reset();
275+
}
276+
}
262277
}

0 commit comments

Comments
 (0)