File tree Expand file tree Collapse file tree 4 files changed +67
-0
lines changed
src/main/java/io/reactivex/internal/operators Expand file tree Collapse file tree 4 files changed +67
-0
lines changed Original file line number Diff line number Diff line change
1
+ package io .reactivex .internal .operators .completable ;
2
+
3
+ import io .reactivex .Completable ;
4
+
5
+ public final class CompletableWrapper extends Completable {
6
+
7
+ final CompletableOnSubscribe onSubscribe ;
8
+
9
+ public CompletableWrapper (CompletableOnSubscribe onSubscribe ) {
10
+ this .onSubscribe = onSubscribe ;
11
+ }
12
+
13
+ @ Override
14
+ protected void subscribeActual (CompletableSubscriber s ) {
15
+ onSubscribe .accept (s );
16
+ }
17
+ }
Original file line number Diff line number Diff line change
1
+ package io .reactivex .internal .operators .flowable ;
2
+
3
+ import org .reactivestreams .*;
4
+
5
+ import io .reactivex .Flowable ;
6
+
7
+ public final class FlowableWrapper <T > extends Flowable <T > {
8
+ final Publisher <? extends T > publisher ;
9
+
10
+ public FlowableWrapper (Publisher <? extends T > publisher ) {
11
+ this .publisher = publisher ;
12
+ }
13
+
14
+ @ Override
15
+ protected void subscribeActual (Subscriber <? super T > s ) {
16
+ publisher .subscribe (s );
17
+ }
18
+ }
Original file line number Diff line number Diff line change
1
+ package io .reactivex .internal .operators .observable ;
2
+
3
+ import io .reactivex .*;
4
+
5
+ public final class ObservableWrapper <T > extends Observable <T > {
6
+ final NbpOnSubscribe <T > onSubscribe ;
7
+
8
+ public ObservableWrapper (NbpOnSubscribe <T > onSubscribe ) {
9
+ this .onSubscribe = onSubscribe ;
10
+ }
11
+
12
+ @ Override
13
+ protected void subscribeActual (Observer <? super T > observer ) {
14
+ onSubscribe .accept (observer );
15
+ }
16
+ }
Original file line number Diff line number Diff line change
1
+ package io .reactivex .internal .operators .single ;
2
+
3
+ import io .reactivex .Single ;
4
+
5
+ public final class SingleWrapper <T > extends Single <T > {
6
+ final SingleOnSubscribe <T > onSubscribe ;
7
+
8
+ public SingleWrapper (io .reactivex .Single .SingleOnSubscribe <T > onSubscribe ) {
9
+ this .onSubscribe = onSubscribe ;
10
+ }
11
+
12
+ @ Override
13
+ protected void subscribeActual (io .reactivex .Single .SingleSubscriber <? super T > subscriber ) {
14
+ onSubscribe .accept (subscriber );
15
+ }
16
+ }
You can’t perform that action at this time.
0 commit comments