Skip to content

Commit 0e74ac8

Browse files
Remove Multicast
The implementation is still there as `replay` is still using it, but the public API is removed.
1 parent 5962c76 commit 0e74ac8

File tree

2 files changed

+7
-115
lines changed

2 files changed

+7
-115
lines changed

src/main/java/rx/Observable.java

Lines changed: 4 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -5086,68 +5086,6 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {
50865086
return merge(this, t1);
50875087
}
50885088

5089-
/**
5090-
* Returns an Observable that emits items produced by multicasting the source Observable within a selector
5091-
* function.
5092-
* <p>
5093-
* This is largely a helper function used by RxJava for other forms of multicasting, such as
5094-
* {@link #publish} and {@link #publishLast}.
5095-
* <dl>
5096-
* <dt><b>Backpressure Support:</b></dt>
5097-
* <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5098-
* multiple subscribers. Each child will need to manage backpressure independently using operators such
5099-
* as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
5100-
* <dt><b>Scheduler:</b></dt>
5101-
* <dd>{@code multicast} does not operate by default on a particular {@link Scheduler}.</dd>
5102-
* </dl>
5103-
*
5104-
* @warn javadocs incomplete: description needs improvement
5105-
* @param subjectFactory
5106-
* the {@link Subject} factory
5107-
* @warn javadocs incomplete: "subjectFactory" parameter described poorly
5108-
* @param selector
5109-
* the selector function, which can use the multicasted source Observable subject to the policies
5110-
* enforced by the created {@code Subject}
5111-
* @warn javadocs incomplete: "selector" parameter described poorly
5112-
* @return an Observable that emits the items produced by multicasting the source Observable within a
5113-
* selector function
5114-
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators#observablepublish-and-observablemulticast">RxJava: Observable.publish() and Observable.multicast</a>
5115-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229708.aspx">MSDN: Observable.Multicast</a>
5116-
*/
5117-
public final <TIntermediate, TResult> Observable<TResult> multicast(
5118-
final Func0<? extends Subject<? super T, ? extends TIntermediate>> subjectFactory,
5119-
final Func1<? super Observable<TIntermediate>, ? extends Observable<TResult>> selector) {
5120-
return create(new OnSubscribeMulticastSelector<T, TIntermediate, TResult>(this, subjectFactory, selector));
5121-
}
5122-
5123-
/**
5124-
* Returns a {@link ConnectableObservable} that upon connection causes the source Observable to push results
5125-
* into the specified subject. A Connectable Observable resembles an ordinary Observable, except that it
5126-
* does not begin emitting items when it is subscribed to, but only when its {@code connect} method
5127-
* is called.
5128-
* <dl>
5129-
* <dt><b>Backpressure Support:</b></dt>
5130-
* <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5131-
* multiple subscribers. Each child will need to manage backpressure independently using operators such
5132-
* as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
5133-
* <dt><b>Scheduler:</b></dt>
5134-
* <dd>{@code multicast} does not operate by default on a particular {@link Scheduler}.</dd>
5135-
* </dl>
5136-
*
5137-
* @param subjectFactory
5138-
* a function that creates a new {@link Subject} for the {@link ConnectableObservable} to push
5139-
* source items into
5140-
* @param <R>
5141-
* the type of items emitted by the resulting {@code ConnectableObservable}
5142-
* @return a {@link ConnectableObservable} that upon connection causes the source Observable to push results
5143-
* into the specified {@link Subject}
5144-
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators#observablepublish-and-observablemulticast">RxJava wiki: Observable.publish and Observable.multicast</a>
5145-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229708.aspx">MSDN: Observable.Multicast</a>
5146-
*/
5147-
public final <R> ConnectableObservable<R> multicast(Func0<? extends Subject<? super T, ? extends R>> subjectFactory) {
5148-
return new OperatorMulticast<T, R>(this, subjectFactory);
5149-
}
5150-
51515089
/**
51525090
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
51535091
* asynchronously with an unbounded buffer.
@@ -5372,10 +5310,6 @@ public final Observable<T> onExceptionResumeNext(final Observable<? extends T> r
53725310
* <p>
53735311
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.png" alt="">
53745312
* <dl>
5375-
* <dt><b>Backpressure Support:</b></dt>
5376-
* <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5377-
* multiple subscribers. Each child will need to manage backpressure independently using operators such
5378-
* as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
53795313
* <dt><b>Scheduler:</b></dt>
53805314
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
53815315
* </dl>
@@ -5395,10 +5329,6 @@ public final ConnectableObservable<T> publish() {
53955329
* <p>
53965330
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.f.png" alt="">
53975331
* <dl>
5398-
* <dt><b>Backpressure Support:</b></dt>
5399-
* <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5400-
* multiple subscribers. Each child will need to manage backpressure independently using operators such
5401-
* as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
54025332
* <dt><b>Scheduler:</b></dt>
54035333
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
54045334
* </dl>
@@ -5415,12 +5345,6 @@ public final ConnectableObservable<T> publish() {
54155345
*/
54165346
public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
54175347
return OperatorPublish.create(this, selector);
5418-
// return multicast(new Func0<Subject<T, T>>() {
5419-
// @Override
5420-
// public final Subject<T, T> call() {
5421-
// return PublishSubject.create();
5422-
// }
5423-
// }, selector);
54245348
}
54255349

