Skip to content

Commit 6db98f8

Browse files
committed
improve perf of OperatorIgnoreElements
1 parent 1cff8bf commit 6db98f8

File tree

4 files changed

+191
-2
lines changed

4 files changed

+191
-2
lines changed

src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4941,7 +4941,7 @@ public final <T2, D1, D2, R> Observable<R> groupJoin(Observable<T2> right, Func1
49414941
* @see <a href="http://reactivex.io/documentation/operators/ignoreelements.html">ReactiveX operators documentation: IgnoreElements</a>
49424942
*/
49434943
public final Observable<T> ignoreElements() {
4944-
return filter(UtilityFunctions.alwaysFalse());
4944+
return lift(OperatorIgnoreElements.<T> instance());
49454945
}
49464946

49474947
/**
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import rx.Observable.Operator;
19+
import rx.Subscriber;
20+
21+
public class OperatorIgnoreElements<T> implements Operator<T, T> {
22+
23+
private static class Holder {
24+
static final OperatorIgnoreElements<?> INSTANCE = new OperatorIgnoreElements<Object>();
25+
}
26+
27+
@SuppressWarnings("unchecked")
28+
public static <T> OperatorIgnoreElements<T> instance() {
29+
return (OperatorIgnoreElements<T>) Holder.INSTANCE;
30+
}
31+
32+
private OperatorIgnoreElements() {
33+
34+
}
35+
36+
@Override
37+
public Subscriber<? super T> call(final Subscriber<? super T> child) {
38+
Subscriber<T> parent = new Subscriber<T>() {
39+
40+
@Override
41+
public void onCompleted() {
42+
child.onCompleted();
43+
}
44+
45+
@Override
46+
public void onError(Throwable e) {
47+
child.onError(e);
48+
}
49+
50+
@Override
51+
public void onNext(T t) {
52+
// ignore element
53+
}
54+
55+
};
56+
child.add(parent);
57+
return parent;
58+
}
59+
60+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package rx.internal.operators;
2+
3+
import static org.junit.Assert.assertEquals;
4+
import static org.junit.Assert.assertTrue;
5+
6+
import java.util.Arrays;
7+
import java.util.concurrent.atomic.AtomicBoolean;
8+
import java.util.concurrent.atomic.AtomicInteger;
9+
10+
import org.junit.Test;
11+
12+
import rx.Observable;
13+
import rx.Subscriber;
14+
import rx.functions.Action0;
15+
import rx.functions.Action1;
16+
import rx.observers.TestSubscriber;
17+
18+
public class OperatorIgnoreElementsTest {
19+
20+
@Test
21+
public void testWithEmpty() {
22+
assertTrue(Observable.empty().ignoreElements().isEmpty().toBlocking().single());
23+
}
24+
25+
@Test
26+
public void testWithNonEmpty() {
27+
assertTrue(Observable.just(1, 2, 3).ignoreElements().isEmpty().toBlocking().single());
28+
}
29+
30+
@Test
31+
public void testUpstreamIsProcessedButIgnored() {
32+
final int num = 10;
33+
final AtomicInteger upstreamCount = new AtomicInteger();
34+
int count = Observable.range(1, num)
35+
.doOnNext(new Action1<Integer>() {
36+
@Override
37+
public void call(Integer t) {
38+
upstreamCount.incrementAndGet();
39+
}
40+
})
41+
.ignoreElements()
42+
.count().toBlocking().single();
43+
assertEquals(num, upstreamCount.get());
44+
assertEquals(0, count);
45+
}
46+
47+
@Test
48+
public void testCompletedOk() {
49+
TestSubscriber<Object> ts = new TestSubscriber<Object>();
50+
Observable.range(1, 10).ignoreElements().subscribe(ts);
51+
ts.assertNoErrors();
52+
ts.assertReceivedOnNext(Arrays.asList());
53+
ts.assertTerminalEvent();
54+
ts.assertUnsubscribed();
55+
}
56+
57+
@Test
58+
public void testErrorReceived() {
59+
TestSubscriber<Object> ts = new TestSubscriber<Object>();
60+
RuntimeException ex = new RuntimeException("boo");
61+
Observable.error(ex).ignoreElements().subscribe(ts);
62+
ts.assertReceivedOnNext(Arrays.asList());
63+
ts.assertTerminalEvent();
64+
ts.assertUnsubscribed();
65+
assertEquals(1, ts.getOnErrorEvents().size());
66+
assertEquals("boo", ts.getOnErrorEvents().get(0).getMessage());
67+
}
68+
69+
@Test
70+
public void testUnsubscribesFromUpstream() {
71+
final AtomicBoolean unsub = new AtomicBoolean();
72+
Observable.range(1, 10).doOnUnsubscribe(new Action0() {
73+
@Override
74+
public void call() {
75+
unsub.set(true);
76+
}})
77+
.subscribe();
78+
assertTrue(unsub.get());
79+
}
80+
81+
@Test(timeout = 10000)
82+
public void testDoesNotHangAndProcessesAllUsingBackpressure() {
83+
final AtomicInteger upstreamCount = new AtomicInteger();
84+
final AtomicInteger count = new AtomicInteger(0);
85+
int num = 10;
86+
Observable.range(1, num)
87+
//
88+
.doOnNext(new Action1<Integer>() {
89+
@Override
90+
public void call(Integer t) {
91+
upstreamCount.incrementAndGet();
92+
}
93+
})
94+
//
95+
.ignoreElements()
96+
//
97+
.doOnNext(new Action1<Integer>() {
98+
99+
@Override
100+
public void call(Integer t) {
101+
upstreamCount.incrementAndGet();
102+
}
103+
})
104+
//
105+
.subscribe(new Subscriber<Integer>() {
106+
107+
@Override
108+
public void onStart() {
109+
request(1);
110+
}
111+
112+
@Override
113+
public void onCompleted() {
114+
115+
}
116+
117+
@Override
118+
public void onError(Throwable e) {
119+
}
120+
121+
@Override
122+
public void onNext(Integer t) {
123+
count.incrementAndGet();
124+
}
125+
});
126+
assertEquals(num, upstreamCount.get());
127+
assertEquals(0, count.get());
128+
}
129+
130+
}

src/test/java/rx/internal/operators/OperatorTakeLastOneTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414

1515
import rx.Observable;
1616
import rx.Subscriber;
17-
import rx.Subscription;
1817
import rx.functions.Action0;
1918
import rx.functions.Action1;
2019
import rx.observers.TestSubscriber;

0 commit comments

Comments
 (0)