File tree Expand file tree Collapse file tree 5 files changed +8
-8
lines changed
main/java/io/reactivex/internal/operators/flowable Expand file tree Collapse file tree 5 files changed +8
-8
lines changed Original file line number Diff line number Diff line change 27
27
import io .reactivex .internal .util .*;
28
28
29
29
public final class BlockingFlowableIterable <T > implements Iterable <T > {
30
- final Flowable <? extends T > source ;
30
+ final Flowable <T > source ;
31
31
32
32
final int bufferSize ;
33
33
34
- public BlockingFlowableIterable (Flowable <? extends T > source , int bufferSize ) {
34
+ public BlockingFlowableIterable (Flowable <T > source , int bufferSize ) {
35
35
this .source = source ;
36
36
this .bufferSize = bufferSize ;
37
37
}
Original file line number Diff line number Diff line change 29
29
*/
30
30
public final class BlockingFlowableMostRecent <T > implements Iterable <T > {
31
31
32
- final Flowable <? extends T > source ;
32
+ final Flowable <T > source ;
33
33
34
34
final T initialValue ;
35
35
36
- public BlockingFlowableMostRecent (Flowable <? extends T > source , T initialValue ) {
36
+ public BlockingFlowableMostRecent (Flowable <T > source , T initialValue ) {
37
37
this .source = source ;
38
38
this .initialValue = initialValue ;
39
39
}
Original file line number Diff line number Diff line change @@ -93,7 +93,7 @@ protected void subscribeActual(Subscriber<? super T> t) {
93
93
*/
94
94
static final class CacheState <T > extends LinkedArrayList implements FlowableSubscriber <T > {
95
95
/** The source observable to connect to. */
96
- final Flowable <? extends T > source ;
96
+ final Flowable <T > source ;
97
97
/** Holds onto the subscriber connected to source. */
98
98
final AtomicReference <Subscription > connection = new AtomicReference <Subscription >();
99
99
/** Guarded by connection (not this). */
@@ -114,7 +114,7 @@ static final class CacheState<T> extends LinkedArrayList implements FlowableSubs
114
114
boolean sourceDone ;
115
115
116
116
@ SuppressWarnings ("unchecked" )
117
- CacheState (Flowable <? extends T > source , int capacityHint ) {
117
+ CacheState (Flowable <T > source , int capacityHint ) {
118
118
super (capacityHint );
119
119
this .source = source ;
120
120
this .subscribers = new AtomicReference <ReplaySubscription <T >[]>(EMPTY );
Original file line number Diff line number Diff line change 32
32
* the value type
33
33
*/
34
34
public final class FlowableRefCount <T > extends AbstractFlowableWithUpstream <T , T > {
35
- final ConnectableFlowable <? extends T > source ;
35
+ final ConnectableFlowable <T > source ;
36
36
volatile CompositeDisposable baseDisposable = new CompositeDisposable ();
37
37
final AtomicInteger subscriptionCount = new AtomicInteger ();
38
38
Original file line number Diff line number Diff line change @@ -644,7 +644,7 @@ public static void doubleOnSubscribe(MaybeObserver<?> subscriber) {
644
644
* isCancelled properly before and after calling dispose.
645
645
* @param source the source to test
646
646
*/
647
- public static void checkDisposed (Flowable <? > source ) {
647
+ public static < T > void checkDisposed (Flowable <T > source ) {
648
648
final TestSubscriber <Object > ts = new TestSubscriber <Object >(0L );
649
649
source .subscribe (new FlowableSubscriber <Object >() {
650
650
@ Override
You can’t perform that action at this time.
0 commit comments