Skip to content

Commit 3bf68ff

Browse files
committed
convert to OnSubscribe
1 parent 6f2e8af commit 3bf68ff

File tree

2 files changed

+9
-6
lines changed

2 files changed

+9
-6
lines changed

src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9203,7 +9203,7 @@ public final Observable<T> skip(long time, TimeUnit unit) {
92039203
* @see <a href="http://reactivex.io/documentation/operators/skip.html">ReactiveX operators documentation: Skip</a>
92049204
*/
92059205
public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler) {
9206-
return lift(new OperatorSkipTimed<T>(time, unit, scheduler));
9206+
return create(new OnSubscribeSkipTimed<T>(this, time, unit, scheduler));
92079207
}
92089208

92099209
/**

src/main/java/rx/internal/operators/OperatorSkipTimed.java renamed to src/main/java/rx/internal/operators/OnSubscribeSkipTimed.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
import java.util.concurrent.TimeUnit;
1919

20-
import rx.Observable.Operator;
20+
import rx.Observable.OnSubscribe;
21+
import rx.Observable;
2122
import rx.Scheduler;
2223
import rx.Scheduler.Worker;
2324
import rx.Subscriber;
@@ -27,24 +28,26 @@
2728
* Skips elements until a specified time elapses.
2829
* @param <T> the value type
2930
*/
30-
public final class OperatorSkipTimed<T> implements Operator<T, T> {
31+
public final class OnSubscribeSkipTimed<T> implements OnSubscribe<T> {
3132
final long time;
3233
final TimeUnit unit;
3334
final Scheduler scheduler;
35+
final Observable<T> source;
3436

35-
public OperatorSkipTimed(long time, TimeUnit unit, Scheduler scheduler) {
37+
public OnSubscribeSkipTimed(Observable<T> source, long time, TimeUnit unit, Scheduler scheduler) {
38+
this.source = source;
3639
this.time = time;
3740
this.unit = unit;
3841
this.scheduler = scheduler;
3942
}
4043

4144
@Override
42-
public Subscriber<? super T> call(final Subscriber<? super T> child) {
45+
public void call(final Subscriber<? super T> child) {
4346
final Worker worker = scheduler.createWorker();
4447
child.add(worker);
4548
SkipTimedSubscriber<T> subscriber = new SkipTimedSubscriber<T>(child);
4649
worker.schedule(subscriber, time, unit);
47-
return subscriber;
50+
source.unsafeSubscribe(subscriber);
4851
}
4952

5053
final static class SkipTimedSubscriber<T> extends Subscriber<T> implements Action0 {

0 commit comments

Comments
 (0)