File tree Expand file tree Collapse file tree 2 files changed +4
-34
lines changed
src/main/java/rx/internal/operators Expand file tree Collapse file tree 2 files changed +4
-34
lines changed Original file line number Diff line number Diff line change 20
20
import rx .Subscriber ;
21
21
import rx .functions .Func1 ;
22
22
import rx .observers .SerializedSubscriber ;
23
+ import rx .observers .Subscribers ;
23
24
import rx .subjects .PublishSubject ;
24
25
25
26
/**
@@ -44,24 +45,7 @@ public Subscriber<? super T> call(final Subscriber<? super T> _child) {
44
45
final SerializedSubscriber <T > child = new SerializedSubscriber <T >(_child );
45
46
final PublishSubject <Observable <T >> delayedEmissions = PublishSubject .create ();
46
47
47
- _child .add (Observable .merge (delayedEmissions ).unsafeSubscribe (new Subscriber <T >() {
48
-
49
- @ Override
50
- public void onCompleted () {
51
- child .onCompleted ();
52
- }
53
-
54
- @ Override
55
- public void onError (Throwable e ) {
56
- child .onError (e );
57
- }
58
-
59
- @ Override
60
- public void onNext (T t ) {
61
- child .onNext (t );
62
- }
63
-
64
- }));
48
+ _child .add (Observable .merge (delayedEmissions ).unsafeSubscribe (Subscribers .from (child )));
65
49
66
50
return new Subscriber <T >(_child ) {
67
51
Original file line number Diff line number Diff line change 26
26
import rx .functions .Action1 ;
27
27
import rx .functions .Func0 ;
28
28
import rx .observables .ConnectableObservable ;
29
+ import rx .observers .Subscribers ;
29
30
import rx .subjects .Subject ;
30
31
import rx .subscriptions .Subscriptions ;
31
32
@@ -90,22 +91,7 @@ public void connect(Action1<? super Subscription> connection) {
90
91
final Subject <? super T , ? extends R > subject = subjectFactory .call ();
91
92
// create new Subscriber that will pass-thru to the subject we just created
92
93
// we do this since it is also a Subscription whereas the Subject is not
93
- subscription = new Subscriber <T >() {
94
- @ Override
95
- public void onCompleted () {
96
- subject .onCompleted ();
97
- }
98
-
99
- @ Override
100
- public void onError (Throwable e ) {
101
- subject .onError (e );
102
- }
103
-
104
- @ Override
105
- public void onNext (T args ) {
106
- subject .onNext (args );
107
- }
108
- };
94
+ subscription = Subscribers .from (subject );
109
95
final AtomicReference <Subscription > gs = new AtomicReference <Subscription >();
110
96
gs .set (Subscriptions .create (new Action0 () {
111
97
@ Override
You can’t perform that action at this time.
0 commit comments