@@ -84,7 +84,7 @@ void connectionsShouldBeMemberLocalReplicatedQueues(Management.QueueType type) {
84
84
}
85
85
86
86
@ Test
87
- void connectionShouldRecoverToNewQuorumQueueLeaderAfterAfterItHasMoved () {
87
+ void connectionShouldRecoverToNewQuorumQueueLeaderAfterItHasMoved () {
88
88
try {
89
89
management .queue (q ).type (Management .QueueType .QUORUM ).declare ();
90
90
Management .QueueInfo info = queueInfo ();
@@ -212,6 +212,65 @@ void consumeFromMovingQq() {
212
212
}
213
213
}
214
214
215
+ @ Test
216
+ void publishConsumeQqWhenLeaderChanges () {
217
+ try {
218
+ management .queue (q ).type (Management .QueueType .QUORUM ).declare ();
219
+
220
+ AmqpConnection consumeConnection = connection (b -> b .affinity ().queue (q ).operation (CONSUME ));
221
+ assertThat (consumeConnection ).isOnFollower (queueInfo ());
222
+
223
+ Set <Long > messageIds = ConcurrentHashMap .newKeySet ();
224
+ Sync consumeSync = sync ();
225
+ consumeConnection
226
+ .consumerBuilder ()
227
+ .queue (q )
228
+ .messageHandler (
229
+ (ctx , msg ) -> {
230
+ messageIds .add (msg .messageIdAsLong ());
231
+ consumeSync .down ();
232
+ ctx .accept ();
233
+ })
234
+ .build ();
235
+
236
+ AmqpConnection publishConnection = connection (b -> b .affinity ().queue (q ).operation (PUBLISH ));
237
+ Publisher publisher = publishConnection .publisherBuilder ().queue (q ).build ();
238
+ Sync publishSync = sync ();
239
+ publisher .publish (publisher .message ().messageId (1L ), ctx -> publishSync .down ());
240
+ assertThat (publishSync ).completes ();
241
+ publishSync .reset ();
242
+
243
+ assertThat (consumeSync ).completes ();
244
+ assertThat (messageIds ).containsExactlyInAnyOrder (1L );
245
+ consumeSync .reset ();
246
+
247
+ String initialLeader = queueInfo ().leader ();
248
+
249
+ deleteQqMember (initialLeader );
250
+ assertThat (queueInfo ()).doesNotHaveLeader (initialLeader );
251
+
252
+ publisher .publish (publisher .message ().messageId (2L ), ctx -> publishSync .down ());
253
+ assertThat (publishSync ).completes ();
254
+ publishSync .reset ();
255
+
256
+ assertThat (consumeSync ).completes ();
257
+ assertThat (messageIds ).containsExactlyInAnyOrder (1L , 2L );
258
+ consumeSync .reset ();
259
+
260
+ addQqMember (initialLeader );
261
+
262
+ publisher .publish (publisher .message ().messageId (3L ), ctx -> publishSync .down ());
263
+ assertThat (publishSync ).completes ();
264
+ publishSync .reset ();
265
+
266
+ assertThat (consumeSync ).completes ();
267
+ assertThat (messageIds ).containsExactlyInAnyOrder (1L , 2L , 3L );
268
+ consumeSync .reset ();
269
+ } finally {
270
+ management .queueDeletion ().delete (q );
271
+ }
272
+ }
273
+
215
274
@ Test
216
275
void publishToRestartedStream () {
217
276
try {
0 commit comments