Skip to content

Commit f4399f5

Browse files
authored
Merge pull request #374 from rabbitmq/flow-control
Consumer flow strategy (take two)
2 parents 5d00247 + 9b1e626 commit f4399f5

34 files changed

+1145
-337
lines changed

pom.xml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,9 @@
383383
<excludes>
384384
<exclude>**/*TestSuite.java</exclude>
385385
</excludes>
386+
<systemProperties>
387+
<rabbitmqctl.bin>DOCKER:rabbitmq</rabbitmqctl.bin>
388+
</systemProperties>
386389
</configuration>
387390
</plugin>
388391

@@ -875,6 +878,32 @@
875878

876879
</profile>
877880

881+
<profile>
882+
<!-- this avoids a compiler warning on Java 9+ -->
883+
<!-- the compiler setting is not available on Java 8 -->
884+
<id>use-release-compiler-argument-on-java-9-or-more</id>
885+
<activation>
886+
<jdk>[9,)</jdk>
887+
</activation>
888+
<build>
889+
<plugins>
890+
<plugin>
891+
<artifactId>maven-compiler-plugin</artifactId>
892+
<version>${maven.compiler.plugin.version}</version>
893+
<configuration>
894+
<source>1.8</source>
895+
<target>1.8</target>
896+
<release>8</release>
897+
<compilerArgs>
898+
<arg>-Xlint:deprecation</arg>
899+
<arg>-Xlint:unchecked</arg>
900+
</compilerArgs>
901+
</configuration>
902+
</plugin>
903+
</plugins>
904+
</build>
905+
</profile>
906+
878907
</profiles>
879908

880909
</project>

src/docs/asciidoc/api.adoc

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -851,6 +851,10 @@ Useful when using an external store for offset tracking.
851851
|Number of credits when the subscription is created.
852852
Increase for higher throughput at the expense of memory usage.
853853
|1
854+
855+
|`flow#strategy`
856+
|The `ConsumerFlowStrategy` to use.
857+
|`ConsumerFlowStrategy#creditOnChunkArrival(1)`
854858
|===
855859

856860
[NOTE]
@@ -1099,6 +1103,48 @@ When a glitch happens and triggers the re-subscription, the server-side stored o
10991103
Using this server-side stored offset can lead to duplicates, whereas using the in-memory, application-specific offset tracking variable is more accurate.
11001104
A custom `SubscriptionListener` lets the application developer uses what's best for the application if the computed value is not optimal.
11011105

1106+
===== Flow Control
1107+
1108+
This section covers how a consumer can tell the broker when to send more messages.
1109+
1110+
By default, the broker keeps sending messages as long as messages are processed and the `MessageHandler#handle(Context, Message)` method returns.
1111+
This strategy works fine if message processing is fast enough.
1112+
If message processing takes longer, one can be tempted to process messages in parallel with an `ExecutorService`.
1113+
This will make the `handle` method return immediately and the broker will keep sending messages, potentially overflowing the consumer.
1114+
1115+
What we miss in the parallel processing case is a way to tell the library we are done processing a message and that we are ready at some point to handle more messages.
1116+
This is the goal of the `MessageHandler.Context#processed()` method.
1117+
1118+
This method is by default a no-op because the default flow control strategy keeps asking for more messages as soon as message processing is done.
1119+
This method gets some real behavior to control the flow of messages when an appropriate `ConsumerFlowStrategy` is set `ConsumerBuilder#flow()`.
1120+
The following code snippet shows how to set a handy consumer flow strategy:
1121+
1122+
.Setting a consumer flow control strategy
1123+
[source,java,indent=0]
1124+
--------
1125+
include::{test-examples}/ConsumerUsage.java[tag=flow-control]
1126+
--------
1127+
<1> Set the flow control strategy
1128+
<2> Make sure to call `Context#processed()`
1129+
1130+
In the example we set up the `creditWhenHalfMessagesProcessed` strategy which asks for more messages once half of the current messages have been marked as processed.
1131+
The broker does not send messages one by one, it sends <<chunk-definition,chunks>> of messages.
1132+
A chunk of messages can contain 1 to several thousands of messages.
1133+
So with the strategy set above, once `processed()` has been called for half of the messages of the current chunk, the library will ask the broker for another one (it will provide a _credit_ for the subscription).
1134+
By doing this, the next chunk should arrive by the time we are done with the other half of the current chunk.
1135+
This way the consumer is neither overwhelmed nor idle.
1136+
1137+
The `ConsumerFlowStrategy` interface provides some static helpers to configure the appropriate strategy.
1138+
1139+
Additional notes on consumer flow control:
1140+
1141+
* Make sure to **call the `processed()` method** once you set up a `ConsumerFlowStrategy`.
1142+
The method is a no-op by default, but it is essential to call it with count-based strategies like `creditWhenHalfMessagesProcessed` or `creditOnProcessedMessageCount`.
1143+
No calling it will stop the dispatching of messages.
1144+
* Make sure to call `processed()` only once.
1145+
Whether the method is idempotent depends on the flow strategy implementation.
1146+
Apart from the default one, the implementations the library provides does not make `processed()` idempotent.
1147+
11021148
[[single-active-consumer]]
11031149
===== Single Active Consumer
11041150

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ public interface ConsumerBuilder {
166166
*
167167
* @return the flow configuration
168168
* @since 0.11.0
169+
* @see ConsumerFlowStrategy#creditOnChunkArrival()
170+
* @see MessageHandler.Context#processed()
169171
*/
170172
FlowConfiguration flow();
171173

