38
38
39
39
public class AmqpPerfTest {
40
40
41
- /*
42
- ./mvnw -q clean test-compile exec:java \
43
- -Dexec.mainClass=com.rabbitmq.client.amqp.perf.AmqpPerfTest \
44
- -Dexec.classpathScope="test"
45
- */
46
- public static void main (String [] args ) throws IOException {
47
- PrometheusMeterRegistry registry = new PrometheusMeterRegistry (PrometheusConfig .DEFAULT );
48
- MetricsCollector collector = new MicrometerMetricsCollector (registry );
49
-
50
- ScheduledExecutorService executorService = Executors .newScheduledThreadPool (10 );
51
-
52
- PrintWriter out = new PrintWriter (System .out , true );
53
- PerformanceMetrics metrics = new PerformanceMetrics (registry , executorService , out );
54
-
55
- String e = TestUtils .name (AmqpPerfTest .class , "main" );
56
- String q = TestUtils .name (AmqpPerfTest .class , "main" );
57
- String rk = "foo" ;
58
- Environment environment = environmentBuilder ().metricsCollector (collector ).build ();
59
- Connection connection = environment .connectionBuilder ().build ();
60
- Management management = connection .management ();
61
-
62
- int monitoringPort = 8080 ;
63
- HttpServer monitoringServer = startMonitoringServer (monitoringPort , registry );
64
-
65
- CountDownLatch shutdownLatch = new CountDownLatch (1 );
66
-
67
- AtomicBoolean hasShutDown = new AtomicBoolean (false );
68
- Runnable shutdownSequence =
69
- () -> {
70
- if (hasShutDown .compareAndSet (false , true )) {
71
- monitoringServer .stop (0 );
72
- metrics .close ();
73
- executorService .shutdownNow ();
74
- shutdownLatch .countDown ();
75
- management .queueDeletion ().delete (q );
76
- management .exchangeDeletion ().delete (e );
77
- management .close ();
78
- }
41
+ /*
42
+ ./mvnw -q clean test-compile exec:java \
43
+ -Dexec.mainClass=com.rabbitmq.client.amqp.perf.AmqpPerfTest \
44
+ -Dexec.classpathScope="test"
45
+ */
46
+ public static void main (String [] args ) throws IOException {
47
+ PrometheusMeterRegistry registry = new PrometheusMeterRegistry (PrometheusConfig .DEFAULT );
48
+ MetricsCollector collector = new MicrometerMetricsCollector (registry );
49
+
50
+ ScheduledExecutorService executorService = Executors .newScheduledThreadPool (10 );
51
+
52
+ PrintWriter out = new PrintWriter (System .out , true );
53
+ PerformanceMetrics metrics = new PerformanceMetrics (registry , executorService , out );
54
+
55
+ String e = TestUtils .name (AmqpPerfTest .class , "main" );
56
+ String q = TestUtils .name (AmqpPerfTest .class , "main" );
57
+ String rk = "foo" ;
58
+ Environment environment = environmentBuilder ().metricsCollector (collector ).build ();
59
+ Connection connection = environment .connectionBuilder ().build ();
60
+ Management management = connection .management ();
61
+
62
+ int monitoringPort = 8080 ;
63
+ HttpServer monitoringServer = startMonitoringServer (monitoringPort , registry );
64
+
65
+ CountDownLatch shutdownLatch = new CountDownLatch (1 );
66
+
67
+ AtomicBoolean hasShutDown = new AtomicBoolean (false );
68
+ Runnable shutdownSequence =
69
+ () -> {
70
+ if (hasShutDown .compareAndSet (false , true )) {
71
+ monitoringServer .stop (0 );
72
+ metrics .close ();
73
+ executorService .shutdownNow ();
74
+ shutdownLatch .countDown ();
75
+ management .queueDeletion ().delete (q );
76
+ management .exchangeDeletion ().delete (e );
77
+ management .close ();
78
+ }
79
+ };
80
+
81
+ Runtime .getRuntime ().addShutdownHook (new Thread (shutdownSequence ::run ));
82
+ try {
83
+ management .exchange ().name (e ).type (DIRECT ).declare ();
84
+ management .queue ().name (q ).type (QUORUM ).declare ();
85
+ management .binding ().sourceExchange (e ).destinationQueue (q ).key (rk ).bind ();
86
+
87
+ connection
88
+ .consumerBuilder ()
89
+ .queue (q )
90
+ .initialCredits (1000 )
91
+ .messageHandler (
92
+ (context , message ) -> {
93
+ context .accept ();
94
+ try {
95
+ long time = readLong (message .body ());
96
+ metrics .latency (System .currentTimeMillis () - time , TimeUnit .MILLISECONDS );
97
+ } catch (Exception ex ) {
98
+ // not able to read the body, maybe not a message from the
99
+ // tool
100
+ }
101
+ })
102
+ .build ();
103
+
104
+ executorService .submit (
105
+ () -> {
106
+ Publisher publisher = connection .publisherBuilder ().exchange (e ).key (rk ).build ();
107
+ Publisher .Callback callback =
108
+ context -> {
109
+ try {
110
+ long time = readLong (context .message ().body ());
111
+ metrics .publishedAcceptedlatency (
112
+ System .currentTimeMillis () - time , TimeUnit .MILLISECONDS );
113
+ } catch (Exception ex ) {
114
+ // not able to read the body, should not happen
115
+ }
79
116
};
80
-
81
- Runtime .getRuntime ().addShutdownHook (new Thread (shutdownSequence ::run ));
82
- try {
83
- management .exchange ().name (e ).type (DIRECT ).declare ();
84
- management .queue ().name (q ).type (QUORUM ).declare ();
85
- management .binding ().sourceExchange (e ).destinationQueue (q ).key (rk ).bind ();
86
-
87
- connection
88
- .consumerBuilder ()
89
- .queue (q )
90
- .initialCredits (1000 )
91
- .messageHandler (
92
- (context , message ) -> {
93
- context .accept ();
94
- try {
95
- long time = readLong (message .body ());
96
- metrics .latency (System .currentTimeMillis () - time , TimeUnit .MILLISECONDS );
97
- } catch (Exception ex ) {
98
- // not able to read the body, maybe not a message from the
99
- // tool
100
- }
101
- })
102
- .build ();
103
-
104
- executorService .submit (
105
- () -> {
106
- Publisher publisher = connection .publisherBuilder ().exchange (e ).key (rk ).build ();
107
- Publisher .Callback callback =
108
- context -> {
109
- try {
110
- long time = readLong (context .message ().body ());
111
- metrics .publishedAcceptedlatency (
112
- System .currentTimeMillis () - time , TimeUnit .MILLISECONDS );
113
- } catch (Exception ex ) {
114
- // not able to read the body, should not happen
115
- }
116
- };
117
- int msgSize = 10 ;
118
- while (!Thread .currentThread ().isInterrupted ()) {
119
- long creationTime = System .currentTimeMillis ();
120
- byte [] payload = new byte [msgSize ];
121
- writeLong (payload , creationTime );
122
- Message message = publisher .message (payload );
123
- publisher .publish (message , callback );
124
- }
125
- });
126
- out .println ("Prometheus endpoint started on http://localhost:" + monitoringPort + "/metrics" );
127
- metrics .start ();
128
- shutdownLatch .await ();
129
- } catch (InterruptedException ex ) {
130
- Thread .currentThread ().interrupt ();
131
- } finally {
132
- shutdownSequence .run ();
133
- }
134
- }
135
-
136
- static void writeLong (byte [] array , long value ) {
137
- // from Guava Longs
138
- for (int i = 7 ; i >= 0 ; i --) {
139
- array [i ] = (byte ) (value & 0xffL );
140
- value >>= 8 ;
141
- }
142
- }
143
-
144
- static long readLong (byte [] array ) {
145
- // from Guava Longs
146
- return (array [0 ] & 0xFFL ) << 56
147
- | (array [1 ] & 0xFFL ) << 48
148
- | (array [2 ] & 0xFFL ) << 40
149
- | (array [3 ] & 0xFFL ) << 32
150
- | (array [4 ] & 0xFFL ) << 24
151
- | (array [5 ] & 0xFFL ) << 16
152
- | (array [6 ] & 0xFFL ) << 8
153
- | (array [7 ] & 0xFFL );
117
+ int msgSize = 10 ;
118
+ while (!Thread .currentThread ().isInterrupted ()) {
119
+ long creationTime = System .currentTimeMillis ();
120
+ byte [] payload = new byte [msgSize ];
121
+ writeLong (payload , creationTime );
122
+ Message message = publisher .message (payload );
123
+ publisher .publish (message , callback );
124
+ }
125
+ });
126
+ out .println ("Prometheus endpoint started on http://localhost:" + monitoringPort + "/metrics" );
127
+ metrics .start ();
128
+ shutdownLatch .await ();
129
+ } catch (InterruptedException ex ) {
130
+ Thread .currentThread ().interrupt ();
131
+ } finally {
132
+ shutdownSequence .run ();
154
133
}
134
+ }
155
135
156
- private static HttpServer startMonitoringServer (
157
- int monitoringPort , PrometheusMeterRegistry registry ) throws IOException {
158
- HttpServer server = HttpServer .create (new InetSocketAddress (monitoringPort ), 0 );
159
-
160
- server .createContext (
161
- "/metrics" ,
162
- exchange -> {
163
- exchange .getResponseHeaders ().set ("Content-Type" , "text/plain" );
164
- byte [] content = registry .scrape ().getBytes (StandardCharsets .UTF_8 );
165
- exchange .sendResponseHeaders (200 , content .length );
166
- try (OutputStream out = exchange .getResponseBody ()) {
167
- out .write (content );
168
- }
169
- });
170
- server .start ();
171
- return server ;
136
+ static void writeLong (byte [] array , long value ) {
137
+ // from Guava Longs
138
+ for (int i = 7 ; i >= 0 ; i --) {
139
+ array [i ] = (byte ) (value & 0xffL );
140
+ value >>= 8 ;
172
141
}
173
- }
142
+ }
143
+
144
+ static long readLong (byte [] array ) {
145
+ // from Guava Longs
146
+ return (array [0 ] & 0xFFL ) << 56
147
+ | (array [1 ] & 0xFFL ) << 48
148
+ | (array [2 ] & 0xFFL ) << 40
149
+ | (array [3 ] & 0xFFL ) << 32
150
+ | (array [4 ] & 0xFFL ) << 24
151
+ | (array [5 ] & 0xFFL ) << 16
152
+ | (array [6 ] & 0xFFL ) << 8
153
+ | (array [7 ] & 0xFFL );
154
+ }
155
+
156
+ private static HttpServer startMonitoringServer (
157
+ int monitoringPort , PrometheusMeterRegistry registry ) throws IOException {
158
+ HttpServer server = HttpServer .create (new InetSocketAddress (monitoringPort ), 0 );
159
+
160
+ server .createContext (
161
+ "/metrics" ,
162
+ exchange -> {
163
+ exchange .getResponseHeaders ().set ("Content-Type" , "text/plain" );
164
+ byte [] content = registry .scrape ().getBytes (StandardCharsets .UTF_8 );
165
+ exchange .sendResponseHeaders (200 , content .length );
166
+ try (OutputStream out = exchange .getResponseBody ()) {
167
+ out .write (content );
168
+ }
169
+ });
170
+ server .start ();
171
+ return server ;
172
+ }
173
+ }
0 commit comments