Skip to content

Commit fd52305

Browse files
authored
Refactored StressTest for easier configuration (#204)
#### Problem `StressTest` is pretty useful to do blackbox testing of ReactiveSocket. It is hard to add behavior like leases, keep alive, timeouts, etc. #### Modification Refactored and did the following changes: - Introduced `TestConfig` that is all that needs to be altered to do behavior changes like lease, timeouts, etc. - Changed `StressTest` to use `TestConfig` and only contain the core logic of the test. - Introduced a `StressTestDriver` that contains the creation of config and invocation of `StressTest` #### Result More powerful `StressTest` which can be used for a variety of situations.
1 parent bc9c3e6 commit fd52305

File tree

6 files changed

+406
-203
lines changed

6 files changed

+406
-203
lines changed
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.examples.transport.tcp.stress;
15+
16+
import io.reactivesocket.ReactiveSocket;
17+
import io.reactivesocket.client.LoadBalancingClient;
18+
import io.reactivesocket.exceptions.RejectedException;
19+
import io.reactivesocket.server.ReactiveSocketServer;
20+
import io.reactivesocket.transport.tcp.server.TcpTransportServer;
21+
import io.reactivex.Flowable;
22+
import io.reactivex.Single;
23+
import io.reactivex.disposables.Disposable;
24+
import org.HdrHistogram.Recorder;
25+
import org.reactivestreams.Publisher;
26+
27+
import java.net.SocketAddress;
28+
import java.time.Duration;
29+
import java.util.ArrayList;
30+
import java.util.Collection;
31+
import java.util.List;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.TimeoutException;
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
import java.util.concurrent.atomic.AtomicLong;
36+
import java.util.function.Function;
37+
38+
class StressTest {
39+
40+
private final AtomicInteger serverCount = new AtomicInteger(0);
41+
private final TestConfig config;
42+
private final AtomicInteger successes;
43+
private final AtomicInteger failures;
44+
private final AtomicInteger leaseExhausted;
45+
private final AtomicInteger timeouts;
46+
private final AtomicInteger outstandings = new AtomicInteger();
47+
private final Recorder histogram;
48+
private volatile long testStartTime;
49+
private ReactiveSocket clientSocket;
50+
private Disposable printDisposable;
51+
52+
StressTest(TestConfig config) {
53+
this.config = config;
54+
successes = new AtomicInteger(0);
55+
failures = new AtomicInteger(0);
56+
leaseExhausted = new AtomicInteger();
57+
timeouts = new AtomicInteger();
58+
histogram = new Recorder(TimeUnit.MINUTES.toNanos(1), 4);
59+
}
60+
61+
public StressTest printStatsEvery(Duration duration) {
62+
printDisposable = Flowable.interval(duration.getSeconds(), TimeUnit.SECONDS)
63+
.forEach(aLong -> {
64+
printTestStats(false);
65+
});
66+
return this;
67+
}
68+
69+
public void printTestStats(boolean printLatencyDistribution) {
70+
System.out.println("==============================================================");
71+
long timeElapsed = (System.nanoTime() - testStartTime) / 1_000_000;
72+
System.out.println(successes.get() + " events in " + timeElapsed +
73+
" ms. Test time remaining(ms): " + (config.getTestDuration().toMillis() - timeElapsed));
74+
double rps = 1_000_000_000.0 * successes.get() / (System.nanoTime() - testStartTime);
75+
System.out.println(rps + " rps");
76+
double rate = (double) successes.get() / (successes.get() + failures.get());
77+
System.out.println("successes: " + successes.get()
78+
+ ", failures: " + failures.get()
79+
+ ", timeouts: " + timeouts.get()
80+
+ ", lease exhaustion: " + leaseExhausted.get()
81+
+ ", success rate: " + rate);
82+
if (printLatencyDistribution) {
83+
System.out.println("Latency distribution in us");
84+
histogram.getIntervalHistogram().outputPercentileDistribution(System.out, 1000.0);
85+
}
86+
System.out.println("==============================================================");
87+
System.out.flush();
88+
}
89+
90+
public StressTest startClient() {
91+
LoadBalancingClient client = LoadBalancingClient.create(getServerList(),
92+
address -> config.newClientForServer(address));
93+
clientSocket = Single.fromPublisher(client.connect()).blockingGet();
94+
System.out.println("Client ready!");
95+
return this;
96+
}
97+
98+
private Publisher<? extends Collection<SocketAddress>> getServerList() {
99+
return config.serverListChangeTicks()
100+
.map(aLong -> startServer())
101+
.map(new io.reactivex.functions.Function<SocketAddress, Collection<SocketAddress>>() {
102+
private final List<SocketAddress> addresses = new ArrayList<SocketAddress>();
103+
104+
@Override
105+
public Collection<SocketAddress> apply(SocketAddress socketAddress) {
106+
System.out.println("Adding server " + socketAddress);
107+
addresses.add(socketAddress);
108+
if (addresses.size() > 15) {
109+
SocketAddress address = addresses.remove(0);
110+
System.out.println("Removed server " + address);
111+
}
112+
return addresses;
113+
}
114+
});
115+
}
116+
117+
public void startTest(Function<ReactiveSocket, Publisher<?>> testFunction) {
118+
if (clientSocket == null) {
119+
System.err.println("Client not connected. Call startClient() first.");
120+
System.exit(-1);
121+
}
122+
testStartTime = System.nanoTime();
123+
while (System.nanoTime() - testStartTime < config.getTestDuration().toNanos()) {
124+
if (outstandings.get() <= config.getMaxConcurrency()) {
125+
AtomicLong startTime = new AtomicLong();
126+
Flowable.defer(() -> testFunction.apply(clientSocket))
127+
.doOnSubscribe(subscription -> {
128+
startTime.set(System.nanoTime());
129+
outstandings.incrementAndGet();
130+
})
131+
.doAfterTerminate(() -> {
132+
long elapsed = (System.nanoTime() - startTime.get()) / 1000;
133+
histogram.recordValue(elapsed);
134+
outstandings.decrementAndGet();
135+
})
136+
.doOnComplete(() -> {
137+
successes.incrementAndGet();
138+
})
139+
.onErrorResumeNext(e -> {
140+
failures.incrementAndGet();
141+
if (e instanceof RejectedException) {
142+
leaseExhausted.incrementAndGet();
143+
} else if (e instanceof TimeoutException) {
144+
timeouts.incrementAndGet();
145+
}
146+
if (failures.get() % 1000 == 0) {
147+
e.printStackTrace();
148+
}
149+
return Flowable.empty();
150+
})
151+
.subscribe();
152+
} else {
153+
try {
154+
Thread.sleep(1);
155+
} catch (InterruptedException e) {
156+
System.out.println("Interrupted while waiting for lowering concurrency.");
157+
Thread.currentThread().interrupt();
158+
}
159+
}
160+
}
161+
System.out.println("Stress test finished. Duration (minutes): "
162+
+ Duration.ofNanos(System.nanoTime() - testStartTime).toMinutes());
163+
printTestStats(true);
164+
Flowable.fromPublisher(clientSocket.close()).ignoreElements().blockingGet();
165+
166+
if (null != printDisposable) {
167+
printDisposable.dispose();
168+
}
169+
}
170+
171+
private SocketAddress startServer() {
172+
return ReactiveSocketServer.create(TcpTransportServer.create())
173+
.start((setup, sendingSocket) -> {
174+
return config.nextServerHandler(serverCount.incrementAndGet());
175+
})
176+
.getServerAddress();
177+
}
178+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.examples.transport.tcp.stress;
15+
16+
import io.reactivesocket.util.PayloadImpl;
17+
18+
import java.time.Duration;
19+
import java.util.function.IntSupplier;
20+
21+
public final class StressTestDriver {
22+
23+
public static void main(String... args) throws Exception {
24+
Duration testDuration = Duration.ofMinutes(1);
25+
int maxConcurrency = 100;
26+
boolean enableLease = true;
27+
IntSupplier leaseSupplier = () -> 100_000;
28+
int leaseTtlMillis = 30_000;
29+
30+
TestConfig config;
31+
if (enableLease) {
32+
config = new TestConfig(testDuration, maxConcurrency, leaseSupplier, leaseTtlMillis);
33+
} else {
34+
config = new TestConfig(testDuration, maxConcurrency, enableLease);
35+
}
36+
37+
StressTest test = new StressTest(config);
38+
39+
test.printStatsEvery(Duration.ofSeconds(5))
40+
.startClient()
41+
.startTest(reactiveSocket -> reactiveSocket.requestResponse(new PayloadImpl("Hello", "META")));
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.examples.transport.tcp.stress;
15+
16+
import io.reactivesocket.AbstractReactiveSocket;
17+
import io.reactivesocket.Payload;
18+
import io.reactivesocket.ReactiveSocket;
19+
import io.reactivesocket.util.PayloadImpl;
20+
import io.reactivex.Flowable;
21+
import org.reactivestreams.Publisher;
22+
23+
import java.util.concurrent.Callable;
24+
import java.util.concurrent.ThreadLocalRandom;
25+
26+
class StressTestHandler extends AbstractReactiveSocket {
27+
28+
private final Callable<Result> failureSelector;
29+
30+
private StressTestHandler(Callable<Result> failureSelector) {
31+
this.failureSelector = failureSelector;
32+
}
33+
34+
@Override
35+
public Publisher<Payload> requestResponse(Payload payload) {
36+
return Flowable.defer(() -> {
37+
Result result = failureSelector.call();
38+
switch (result) {
39+
case Fail:
40+
return Flowable.error(new Exception("SERVER EXCEPTION"));
41+
case DontReply:
42+
return Flowable.never(); // Cause timeout
43+
default:
44+
return Flowable.just(new PayloadImpl("Response"));
45+
}
46+
});
47+
}
48+
49+
public static ReactiveSocket alwaysPass() {
50+
return new StressTestHandler(() -> Result.Pass);
51+
}
52+
53+
public static ReactiveSocket randomFailuresAndDelays() {
54+
return new StressTestHandler(() -> {
55+
if (ThreadLocalRandom.current().nextInt(2) == 0) {
56+
return Result.Fail;
57+
}
58+
return Result.DontReply;
59+
});
60+
}
61+
62+
public enum Result {
63+
Fail,
64+
DontReply,
65+
Pass
66+
}
67+
}

0 commit comments

Comments
 (0)