Skip to content

Remove Multicast #1786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 23, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 4 additions & 112 deletions src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5086,68 +5086,6 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {
return merge(this, t1);
}

/**
* Returns an Observable that emits items produced by multicasting the source Observable within a selector
* function.
* <p>
* This is largely a helper function used by RxJava for other forms of multicasting, such as
* {@link #publish} and {@link #publishLast}.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
* multiple subscribers. Each child will need to manage backpressure independently using operators such
* as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code multicast} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @warn javadocs incomplete: description needs improvement
* @param subjectFactory
* the {@link Subject} factory
* @warn javadocs incomplete: "subjectFactory" parameter described poorly
* @param selector
* the selector function, which can use the multicasted source Observable subject to the policies
* enforced by the created {@code Subject}
* @warn javadocs incomplete: "selector" parameter described poorly
* @return an Observable that emits the items produced by multicasting the source Observable within a
* selector function
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators#observablepublish-and-observablemulticast">RxJava: Observable.publish() and Observable.multicast</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229708.aspx">MSDN: Observable.Multicast</a>
*/
public final <TIntermediate, TResult> Observable<TResult> multicast(
final Func0<? extends Subject<? super T, ? extends TIntermediate>> subjectFactory,
final Func1<? super Observable<TIntermediate>, ? extends Observable<TResult>> selector) {
return create(new OnSubscribeMulticastSelector<T, TIntermediate, TResult>(this, subjectFactory, selector));
}

/**
* Returns a {@link ConnectableObservable} that upon connection causes the source Observable to push results
* into the specified subject. A Connectable Observable resembles an ordinary Observable, except that it
* does not begin emitting items when it is subscribed to, but only when its {@code connect} method
* is called.
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
* multiple subscribers. Each child will need to manage backpressure independently using operators such
* as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code multicast} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param subjectFactory
* a function that creates a new {@link Subject} for the {@link ConnectableObservable} to push
* source items into
* @param <R>
* the type of items emitted by the resulting {@code ConnectableObservable}
* @return a {@link ConnectableObservable} that upon connection causes the source Observable to push results
* into the specified {@link Subject}
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Connectable-Observable-Operators#observablepublish-and-observablemulticast">RxJava wiki: Observable.publish and Observable.multicast</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229708.aspx">MSDN: Observable.Multicast</a>
*/
public final <R> ConnectableObservable<R> multicast(Func0<? extends Subject<? super T, ? extends R>> subjectFactory) {
return new OperatorMulticast<T, R>(this, subjectFactory);
}

/**
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with an unbounded buffer.
Expand Down Expand Up @@ -5372,10 +5310,6 @@ public final Observable<T> onExceptionResumeNext(final Observable<? extends T> r
* <p>
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.png" alt="">
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
* multiple subscribers. Each child will need to manage backpressure independently using operators such
* as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
Expand All @@ -5395,10 +5329,6 @@ public final ConnectableObservable<T> publish() {
* <p>
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.f.png" alt="">
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
* multiple subscribers. Each child will need to manage backpressure independently using operators such
* as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
Expand All @@ -5415,12 +5345,6 @@ public final ConnectableObservable<T> publish() {
*/
public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
return OperatorPublish.create(this, selector);
// return multicast(new Func0<Subject<T, T>>() {
// @Override
// public final Subject<T, T> call() {
// return PublishSubject.create();
// }
// }, selector);
}

/**
Expand All @@ -5430,10 +5354,6 @@ public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Ob
* <p>
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.if.png" alt="">
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
* multiple subscribers. Each child will need to manage backpressure independently using operators such
* as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
Expand All @@ -5452,12 +5372,7 @@ public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Ob
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
*/
public final <R> Observable<R> publish(Func1<? super Observable<T>, ? extends Observable<R>> selector, final T initialValue) {
return multicast(new Func0<Subject<T, T>>() {
@Override
public final Subject<T, T> call() {
return BehaviorSubject.create(initialValue);
}
}, selector);
return concatWith(just(initialValue)).publish(selector);
}

/**
Expand All @@ -5467,10 +5382,6 @@ public final Subject<T, T> call() {
* <p>
* <img width="640" height="510" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/publishConnect.i.png" alt="">
* <dl>
* <dt><b>Backpressure Support:</b></dt>
* <dd>This operator does not support backpressure because multicasting means the stream is "hot" with
* multiple subscribers. Each child will need to manage backpressure independently using operators such
* as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code publish} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
Expand All @@ -5483,14 +5394,7 @@ public final Subject<T, T> call() {
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publish.aspx">MSDN: Observable.Publish</a>
*/
public final ConnectableObservable<T> publish(final T initialValue) {
return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {

@Override
public Subject<? super T, ? extends T> call() {
return BehaviorSubject.<T> create(initialValue);
}

});
return concatWith(just(initialValue)).publish();
}

/**
Expand All @@ -5512,14 +5416,7 @@ public final ConnectableObservable<T> publish(final T initialValue) {
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publishlast.aspx">MSDN: Observable.PublishLast</a>
*/
public final ConnectableObservable<T> publishLast() {
return new OperatorMulticast<T, T>(this, new Func0<Subject<? super T, ? extends T>>() {

@Override
public Subject<? super T, ? extends T> call() {
return AsyncSubject.<T> create();
}

});
return takeLast(1).publish();
}

/**
Expand All @@ -5546,12 +5443,7 @@ public final ConnectableObservable<T> publishLast() {
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.linq.observable.publishlast.aspx">MSDN: Observable.PublishLast</a>
*/
public final <R> Observable<R> publishLast(Func1<? super Observable<T>, ? extends Observable<R>> selector) {
return multicast(new Func0<Subject<T, T>>() {
@Override
public final Subject<T, T> call() {
return AsyncSubject.create();
}
}, selector);
return takeLast(1).publish(selector);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class OnSubscribeMulticastTest {
public void testMulticast() {
Subject<String, String> source = PublishSubject.create();

ConnectableObservable<String> multicasted = source.multicast(new PublishSubjectFactory());
ConnectableObservable<String> multicasted = new OperatorMulticast<String, String>(source, new PublishSubjectFactory());

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand All @@ -62,7 +62,7 @@ public void testMulticast() {
public void testMulticastConnectTwice() {
Subject<String, String> source = PublishSubject.create();

ConnectableObservable<String> multicasted = source.multicast(new PublishSubjectFactory());
ConnectableObservable<String> multicasted = new OperatorMulticast<String, String>(source, new PublishSubjectFactory());

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand All @@ -86,7 +86,7 @@ public void testMulticastConnectTwice() {
public void testMulticastDisconnect() {
Subject<String, String> source = PublishSubject.create();

ConnectableObservable<String> multicasted = source.multicast(new PublishSubjectFactory());
ConnectableObservable<String> multicasted = new OperatorMulticast<String, String>(source, new PublishSubjectFactory());

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand Down