Skip to content

Commit 0bdff66

Browse files
committed
Merge branch '1.x' of https://github.com/ReactiveX/RxJava.git into 1.x
2 parents c566a40 + 3467685 commit 0bdff66

File tree

6 files changed

+124
-83
lines changed

6 files changed

+124
-83
lines changed

src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9203,7 +9203,7 @@ public final Observable<T> skip(long time, TimeUnit unit) {
92039203
* @see <a href="http://reactivex.io/documentation/operators/skip.html">ReactiveX operators documentation: Skip</a>
92049204
*/
92059205
public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler) {
9206-
return lift(new OperatorSkipTimed<T>(time, unit, scheduler));
9206+
return create(new OnSubscribeSkipTimed<T>(this, time, unit, scheduler));
92079207
}
92089208

92099209
/**
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
5+
* use this file except in compliance with the License. You may obtain a copy of
6+
* the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13+
* License for the specific language governing permissions and limitations under
14+
* the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import java.util.concurrent.TimeUnit;
19+
20+
import rx.Observable.OnSubscribe;
21+
import rx.Observable;
22+
import rx.Scheduler;
23+
import rx.Scheduler.Worker;
24+
import rx.Subscriber;
25+
import rx.functions.Action0;
26+
27+
/**
28+
* Skips elements until a specified time elapses.
29+
* @param <T> the value type
30+
*/
31+
public final class OnSubscribeSkipTimed<T> implements OnSubscribe<T> {
32+
final long time;
33+
final TimeUnit unit;
34+
final Scheduler scheduler;
35+
final Observable<T> source;
36+
37+
public OnSubscribeSkipTimed(Observable<T> source, long time, TimeUnit unit, Scheduler scheduler) {
38+
this.source = source;
39+
this.time = time;
40+
this.unit = unit;
41+
this.scheduler = scheduler;
42+
}
43+
44+
@Override
45+
public void call(final Subscriber<? super T> child) {
46+
final Worker worker = scheduler.createWorker();
47+
SkipTimedSubscriber<T> subscriber = new SkipTimedSubscriber<T>(child);
48+
subscriber.add(worker);
49+
child.add(subscriber);
50+
worker.schedule(subscriber, time, unit);
51+
source.unsafeSubscribe(subscriber);
52+
}
53+
54+
final static class SkipTimedSubscriber<T> extends Subscriber<T> implements Action0 {
55+
56+
final Subscriber<? super T> child;
57+
volatile boolean gate;
58+
59+
SkipTimedSubscriber(Subscriber<? super T> child) {
60+
this.child = child;
61+
}
62+
63+
@Override
64+
public void call() {
65+
gate = true;
66+
}
67+
68+
@Override
69+
public void onNext(T t) {
70+
if (gate) {
71+
child.onNext(t);
72+
}
73+
}
74+
75+
@Override
76+
public void onError(Throwable e) {
77+
try {
78+
child.onError(e);
79+
} finally {
80+
unsubscribe();
81+
}
82+
}
83+
84+
@Override
85+
public void onCompleted() {
86+
try {
87+
child.onCompleted();
88+
} finally {
89+
unsubscribe();
90+
}
91+
}
92+
93+
}
94+
}

src/main/java/rx/internal/operators/OperatorSkipTimed.java

Lines changed: 0 additions & 80 deletions
This file was deleted.

src/test/java/rx/internal/operators/OperatorFilterTest.java renamed to src/test/java/rx/internal/operators/OnSubscribeFilterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import rx.observers.TestSubscriber;
3131
import rx.subjects.PublishSubject;
3232

33-
public class OperatorFilterTest {
33+
public class OnSubscribeFilterTest {
3434

3535
@Test
3636
public void testFilter() {

src/test/java/rx/internal/operators/OperatorMapTest.java renamed to src/test/java/rx/internal/operators/OnSubscribeMapTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import rx.schedulers.Schedulers;
3333
import rx.subjects.PublishSubject;
3434

35-
public class OperatorMapTest {
35+
public class OnSubscribeMapTest {
3636

3737
@Mock
3838
Observer<String> stringObserver;

src/test/java/rx/internal/operators/OperatorSkipTimedTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,24 @@
1515
*/
1616
package rx.internal.operators;
1717

18+
import static org.junit.Assert.assertTrue;
1819
import static org.mockito.Matchers.any;
1920
import static org.mockito.Mockito.inOrder;
2021
import static org.mockito.Mockito.mock;
2122
import static org.mockito.Mockito.never;
2223
import static org.mockito.Mockito.verify;
2324

2425
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2527

2628
import org.junit.Test;
2729
import org.mockito.InOrder;
2830

2931
import rx.Observable;
3032
import rx.Observer;
3133
import rx.exceptions.TestException;
34+
import rx.functions.Action0;
35+
import rx.observers.TestSubscriber;
3236
import rx.schedulers.TestScheduler;
3337
import rx.subjects.PublishSubject;
3438

@@ -166,4 +170,27 @@ public void testSkipTimedErrorAfterTime() {
166170
verify(o, never()).onCompleted();
167171

168172
}
173+
174+
@Test
175+
public void testSkipTimedUnsubscribePropagatesToUpstream() {
176+
TestScheduler scheduler = new TestScheduler();
177+
178+
PublishSubject<Integer> source = PublishSubject.create();
179+
180+
final AtomicBoolean unsub = new AtomicBoolean();
181+
Observable<Integer> result = source.doOnUnsubscribe(new Action0() {
182+
183+
@Override
184+
public void call() {
185+
unsub.set(true);
186+
}
187+
}).skip(1, TimeUnit.SECONDS, scheduler);
188+
189+
TestSubscriber<Object> ts = TestSubscriber.create();
190+
191+
result.subscribe(ts);
192+
source.onNext(1);
193+
ts.unsubscribe();
194+
assertTrue(unsub.get());
195+
}
169196
}

0 commit comments

Comments
 (0)