Skip to content

Commit 0499cff

Browse files
committed
Multicast implemented
1 parent 98f6cbc commit 0499cff

File tree

2 files changed

+45
-38
lines changed

2 files changed

+45
-38
lines changed

rxjava-core/src/main/java/rx/observables/ConnectableObservable.java

Lines changed: 3 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,47 +3,14 @@
33
import rx.Observable;
44
import rx.Observer;
55
import rx.Subscription;
6-
import rx.subjects.DefaultSubject;
7-
import rx.subjects.Subject;
86
import rx.util.functions.Func1;
97

10-
public class ConnectableObservable<T, R> extends Observable<T> {
11-
private final Observable<T> source;
12-
private final Subject<T, R> subject;
8+
public abstract class ConnectableObservable<T> extends Observable<T> {
139

14-
public static <T, R> ConnectableObservable create(final Observable<T> source, final Subject<T, R> subject) {
15-
return new ConnectableObservable<T, R>(source, subject, new Func1<Observer<T>, Subscription>() {
16-
@Override
17-
public Subscription call(Observer<T> observer) {
18-
return subject.subscribe(observer);
19-
}
20-
});
21-
}
22-
23-
protected ConnectableObservable(Observable<T> source, Subject<T, R> subject, Func1<Observer<T>, Subscription> onSubscribe) {
10+
protected ConnectableObservable(Func1<Observer<T>, Subscription> onSubscribe) {
2411
super(onSubscribe);
25-
this.source = source;
26-
this.subject = subject;
27-
}
28-
29-
public Subscription connect() {
30-
return source.subscribe(new Observer<T>() {
31-
@Override
32-
public void onCompleted() {
33-
subject.onCompleted();
34-
}
35-
36-
@Override
37-
public void onError(Exception e) {
38-
subject.onError(e);
39-
}
40-
41-
@Override
42-
public void onNext(T args) {
43-
subject.onNext(args);
44-
}
45-
});
4612
}
4713

14+
public abstract Subscription connect();
4815

4916
}

rxjava-core/src/main/java/rx/operators/OperatorMulticast.java

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,51 @@
33
import rx.Observable;
44
import rx.Observer;
55
import rx.Subscription;
6+
import rx.observables.ConnectableObservable;
7+
import rx.subjects.Subject;
68
import rx.util.functions.Func1;
79

810
public class OperatorMulticast {
9-
public static <T> Func1<Observer<T>, Subscription> multicast(Observable<T> source, Func1<T, Boolean> predicate) {
10-
return null;
11+
public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, final Subject<T, R> subject) {
12+
return new MulticastConnectableObservable<T ,R>(source, subject);
1113
}
1214

15+
private static class MulticastConnectableObservable<T, R> extends ConnectableObservable<R> {
16+
private final Observable<T> source;
17+
private final Subject<T, R> subject;
18+
19+
public MulticastConnectableObservable(Observable<T> source, final Subject<T, R> subject) {
20+
super(new Func1<Observer<R>, Subscription>() {
21+
@Override
22+
public Subscription call(Observer<R> observer) {
23+
return subject.subscribe(observer);
24+
}
25+
});
26+
this.source = source;
27+
this.subject = subject;
28+
}
29+
30+
public Subscription connect() {
31+
return source.subscribe(new Observer<T>() {
32+
@Override
33+
public void onCompleted() {
34+
subject.onCompleted();
35+
}
36+
37+
@Override
38+
public void onError(Exception e) {
39+
subject.onError(e);
40+
}
41+
42+
@Override
43+
public void onNext(T args) {
44+
subject.onNext(args);
45+
}
46+
});
47+
}
48+
49+
50+
}
51+
52+
1353
}

0 commit comments

Comments
 (0)