Skip to content

Commit 7500592

Browse files
committed
Expose the number of unsettled messages in consumer
To make it easy to close a consumer without unsettled messages.
1 parent 29db8b0 commit 7500592

File tree

6 files changed

+135
-9
lines changed

6 files changed

+135
-9
lines changed

src/docs/asciidoc/usage.adoc

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ include::{test-examples}/Api.java[tag=target-address-null]
9393

9494
=== Consuming
9595

96-
.Consuming from a queue (generic body)
96+
.Consuming from a queue
9797
[source,java,indent=0]
9898
--------
9999
include::{test-examples}/Api.java[tag=consumer-consume]
@@ -102,6 +102,42 @@ include::{test-examples}/Api.java[tag=consumer-consume]
102102
<2> Process the message
103103
<3> Accept (acknowledge) the message
104104

105+
A consumer can accept, discard, or requeue a message.
106+
We say the consumer _settles_ a message.
107+
108+
Unsettled messages are requeued when a consumer get closed.
109+
This can lead to duplicate processing of messages.
110+
Here is an example:
111+
112+
* A consumer executes a database operation for a given message.
113+
* The consumer gets closed before it accepts (settles) the message.
114+
* The message is requeued.
115+
* Another consumer gets the message and executes the database operation again.
116+
117+
It is difficult to completely avoid duplicate messages, this is why processing should be idempotent.
118+
The consumer API allows nevertheless to _pause_ the delivery of messages, get the number of unsettled messages to make sure it reaches 0 at some point, and then _close_ the consumer.
119+
This ensures the consumer has finally quiesced and all the received messages have been processed.
120+
121+
Here is an example of a consumer graceful shutdown:
122+
123+
.Closing a consumer gracefully
124+
[source,java,indent=0]
125+
--------
126+
include::{test-examples}/Api.java[tag=consumer-graceful-shutdown]
127+
--------
128+
<1> Pause the delivery of messages
129+
<2> Ensure the number of unsettled messages reaches 0
130+
<3> Close the consumer
131+
132+
It is also possible to simplify the consumer shutdown by just closing it, but this is likely to requeue unsettled messages.
133+
134+
.Closing a consumer abruptly (can lead to duplicate messages)
135+
[source,java,indent=0]
136+
--------
137+
include::{test-examples}/Api.java[tag=consumer-abrupt-shutdown]
138+
--------
139+
<1> Close the consumer with potential unsettled messages
140+
105141
=== Resource Management
106142

107143
The `Management` object is the entry point to deal with resources.

src/main/java/com/rabbitmq/client/amqp/Consumer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ public interface Consumer extends AutoCloseable {
2121

2222
void pause();
2323

24+
long unsettledCount();
25+
2426
void unpause();
2527

2628
@Override

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,11 @@ public void unpause() {
122122
}
123123
}
124124

