Skip to content

Commit 56e357d

Browse files
committed
change the from specification
to queue for consistency Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent ac4c7ef commit 56e357d

File tree

3 files changed

+220
-130
lines changed

3 files changed

+220
-130
lines changed

src/main/java/com/rabbitmq/client/amqp/Management.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ interface StreamSpecification {
126126

127127
StreamSpecification initialClusterSize(int initialClusterSize);
128128

129-
QueueSpecification specification();
129+
QueueSpecification queue();
130130
}
131131

132132
enum QuorumQueueDeadLetterStrategy {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpQueueSpecification.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ public Management.StreamSpecification initialClusterSize(int initialClusterSize)
356356
}
357357

358358
@Override
359-
public Management.QueueSpecification specification() {
359+
public Management.QueueSpecification queue() {
360360
return this.parent;
361361
}
362362
}

src/test/java/com/rabbitmq/client/amqp/perf/AmqpPerfTest.java

Lines changed: 218 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package com.rabbitmq.client.amqp.perf;
1919

2020
import static com.rabbitmq.client.amqp.Management.ExchangeType.DIRECT;
21-
import static com.rabbitmq.client.amqp.Management.QueueType.QUORUM;
21+
import static com.rabbitmq.client.amqp.Management.QueueType.*;
2222
import static com.rabbitmq.client.amqp.impl.TestUtils.environmentBuilder;
2323

2424
import com.rabbitmq.client.amqp.*;
@@ -28,146 +28,236 @@
2828
import com.sun.net.httpserver.HttpServer;
2929
import io.micrometer.prometheusmetrics.PrometheusConfig;
3030
import io.micrometer.prometheusmetrics.PrometheusMeterRegistry;
31+
import org.jboss.forge.roaster._shade.org.apache.felix.resolver.util.ArrayMap;
32+
3133
import java.io.IOException;
3234
import java.io.OutputStream;
3335
import java.io.PrintWriter;
3436
import java.net.InetSocketAddress;
3537
import java.nio.charset.StandardCharsets;
38+
import java.util.*;
3639
import java.util.concurrent.*;
3740
import java.util.concurrent.atomic.AtomicBoolean;
41+
import java.util.concurrent.atomic.AtomicInteger;
3842