54265350
/**
@@ -5430,10 +5354,6 @@ public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Ob
54305354
* <p>
54315355
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.if.png" alt="">
54325356
* <dl>
5433-
* <dt><b>Backpressure Support:</b></dt>
5434-
* <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5435-
* multiple subscribers. Each child will need to manage backpressure independently using operators such
5436-
* as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
54375357
* <dt><b>Scheduler:</b></dt>
54385358
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
54395359
* </dl>
@@ -5452,12 +5372,7 @@ public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Ob
54525372
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
54535373
*/
54545374
public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Observable<R>> selector, final T initialValue) {
5455-
return multicast(new Func0<Subject<T, T>>() {
5456-
@Override
5457-
public final Subject<T, T> call() {
5458-
return BehaviorSubject.create(initialValue);
5459-
}
5460-
}, selector);
5375+
return concatWith(just(initialValue)).publish(selector);
54615376
}
54625377

54635378
/**
@@ -5467,10 +5382,6 @@ public final Subject<T, T> call() {
54675382
* <p>
54685383
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.i.png" alt="">
54695384
* <dl>
5470-
* <dt><b>Backpressure Support:</b></dt>
5471-
* <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
5472-
* multiple subscribers. Each child will need to manage backpressure independently using operators such
5473-
* as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
54745385
* <dt><b>Scheduler:</b></dt>
54755386
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
54765387
* </dl>
@@ -5483,14 +5394,7 @@ public final Subject<T, T> call() {
54835394
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
54845395
*/
54855396
public final ConnectableObservable<T> publish(final T initialValue) {
5486-
return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
5487-
5488-
@Override
5489-
public Subject<? super T, ? extends T> call() {
5490-
return BehaviorSubject.<T> create(initialValue);
5491-
}
5492-
5493-
});
5397+
return concatWith(just(initialValue)).publish();
54945398
}
54955399

54965400
/**
@@ -5512,14 +5416,7 @@ public final ConnectableObservable<T> publish(final T initialValue) {
55125416
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publishlast.aspx">MSDN: Observable.PublishLast</a>
55135417
*/
55145418
public final ConnectableObservable<T> publishLast() {
5515-
return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {
5516-
5517-
@Override
5518-
public Subject<? super T, ? extends T> call() {
5519-
return AsyncSubject.<T> create();
5520-
}
5521-
5522-
});
5419+
return takeLast(1).publish();
55235420
}
55245421

55255422
/**
@@ -5546,12 +5443,7 @@ public final ConnectableObservable<T> publishLast() {
55465443
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publishlast.aspx">MSDN: Observable.PublishLast</a>
55475444
*/
55485445
public final <R> Observable<R> publishLast(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
5549-
return multicast(new Func0<Subject<T, T>>() {
5550-
@Override
5551-
public final Subject<T, T> call() {
5552-
return AsyncSubject.create();
5553-
}
5554-
}, selector);
5446+
return takeLast(1).publish(selector);
55555447
}
55565448

55575449
/**

src/test/java/rx/internal/operators/OnSubscribeMulticastTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class OnSubscribeMulticastTest {
3535
public void testMulticast() {
3636
Subject<String, String> source = PublishSubject.create();
3737

38-
ConnectableObservable<String> multicasted = source.multicast(new PublishSubjectFactory());
38+
ConnectableObservable<String> multicasted = new OperatorMulticast<String, String>(source, new PublishSubjectFactory());
3939

4040
@SuppressWarnings("unchecked")
4141
Observer<String> observer = mock(Observer.class);
@@ -62,7 +62,7 @@ public void testMulticast() {
6262
public void testMulticastConnectTwice() {
6363
Subject<String, String> source = PublishSubject.create();
6464

65-
ConnectableObservable<String> multicasted = source.multicast(new PublishSubjectFactory());
65+
ConnectableObservable<String> multicasted = new OperatorMulticast<String, String>(source, new PublishSubjectFactory());
6666

6767
@SuppressWarnings("unchecked")
6868
Observer<String> observer = mock(Observer.class);
@@ -86,7 +86,7 @@ public void testMulticastConnectTwice() {
8686
public void testMulticastDisconnect() {
8787
Subject<String, String> source = PublishSubject.create();
8888

89-
ConnectableObservable<String> multicasted = source.multicast(new PublishSubjectFactory());
89+
ConnectableObservable<String> multicasted = new OperatorMulticast<String, String>(source, new PublishSubjectFactory());
9090

9191
@SuppressWarnings("unchecked")
9292
Observer<String> observer = mock(Observer.class);

0 commit comments

Comments
 (0)