Skip to content

Commit c55a539

Browse files
committed
improve perf of OperatorIgnoreElements
1 parent 1cff8bf commit c55a539

File tree

4 files changed

+139
-2
lines changed

4 files changed

+139
-2
lines changed

src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4941,7 +4941,7 @@ public final <T2, D1, D2, R> Observable<R> groupJoin(Observable<T2> right, Func1
49414941
* @see <a href="http://reactivex.io/documentation/operators/ignoreelements.html">ReactiveX operators documentation: IgnoreElements</a>
49424942
*/
49434943
public final Observable<T> ignoreElements() {
4944-
return filter(UtilityFunctions.alwaysFalse());
4944+
return lift(OperatorIgnoreElements.<T> instance());
49454945
}
49464946

49474947
/**
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable.Operator;
19+
import rx.Subscriber;
20+
21+
public class OperatorIgnoreElements<T> implements Operator<T, T> {
22+
23+
private static class Holder {
24+
static final OperatorIgnoreElements<?> INSTANCE = new OperatorIgnoreElements<Object>();
25+
}
26+
27+
@SuppressWarnings("unchecked")
28+
public static <T> OperatorIgnoreElements<T> instance() {
29+
return (OperatorIgnoreElements<T>) Holder.INSTANCE;
30+
}
31+
32+
private OperatorIgnoreElements() {
33+
34+
}
35+
36+
@Override
37+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
38+
return new Subscriber<T>(child) {
39+
40+
@Override
41+
public void onCompleted() {
42+
child.onCompleted();
43+
}
44+
45+
@Override
46+
public void onError(Throwable e) {
47+
child.onError(e);
48+
}
49+
50+
@Override
51+
public void onNext(T t) {
52+
// ignore element
53+
}
54+
55+
};
56+
}
57+
58+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package rx.internal.operators;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertTrue;
5+
6+
import java.util.Arrays;
7+
import java.util.concurrent.atomic.AtomicBoolean;
8+
import java.util.concurrent.atomic.AtomicInteger;
9+
10+
import org.junit.Test;
11+
12+
import rx.Observable;
13+
import rx.functions.Action0;
14+
import rx.functions.Action1;
15+
import rx.observers.TestSubscriber;
16+
17+
public class OperatorIgnoreElementsTest {
18+
19+
@Test
20+
public void testWithEmpty() {
21+
assertTrue(Observable.empty().ignoreElements().isEmpty().toBlocking().single());
22+
}
23+
24+
@Test
25+
public void testWithNonEmpty() {
26+
assertTrue(Observable.just(1, 2, 3).ignoreElements().isEmpty().toBlocking().single());
27+
}
28+
29+
@Test
30+
public void testUpstreamIsProcessedButIgnored() {
31+
final int num = 10;
32+
final AtomicInteger upstreamCount = new AtomicInteger();
33+
int count = Observable.range(1, num)
34+
.doOnNext(new Action1<Integer>() {
35+
@Override
36+
public void call(Integer t) {
37+
upstreamCount.incrementAndGet();
38+
}
39+
})
40+
.ignoreElements()
41+
.count().toBlocking().single();
42+
assertEquals(num, upstreamCount.get());
43+
assertEquals(0, count);
44+
}
45+
46+
@Test
47+
public void testCompletedOk() {
48+
TestSubscriber<Object> ts = new TestSubscriber<Object>();
49+
Observable.range(1, 10).ignoreElements().subscribe(ts);
50+
ts.assertNoErrors();
51+
ts.assertReceivedOnNext(Arrays.asList());
52+
ts.assertTerminalEvent();
53+
ts.assertUnsubscribed();
54+
}
55+
56+
@Test
57+
public void testErrorReceived() {
58+
TestSubscriber<Object> ts = new TestSubscriber<Object>();
59+
RuntimeException ex = new RuntimeException("boo");
60+
Observable.error(ex).ignoreElements().subscribe(ts);
61+
ts.assertReceivedOnNext(Arrays.asList());
62+
ts.assertTerminalEvent();
63+
ts.assertUnsubscribed();
64+
assertEquals(1, ts.getOnErrorEvents().size());
65+
assertEquals("boo", ts.getOnErrorEvents().get(0).getMessage());
66+
}
67+
68+
@Test
69+
public void testUnsubscribesFromUpstream() {
70+
final AtomicBoolean unsub = new AtomicBoolean();
71+
Observable.range(1, 10).doOnUnsubscribe(new Action0() {
72+
@Override
73+
public void call() {
74+
unsub.set(true);
75+
}})
76+
.subscribe();
77+
assertTrue(unsub.get());
78+
}
79+
80+
}

src/test/java/rx/internal/operators/OperatorTakeLastOneTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
import rx.Observable;
1616
import rx.Subscriber;
17-
import rx.Subscription;
1817
import rx.functions.Action0;
1918
import rx.functions.Action1;
2019
import rx.observers.TestSubscriber;

0 commit comments

Comments
 (0)