@@ -5086,68 +5086,6 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {
5086
5086
return merge (this , t1 );
5087
5087
}
5088
5088
5089
- /**
5090
- * Returns an Observable that emits items produced by multicasting the source Observable within a selector
5091
- * function.
5092
- * <p>
5093
- * This is largely a helper function used by RxJava for other forms of multicasting, such as
5094
- * {@link #publish} and {@link #publishLast}.
5095
- * <dl>
5096
- * <dt><b>Backpressure Support:</b></dt>
5097
- * <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5098
- * multiple subscribers. Each child will need to manage backpressure independently using operators such
5099
- * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
5100
- * <dt><b>Scheduler:</b></dt>
5101
- * <dd>{@code multicast} does not operate by default on a particular {@link Scheduler}.</dd>
5102
- * </dl>
5103
- *
5104
- * @warn javadocs incomplete: description needs improvement
5105
- * @param subjectFactory
5106
- * the {@link Subject} factory
5107
- * @warn javadocs incomplete: "subjectFactory" parameter described poorly
5108
- * @param selector
5109
- * the selector function, which can use the multicasted source Observable subject to the policies
5110
- * enforced by the created {@code Subject}
5111
- * @warn javadocs incomplete: "selector" parameter described poorly
5112
- * @return an Observable that emits the items produced by multicasting the source Observable within a
5113
- * selector function
5114
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators#observablepublish-and-observablemulticast">RxJava: Observable.publish() and Observable.multicast</a>
5115
- * @see <a href="http://msdn.microsoft.com/en-us/library/hh229708.aspx">MSDN: Observable.Multicast</a>
5116
- */
5117
- public final <TIntermediate , TResult > Observable <TResult > multicast (
5118
- final Func0 <? extends Subject <? super T , ? extends TIntermediate >> subjectFactory ,
5119
- final Func1 <? super Observable <TIntermediate >, ? extends Observable <TResult >> selector ) {
5120
- return create (new OnSubscribeMulticastSelector <T , TIntermediate , TResult >(this , subjectFactory , selector ));
5121
- }
5122
-
5123
- /**
5124
- * Returns a {@link ConnectableObservable} that upon connection causes the source Observable to push results
5125
- * into the specified subject. A Connectable Observable resembles an ordinary Observable, except that it
5126
- * does not begin emitting items when it is subscribed to, but only when its {@code connect} method
5127
- * is called.
5128
- * <dl>
5129
- * <dt><b>Backpressure Support:</b></dt>
5130
- * <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5131
- * multiple subscribers. Each child will need to manage backpressure independently using operators such
5132
- * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
5133
- * <dt><b>Scheduler:</b></dt>
5134
- * <dd>{@code multicast} does not operate by default on a particular {@link Scheduler}.</dd>
5135
- * </dl>
5136
- *
5137
- * @param subjectFactory
5138
- * a function that creates a new {@link Subject} for the {@link ConnectableObservable} to push
5139
- * source items into
5140
- * @param <R>
5141
- * the type of items emitted by the resulting {@code ConnectableObservable}
5142
- * @return a {@link ConnectableObservable} that upon connection causes the source Observable to push results
5143
- * into the specified {@link Subject}
5144
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators#observablepublish-and-observablemulticast">RxJava wiki: Observable.publish and Observable.multicast</a>
5145
- * @see <a href="http://msdn.microsoft.com/en-us/library/hh229708.aspx">MSDN: Observable.Multicast</a>
5146
- */
5147
- public final <R > ConnectableObservable <R > multicast (Func0 <? extends Subject <? super T , ? extends R >> subjectFactory ) {
5148
- return new OperatorMulticast <T , R >(this , subjectFactory );
5149
- }
5150
-
5151
5089
/**
5152
5090
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
5153
5091
* asynchronously with an unbounded buffer.
@@ -5372,10 +5310,6 @@ public final Observable<T> onExceptionResumeNext(final Observable<? extends T> r
5372
5310
* <p>
5373
5311
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.png" alt="">
5374
5312
* <dl>
5375
- * <dt><b>Backpressure Support:</b></dt>
5376
- * <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5377
- * multiple subscribers. Each child will need to manage backpressure independently using operators such
5378
- * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
5379
5313
* <dt><b>Scheduler:</b></dt>
5380
5314
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
5381
5315
* </dl>
@@ -5395,10 +5329,6 @@ public final ConnectableObservable<T> publish() {
5395
5329
* <p>
5396
5330
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.f.png" alt="">
5397
5331
* <dl>
5398
- * <dt><b>Backpressure Support:</b></dt>
5399
- * <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5400
- * multiple subscribers. Each child will need to manage backpressure independently using operators such
5401
- * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
5402
5332
* <dt><b>Scheduler:</b></dt>
5403
5333
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
5404
5334
* </dl>
@@ -5415,12 +5345,6 @@ public final ConnectableObservable<T> publish() {
5415
5345
*/
5416
5346
public final <R > Observable <R > publish (Func1 <? super Observable <T >, ? extends Observable <R >> selector ) {
5417
5347
return OperatorPublish .create (this , selector );
5418
- // return multicast(new Func0<Subject<T, T>>() {
5419
- // @Override
5420
- // public final Subject<T, T> call() {
5421
- // return PublishSubject.create();
5422
- // }
5423
- // }, selector);
5424
5348
}
5425
5349
5426
5350
/**
@@ -5430,10 +5354,6 @@ public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Ob
5430
5354
* <p>
5431
5355
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.if.png" alt="">
5432
5356
* <dl>
5433
- * <dt><b>Backpressure Support:</b></dt>
5434
- * <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5435
- * multiple subscribers. Each child will need to manage backpressure independently using operators such
5436
- * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
5437
5357
* <dt><b>Scheduler:</b></dt>
5438
5358
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
5439
5359
* </dl>
@@ -5452,12 +5372,7 @@ public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Ob
5452
5372
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
5453
5373
*/
5454
5374
public final <R > Observable <R > publish (Func1 <? super Observable <T >, ? extends Observable <R >> selector , final T initialValue ) {
5455
- return multicast (new Func0 <Subject <T , T >>() {
5456
- @ Override
5457
- public final Subject <T , T > call () {
5458
- return BehaviorSubject .create (initialValue );
5459
- }
5460
- }, selector );
5375
+ return concatWith (just (initialValue )).publish (selector );
5461
5376
}
5462
5377
5463
5378
/**
@@ -5467,10 +5382,6 @@ public final Subject<T, T> call() {
5467
5382
* <p>
5468
5383
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.i.png" alt="">
5469
5384
* <dl>
5470
- * <dt><b>Backpressure Support:</b></dt>
5471
- * <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5472
- * multiple subscribers. Each child will need to manage backpressure independently using operators such
5473
- * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
5474
5385
* <dt><b>Scheduler:</b></dt>
5475
5386
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
5476
5387
* </dl>
@@ -5483,14 +5394,7 @@ public final Subject<T, T> call() {
5483
5394
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
5484
5395
*/
5485
5396
public final ConnectableObservable <T > publish (final T initialValue ) {
5486
- return new OperatorMulticast <T , T >(this , new Func0 <Subject <? super T , ? extends T >>() {
5487
-
5488
- @ Override
5489
- public Subject <? super T , ? extends T > call () {
5490
- return BehaviorSubject .<T > create (initialValue );
5491
- }
5492
-
5493
- });
5397
+ return concatWith (just (initialValue )).publish ();
5494
5398
}
5495
5399
5496
5400
/**
@@ -5512,14 +5416,7 @@ public final ConnectableObservable<T> publish(final T initialValue) {
5512
5416
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publishlast.aspx">MSDN: Observable.PublishLast</a>
5513
5417
*/
5514
5418
public final ConnectableObservable <T > publishLast () {
5515
- return new OperatorMulticast <T , T >(this , new Func0 <Subject <? super T , ? extends T >>() {
5516
-
5517
- @ Override
5518
- public Subject <? super T , ? extends T > call () {
5519
- return AsyncSubject .<T > create ();
5520
- }
5521
-
5522
- });
5419
+ return takeLast (1 ).publish ();
5523
5420
}
5524
5421
5525
5422
/**
@@ -5546,12 +5443,7 @@ public final ConnectableObservable<T> publishLast() {
5546
5443
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publishlast.aspx">MSDN: Observable.PublishLast</a>
5547
5444
*/
5548
5445
public final <R > Observable <R > publishLast (Func1 <? super Observable <T >, ? extends Observable <R >> selector ) {
5549
- return multicast (new Func0 <Subject <T , T >>() {
5550
- @ Override
5551
- public final Subject <T , T > call () {
5552
- return AsyncSubject .create ();
5553
- }
5554
- }, selector );
5446
+ return takeLast (1 ).publish (selector );
5555
5447
}
5556
5448
5557
5449
/**
0 commit comments