File tree Expand file tree Collapse file tree 1 file changed +7
-1
lines changed
src/main/java/rx/internal/operators Expand file tree Collapse file tree 1 file changed +7
-1
lines changed Original file line number Diff line number Diff line change 46
46
import rx .functions .Action0 ;
47
47
import rx .functions .Func1 ;
48
48
import rx .functions .Func2 ;
49
+ import rx .observers .Subscribers ;
49
50
import rx .schedulers .Schedulers ;
51
+ import rx .subjects .BehaviorSubject ;
50
52
import rx .subjects .PublishSubject ;
51
53
import rx .subscriptions .SerialSubscription ;
52
54
@@ -199,7 +201,11 @@ public void call(final Subscriber<? super T> child) {
199
201
final SerialSubscription sourceSubscriptions = new SerialSubscription ();
200
202
child .add (sourceSubscriptions );
201
203
202
- final PublishSubject <Notification <?>> terminals = PublishSubject .create ();
204
+ final BehaviorSubject <Notification <?>> terminals = BehaviorSubject .create ();
205
+ final Subscriber <Notification <?>> dummySubscriber = Subscribers .empty ();
206
+ // subscribe immediately so the last emission will be replayed to the next
207
+ // subscriber (which is the one we care about)
208
+ terminals .subscribe (dummySubscriber );
203
209
204
210
final ProducerArbiter arbiter = new ProducerArbiter ();
205
211
You can’t perform that action at this time.
0 commit comments