Skip to content

Commit b617bf2

Browse files
committed
Wait until all messages are settled on consumer closing
1 parent 859114d commit b617bf2

File tree

6 files changed

+203
-74
lines changed

6 files changed

+203
-74
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ private org.apache.qpid.protonj2.client.Connection connect(
204204
.client()
205205
.connect(
206206
this.connectionAddress.host(), this.connectionAddress.port(), connectionOptions);
207-
208207
ExceptionUtils.wrapGet(connection.openFuture());
209208
LOGGER.debug("Connection attempt succeeded");
210209
checkBrokerVersion(connection);

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 61 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.rabbitmq.client.amqp.AmqpException;
2323
import com.rabbitmq.client.amqp.Consumer;
2424
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
25+
import java.time.Duration;
2526
import java.util.concurrent.*;
2627
import java.util.concurrent.atomic.AtomicBoolean;
2728
import java.util.concurrent.atomic.AtomicLong;
@@ -52,10 +53,12 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
5253
private final Long id;
5354
private final String address;
5455
private final AmqpConnection connection;
55-
private final AtomicBoolean paused = new AtomicBoolean(false);
56+
private final AtomicReference<PauseStatus> pauseStatus =
57+
new AtomicReference<>(PauseStatus.UNPAUSED);
5658
private final AtomicReference<CountDownLatch> echoedFlowAfterPauseLatch = new AtomicReference<>();
5759
private final MetricsCollector metricsCollector;
5860
private final SessionHandler sessionHandler;
61+
private final AtomicLong unsettledCount = new AtomicLong(0);
5962
// native receiver internal state, accessed only in the native executor/scheduler
6063
private ProtonReceiver protonReceiver;
6164
private Scheduler protonExecutor;
@@ -115,6 +118,7 @@ private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHand
115118
while (!Thread.currentThread().isInterrupted()) {
116119
Delivery delivery = receiver.receive(100, TimeUnit.MILLISECONDS);
117120
if (delivery != null) {
121+
this.unsettledCount.incrementAndGet();
118122
this.metricsCollector.consume();
119123
AmqpMessage message = new AmqpMessage(delivery.message());
120124
AtomicBoolean disposed = new AtomicBoolean(false);
@@ -127,6 +131,7 @@ public void accept() {
127131
try {
128132
protonExecutor.execute(() -> replenishCreditIfNeeded());
129133
delivery.disposition(DeliveryState.accepted(), true);
134+
unsettledCount.decrementAndGet();
130135
metricsCollector.consumeDisposition(
131136
MetricsCollector.ConsumeDisposition.ACCEPTED);
132137
} catch (ClientIllegalStateException | ClientIOException e) {
@@ -143,6 +148,7 @@ public void discard() {
143148
try {
144149
protonExecutor.execute(() -> replenishCreditIfNeeded());
145150
delivery.disposition(DeliveryState.rejected("", ""), true);
151+
unsettledCount.decrementAndGet();
146152
metricsCollector.consumeDisposition(
147153
MetricsCollector.ConsumeDisposition.DISCARDED);
148154
} catch (ClientIllegalStateException | ClientIOException e) {
@@ -159,6 +165,7 @@ public void requeue() {
159165
try {
160166
protonExecutor.execute(() -> replenishCreditIfNeeded());
161167
delivery.disposition(DeliveryState.released(), true);
168+
unsettledCount.decrementAndGet();
162169
metricsCollector.consumeDisposition(
163170
MetricsCollector.ConsumeDisposition.REQUEUED);
164171
} catch (ClientIllegalStateException | ClientIOException e) {
@@ -196,16 +203,23 @@ private void startReceivingLoop() {
196203
void recoverAfterConnectionFailure() {
197204
this.nativeReceiver = createNativeReceiver(this.sessionHandler.sessionNoCheck(), this.address);
198205
this.initStateFromNativeReceiver(this.nativeReceiver);
199-
this.paused.set(false);
206+
this.pauseStatus.set(PauseStatus.UNPAUSED);
200207
startReceivingLoop();
201208
}
202209

203210
private void close(Throwable cause) {
204211
if (this.closed.compareAndSet(false, true)) {
205212
this.state(CLOSING, cause);
206213
if (cause == null) {
214+
LOGGER.debug("Pausing receiver link before detaching it");
207215
// normal closing, pausing message dispatching
208216
this.pause();
217+
LOGGER.debug("Receiver link paused. Unsettled message(s): {}", this.unsettledCount.get());
218+
LOGGER.debug("Waiting for unsettled messages to get settled if necessary");
219+
waitForUnsettledMessagesToSettle();
220+
if (this.unsettledCount.get() > 0) {
221+
LOGGER.debug("Closing receiver link with {} unsettled message(s)", this.unsettledCount);
222+
}
209223
}
210224
this.connection.removeConsumer(this);
211225
if (this.receiveLoop != null) {
@@ -222,6 +236,21 @@ private void close(Throwable cause) {
222236
}
223237
}
224238

239+
private void waitForUnsettledMessagesToSettle() {
240+
Duration timeout = Duration.ofSeconds(10);
241+
Duration waitTime = Duration.ofMillis(10);
242+
Duration waitedTime = Duration.ZERO;
243+
while (this.unsettledCount.get() > 0 && waitedTime.compareTo(timeout) <= 0) {
244+
try {
245+
Thread.sleep(waitTime.toMillis());
246+
waitedTime = waitedTime.plus(waitTime);
247+
} catch (InterruptedException e) {
248+
Thread.currentThread().interrupt();
249+
return;
250+
}
251+
}
252+
}
253+
225254
long id() {
226255
return this.id;
227256
}
@@ -265,7 +294,7 @@ private void initStateFromNativeReceiver(ClientReceiver receiver) {
265294
}
266295

267296
private void replenishCreditIfNeeded() {
268-
if (!this.paused()) {
297+
if (!this.pausedOrPausing()) {
269298
int creditWindow = this.initialCredits;
270299
int currentCredit = protonReceiver.getCredit();
271300
if (currentCredit <= creditWindow * 0.5) {
@@ -283,16 +312,24 @@ private void replenishCreditIfNeeded() {
283312
}
284313

285314
void pause() {
286-
if (this.paused.compareAndSet(false, true)) {
287-
CountDownLatch latch = new CountDownLatch(1);
288-
this.echoedFlowAfterPauseLatch.set(latch);
289-
this.protonExecutor.execute(this::doPause);
315+
if (this.pauseStatus.compareAndSet(PauseStatus.UNPAUSED, PauseStatus.PAUSING)) {
290316
try {
291-
if (!latch.await(10, TimeUnit.SECONDS)) {
292-
LOGGER.warn("Did not receive echoed flow to pause receiver");
317+
CountDownLatch latch = new CountDownLatch(1);
318+
this.echoedFlowAfterPauseLatch.set(latch);
319+
this.protonExecutor.execute(this::doPause);
320+
try {
321+
boolean echoed = latch.await(10, TimeUnit.SECONDS);
322+
if (echoed) {
323+
this.pauseStatus.set(PauseStatus.PAUSED);
324+
} else {
325+
LOGGER.warn("Did not receive echoed flow to pause receiver");
326+
this.pauseStatus.set(PauseStatus.UNPAUSED);
327+
}
328+
} catch (InterruptedException e) {
329+
Thread.currentThread().interrupt();
293330
}
294-
} catch (InterruptedException e) {
295-
Thread.currentThread().interrupt();
331+
} catch (Exception e) {
332+
this.pauseStatus.set(PauseStatus.UNPAUSED);
296333
}
297334
}
298335
}
@@ -305,7 +342,7 @@ private void doPause() {
305342

306343
void unpause() {
307344
checkOpen();
308-
if (this.paused.compareAndSet(true, false)) {
345+
if (this.pauseStatus.compareAndSet(PauseStatus.PAUSED, PauseStatus.UNPAUSED)) {
309346
try {
310347
this.nativeReceiver.addCredit(this.initialCredits);
311348
} catch (ClientException e) {
@@ -314,7 +351,17 @@ void unpause() {
314351
}
315352
}
316353

317-
private boolean paused() {
318-
return this.paused.get();
354+
boolean pausedOrPausing() {
355+
return this.pauseStatus.get() != PauseStatus.UNPAUSED;
356+
}
357+
358+
boolean paused() {
359+
return this.pauseStatus.get() == PauseStatus.PAUSED;
360+
}
361+
362+
enum PauseStatus {
363+
UNPAUSED,
364+
PAUSING,
365+
PAUSED
319366
}
320367
}

src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java

Lines changed: 90 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@
2020
import static com.rabbitmq.client.amqp.Management.ExchangeType.DIRECT;
2121
import static com.rabbitmq.client.amqp.Management.ExchangeType.FANOUT;
2222
import static com.rabbitmq.client.amqp.Management.QueueType.QUORUM;
23+
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
2324
import static com.rabbitmq.client.amqp.impl.TestUtils.CountDownLatchConditions.completed;
2425
import static com.rabbitmq.client.amqp.impl.TestUtils.assertThat;
25-
import static com.rabbitmq.client.amqp.impl.TestUtils.sync;
26-
import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost;
2726
import static java.nio.charset.StandardCharsets.*;
2827
import static java.util.Collections.emptyMap;
2928
import static java.util.Collections.singletonMap;
@@ -35,15 +34,17 @@
3534
import com.rabbitmq.client.amqp.*;
3635
import com.rabbitmq.client.amqp.impl.TestUtils.DisabledIfAddressV1Permitted;
3736
import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
37+
import java.time.Duration;
3838
import java.util.ArrayList;
3939
import java.util.Map;
4040
import java.util.Set;
4141
import java.util.UUID;
4242
import java.util.concurrent.ConcurrentHashMap;
4343
import java.util.concurrent.CountDownLatch;
44+
import java.util.concurrent.atomic.AtomicInteger;
4445
import java.util.concurrent.atomic.AtomicReference;
4546
import java.util.function.Consumer;
46-
import java.util.stream.IntStream;
47+
import java.util.function.Supplier;
4748
import org.assertj.core.api.Assertions;
4849
import org.junit.jupiter.api.*;
4950
import org.junit.jupiter.api.extension.ExtendWith;
@@ -233,40 +234,42 @@ void sameTypeMessagesInQueue() {
233234
}
234235

235236
@Test
236-
void pauseShouldStopMessageArrivalUnpauseShouldResumeIt() throws Exception {
237+
void pauseShouldStopMessageArrivalUnpauseShouldResumeIt() {
237238
String q = connection.management().queue().exclusive(true).declare().name();
238239
Publisher publisher = connection.publisherBuilder().queue(q).build();
239240
int messageCount = 100;
240241
CountDownLatch publishLatch = new CountDownLatch(messageCount);
241242
Publisher.Callback callback = ctx -> publishLatch.countDown();
242-
IntStream.range(0, messageCount)
243-
.forEach(ignored -> publisher.publish(publisher.message(), callback));
243+
range(0, messageCount).forEach(ignored -> publisher.publish(publisher.message(), callback));
244244

245245
assertThat(publishLatch).completes();
246246

247247
int initialCredits = 10;
248248
Set<com.rabbitmq.client.amqp.Consumer.Context> messageContexts = ConcurrentHashMap.newKeySet();
249-
com.rabbitmq.client.amqp.Consumer consumer =
250-
connection
251-
.consumerBuilder()
252-
.queue(q)
253-
.initialCredits(initialCredits)
254-
.messageHandler((ctx, msg) -> messageContexts.add(ctx))
255-
.build();
249+
AmqpConsumer consumer =
250+
(AmqpConsumer)
251+
connection
252+
.consumerBuilder()
253+
.queue(q)
254+
.initialCredits(initialCredits)
255+
.messageHandler((ctx, msg) -> messageContexts.add(ctx))
256+
.build();
256257

257258
waitAtMost(() -> messageContexts.size() == initialCredits);
258259

259260
assertThat(connection.management().queueInfo(q)).hasMessageCount(messageCount - initialCredits);
260261

261262
assertThat(Cli.queueInfo(q).unackedMessageCount()).isEqualTo(initialCredits);
262263

263-
((AmqpConsumer) consumer).pause();
264+
consumer.pause();
264265
new ArrayList<>(messageContexts).forEach(com.rabbitmq.client.amqp.Consumer.Context::accept);
265266

266267
waitAtMost(() -> Cli.queueInfo(q).unackedMessageCount() == 0);
267268
waitAtMost(() -> messageContexts.size() == initialCredits);
268-
((AmqpConsumer) consumer).unpause();
269+
consumer.unpause();
269270
waitAtMost(() -> messageContexts.size() == initialCredits * 2);
271+
consumer.pause();
272+
messageContexts.forEach(com.rabbitmq.client.amqp.Consumer.Context::accept);
270273
}
271274

272275
@Test
@@ -366,8 +369,7 @@ void publisherSendingShouldThrowWhenQueueHasBeenDeleted() throws Exception {
366369
}
367370

368371
@Test
369-
void publisherSendingShouldThrowWhenPublishingToNonExistingExchangeWithToProperty()
370-
throws Exception {
372+
void publisherSendingShouldThrowWhenPublishingToNonExistingExchangeWithToProperty() {
371373
String doesNotExist = uuid();
372374
Sync closedSync = sync();
373375
AtomicReference<Throwable> closedException = new AtomicReference<>();
@@ -444,6 +446,77 @@ void consumerShouldGetClosedWhenQueueIsDeleted() {
444446
.hasMessageContaining(ExceptionUtils.ERROR_RESOURCE_DELETED);
445447
}
446448

449+
@Test
450+
void consumerShouldNotCloseUntilAllMessagesAreSettled() throws Exception {
451+
connection.management().queue(name).exclusive(true).declare();
452+
int messageCount = 100;
453+
int messageToConsumeCount = messageCount / 2;
454+
int initialCredits = messageCount / 10;
455+
Publisher publisher = connection.publisherBuilder().queue(name).build();
456+
Sync publishSync = sync(messageCount);
457+
range(0, messageCount)
458+
.forEach(ignored -> publisher.publish(publisher.message(), ctx -> publishSync.down()));
459+
assertThat(publishSync).completes();
460+
461+
AtomicInteger receivedCount = new AtomicInteger(0);
462+
Set<com.rabbitmq.client.amqp.Consumer.Context> unsettledMessages =
463+
ConcurrentHashMap.newKeySet();
464+
AmqpConsumer consumer =
465+
(AmqpConsumer)
466+
connection
467+
.consumerBuilder()
468+
.queue(name)
469+
.initialCredits(messageCount / 10)
470+
.messageHandler(
471+
(ctx, msg) -> {
472+
if (receivedCount.incrementAndGet() <= initialCredits) {
473+
ctx.accept();
474+
} else {
475+
unsettledMessages.add(ctx);
476+
}
477+
})
478+
.build();
479+
480+
int unsettledCount = waitUntilStable(unsettledMessages::size);
481+
assertThat(unsettledCount).isNotZero();
482+
int receivedCountBeforeClosing = receivedCount.get();
483+
484+
Sync consumerClosedSync = sync();
485+
submitTask(
486+
() -> {
487+
consumer.close();
488+
consumerClosedSync.down();
489+
});
490+
waitAtMost(consumer::paused);
491+
submitTask(() -> unsettledMessages.forEach(com.rabbitmq.client.amqp.Consumer.Context::accept));
492+
assertThat(consumerClosedSync).completes();
493+
assertThat(receivedCount).hasValue(receivedCountBeforeClosing);
494+
assertThat(connection.management().queueInfo(name))
495+
.hasMessageCount(messageCount - receivedCount.get());
496+
}
497+
498+
static <T> T waitUntilStable(Supplier<T> call) {
499+
Duration timeout = Duration.ofSeconds(10);
500+
Duration waitTime = Duration.ofMillis(200);
501+
Duration waitedTime = Duration.ZERO;
502+
T newValue = null;
503+
while (waitedTime.compareTo(timeout) <= 0) {
504+
T previousValue = call.get();
505+
try {
506+
Thread.sleep(waitTime.toMillis());
507+
} catch (InterruptedException e) {
508+
Thread.currentThread().interrupt();
509+
throw new RuntimeException(e);
510+
}
511+
newValue = call.get();
512+
if (newValue.equals(previousValue)) {
513+
return newValue;
514+
}
515+
}
516+
Assertions.fail("Value did not stabilize in %s, last value was %s", timeout, newValue);
517+
return null;
518+
}
519+
447520
private static String uuid() {
448521
return UUID.randomUUID().toString();
449522
}

0 commit comments

Comments
 (0)