Skip to content

Commit 3ac1f7e

Browse files
committed
Add Prometheus support in performance tool
1 parent f7e8435 commit 3ac1f7e

File tree

4 files changed

+383
-185
lines changed

4 files changed

+383
-185
lines changed

pom.xml

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
<assertj.version>3.25.3</assertj.version>
5050
<mockito.version>5.12.0</mockito.version>
5151
<amqp-client.version>5.20.0</amqp-client.version>
52-
<metrics.version>4.2.25</metrics.version>
5352
<micrometer.version>1.13.0</micrometer.version>
5453
<maven.compiler.plugin.version>3.13.0</maven.compiler.plugin.version>
5554
<maven-surefire-plugin.version>3.2.5</maven-surefire-plugin.version>
@@ -143,13 +142,6 @@
143142
<scope>test</scope>
144143
</dependency>
145144

146-
<dependency>
147-
<groupId>io.dropwizard.metrics</groupId>
148-
<artifactId>metrics-core</artifactId>
149-
<version>${metrics.version}</version>
150-
<scope>test</scope>
151-
</dependency>
152-
153145
<dependency>
154146
<groupId>eu.rekawek.toxiproxy</groupId>
155147
<artifactId>toxiproxy-java</artifactId>

src/test/java/com/rabbitmq/model/AmqpPerfTest.java

Lines changed: 0 additions & 177 deletions
This file was deleted.
Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.model.perf;
19+
20+
import static com.rabbitmq.model.Management.ExchangeType.DIRECT;
21+
import static com.rabbitmq.model.Management.QueueType.QUORUM;
22+
import static com.rabbitmq.model.amqp.TestUtils.environmentBuilder;
23+
24+
import com.rabbitmq.model.*;
25+
import com.rabbitmq.model.amqp.TestUtils;
26+
import com.rabbitmq.model.metrics.MetricsCollector;
27+
import com.rabbitmq.model.metrics.MicrometerMetricsCollector;
28+
import com.sun.net.httpserver.HttpServer;
29+
import io.micrometer.prometheusmetrics.PrometheusConfig;
30+
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
31+
import java.io.IOException;
32+
import java.io.OutputStream;
33+
import java.io.PrintWriter;
34+
import java.net.InetSocketAddress;
35+
import java.nio.charset.StandardCharsets;
36+
import java.util.concurrent.*;
37+
import java.util.concurrent.atomic.AtomicBoolean;
38+
39+
public class AmqpPerfTest {
40+
41+
/*
42+
./mvnw -q clean test-compile exec:java \
43+
-Dexec.mainClass=com.rabbitmq.model.perf.AmqpPerfTest \
44+
-Dexec.classpathScope="test"
45+
*/
46+
public static void main(String[] args) throws IOException {
47+
PrometheusMeterRegistry registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
48+
MetricsCollector collector = new MicrometerMetricsCollector(registry);
49+
50+
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
51+
52+
PrintWriter out = new PrintWriter(System.out, true);
53+
PerformanceMetrics metrics = new PerformanceMetrics(registry, executorService, out);
54+
55+
String e = TestUtils.name(AmqpPerfTest.class, "main");
56+
String q = TestUtils.name(AmqpPerfTest.class, "main");
57+
String rk = "foo";
58+
Environment environment = environmentBuilder().metricsCollector(collector).build();
59+
Connection connection = environment.connectionBuilder().build();
60+
Management management = connection.management();
61+
62+
int monitoringPort = 8080;
63+
HttpServer monitoringServer = startMonitoringServer(monitoringPort, registry);
64+
65+
CountDownLatch shutdownLatch = new CountDownLatch(1);
66+
67+
AtomicBoolean hasShutDown = new AtomicBoolean(false);
68+
Runnable shutdownSequence =
69+
() -> {
70+
if (hasShutDown.compareAndSet(false, true)) {
71+
monitoringServer.stop(0);
72+
metrics.close();
73+
executorService.shutdownNow();
74+
shutdownLatch.countDown();
75+
management.queueDeletion().delete(q);
76+
management.exchangeDeletion().delete(e);
77+
management.close();
78+
}
79+
};
80+
81+
Runtime.getRuntime().addShutdownHook(new Thread(shutdownSequence::run));
82+
try {
83+
management.exchange().name(e).type(DIRECT).declare();
84+
management.queue().name(q).type(QUORUM).declare();
85+
management.binding().sourceExchange(e).destinationQueue(q).key(rk).bind();
86+
87+
connection
88+
.consumerBuilder()
89+
.queue(q)
90+
.initialCredits(1000)
91+
.messageHandler(
92+
(context, message) -> {
93+
context.accept();
94+
try {
95+
long time = readLong(message.body());
96+
metrics.latency(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS);
97+
} catch (Exception ex) {
98+
// not able to read the body, maybe not a message from the
99+
// tool
100+
}
101+
})
102+
.build();
103+
104+
executorService.submit(
105+
() -> {
106+
Publisher publisher = connection.publisherBuilder().exchange(e).key(rk).build();
107+
Publisher.Callback callback =
108+
context -> {
109+
try {
110+
long time = readLong(context.message().body());
111+
metrics.publishedAcceptedlatency(
112+
System.currentTimeMillis() - time, TimeUnit.MILLISECONDS);
113+
} catch (Exception ex) {
114+
// not able to read the body, should not happen
115+
}
116+
};
117+
int msgSize = 10;
118+
while (!Thread.currentThread().isInterrupted()) {
119+
long creationTime = System.currentTimeMillis();
120+
byte[] payload = new byte[msgSize];
121+
writeLong(payload, creationTime);
122+
Message message = publisher.message(payload);
123+
publisher.publish(message, callback);
124+
}
125+
});
126+
out.println("Prometheus endpoint started on http://localhost:" + monitoringPort + "/metrics");
127+
metrics.start();
128+
shutdownLatch.await();
129+
} catch (InterruptedException ex) {
130+
Thread.currentThread().interrupt();
131+
} finally {
132+
shutdownSequence.run();
133+
}
134+
}
135+
136+
static void writeLong(byte[] array, long value) {
137+
// from Guava Longs
138+
for (int i = 7; i >= 0; i--) {
139+
array[i] = (byte) (value & 0xffL);
140+
value >>= 8;
141+
}
142+
}
143+
144+
static long readLong(byte[] array) {
145+
// from Guava Longs
146+
return (array[0] & 0xFFL) << 56
147+
| (array[1] & 0xFFL) << 48
148+
| (array[2] & 0xFFL) << 40
149+
| (array[3] & 0xFFL) << 32
150+
| (array[4] & 0xFFL) << 24
151+
| (array[5] & 0xFFL) << 16
152+
| (array[6] & 0xFFL) << 8
153+
| (array[7] & 0xFFL);
154+
}
155+
156+
private static HttpServer startMonitoringServer(
157+
int monitoringPort, PrometheusMeterRegistry registry) throws IOException {
158+
HttpServer server = HttpServer.create(new InetSocketAddress(monitoringPort), 0);
159+
160+
server.createContext(
161+
"/metrics",
162+
exchange -> {
163+
exchange.getResponseHeaders().set("Content-Type", "text/plain");
164+
byte[] content = registry.scrape().getBytes(StandardCharsets.UTF_8);
165+
exchange.sendResponseHeaders(200, content.length);
166+
try (OutputStream out = exchange.getResponseBody()) {
167+
out.write(content);
168+
}
169+
});
170+
server.start();
171+
return server;
172+
}
173+
}

0 commit comments

Comments
 (0)