@@ -231,7 +233,11 @@ interface AutoTrackingStrategy {
231233
/**
232234
* Message flow configuration.
233235
*
236+
* <p>The default configuration uses {@link ConsumerFlowStrategy#creditOnChunkArrival()}.
237+
*
234238
* @since 0.11.0
239+
* @see ConsumerFlowStrategy#creditOnChunkArrival()
240+
* @see MessageHandler.Context#processed()
235241
*/
236242
interface FlowConfiguration {
237243

@@ -240,11 +246,29 @@ interface FlowConfiguration {
240246
*
241247
* <p>Default is 1.
242248
*
249+
* <p>This calls uses {@link ConsumerFlowStrategy#creditOnChunkArrival(int)}.
250+
*
243251
* @param initialCredits the number of initial credits
244252
* @return this configuration instance
253+
* @see ConsumerFlowStrategy#creditOnChunkArrival(int)
245254
*/
246255
FlowConfiguration initialCredits(int initialCredits);
247256

257+
/**
258+
* Flow strategy to use
259+
*
260+
* @param strategy the strategy to use
261+
* @return this configuration instance
262+
* @since 0.12.0
263+
* @see ConsumerFlowStrategy
264+
* @see ConsumerFlowStrategy#creditOnChunkArrival()
265+
* @see ConsumerFlowStrategy#creditOnChunkArrival(int)
266+
* @see ConsumerFlowStrategy#creditWhenHalfMessagesProcessed()
267+
* @see ConsumerFlowStrategy#creditWhenHalfMessagesProcessed(int)
268+
* @see ConsumerFlowStrategy#creditOnProcessedMessageCount(int, double)
269+
*/
270+
FlowConfiguration strategy(ConsumerFlowStrategy strategy);
271+
248272
/**
249273
* Go back to the builder.
250274
*
Lines changed: 227 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,227 @@
1+
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream;
15+
16+
import java.util.concurrent.atomic.AtomicLong;
17+
18+
/**
19+
* Contract to determine when a subscription provides credits to get more messages.
20+
*
21+
* <p>The broker delivers "chunks" of messages to consumers. A chunk can contain from 1 to several
22+
* thousands of messages. The broker send chunks as long as the subscription has <em>credits</em>. A
23+
* client connection can provide credits for a given subscription and the broker will send the
24+
* corresponding number of chunks (1 credit = 1 chunk).
25+
*
26+
* <p>This credit mechanism avoids overwhelming a consumer with messages. A consumer does not want
27+
* to provide a credit only when it is done with messages of a chunk, because it will be idle
28+
* between its credit request and the arrival of the next chunk. The idea is to keep consumers busy
29+
* as much as possible, without accumulating an in-memory backlog on the client side. There is no
30+
* ideal solution, it depends on the use cases and several parameters (processing time, network,
31+
* etc).
32+
*
33+
* <p>This is an experimental API, subject to change.
34+
*
35+
* @since 0.12.0
36+
* @see MessageHandler.Context#processed()
37+
* @see ConsumerBuilder#flow()
38+
*/
39+
public interface ConsumerFlowStrategy {
40+
41+
/**
42+
* The initial number of credits for a subscription.
43+
*
44+
* <p>It must be greater than 0. Values are usually between 1 and 10.
45+
*
46+
* @return initial number of credits
47+
*/
48+
int initialCredits();
49+
50+
/**
51+
* Return the behavior for {@link MessageHandler.Context#processed()} calls.
52+
*
53+
* <p>This method is called for each chunk of messages. Implementations return a callback that
54+
* will be called when applications consider a message dealt with and call {@link
55+
* MessageHandler.Context#processed()}. The callback can count messages and provide credits
56+
* accordingly.
57+
*
58+
* @param context chunk context
59+
* @return the message processed callback
60+
*/
61+
MessageProcessedCallback start(Context context);
62+
63+
/** Chunk context. */
64+
interface Context {
65+
66+
/**
67+
* Provide credits for the subscription.
68+
*
69+
* <p>{@link ConsumerFlowStrategy} implementation should always provide 1 credit a given chunk.
70+
*
71+
* @param credits the number of credits provided, usually 1
72+
*/
73+
void credits(int credits);
74+
75+
/**
76+
* The number of messages in the chunk.
77+
*
78+
* @return number of messages in the chunk
79+
*/
80+
long messageCount();
81+
}
82+
83+
/** Behavior for {@link MessageHandler.Context#processed()} calls. */
84+
@FunctionalInterface
85+
interface MessageProcessedCallback {
86+
87+
/**
88+
* Method called when {@link MessageHandler.Context#processed()} is called.
89+
*
90+
* <p>There is one instance of this class for a given chunk and it is called for the <code>
91+
* processed()</code> calls of the message of this chunk.
92+
*
93+
* <p>Implementations can count messages and call {@link Context#credits(int)} when appropriate.
94+
*
95+
* <p>Note calls to {@link MessageHandler.Context#processed()} are not idempotent: an
96+
* application can call the method several times for the same message and implementations must
97+
* deal with these multiple calls if they impact their logic.
98+
*
99+
* @param messageContext context of the message
100+
*/
101+
void processed(MessageHandler.Context messageContext);
102+
}
103+
104+
/**
105+
* Strategy that provides 1 initial credit and a credit on each new chunk.
106+
*
107+
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
108+
*
109+
* @return flow strategy
110+
*/
111+
static ConsumerFlowStrategy creditOnChunkArrival() {
112+
return creditOnChunkArrival(1);
113+
}
114+
115+
/**
116+
* Strategy that provides the specified number of initial credits and a credit on each new chunk.
117+
*
118+
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
119+
*
120+
* @param initialCredits number of initial credits
121+
* @return flow strategy
122+
*/
123+
static ConsumerFlowStrategy creditOnChunkArrival(int initialCredits) {
124+
return new CreditOnChunkArrivalConsumerFlowStrategy(initialCredits);
125+
}
126+
127+
/**
128+
* Strategy that provides 1 initial credit and a credit when half of the chunk messages are
129+
* processed.
130+
*
131+
* <p>Make sure to call {@link MessageHandler.Context#processed()} on every message when using
132+
* this strategy, otherwise the broker may stop sending messages to the consumer.
133+
*
134+
* @return flow strategy
135+
*/
136+
static ConsumerFlowStrategy creditWhenHalfMessagesProcessed() {
137+
return creditOnProcessedMessageCount(1, 0.5);
138+
}
139+
140+
/**
141+
* Strategy that provides the specified number of initial credits and a credit when half of the
142+
* chunk messages are processed.
143+
*
144+
* <p>Make sure to call {@link MessageHandler.Context#processed()} on every message when using
145+
* this strategy, otherwise the broker may stop sending messages to the consumer.
146+
*
147+
* @param initialCredits number of initial credits
148+
* @return flow strategy
149+
*/
150+
static ConsumerFlowStrategy creditWhenHalfMessagesProcessed(int initialCredits) {
151+
return creditOnProcessedMessageCount(initialCredits, 0.5);
152+
}
153+
154+
/**
155+
* Strategy that provides the specified number of initial credits and a credit when the specified
156+
* ratio of the chunk messages are processed.
157+
*
158+
* <p>Make sure to call {@link MessageHandler.Context#processed()} on every message when using
159+
* this strategy, otherwise the broker may stop sending messages to the consumer.
160+
*
161+
* @param initialCredits number of initial credits
162+
* @return flow strategy
163+
*/
164+
static ConsumerFlowStrategy creditOnProcessedMessageCount(int initialCredits, double ratio) {
165+
return new MessageCountConsumerFlowStrategy(initialCredits, ratio);
166+
}
167+
168+
/**
169+
* Strategy that provides the specified number of initial credits and a credit on each new chunk.
170+
*
171+
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
172+
*/
173+
class CreditOnChunkArrivalConsumerFlowStrategy implements ConsumerFlowStrategy {
174+
175+
private final int initialCredits;
176+
177+
private CreditOnChunkArrivalConsumerFlowStrategy(int initialCredits) {
178+
this.initialCredits = initialCredits;
179+
}
180+
181+
@Override
182+
public int initialCredits() {
183+
return this.initialCredits;
184+
}
185+
186+
@Override
187+
public MessageProcessedCallback start(Context context) {
188+
context.credits(1);
189+
return value -> {};
190+
}
191+
}
192+
193+
/**
194+
* Strategy that provides the specified number of initial credits and a credit when the specified
195+
* ratio of the chunk messages are processed.
196+
*
197+
* <p>Make sure to call {@link MessageHandler.Context#processed()} on every message when using
198+
* this strategy, otherwise the broker may stop sending messages to the consumer.
199+
*/
200+
class MessageCountConsumerFlowStrategy implements ConsumerFlowStrategy {
201+
202+
private final int initialCredits;
203+
private final double ratio;
204+
205+
private MessageCountConsumerFlowStrategy(int initialCredits, double ratio) {
206+
this.initialCredits = initialCredits;
207+
this.ratio = ratio;
208+
}
209+
210+
@Override
211+
public int initialCredits() {
212+
return this.initialCredits;
213+
}
214+
215+
@Override
216+
public MessageProcessedCallback start(Context context) {
217+
long l = (long) (context.messageCount() * ratio);
218+
long limit = Math.max(1, l);
219+
AtomicLong processedMessages = new AtomicLong(0);
220+
return messageOffset -> {
221+
if (processedMessages.incrementAndGet() == limit) {
222+
context.credits(1);
223+
}
224+
};
225+
}
226+
}
227+
}

0 commit comments

Comments
 (0)