Skip to content

Commit 6362dfe

Browse files
committed
Merge pull request #3137 from akarnokd/FromIterablePerf
FromIterable overhead reduction.
2 parents 9a84006 + f6ea890 commit 6362dfe

File tree

2 files changed

+142
-48
lines changed

2 files changed

+142
-48
lines changed

src/main/java/rx/internal/operators/OnSubscribeFromIterable.java

Lines changed: 57 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616
package rx.internal.operators;
1717

1818
import java.util.Iterator;
19-
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
19+
import java.util.concurrent.atomic.AtomicLong;
2020

21+
import rx.*;
2122
import rx.Observable.OnSubscribe;
22-
import rx.Producer;
23-
import rx.Subscriber;
2423

2524
/**
2625
* Converts an {@code Iterable} sequence into an {@code Observable}.
@@ -50,33 +49,54 @@ public void call(final Subscriber<? super T> o) {
5049
o.setProducer(new IterableProducer<T>(o, it));
5150
}
5251

53-
private static final class IterableProducer<T> implements Producer {
52+
private static final class IterableProducer<T> extends AtomicLong implements Producer {
53+
/** */
54+
private static final long serialVersionUID = -8730475647105475802L;
5455
private final Subscriber<? super T> o;
5556
private final Iterator<? extends T> it;
5657

57-
private volatile long requested = 0;
58-
@SuppressWarnings("rawtypes")
59-
private static final AtomicLongFieldUpdater<IterableProducer> REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(IterableProducer.class, "requested");
60-
6158
private IterableProducer(Subscriber<? super T> o, Iterator<? extends T> it) {
6259
this.o = o;
6360
this.it = it;
6461
}
6562

6663
@Override
6764
public void request(long n) {
68-
if (requested == Long.MAX_VALUE) {
65+
if (get() == Long.MAX_VALUE) {
6966
// already started with fast-path
7067
return;
7168
}
72-
if (n == Long.MAX_VALUE && REQUESTED_UPDATER.compareAndSet(this, 0, Long.MAX_VALUE)) {
73-
// fast-path without backpressure
69+
if (n == Long.MAX_VALUE && compareAndSet(0, Long.MAX_VALUE)) {
70+
fastpath();
71+
} else
72+
if (n > 0 && BackpressureUtils.getAndAddRequest(this, n) == 0L) {
73+
slowpath(n);
74+
}
75+
76+
}
77+
78+
void slowpath(long n) {
79+
// backpressure is requested
80+
final Subscriber<? super T> o = this.o;
81+
final Iterator<? extends T> it = this.it;
7482

83+
long r = n;
84+
while (true) {
85+
/*
86+
* This complicated logic is done to avoid touching the
87+
* volatile `requested` value during the loop itself. If
88+
* it is touched during the loop the performance is
89+
* impacted significantly.
90+
*/
91+
long numToEmit = r;
7592
while (true) {
7693
if (o.isUnsubscribed()) {
7794
return;
7895
} else if (it.hasNext()) {
79-
o.onNext(it.next());
96+
if (--numToEmit >= 0) {
97+
o.onNext(it.next());
98+
} else
99+
break;
80100
} else if (!o.isUnsubscribed()) {
81101
o.onCompleted();
82102
return;
@@ -85,45 +105,34 @@ public void request(long n) {
85105
return;
86106
}
87107
}
88-
} else if (n > 0) {
89-
// backpressure is requested
90-
long _c = BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER, this, n);
91-
if (_c == 0) {
92-
while (true) {
93-
/*
94-
* This complicated logic is done to avoid touching the
95-
* volatile `requested` value during the loop itself. If
96-
* it is touched during the loop the performance is
97-
* impacted significantly.
98-
*/
99-
long r = requested;
100-
long numToEmit = r;
101-
while (true) {
102-
if (o.isUnsubscribed()) {
103-
return;
104-
} else if (it.hasNext()) {
105-
if (--numToEmit >= 0) {
106-
o.onNext(it.next());
107-
} else
108-
break;
109-
} else if (!o.isUnsubscribed()) {
110-
o.onCompleted();
111-
return;
112-
} else {
113-
// is unsubscribed
114-
return;
115-
}
116-
}
117-
if (REQUESTED_UPDATER.addAndGet(this, -r) == 0) {
118-
// we're done emitting the number requested so
119-
// return
120-
return;
121-
}
122-
123-
}
108+
r = addAndGet(-r);
109+
if (r == 0L) {
110+
// we're done emitting the number requested so
111+
// return
112+
return;
124113
}
114+
125115
}
116+
}
126117

118+
void fastpath() {
119+
// fast-path without backpressure
120+
final Subscriber<? super T> o = this.o;
121+
final Iterator<? extends T> it = this.it;
122+
123+
while (true) {
124+
if (o.isUnsubscribed()) {
125+
return;
126+
} else if (it.hasNext()) {
127+
o.onNext(it.next());
128+
} else if (!o.isUnsubscribed()) {
129+
o.onCompleted();
130+
return;
131+
} else {
132+
// is unsubscribed
133+
return;
134+
}
135+
}
127136
}
128137
}
129138

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.operators;
18+
19+
import java.util.Arrays;
20+
import java.util.concurrent.TimeUnit;
21+
22+
import org.openjdk.jmh.annotations.*;
23+
import org.openjdk.jmh.infra.Blackhole;
24+
25+
import rx.*;
26+
import rx.internal.operators.OnSubscribeFromIterable;
27+
import rx.jmh.LatchedObserver;
28+
29+
/**
30+
* Benchmark from(Iterable).
31+
* <p>
32+
* gradlew benchmarks "-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 1 .*FromIterablePerf.*"
33+
* <p>
34+
* gradlew benchmarks "-Pjmh=-f 1 -tu ns -bm avgt -wi 5 -i 5 -r 1 .*FromIterablePerf.*"
35+
*/
36+
@BenchmarkMode(Mode.Throughput)
37+
@OutputTimeUnit(TimeUnit.SECONDS)
38+
@State(Scope.Thread)
39+
public class FromIterablePerf {
40+
Observable<Integer> from;
41+
OnSubscribeFromIterable<Integer> direct;
42+
@Param({"1", "1000", "1000000"})
43+
public int size;
44+
45+
@Setup
46+
public void setup() {
47+
Integer[] array = new Integer[size];
48+
for (int i = 0; i < size; i++) {
49+
array[i] = i;
50+
}
51+
from = Observable.from(Arrays.asList(array));
52+
direct = new OnSubscribeFromIterable<Integer>(Arrays.asList(array));
53+
}
54+
55+
@Benchmark
56+
public void from(Blackhole bh) {
57+
from.subscribe(new LatchedObserver<Integer>(bh));
58+
}
59+
@Benchmark
60+
public void fromUnsafe(final Blackhole bh) {
61+
from.unsafeSubscribe(createSubscriber(bh));
62+
}
63+
64+
@Benchmark
65+
public void direct(final Blackhole bh) {
66+
direct.call(createSubscriber(bh));
67+
}
68+
69+
Subscriber<Integer> createSubscriber(final Blackhole bh) {
70+
return new Subscriber<Integer>() {
71+
@Override
72+
public void onNext(Integer t) {
73+
bh.consume(t);
74+
}
75+
@Override
76+
public void onError(Throwable e) {
77+
e.printStackTrace();
78+
}
79+
@Override
80+
public void onCompleted() {
81+
82+
}
83+
};
84+
}
85+
}

0 commit comments

Comments
 (0)