Skip to content

Commit d6f395e

Browse files
authored
Fix retryWhen/redoWhen potential concurrent reentry when resubscribing (#4241)
1 parent e697309 commit d6f395e

File tree

1 file changed

+2
-2
lines changed

1 file changed

+2
-2
lines changed

src/main/java/rx/internal/operators/OnSubscribeRedo.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
import rx.internal.producers.ProducerArbiter;
4242
import rx.observers.Subscribers;
4343
import rx.schedulers.Schedulers;
44-
import rx.subjects.BehaviorSubject;
44+
import rx.subjects.*;
4545
import rx.subscriptions.SerialSubscription;
4646

4747
public final class OnSubscribeRedo<T> implements OnSubscribe<T> {
@@ -202,7 +202,7 @@ public void call(final Subscriber<? super T> child) {
202202
// the source observable. We use a BehaviorSubject because subscribeToSource
203203
// may emit a terminal before the restarts observable (transformed terminals)
204204
// is subscribed
205-
final BehaviorSubject<Notification<?>> terminals = BehaviorSubject.create();
205+
final Subject<Notification<?>, Notification<?>> terminals = BehaviorSubject.<Notification<?>>create().toSerialized();
206206
final Subscriber<Notification<?>> dummySubscriber = Subscribers.empty();
207207
// subscribe immediately so the last emission will be replayed to the next
208208
// subscriber (which is the one we care about)

0 commit comments

Comments
 (0)