18
18
package com .rabbitmq .client .amqp .perf ;
19
19
20
20
import static com .rabbitmq .client .amqp .Management .ExchangeType .DIRECT ;
21
- import static com .rabbitmq .client .amqp .Management .QueueType .* ;
21
+ import static com .rabbitmq .client .amqp .Management .QueueType .QUORUM ;
22
22
import static com .rabbitmq .client .amqp .impl .TestUtils .environmentBuilder ;
23
23
24
24
import com .rabbitmq .client .amqp .*;
28
28
import com .sun .net .httpserver .HttpServer ;
29
29
import io .micrometer .prometheusmetrics .PrometheusConfig ;
30
30
import io .micrometer .prometheusmetrics .PrometheusMeterRegistry ;
31
- import org .jboss .forge .roaster ._shade .org .apache .felix .resolver .util .ArrayMap ;
32
-
33
31
import java .io .IOException ;
34
32
import java .io .OutputStream ;
35
33
import java .io .PrintWriter ;
36
34
import java .net .InetSocketAddress ;
37
35
import java .nio .charset .StandardCharsets ;
38
- import java .util .*;
39
36
import java .util .concurrent .*;
40
37
import java .util .concurrent .atomic .AtomicBoolean ;
41
- import java .util .concurrent .atomic .AtomicInteger ;
42
38
43
39
public class AmqpPerfTest {
44
40
@@ -60,18 +56,9 @@ public static void main(String[] args) throws IOException {
60
56
String q = TestUtils .name (AmqpPerfTest .class , "main" );
61
57
String rk = "foo" ;
62
58
Environment environment = environmentBuilder ().metricsCollector (collector ).build ();
63
- Connection connection = environment .
64
- connectionBuilder ().
65
- listeners (context -> {
66
- context .previousState ();
67
- context .currentState ();
68
- context .failureCause ();
69
- context .resource ();
70
- }).
71
- recovery ().
72
- activated (true ).connectionBuilder ().build ();
73
-
59
+ Connection connection = environment .connectionBuilder ().build ();
74
60
Management management = connection .management ();
61
+
75
62
int monitoringPort = 8080 ;
76
63
HttpServer monitoringServer = startMonitoringServer (monitoringPort , registry );
77
64
@@ -92,88 +79,11 @@ public static void main(String[] args) throws IOException {
92
79
};
93
80
94
81
Runtime .getRuntime ().addShutdownHook (new Thread (shutdownSequence ::run ));
95
- // management.queue().name("stream").type(STREAM).deadLetterExchange("aaaa").declare();
96
82
try {
97
- // management.queueDeletion().delete("my-first-queue-j");
98
- // management.queue().name("my-first-queue-j").type(QUORUM).declare();
99
- //
100
- // Publisher publisher1 = connection.publisherBuilder().queue("my-first-queue-j").build();
101
- //
102
- // long startTime = System.currentTimeMillis();
103
- // AtomicInteger confirmed = new AtomicInteger(0);
104
- // int total = 5_000_000;
105
- // for (int i = 0; i < total ; i++) {
106
- //
107
- // byte[] payload1 = new byte[10];
108
- // Message message1 = publisher1.message(payload1);
109
- // publisher1.publish(message1, context -> {
110
- // if (confirmed.incrementAndGet() % 200_000 == 0) {
111
- // long stopTime = System.currentTimeMillis();
112
- // long elapsedTime = (stopTime - startTime)/ 1000;
113
- // System.out.println("confirmed time:" + elapsedTime + " confirmed: " + confirmed.get() + " total: " + total);
114
- // }
115
- // });
116
- // }
117
-
118
- // long stopTime = System.currentTimeMillis();
119
- // long elapsedTime = (stopTime - startTime)/ 1000;
120
- // System.out.println("sent time: " + elapsedTime);
121
-
122
- // management.queueDeletion().delete("alone");
123
- // publisher1.close();
124
- // Thread.sleep(300000000);
125
-
126
- // try {
127
-
128
- // try {
129
- // management.queue().name("e1").type(QUORUM).declare();
130
- // management.queue().name("e1").type(STREAM).declare();
131
- // } catch (Exception e1) {
132
- // e1.printStackTrace();
133
- // }
134
-
135
-
136
- try {
137
-
138
- management .queue ().name ("stream" ).type (STREAM ).deadLetterExchange ("aaaa" ).declare ();
139
- } catch (Exception e1 ) {
140
- e1 .printStackTrace ();
141
- }
142
-
143
- management .queue ().name ("stream-1" ).type (STREAM ).declare ();
144
-
145
-
146
- management .queue ().name ("aaaaa" ).type (QUORUM ).declare ();
147
- List list = new ArrayList <>();
148
- list .add (1 );
149
- list .add (2 );
150
- list .add ("aaaa" );
151
- list .add (0.33 );
152
-
153
- Map s = new LinkedHashMap ();
154
- s .put ("v_8" , "p_8" );
155
- s .put ("v_1" , 1 );
156
- s .put ("list" , list );
157
-
158
- management .exchange ().name ("e" ).type (DIRECT ).declare ();
159
- management .binding ().sourceExchange ("e" ).destinationQueue ("q" ).
160
- key ("k" ).arguments (s ).bind ();
161
-
162
-
163
- management .unbind ().sourceExchange ("e" )
164
- .destinationQueue ("q" ).key ("k" ).arguments (s ).unbind ();
165
-
166
- try {
167
- management .queue ().name ("q_是英国数学家" ).type (CLASSIC ).declare ();
168
- } catch (Exception e1 ) {
169
- e1 .printStackTrace ();
170
- }
171
-
172
-
173
- management .exchange ().name ("是英国数学家" ).type (DIRECT ).declare ();
83
+ management .exchange ().name (e ).type (DIRECT ).declare ();
174
84
management .queue ().name (q ).type (QUORUM ).declare ();
175
85
management .binding ().sourceExchange (e ).destinationQueue (q ).key (rk ).bind ();
176
- /// { $"v_8", $"p_8" }, { $"v_1", 1 }, { $"v_r", 1000L },
86
+
177
87
connection
178
88
.consumerBuilder ()
179
89
.queue (q )
@@ -260,4 +170,4 @@ private static HttpServer startMonitoringServer(
260
170
server .start ();
261
171
return server ;
262
172
}
263
- }
173
+ }
0 commit comments