Skip to content

Commit ccb7d5f

Browse files
committed
add Subscribers.wrap
1 parent 9549363 commit ccb7d5f

File tree

7 files changed

+50
-87
lines changed

7 files changed

+50
-87
lines changed

src/main/java/rx/internal/operators/OnSubscribeDefer.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import rx.Observable.OnSubscribe;
2020
import rx.Subscriber;
2121
import rx.functions.Func0;
22+
import rx.observers.Subscribers;
2223

2324
/**
2425
* Do not create the Observable until an Observer subscribes; create a fresh Observable on each
@@ -46,20 +47,7 @@ public void call(final Subscriber<? super T> s) {
4647
s.onError(t);
4748
return;
4849
}
49-
o.unsafeSubscribe(new Subscriber<T>(s) {
50-
@Override
51-
public void onNext(T t) {
52-
s.onNext(t);
53-
}
54-
@Override
55-
public void onError(Throwable e) {
56-
s.onError(e);
57-
}
58-
@Override
59-
public void onCompleted() {
60-
s.onCompleted();
61-
}
62-
});
50+
o.unsafeSubscribe(Subscribers.wrap(s));
6351
}
6452

6553
}

src/main/java/rx/internal/operators/OnSubscribeDelaySubscription.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import rx.Observable.OnSubscribe;
2222
import rx.Scheduler.Worker;
2323
import rx.functions.Action0;
24+
import rx.observers.Subscribers;
2425

2526
/**
2627
* Delays the subscription to the source by the given amount, running on the given scheduler.
@@ -49,20 +50,7 @@ public void call(final Subscriber<? super T> s) {
4950
@Override
5051
public void call() {
5152
if (!s.isUnsubscribed()) {
52-
source.unsafeSubscribe(new Subscriber<T>(s) {
53-
@Override
54-
public void onNext(T t) {
55-
s.onNext(t);
56-
}
57-
@Override
58-
public void onError(Throwable e) {
59-
s.onError(e);
60-
}
61-
@Override
62-
public void onCompleted() {
63-
s.onCompleted();
64-
}
65-
});
53+
source.unsafeSubscribe(Subscribers.wrap(s));
6654
}
6755
}
6856
}, time, unit);

src/main/java/rx/internal/operators/OnSubscribeDelaySubscriptionWithSelector.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.*;
1919
import rx.Observable.OnSubscribe;
2020
import rx.functions.Func0;
21+
import rx.observers.Subscribers;
2122

2223
/**
2324
* Delays the subscription until the Observable<U> emits an event.
@@ -42,20 +43,7 @@ public void call(final Subscriber<? super T> child) {
4243
@Override
4344
public void onCompleted() {
4445
// subscribe to actual source
45-
source.unsafeSubscribe(new Subscriber<T>(child) {
46-
@Override
47-
public void onNext(T t) {
48-
child.onNext(t);
49-
}
50-
@Override
51-
public void onError(Throwable e) {
52-
child.onError(e);
53-
}
54-
@Override
55-
public void onCompleted() {
56-
child.onCompleted();
57-
}
58-
});
46+
source.unsafeSubscribe(Subscribers.wrap(child));
5947
}
6048

6149
@Override

src/main/java/rx/internal/operators/OnSubscribeUsing.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import rx.Observable.OnSubscribe;
2323
import rx.exceptions.CompositeException;
2424
import rx.functions.*;
25+
import rx.observers.Subscribers;
2526

2627
/**
2728
* Constructs an observable sequence that depends on a resource object.
@@ -68,20 +69,7 @@ public void call(final Subscriber<? super T> subscriber) {
6869
observable = source;
6970
try {
7071
// start
71-
observable.unsafeSubscribe(new Subscriber<T>(subscriber) {
72-
@Override
73-
public void onNext(T t) {
74-
subscriber.onNext(t);
75-
}
76-
@Override
77-
public void onError(Throwable e) {
78-
subscriber.onError(e);
79-
}
80-
@Override
81-
public void onCompleted() {
82-
subscriber.onCompleted();
83-
}
84-
});
72+
observable.unsafeSubscribe(Subscribers.from(subscriber));
8573
} catch (Throwable e) {
8674
Throwable disposeError = disposeEagerlyIfRequested(disposeOnceOnly);
8775
if (disposeError != null)

src/main/java/rx/internal/operators/OperatorDoOnSubscribe.java

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.Observable.Operator;
1919
import rx.Subscriber;
2020
import rx.functions.Action0;
21+
import rx.observers.Subscribers;
2122

2223
/**
2324
* This operator modifies an {@link rx.Observable} so a given action is invoked when the {@link rx.Observable} is subscribed.
@@ -39,19 +40,6 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
3940
subscribe.call();
4041
// Pass through since this operator is for notification only, there is
4142
// no change to the stream whatsoever.
42-
return new Subscriber<T>(child) {
43-
@Override
44-
public void onNext(T t) {
45-
child.onNext(t);
46-
}
47-
@Override
48-
public void onError(Throwable e) {
49-
child.onError(e);
50-
}
51-
@Override
52-
public void onCompleted() {
53-
child.onCompleted();
54-
}
55-
};
43+
return Subscribers.wrap(child);
5644
}
5745
}

src/main/java/rx/internal/operators/OperatorDoOnUnsubscribe.java

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import rx.Observable.Operator;
1919
import rx.*;
2020
import rx.functions.Action0;
21+
import rx.observers.Subscribers;
2122
import rx.subscriptions.Subscriptions;
2223

2324
/**
@@ -41,22 +42,6 @@ public Subscriber<? super T> call(final Subscriber<? super T> child) {
4142

4243
// Pass through since this operator is for notification only, there is
4344
// no change to the stream whatsoever.
44-
return new Subscriber<T>(child) {
45-
@Override
46-
public void onStart() {
47-
}
48-
@Override
49-
public void onNext(T t) {
50-
child.onNext(t);
51-
}
52-
@Override
53-
public void onError(Throwable e) {
54-
child.onError(e);
55-
}
56-
@Override
57-
public void onCompleted() {
58-
child.onCompleted();
59-
}
60-
};
45+
return Subscribers.wrap(child);
6146
}
6247
}

src/main/java/rx/observers/Subscribers.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import rx.Observer;
1919
import rx.Subscriber;
20+
import rx.annotations.Experimental;
2021
import rx.exceptions.OnErrorNotImplementedException;
2122
import rx.functions.Action0;
2223
import rx.functions.Action1;
@@ -198,4 +199,41 @@ public final void onNext(T args) {
198199
};
199200
}
200201

202+
/**
203+
* Returns a new {@link Subscriber} that passes all events to
204+
* <code>subscriber</code>, has backpressure controlled by
205+
* <code>subscriber</code> and uses the subscription list of
206+
* <code>subscriber</code> when {@link Subscriber#add(rx.Subscription)} is
207+
* called.
208+
*
209+
* @param subscriber
210+
* the Subscriber to wrap.
211+
*
212+
* @return a new Subscriber that passes all events to
213+
* <code>subscriber</code>, has backpressure controlled by
214+
* <code>subscriber</code> and uses <code>subscriber</code> to
215+
* manage unsubscription.
216+
*
217+
*/
218+
@Experimental
219+
public static <T> Subscriber<T> wrap(final Subscriber<? super T> subscriber) {
220+
return new Subscriber<T>(subscriber) {
221+
222+
@Override
223+
public void onCompleted() {
224+
subscriber.onCompleted();
225+
}
226+
227+
@Override
228+
public void onError(Throwable e) {
229+
subscriber.onError(e);
230+
}
231+
232+
@Override
233+
public void onNext(T t) {
234+
subscriber.onNext(t);
235+
}
236+
237+
};
238+
}
201239
}

0 commit comments

Comments
 (0)