Skip to content

Commit aedea7a

Browse files
committed
Feature/mono processor rework (#723)
* reworking unicast mono processor impl This reverts commit c5684eb. Signed-off-by: Oleh Dokuka <[email protected]> * reworking unicast mono processor impl This reverts commit c5684eb. Signed-off-by: Oleh Dokuka <[email protected]> * provides perf comparison Signed-off-by: Oleh Dokuka <[email protected]> * provides more strict tests Signed-off-by: Oleh Dokuka <[email protected]> * provides formatting Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 2f7b705 commit aedea7a

File tree

8 files changed

+3074
-110
lines changed

8 files changed

+3074
-110
lines changed

benchmarks/src/main/java/io/rsocket/MaxPerfSubscriber.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
import org.reactivestreams.Subscription;
66
import reactor.core.CoreSubscriber;
77

8-
public class MaxPerfSubscriber implements CoreSubscriber<Payload> {
8+
public class MaxPerfSubscriber<T> extends CountDownLatch implements CoreSubscriber<T> {
99

10-
final CountDownLatch latch = new CountDownLatch(1);
1110
final Blackhole blackhole;
1211

1312
public MaxPerfSubscriber(Blackhole blackhole) {
13+
super(1);
1414
this.blackhole = blackhole;
1515
}
1616

@@ -20,19 +20,18 @@ public void onSubscribe(Subscription s) {
2020
}
2121

2222
@Override
23-
public void onNext(Payload payload) {
24-
payload.release();
23+
public void onNext(T payload) {
2524
blackhole.consume(payload);
2625
}
2726

2827
@Override
2928
public void onError(Throwable t) {
3029
blackhole.consume(t);
31-
latch.countDown();
30+
countDown();
3231
}
3332

3433
@Override
3534
public void onComplete() {
36-
latch.countDown();
35+
countDown();
3736
}
3837
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.rsocket;
2+
3+
import org.openjdk.jmh.infra.Blackhole;
4+
5+
public class PayloadsMaxPerfSubscriber extends MaxPerfSubscriber<Payload> {
6+
7+
public PayloadsMaxPerfSubscriber(Blackhole blackhole) {
8+
super(blackhole);
9+
}
10+
11+
@Override
12+
public void onNext(Payload payload) {
13+
payload.release();
14+
super.onNext(payload);
15+
}
16+
}

benchmarks/src/main/java/io/rsocket/PerfSubscriber.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@
55
import org.reactivestreams.Subscription;
66
import reactor.core.CoreSubscriber;
77

8-
public class PerfSubscriber<T> implements CoreSubscriber<T> {
8+
public class PerfSubscriber<T> extends CountDownLatch implements CoreSubscriber<T> {
99

10-
public final CountDownLatch latch = new CountDownLatch(1);
1110
final Blackhole blackhole;
1211

1312
Subscription s;
1413

1514
public PerfSubscriber(Blackhole blackhole) {
15+
super(1);
1616
this.blackhole = blackhole;
1717
}
1818

@@ -31,11 +31,11 @@ public void onNext(T payload) {
3131
@Override
3232
public void onError(Throwable t) {
3333
blackhole.consume(t);
34-
latch.countDown();
34+
countDown();
3535
}
3636

3737
@Override
3838
public void onComplete() {
39-
latch.countDown();
39+
countDown();
4040
}
4141
}

benchmarks/src/main/java/io/rsocket/RSocketPerf.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
112112
public PayloadsPerfSubscriber fireAndForget(Blackhole blackhole) throws InterruptedException {
113113
PayloadsPerfSubscriber subscriber = new PayloadsPerfSubscriber(blackhole);
114114
client.fireAndForget(PAYLOAD).subscribe((CoreSubscriber) subscriber);
115-
subscriber.latch.await();
115+
subscriber.await();
116116

117117
return subscriber;
118118
}
@@ -121,7 +121,7 @@ public PayloadsPerfSubscriber fireAndForget(Blackhole blackhole) throws Interrup
121121
public PayloadsPerfSubscriber requestResponse(Blackhole blackhole) throws InterruptedException {
122122
PayloadsPerfSubscriber subscriber = new PayloadsPerfSubscriber(blackhole);
123123
client.requestResponse(PAYLOAD).subscribe(subscriber);
124-
subscriber.latch.await();
124+
subscriber.await();
125125

126126
return subscriber;
127127
}
@@ -131,17 +131,17 @@ public PayloadsPerfSubscriber requestStreamWithRequestByOneStrategy(Blackhole bl
131131
throws InterruptedException {
132132
PayloadsPerfSubscriber subscriber = new PayloadsPerfSubscriber(blackhole);
133133
client.requestStream(PAYLOAD).subscribe(subscriber);
134-
subscriber.latch.await();
134+
subscriber.await();
135135

136136
return subscriber;
137137
}
138138

139139
@Benchmark
140-
public MaxPerfSubscriber requestStreamWithRequestAllStrategy(Blackhole blackhole)
140+
public PayloadsMaxPerfSubscriber requestStreamWithRequestAllStrategy(Blackhole blackhole)
141141
throws InterruptedException {
142-
MaxPerfSubscriber subscriber = new MaxPerfSubscriber(blackhole);
142+
PayloadsMaxPerfSubscriber subscriber = new PayloadsMaxPerfSubscriber(blackhole);
143143
client.requestStream(PAYLOAD).subscribe(subscriber);
144-
subscriber.latch.await();
144+
subscriber.await();
145145

146146
return subscriber;
147147
}
@@ -151,17 +151,17 @@ public PayloadsPerfSubscriber requestChannelWithRequestByOneStrategy(Blackhole b
151151
throws InterruptedException {
152152
PayloadsPerfSubscriber subscriber = new PayloadsPerfSubscriber(blackhole);
153153
client.requestChannel(PAYLOAD_FLUX).subscribe(subscriber);
154-
subscriber.latch.await();
154+
subscriber.await();
155155

156156
return subscriber;
157157
}
158158

159159
@Benchmark
160-
public MaxPerfSubscriber requestChannelWithRequestAllStrategy(Blackhole blackhole)
160+
public PayloadsMaxPerfSubscriber requestChannelWithRequestAllStrategy(Blackhole blackhole)
161161
throws InterruptedException {
162-
MaxPerfSubscriber subscriber = new MaxPerfSubscriber(blackhole);
162+
PayloadsMaxPerfSubscriber subscriber = new PayloadsMaxPerfSubscriber(blackhole);
163163
client.requestChannel(PAYLOAD_FLUX).subscribe(subscriber);
164-
subscriber.latch.await();
164+
subscriber.await();
165165

166166
return subscriber;
167167
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package io.rsocket.internal;
2+
3+
import io.rsocket.MaxPerfSubscriber;
4+
import java.util.concurrent.TimeUnit;
5+
import org.openjdk.jmh.annotations.Benchmark;
6+
import org.openjdk.jmh.annotations.BenchmarkMode;
7+
import org.openjdk.jmh.annotations.Fork;
8+
import org.openjdk.jmh.annotations.Measurement;
9+
import org.openjdk.jmh.annotations.Mode;
10+
import org.openjdk.jmh.annotations.OutputTimeUnit;
11+
import org.openjdk.jmh.annotations.Scope;
12+
import org.openjdk.jmh.annotations.State;
13+
import org.openjdk.jmh.annotations.Warmup;
14+
import org.openjdk.jmh.infra.Blackhole;
15+
import reactor.core.publisher.MonoProcessor;
16+
17+
@BenchmarkMode({Mode.Throughput, Mode.SampleTime})
18+
@Fork(1)
19+
@Warmup(iterations = 10)
20+
@Measurement(iterations = 10, time = 20)
21+
@State(Scope.Benchmark)
22+
@OutputTimeUnit(TimeUnit.MICROSECONDS)
23+
public class UnicastVsDefaultMonoProcessorPerf {
24+
25+
@Benchmark
26+
public void monoProcessorPerf(Blackhole bh) {
27+
MaxPerfSubscriber<Integer> subscriber = new MaxPerfSubscriber<>(bh);
28+
MonoProcessor<Integer> monoProcessor = MonoProcessor.create();
29+
monoProcessor.onNext(1);
30+
monoProcessor.subscribe(subscriber);
31+
32+
bh.consume(monoProcessor);
33+
bh.consume(subscriber);
34+
}
35+
36+
@Benchmark
37+
public void unicastMonoProcessorPerf(Blackhole bh) {
38+
MaxPerfSubscriber<Integer> subscriber = new MaxPerfSubscriber<>(bh);
39+
UnicastMonoProcessor<Integer> monoProcessor = UnicastMonoProcessor.create();
40+
monoProcessor.onNext(1);
41+
monoProcessor.subscribe(subscriber);
42+
43+
bh.consume(monoProcessor);
44+
bh.consume(subscriber);
45+
}
46+
}

0 commit comments

Comments
 (0)