Skip to content

Commit d6c7038

Browse files
committed
Document consumer flow control
1 parent 4ddea50 commit d6c7038

File tree

2 files changed

+63
-0
lines changed

2 files changed

+63
-0
lines changed

src/docs/asciidoc/api.adoc

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -851,6 +851,10 @@ Useful when using an external store for offset tracking.
851851
|Number of credits when the subscription is created.
852852
Increase for higher throughput at the expense of memory usage.
853853
|1
854+
855+
|`flow#strategy`
856+
|The `ConsumerFlowStrategy` to use.
857+
|`ConsumerFlowStrategy#creditOnChunkArrival(1)`
854858
|===
855859

856860
[NOTE]
@@ -1099,6 +1103,48 @@ When a glitch happens and triggers the re-subscription, the server-side stored o
10991103
Using this server-side stored offset can lead to duplicates, whereas using the in-memory, application-specific offset tracking variable is more accurate.
11001104
A custom `SubscriptionListener` lets the application developer uses what's best for the application if the computed value is not optimal.
11011105

1106+
===== Flow Control
1107+
1108+
This section covers how a consumer can tell the broker when to send more messages.
1109+
1110+
By default, the broker keeps sending messages as long as messages are processed and the `MessageHandler#handle(Context, Message)` method returns.
1111+
This strategy works fine if message processing is fast enough.
1112+
If message processing takes longer, one can be tempted to process messages in parallel with an `ExecutorService`.
1113+
This will make the `handle` method return immediately and the broker will keep sending messages, potentially overflowing the consumer.
1114+
1115+
What we miss in the parallel processing case is a way to tell the library we are done processing a message and that we are ready at some point to handle more messages.
1116+
This is the goal of the `MessageHandler.Context#processed()` method.
1117+
1118+
This method is by default a no-op because the default flow control strategy keeps asking for more messages as soon as message processing is done.
1119+
This method gets some real behavior to control the flow of messages when an appropriate `ConsumerFlowStrategy` is set `ConsumerBuilder#flow()`.
1120+
The following code snippet shows how to set a handy consumer flow strategy:
1121+
1122+
.Setting a consumer flow control strategy
1123+
[source,java,indent=0]
1124+
--------
1125+
include::{test-examples}/ConsumerUsage.java[tag=flow-control]
1126+
--------
1127+
<1> Set the flow control strategy
1128+
<2> Make sure to call `Context#processed()`
1129+
1130+
In the example we set up the `creditWhenHalfMessagesProcessed` strategy which asks for more messages once half of the current messages have been marked as processed.
1131+
The broker does not send messages one by one, it sends <<chunk-definition,chunks>> of messages.
1132+
A chunk of messages can contain 1 to several thousands of messages.
1133+
So with the strategy set above, once `processed()` has been called for half of the messages of the current chunk, the library will ask the broker for another one (it will provide a _credit_ for the subscription).
1134+
By doing this, the next chunk should arrive by the time we are done with the other half of the current chunk.
1135+
This way the consumer is neither overwhelmed nor idle.
1136+
1137+
The `ConsumerFlowStrategy` interface provides some static helpers to configure the appropriate strategy.
1138+
1139+
Additional notes on consumer flow control:
1140+
1141+
* Make sure to **call the `processed()` method** once you set up a `ConsumerFlowStrategy`.
1142+
The method is a no-op by default, but it is essential to call it with count-based strategies like `creditWhenHalfMessagesProcessed` or `creditOnProcessedMessageCount`.
1143+
No calling it will stop the dispatching of messages.
1144+
* Make sure to call `processed()` only once.
1145+
Whether the method is idempotent depends on the flow strategy implementation.
1146+
Apart from the default one, the implementations the library provides does not make `processed()` idempotent.
1147+
11021148
[[single-active-consumer]]
11031149
===== Single Active Consumer
11041150

src/test/java/com/rabbitmq/stream/docs/ConsumerUsage.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,22 @@ long getOffsetFromExternalStore() {
154154
return 0L;
155155
}
156156

157+
void flowControl() {
158+
Environment environment = Environment.builder().build();
159+
// tag::flow-control[]
160+
Consumer consumer = environment.consumerBuilder()
161+
.stream("my-stream")
162+
.flow()
163+
.strategy(ConsumerFlowStrategy.creditWhenHalfMessagesProcessed()) // <1>
164+
.builder()
165+
.messageHandler((context, message) -> {
166+
// message handling code (possibly asynchronous)...
167+
context.processed(); // <2>
168+
})
169+
.build();
170+
// end::flow-control[]
171+
}
172+
157173
void enablingSingleActiveConsumer() {
158174
Environment environment = Environment.builder().build();
159175
// tag::enabling-single-active-consumer[]
@@ -188,4 +204,5 @@ void sacConsumerUpdateListener() {
188204
.build();
189205
// end::sac-consumer-update-listener[]
190206
}
207+
191208
}

0 commit comments

Comments
 (0)