3943
public class AmqpPerfTest {
4044

41-
/*
42-
./mvnw -q clean test-compile exec:java \
43-
-Dexec.mainClass=com.rabbitmq.client.amqp.perf.AmqpPerfTest \
44-
-Dexec.classpathScope="test"
45-
*/
46-
public static void main(String[] args) throws IOException {
47-
PrometheusMeterRegistry registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
48-
MetricsCollector collector = new MicrometerMetricsCollector(registry);
49-
50-
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
51-
52-
PrintWriter out = new PrintWriter(System.out, true);
53-
PerformanceMetrics metrics = new PerformanceMetrics(registry, executorService, out);
54-
55-
String e = TestUtils.name(AmqpPerfTest.class, "main");
56-
String q = TestUtils.name(AmqpPerfTest.class, "main");
57-
String rk = "foo";
58-
Environment environment = environmentBuilder().metricsCollector(collector).build();
59-
Connection connection = environment.connectionBuilder().build();
60-
Management management = connection.management();
61-
62-
int monitoringPort = 8080;
63-
HttpServer monitoringServer = startMonitoringServer(monitoringPort, registry);
64-
65-
CountDownLatch shutdownLatch = new CountDownLatch(1);
66-
67-
AtomicBoolean hasShutDown = new AtomicBoolean(false);
68-
Runnable shutdownSequence =
69-
() -> {
70-
if (hasShutDown.compareAndSet(false, true)) {
71-
monitoringServer.stop(0);
72-
metrics.close();
73-
executorService.shutdownNow();
74-
shutdownLatch.countDown();
75-
management.queueDeletion().delete(q);
76-
management.exchangeDeletion().delete(e);
77-
management.close();
78-
}
79-
};
80-
81-
Runtime.getRuntime().addShutdownHook(new Thread(shutdownSequence::run));
82-
try {
83-
management.exchange().name(e).type(DIRECT).declare();
84-
management.queue().name(q).type(QUORUM).declare();
85-
management.binding().sourceExchange(e).destinationQueue(q).key(rk).bind();
86-
87-
connection
88-
.consumerBuilder()
89-
.queue(q)
90-
.initialCredits(1000)
91-
.messageHandler(
92-
(context, message) -> {
93-
context.accept();
94-
try {
95-
long time = readLong(message.body());
96-
metrics.latency(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS);
97-
} catch (Exception ex) {
98-
// not able to read the body, maybe not a message from the
99-
// tool
100-
}
101-
})
102-
.build();
103-
104-
executorService.submit(
105-
() -> {
106-
Publisher publisher = connection.publisherBuilder().exchange(e).key(rk).build();
107-
Publisher.Callback callback =
108-
context -> {
109-
try {
110-
long time = readLong(context.message().body());
111-
metrics.publishedAcceptedlatency(
112-
System.currentTimeMillis() - time, TimeUnit.MILLISECONDS);
113-
} catch (Exception ex) {
114-
// not able to read the body, should not happen
115-
}
45+
/*
46+
./mvnw -q clean test-compile exec:java \
47+
-Dexec.mainClass=com.rabbitmq.client.amqp.perf.AmqpPerfTest \
48+
-Dexec.classpathScope="test"
49+
*/
50+
public static void main(String[] args) throws IOException {
51+
PrometheusMeterRegistry registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
52+
MetricsCollector collector = new MicrometerMetricsCollector(registry);
53+
54+
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(10);
55+
56+
PrintWriter out = new PrintWriter(System.out, true);
57+
PerformanceMetrics metrics = new PerformanceMetrics(registry, executorService, out);
58+
59+
String e = TestUtils.name(AmqpPerfTest.class, "main");
60+
String q = TestUtils.name(AmqpPerfTest.class, "main");
61+
String rk = "foo";
62+
Environment environment = environmentBuilder().metricsCollector(collector).build();
63+
Connection connection = environment.
64+
connectionBuilder().
65+
listeners(context -> {
66+
context.previousState();
67+
context.currentState();
68+
context.failureCause();
69+
context.resource();
70+
}).
71+
recovery().
72+
activated(true).connectionBuilder().build();
73+
74+
Management management = connection.management();
75+
int monitoringPort = 8080;
76+
HttpServer monitoringServer = startMonitoringServer(monitoringPort, registry);
77+
78+
CountDownLatch shutdownLatch = new CountDownLatch(1);
79+
80+
AtomicBoolean hasShutDown = new AtomicBoolean(false);
81+
Runnable shutdownSequence =
82+
() -> {
83+
if (hasShutDown.compareAndSet(false, true)) {
84+
monitoringServer.stop(0);
85+
metrics.close();
86+
executorService.shutdownNow();
87+
shutdownLatch.countDown();
88+
management.queueDeletion().delete(q);
89+
management.exchangeDeletion().delete(e);
90+
management.close();
91+
}
11692
};
117-
int msgSize = 10;
118-
while (!Thread.currentThread().isInterrupted()) {
119-
long creationTime = System.currentTimeMillis();
120-
byte[] payload = new byte[msgSize];
121-
writeLong(payload, creationTime);
122-
Message message = publisher.message(payload);
123-
publisher.publish(message, callback);
93+
94+
Runtime.getRuntime().addShutdownHook(new Thread(shutdownSequence::run));
95+
// management.queue().name("stream").type(STREAM).deadLetterExchange("aaaa").declare();
96+
try {
97+
// management.queueDeletion().delete("my-first-queue-j");
98+
// management.queue().name("my-first-queue-j").type(QUORUM).declare();
99+
//
100+
// Publisher publisher1 = connection.publisherBuilder().queue("my-first-queue-j").build();
101+
//
102+
// long startTime = System.currentTimeMillis();
103+
// AtomicInteger confirmed = new AtomicInteger(0);
104+
// int total = 5_000_000;
105+
// for (int i = 0; i < total ; i++) {
106+
//
107+
// byte[] payload1 = new byte[10];
108+
// Message message1 = publisher1.message(payload1);
109+
// publisher1.publish(message1, context -> {
110+
// if (confirmed.incrementAndGet() % 200_000 == 0) {
111+
// long stopTime = System.currentTimeMillis();
112+
// long elapsedTime = (stopTime - startTime)/ 1000;
113+
// System.out.println("confirmed time:" + elapsedTime + " confirmed: " + confirmed.get() + " total: " + total);
114+
// }
115+
// });
116+
// }
117+
118+
// long stopTime = System.currentTimeMillis();
119+
// long elapsedTime = (stopTime - startTime)/ 1000;
120+
// System.out.println("sent time: " + elapsedTime);
121+
122+
// management.queueDeletion().delete("alone");
123+
// publisher1.close();
124+
// Thread.sleep(300000000);
125+
126+
// try {
127+
128+
// try {
129+
// management.queue().name("e1").type(QUORUM).declare();
130+
// management.queue().name("e1").type(STREAM).declare();
131+
// } catch (Exception e1) {
132+
// e1.printStackTrace();
133+
// }
134+
135+
136+
try {
137+
138+
management.queue().name("stream").type(STREAM).deadLetterExchange("aaaa").declare();
139+
} catch (Exception e1) {
140+
e1.printStackTrace();
124141
}
125-
});
126-
out.println("Prometheus endpoint started on http://localhost:" + monitoringPort + "/metrics");
127-
metrics.start();
128-
shutdownLatch.await();
129-
} catch (InterruptedException ex) {
130-
Thread.currentThread().interrupt();
131-
} finally {
132-
shutdownSequence.run();
142+
143+
management.queue().name("stream-1").type(STREAM).declare();
144+
145+
146+
management.queue().name("aaaaa").type(QUORUM).declare();
147+
List list = new ArrayList<>();
148+
list.add(1);
149+
list.add(2);
150+
list.add("aaaa");
151+
list.add(0.33);
152+
153+
Map s = new LinkedHashMap();
154+
s.put("v_8", "p_8");
155+
s.put("v_1", 1);
156+
s.put("list", list);
157+
158+
management.exchange().name("e").type(DIRECT).declare();
159+
management.binding().sourceExchange("e").destinationQueue("q").
160+
key("k").arguments(s).bind();
161+
162+
163+
management.unbind().sourceExchange("e")
164+
.destinationQueue("q").key("k").arguments(s).unbind();
165+
166+
try {
167+
management.queue().name("q_是英国数学家").type(CLASSIC).declare();
168+
} catch (Exception e1) {
169+
e1.printStackTrace();
170+
}
171+
172+
173+
management.exchange().name("是英国数学家").type(DIRECT).declare();
174+
management.queue().name(q).type(QUORUM).declare();
175+
management.binding().sourceExchange(e).destinationQueue(q).key(rk).bind();
176+
/// { $"v_8", $"p_8" }, { $"v_1", 1 }, { $"v_r", 1000L },
177+
connection
178+
.consumerBuilder()
179+
.queue(q)
180+
.initialCredits(1000)
181+
.messageHandler(
182+
(context, message) -> {
183+
context.accept();
184+
try {
185+
long time = readLong(message.body());
186+
metrics.latency(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS);
187+
} catch (Exception ex) {
188+
// not able to read the body, maybe not a message from the
189+
// tool
190+
}
191+
})
192+
.build();
193+
194+
executorService.submit(
195+
() -> {
196+
Publisher publisher = connection.publisherBuilder().exchange(e).key(rk).build();
197+
Publisher.Callback callback =
198+
context -> {
199+
try {
200+
long time = readLong(context.message().body());
201+
metrics.publishedAcceptedlatency(
202+
System.currentTimeMillis() - time, TimeUnit.MILLISECONDS);
203+
} catch (Exception ex) {
204+
// not able to read the body, should not happen
205+
}
206+
};
207+
int msgSize = 10;
208+
while (!Thread.currentThread().isInterrupted()) {
209+
long creationTime = System.currentTimeMillis();
210+
byte[] payload = new byte[msgSize];
211+
writeLong(payload, creationTime);
212+
Message message = publisher.message(payload);
213+
publisher.publish(message, callback);
214+
}
215+
});
216+
out.println("Prometheus endpoint started on http://localhost:" + monitoringPort + "/metrics");
217+
metrics.start();
218+
shutdownLatch.await();
219+
} catch (InterruptedException ex) {
220+
Thread.currentThread().interrupt();
221+
} finally {
222+
shutdownSequence.run();
223+
}
224+
}
225+
226+
static void writeLong(byte[] array, long value) {
227+
// from Guava Longs
228+
for (int i = 7; i >= 0; i--) {
229+
array[i] = (byte) (value & 0xffL);
230+
value >>= 8;
231+
}
232+
}
233+
234+
static long readLong(byte[] array) {
235+
// from Guava Longs
236+
return (array[0] & 0xFFL) << 56
237+
| (array[1] & 0xFFL) << 48
238+
| (array[2] & 0xFFL) << 40
239+
| (array[3] & 0xFFL) << 32
240+
| (array[4] & 0xFFL) << 24
241+
| (array[5] & 0xFFL) << 16
242+
| (array[6] & 0xFFL) << 8
243+
| (array[7] & 0xFFL);
133244
}
134-
}
135245

136-
static void writeLong(byte[] array, long value) {
137-
// from Guava Longs
138-
for (int i = 7; i >= 0; i--) {
139-
array[i] = (byte) (value & 0xffL);
140-
value >>= 8;
246+
private static HttpServer startMonitoringServer(
247+
int monitoringPort, PrometheusMeterRegistry registry) throws IOException {
248+
HttpServer server = HttpServer.create(new InetSocketAddress(monitoringPort), 0);
249+
250+
server.createContext(
251+
"/metrics",
252+
exchange -> {
253+
exchange.getResponseHeaders().set("Content-Type", "text/plain");
254+
byte[] content = registry.scrape().getBytes(StandardCharsets.UTF_8);
255+
exchange.sendResponseHeaders(200, content.length);
256+
try (OutputStream out = exchange.getResponseBody()) {
257+
out.write(content);
258+
}
259+
});
260+
server.start();
261+
return server;
141262
}
142-
}
143-
144-
static long readLong(byte[] array) {
145-
// from Guava Longs
146-
return (array[0] & 0xFFL) << 56
147-
| (array[1] & 0xFFL) << 48
148-
| (array[2] & 0xFFL) << 40
149-
| (array[3] & 0xFFL) << 32
150-
| (array[4] & 0xFFL) << 24
151-
| (array[5] & 0xFFL) << 16
152-
| (array[6] & 0xFFL) << 8
153-
| (array[7] & 0xFFL);
154-
}
155-
156-
private static HttpServer startMonitoringServer(
157-
int monitoringPort, PrometheusMeterRegistry registry) throws IOException {
158-
HttpServer server = HttpServer.create(new InetSocketAddress(monitoringPort), 0);
159-
160-
server.createContext(
161-
"/metrics",
162-
exchange -> {
163-
exchange.getResponseHeaders().set("Content-Type", "text/plain");
164-
byte[] content = registry.scrape().getBytes(StandardCharsets.UTF_8);
165-
exchange.sendResponseHeaders(200, content.length);
166-
try (OutputStream out = exchange.getResponseBody()) {
167-
out.write(content);
168-
}
169-
});
170-
server.start();
171-
return server;
172-
}
173263
}

0 commit comments

Comments
 (0)