Skip to content

Commit c9501a8

Browse files
committed
Make sure unsettled message goes back to queue on connection closing
1 parent 7891645 commit c9501a8

File tree

1 file changed

+57
-5
lines changed

1 file changed

+57
-5
lines changed

src/test/java/com/rabbitmq/client/amqp/impl/AmqpConsumerTest.java

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717
1818
package com.rabbitmq.client.amqp.impl;
1919

20+
import static com.rabbitmq.client.amqp.Management.QueueType.QUORUM;
2021
import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
21-
import static com.rabbitmq.client.amqp.Resource.State.OPEN;
22-
import static com.rabbitmq.client.amqp.Resource.State.RECOVERING;
22+
import static com.rabbitmq.client.amqp.Resource.State.*;
2323
import static com.rabbitmq.client.amqp.impl.Assertions.*;
2424
import static com.rabbitmq.client.amqp.impl.Cli.closeConnection;
25-
import static com.rabbitmq.client.amqp.impl.TestUtils.name;
26-
import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
25+
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
2726
import static org.assertj.core.api.Assertions.*;
2827

2928
import com.rabbitmq.client.amqp.*;
@@ -41,6 +40,7 @@
4140
@ExtendWith(AmqpTestInfrastructureExtension.class)
4241
public class AmqpConsumerTest {
4342

43+
// used by the test extension
4444
BackOffDelayPolicy backOffDelayPolicy = BackOffDelayPolicy.fixed(Duration.ofMillis(100));
4545
Environment environment;
4646
Connection connection;
@@ -50,7 +50,6 @@ public class AmqpConsumerTest {
5050
@BeforeEach
5151
void init(TestInfo info) {
5252
this.q = name(info);
53-
connection.management().queue(this.q).type(STREAM).declare();
5453
this.connectionName = ((AmqpConnection) connection).name();
5554
}
5655

@@ -61,6 +60,7 @@ void tearDown() {
6160

6261
@Test
6362
void subscriptionListenerShouldBeCalledOnRecovery() {
63+
connection.management().queue(this.q).type(STREAM).declare();
6464
Sync subscriptionSync = sync();
6565
Sync recoveredSync = sync();
6666
connection
@@ -81,6 +81,7 @@ void subscriptionListenerShouldBeCalledOnRecovery() {
8181

8282
@Test
8383
void streamConsumerRestartsWhereItLeftOff() {
84+
connection.management().queue(this.q).type(STREAM).declare();
8485
Connection publisherConnection = environment.connectionBuilder().build();
8586
Publisher publisher = publisherConnection.publisherBuilder().queue(this.q).build();
8687
int messageCount = 100;
@@ -159,6 +160,57 @@ void streamConsumerRestartsWhereItLeftOff() {
159160
assertThat(lastOffsetProcessed).hasValueGreaterThan(offsetAfterClosing);
160161
}
161162

163+
@Test
164+
void unsettledMessageShouldGoBackToQueueIfConnectionIsClosed(TestInfo testInfo) {
165+
String cName = name(testInfo);
166+
connection.management().queue(this.q).type(QUORUM).declare();
167+
Sync connectionRecoveredSync = sync();
168+
Connection c =
169+
((AmqpConnectionBuilder) environment.connectionBuilder())
170+
.name(cName)
171+
.recovery()
172+
.backOffDelayPolicy(backOffDelayPolicy)
173+
.connectionBuilder()
174+
.listeners(recoveredListener(connectionRecoveredSync))
175+
.build();
176+
Publisher publisher = c.publisherBuilder().queue(this.q).build();
177+
178+
Sync deliveredSync = sync(2);
179+
Sync consumerClosedSync = sync();
180+
AtomicInteger deliveryCount = new AtomicInteger();
181+
c.consumerBuilder()
182+
.listeners(
183+
ctx -> {
184+
if (ctx.currentState() == CLOSED) {
185+
consumerClosedSync.down();
186+
}
187+
})
188+
.queue(this.q)
189+
.messageHandler(
190+
(ctx, msg) -> {
191+
if (deliveryCount.incrementAndGet() == 1) {
192+
closeConnection(cName);
193+
}
194+
deliveredSync.down();
195+
})
196+
.build();
197+
198+
publisher.publish(publisher.message(), ctx -> {});
199+
200+
assertThat(deliveredSync).completes();
201+
assertThat(deliveryCount).hasValue(2);
202+
assertThat(connectionRecoveredSync).completes();
203+
assertThat(consumerClosedSync).hasNotCompleted();
204+
c.close();
205+
assertThat(consumerClosedSync).completes();
206+
207+
waitAtMost(
208+
() -> {
209+
Management.QueueInfo info = connection.management().queueInfo(this.q);
210+
return info.messageCount() == 1 && info.consumerCount() == 0;
211+
});
212+
}
213+
162214
private static Resource.StateListener recoveredListener(Sync sync) {
163215
return context -> {
164216
if (context.previousState() == RECOVERING && context.currentState() == OPEN) {

0 commit comments

Comments
 (0)