33
33
import java .time .Duration ;
34
34
import java .util .Collections ;
35
35
import java .util .List ;
36
- import java .util .concurrent .ExecutorService ;
37
36
import java .util .concurrent .ThreadFactory ;
38
37
import java .util .concurrent .atomic .AtomicBoolean ;
39
38
import java .util .concurrent .atomic .AtomicInteger ;
40
39
import java .util .concurrent .atomic .AtomicReference ;
41
40
import java .util .function .Consumer ;
41
+ import java .util .function .Function ;
42
+ import java .util .function .UnaryOperator ;
42
43
import java .util .stream .IntStream ;
44
+ import java .util .stream .Stream ;
43
45
import org .junit .jupiter .api .*;
44
46
import org .slf4j .Logger ;
45
47
import org .slf4j .LoggerFactory ;
@@ -54,7 +56,6 @@ public class RecoveryClusterTest {
54
56
new String [] {"amqp://localhost:5672" , "amqp://localhost:5673" , "amqp://localhost:5674" };
55
57
static final BackOffDelayPolicy BACK_OFF_DELAY_POLICY = BackOffDelayPolicy .fixed (ofSeconds (3 ));
56
58
static List <String > nodes ;
57
- ExecutorService executorService ;
58
59
Environment environment ;
59
60
AmqpConnection connection ;
60
61
Management management ;
@@ -78,48 +79,105 @@ void init(TestInfo info) {
78
79
@ AfterEach
79
80
void tearDown () {
80
81
environment .close ();
81
- if (executorService != null ) {
82
- executorService .shutdownNow ();
82
+ }
83
+
84
+ private static class QueueConfiguration {
85
+
86
+ private final String name ;
87
+ private final Management .QueueType type ;
88
+ private final boolean exclusive ;
89
+ private final UnaryOperator <Management .QueueSpecification > configurationCallback ;
90
+
91
+ private QueueConfiguration (
92
+ String name ,
93
+ Management .QueueType type ,
94
+ boolean exclusive ,
95
+ UnaryOperator <Management .QueueSpecification > configurationCallback ) {
96
+ this .name = name ;
97
+ this .type = type ;
98
+ this .exclusive = exclusive ;
99
+ this .configurationCallback = configurationCallback ;
83
100
}
84
101
}
85
102
86
103
@ Test
87
104
void clusterRestart () {
88
- int qqCount = 20 ;
89
- List <String > qqNames =
90
- IntStream .range (0 , qqCount ).mapToObj (ignored -> name ("qq-" )).collect (toList ());
105
+ int queueCount = 10 ;
106
+ List <Management .QueueType > queueTypes =
107
+ List .of (
108
+ Management .QueueType .QUORUM ,
109
+ Management .QueueType .CLASSIC ,
110
+ Management .QueueType .CLASSIC );
111
+ AtomicInteger classicQueueCount = new AtomicInteger (0 );
112
+ List <QueueConfiguration > queueConfigurations =
113
+ queueTypes .stream ()
114
+ .flatMap (
115
+ (Function <Management .QueueType , Stream <QueueConfiguration >>)
116
+ type ->
117
+ IntStream .range (0 , queueCount )
118
+ .mapToObj (
119
+ ignored -> {
120
+ boolean exclusive =
121
+ type == Management .QueueType .CLASSIC
122
+ && classicQueueCount .incrementAndGet () > queueCount ;
123
+ String prefix =
124
+ type .name ().toLowerCase () + (exclusive ? "-ex-" : "-" );
125
+ String n = name (prefix );
126
+ UnaryOperator <Management .QueueSpecification > c =
127
+ s -> s .type (type ).exclusive (exclusive );
128
+ return new QueueConfiguration (n , type , exclusive , c );
129
+ }))
130
+ .collect (toList ());
131
+ List <String > queueNames = queueConfigurations .stream ().map (c -> c .name ).collect (toList ());
91
132
List <PublisherState > publisherStates = Collections .emptyList ();
92
133
List <ConsumerState > consumerStates = Collections .emptyList ();
93
134
try {
94
- qqNames .forEach (n -> management .queue (n ).type (Management .QueueType .QUORUM ).declare ());
135
+ queueConfigurations .stream ()
136
+ .filter (c -> !c .exclusive )
137
+ .forEach (c -> c .configurationCallback .apply (management .queue (c .name )).declare ());
95
138
AtomicInteger counter = new AtomicInteger (0 );
96
139
consumerStates =
97
- qqNames .stream ()
140
+ queueConfigurations .stream ()
98
141
.map (
99
- n ->
100
- new ConsumerState (
101
- n ,
142
+ conf -> {
143
+ AmqpConnection c ;
144
+ String cName = "consumer-" + counter .getAndIncrement ();
145
+ if (conf .exclusive ) {
146
+ c = connection (b -> b .name (cName ));
147
+ conf .configurationCallback .apply (c .management ().queue (conf .name )).declare ();
148
+ c .management ()
149
+ .binding ()
150
+ .sourceExchange ("amq.direct" )
151
+ .key (conf .name )
152
+ .destinationQueue (conf .name )
153
+ .bind ();
154
+ } else {
155
+ c =
102
156
connection (
103
157
b ->
104
- b .name ("consumer-" + counter . getAndIncrement () )
158
+ b .name (cName )
105
159
.affinity ()
106
- .queue (n )
160
+ .queue (conf . name )
107
161
.operation (CONSUME )
108
- .connection ())))
162
+ .connection ());
163
+ }
164
+ return new ConsumerState (conf .name , c );
165
+ })
109
166
.collect (toList ());
110
167
111
168
counter .set (0 );
112
169
publisherStates =
113
- qqNames .stream ()
170
+ queueConfigurations .stream ()
114
171
.map (
115
- n ->
172
+ c ->
116
173
new PublisherState (
117
- n ,
174
+ c .name ,
175
+ c .exclusive ,
118
176
connection (
119
177
b ->
120
178
b .name ("publisher-" + counter .getAndIncrement ())
121
179
.affinity ()
122
- .queue (n )
180
+ .queue (c . name )
123
181
.operation (PUBLISH )
124
182
.connection ())))
125
183
.collect (toList ());
@@ -146,17 +204,25 @@ void clusterRestart() {
146
204
() -> format ("Test connection state is %s, expecting %s" , connection .state (), OPEN ));
147
205
LOGGER .info ("Test connection has recovered" );
148
206
149
- qqNames .forEach (
207
+ queueNames .forEach (
150
208
n -> {
151
209
LOGGER .info ("Getting info for queue {}" , n );
152
210
waitAtMostNoException (TIMEOUT , () -> management .queueInfo (n ));
153
211
});
154
212
LOGGER .info ("Retrieved info for each queue." );
155
- qqNames .forEach (
156
- n ->
157
- assertThat (management .queueInfo (n ).replicas ())
213
+ queueConfigurations .forEach (
214
+ c -> {
215
+ if (c .type == Management .QueueType .QUORUM || c .type == Management .QueueType .STREAM ) {
216
+ assertThat (management .queueInfo (c .name ).replicas ())
158
217
.hasSameSizeAs (nodes )
159
- .containsExactlyInAnyOrderElementsOf (nodes ));
218
+ .containsExactlyInAnyOrderElementsOf (nodes );
219
+ } else {
220
+ assertThat (management .queueInfo (c .name ).replicas ())
221
+ .hasSize (1 )
222
+ .containsAnyElementsOf (nodes );
223
+ }
224
+ });
225
+
160
226
LOGGER .info ("Checked replica info for each queue." );
161
227
162
228
syncs = publisherStates .stream ().map (s -> s .waitForNewMessages (10 )).collect (toList ());
@@ -171,7 +237,7 @@ void clusterRestart() {
171
237
assertThat (consumerStates ).allMatch (s -> s .state () == OPEN );
172
238
173
239
System .out .println ("Queues:" );
174
- qqNames .forEach (
240
+ queueNames .forEach (
175
241
q -> {
176
242
Management .QueueInfo queueInfo = management .queueInfo (q );
177
243
System .out .printf (
@@ -200,7 +266,9 @@ void clusterRestart() {
200
266
} finally {
201
267
publisherStates .forEach (PublisherState ::close );
202
268
consumerStates .forEach (ConsumerState ::close );
203
- qqNames .forEach (n -> management .queueDeletion ().delete (n ));
269
+ queueConfigurations .stream ()
270
+ .filter (c -> !c .exclusive )
271
+ .forEach (c -> management .queueDeletion ().delete (c .name ));
204
272
}
205
273
}
206
274
@@ -225,16 +293,13 @@ private static class PublisherState implements AutoCloseable {
225
293
final RateLimiter limiter = RateLimiter .create (10 );
226
294
final AtomicReference <Runnable > postAccepted = new AtomicReference <>(() -> {});
227
295
228
- private PublisherState (String queue , AmqpConnection connection ) {
296
+ private PublisherState (String queue , boolean exclusive , AmqpConnection connection ) {
229
297
this .queue = queue ;
230
298
this .connection = connection ;
231
- this .publisher =
232
- (AmqpPublisher )
233
- connection
234
- .publisherBuilder ()
235
- .queue (queue )
236
- .listeners (context -> state .set (context .currentState ()))
237
- .build ();
299
+ PublisherBuilder builder =
300
+ connection .publisherBuilder ().listeners (context -> state .set (context .currentState ()));
301
+ builder = exclusive ? builder .exchange ("amq.direct" ).key (queue ) : builder .queue (queue );
302
+ this .publisher = (AmqpPublisher ) builder .build ();
238
303
}
239
304
240
305
void start () {
0 commit comments