125+
@Override
126+
public long unsettledCount() {
127+
return unsettledCount.get();
128+
}
129+
125130
@Override
126131
public void close() {
127132
this.close(null);

src/test/java/com/rabbitmq/client/amqp/docs/Api.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ void targetAddressNull() {
139139
void consuming() {
140140
Connection connection = null;
141141
// tag::consumer-consume[]
142-
connection.consumerBuilder()
142+
Consumer consumer = connection.consumerBuilder()
143143
.queue("some-queue")
144144
.messageHandler((context, message) -> {
145145
byte[] body = message.body(); // <1>
@@ -148,6 +148,16 @@ void consuming() {
148148
})
149149
.build();
150150
// end::consumer-consume[]
151+
152+
// tag::consumer-graceful-shutdown[]
153+
consumer.pause(); // <1>
154+
long unsettledCount = consumer.unsettledCount(); // <2>
155+
consumer.close(); // <3>
156+
// end::consumer-graceful-shutdown[]
157+
158+
// tag::consumer-abrupt-shutdown[]
159+
consumer.close(); // <1>
160+
// end::consumer-abrupt-shutdown[]
151161
}
152162

153163
void management() {

src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java

Lines changed: 72 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,7 @@
3535
import com.rabbitmq.client.amqp.impl.TestUtils.DisabledIfAddressV1Permitted;
3636
import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
3737
import java.time.Duration;
38-
import java.util.ArrayList;
39-
import java.util.Map;
40-
import java.util.Set;
41-
import java.util.UUID;
38+
import java.util.*;
4239
import java.util.concurrent.ConcurrentHashMap;
4340
import java.util.concurrent.CountDownLatch;
4441
import java.util.concurrent.atomic.AtomicInteger;
@@ -446,7 +443,7 @@ void consumerShouldGetClosedWhenQueueIsDeleted() {
446443
}
447444

448445
@Test
449-
void consumerShouldNotCloseUntilAllMessagesAreSettled() {
446+
void consumerPauseThenClose() {
450447
connection.management().queue(name).exclusive(true).declare();
451448
int messageCount = 100;
452449
int initialCredits = messageCount / 10;
@@ -477,14 +474,82 @@ void consumerShouldNotCloseUntilAllMessagesAreSettled() {
477474
int unsettledCount = waitUntilStable(unsettledMessages::size);
478475
assertThat(unsettledCount).isNotZero();
479476
consumer.pause();
480-
int receivedCountBeforePausing = receivedCount.get();
477+
int receivedCountAfterPausing = receivedCount.get();
481478
unsettledMessages.forEach(com.rabbitmq.client.amqp.Consumer.Context::accept);
482479
consumer.close();
483-
assertThat(receivedCount).hasValue(receivedCountBeforePausing);
480+
assertThat(receivedCount).hasValue(receivedCountAfterPausing);
484481
assertThat(connection.management().queueInfo(name))
485482
.hasMessageCount(messageCount - receivedCount.get());
486483
}
487484

485+
@Test
486+
void consumerGracefulShutdownExample() {
487+
connection.management().queue(name).exclusive(true).declare();
488+
int messageCount = 100;
489+
int initialCredits = messageCount / 10;
490+
Publisher publisher = connection.publisherBuilder().queue(name).build();
491+
Sync publishSync = sync(messageCount);
492+
range(0, messageCount)
493+
.forEach(ignored -> publisher.publish(publisher.message(), ctx -> publishSync.down()));
494+
assertThat(publishSync).completes();
495+
496+
AtomicInteger receivedCount = new AtomicInteger(0);
497+
Random random = new Random();
498+
com.rabbitmq.client.amqp.Consumer consumer =
499+
connection
500+
.consumerBuilder()
501+
.queue(name)
502+
.initialCredits(initialCredits)
503+
.messageHandler(
504+
(ctx, msg) -> {
505+
receivedCount.incrementAndGet();
506+
int processTime = random.nextInt(10) + 1;
507+
TestUtils.simulateActivity(processTime);
508+
ctx.accept();
509+
})
510+
.build();
511+
512+
waitAtMost(() -> receivedCount.get() > initialCredits * 2);
513+
consumer.pause();
514+
waitAtMost(() -> consumer.unsettledCount() == 0);
515+
consumer.close();
516+
assertThat(connection.management().queueInfo(name))
517+
.hasMessageCount(messageCount - receivedCount.get());
518+
}
519+
520+
@Test
521+
void consumerUnsettledMessagesGoBackToQueueAfterClosing() {
522+
connection.management().queue(name).exclusive(true).declare();
523+
int messageCount = 100;
524+
int initialCredits = messageCount / 10;
525+
int settledCount = initialCredits * 2;
526+
Publisher publisher = connection.publisherBuilder().queue(name).build();
527+
Sync publishSync = sync(messageCount);
528+
range(0, messageCount)
529+
.forEach(ignored -> publisher.publish(publisher.message(), ctx -> publishSync.down()));
530+
assertThat(publishSync).completes();
531+
532+
AtomicInteger receivedCount = new AtomicInteger(0);
533+
com.rabbitmq.client.amqp.Consumer consumer =
534+
connection
535+
.consumerBuilder()
536+
.queue(name)
537+
.initialCredits(initialCredits)
538+
.messageHandler(
539+
(ctx, msg) -> {
540+
receivedCount.incrementAndGet();
541+
if (receivedCount.get() <= settledCount) {
542+
ctx.accept();
543+
}
544+
})
545+
.build();
546+
547+
waitAtMost(() -> receivedCount.get() > settledCount);
548+
consumer.close();
549+
assertThat(connection.management().queueInfo(name))
550+
.hasMessageCount(messageCount - settledCount);
551+
}
552+
488553
static <T> T waitUntilStable(Supplier<T> call) {
489554
Duration timeout = Duration.ofSeconds(10);
490555
Duration waitTime = Duration.ofMillis(200);

src/test/java/com/rabbitmq/client/amqp/impl/TestUtils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,14 @@ public static Duration waitAtMost(
129129
}
130130
}
131131

132+
static void simulateActivity(long timeInMs) {
133+
try {
134+
Thread.sleep(timeInMs);
135+
} catch (InterruptedException e) {
136+
throw new RuntimeException(e);
137+
}
138+
}
139+
132140
public static class CountDownLatchReferenceConditions {
133141

134142
public static Condition<AtomicReference<CountDownLatch>> completed() {

0 commit comments

Comments
 (0)