Skip to content

Commit 4ddea50

Browse files
committed
Add consumer flow strategy Javadoc
1 parent 867e4ce commit 4ddea50

File tree

4 files changed

+160
-1
lines changed

4 files changed

+160
-1
lines changed

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,8 @@ public interface ConsumerBuilder {
166166
*
167167
* @return the flow configuration
168168
* @since 0.11.0
169+
* @see ConsumerFlowStrategy#creditOnChunkArrival()
170+
* @see MessageHandler.Context#processed()
169171
*/
170172
FlowConfiguration flow();
171173

@@ -231,7 +233,11 @@ interface AutoTrackingStrategy {
231233
/**
232234
* Message flow configuration.
233235
*
236+
* <p>The default configuration uses {@link ConsumerFlowStrategy#creditOnChunkArrival()}.
237+
*
234238
* @since 0.11.0
239+
* @see ConsumerFlowStrategy#creditOnChunkArrival()
240+
* @see MessageHandler.Context#processed()
235241
*/
236242
interface FlowConfiguration {
237243

@@ -240,11 +246,27 @@ interface FlowConfiguration {
240246
*
241247
* <p>Default is 1.
242248
*
249+
* <p>This calls uses {@link ConsumerFlowStrategy#creditOnChunkArrival(int)}.
250+
*
243251
* @param initialCredits the number of initial credits
244252
* @return this configuration instance
253+
* @see ConsumerFlowStrategy#creditOnChunkArrival(int)
245254
*/
246255
FlowConfiguration initialCredits(int initialCredits);
247256

257+
/**
258+
* Flow strategy to use
259+
*
260+
* @param strategy the strategy to use
261+
* @return this configuration instance
262+
* @since 0.12.0
263+
* @see ConsumerFlowStrategy
264+
* @see ConsumerFlowStrategy#creditOnChunkArrival()
265+
* @see ConsumerFlowStrategy#creditOnChunkArrival(int)
266+
* @see ConsumerFlowStrategy#creditWhenHalfMessagesProcessed()
267+
* @see ConsumerFlowStrategy#creditWhenHalfMessagesProcessed(int)
268+
* @see ConsumerFlowStrategy#creditOnProcessedMessageCount(int, double)
269+
*/
248270
FlowConfiguration strategy(ConsumerFlowStrategy strategy);
249271

250272
/**

src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,45 +15,159 @@
1515

1616
import java.util.concurrent.atomic.AtomicLong;
1717

18+
/**
19+
* Contract to determine when a subscription provides credits to get more messages.
20+
*
21+
* <p>The broker delivers "chunks" of messages to consumers. A chunk can contain from 1 to several
22+
* thousands of messages. The broker send chunks as long as the subscription has <em>credits</em>. A
23+
* client connection can provide credits for a given subscription and the broker will send the
24+
* corresponding number of chunks (1 credit = 1 chunk).
25+
*
26+
* <p>This credit mechanism avoids overwhelming a consumer with messages. A consumer does not want
27+
* to provide a credit only when it is done with messages of a chunk, because it will be idle
28+
* between its credit request and the arrival of the next chunk. The idea is to keep consumers busy
29+
* as much as possible, without accumulating an in-memory backlog on the client side. There is no
30+
* ideal solution, it depends on the use cases and several parameters (processing time, network,
31+
* etc).
32+
*
33+
* @since 0.12.0
34+
* @see MessageHandler.Context#processed()
35+
* @see ConsumerBuilder#flow()
36+
*/
1837
public interface ConsumerFlowStrategy {
1938

39+
/**
40+
* The initial number of credits for a subscription.
41+
*
42+
* <p>It must be greater than 0. Values are usually between 1 and 10.
43+
*
44+
* @return initial number of credits
45+
*/
2046
int initialCredits();
2147

48+
/**
49+
* Return the behavior for {@link MessageHandler.Context#processed()} calls.
50+
*
51+
* <p>This method is called for each chunk of messages. Implementations return a callback that
52+
* will be called when applications consider a message dealt with and call {@link
53+
* MessageHandler.Context#processed()}. The callback can count messages and provide credits
54+
* accordingly.
55+
*
56+
* @param context chunk context
57+
* @return the message processed callback
58+
*/
2259
MessageProcessedCallback start(Context context);
2360

61+
/** Chunk context. */
2462
interface Context {
2563

64+
/**
65+
* Provide credits for the subscription.
66+
*
67+
* <p>{@link ConsumerFlowStrategy} implementation should always provide 1 credit a given chunk.
68+
*
69+
* @param credits the number of credits provided, usually 1
70+
*/
2671
void credits(int credits);
2772

73+
/**
74+
* The number of messages in the chunk.
75+
*
76+
* @return number of messages in the chunk
77+
*/
2878
long messageCount();
2979
}
3080

81+
/** Behavior for {@link MessageHandler.Context#processed()} calls. */
3182
@FunctionalInterface
3283
interface MessageProcessedCallback {
3384

85+
/**
86+
* Method called when {@link MessageHandler.Context#processed()} is called.
87+
*
88+
* <p>There is one instance of this class for a given chunk and it is called for the <code>
89+
* processed()</code> calls of the message of this chunk.
90+
*
91+
* <p>Implementations can count messages and call {@link Context#credits(int)} when appropriate.
92+
*
93+
* <p>Note calls to {@link MessageHandler.Context#processed()} are not idempotent: an
94+
* application can call the method several times for the same message and implementations must
95+
* deal with these multiple calls if they impact their logic.
96+
*
97+
* @param messageContext context of the message
98+
*/
3499
void processed(MessageHandler.Context messageContext);
35100
}
36101

102+
/**
103+
* Strategy that provides 1 initial credit and a credit on each new chunk.
104+
*
105+
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
106+
*
107+
* @return flow strategy
108+
*/
37109
static ConsumerFlowStrategy creditOnChunkArrival() {
38110
return creditOnChunkArrival(1);
39111
}
40112

113+
/**
114+
* Strategy that provides the specified number of initial credits and a credit on each new chunk.
115+
*
116+
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
117+
*
118+
* @param initialCredits number of initial credits
119+
* @return flow strategy
120+
*/
41121
static ConsumerFlowStrategy creditOnChunkArrival(int initialCredits) {
42122
return new CreditOnChunkArrivalConsumerFlowStrategy(initialCredits);
43123
}
44124

125+
/**
126+
* Strategy that provides 1 initial credit and a credit when half of the chunk messages are
127+
* processed.
128+
*
129+
* <p>Make sure to call {@link MessageHandler.Context#processed()} on every message when using
130+
* this strategy, otherwise the broker may stop sending messages to the consumer.
131+
*
132+
* @return flow strategy
133+
*/
45134
static ConsumerFlowStrategy creditWhenHalfMessagesProcessed() {
46135
return creditOnProcessedMessageCount(1, 0.5);
47136
}
48137

138+
/**
139+
* Strategy that provides the specified number of initial credits and a credit when half of the
140+
* chunk messages are processed.
141+
*
142+
* <p>Make sure to call {@link MessageHandler.Context#processed()} on every message when using
143+
* this strategy, otherwise the broker may stop sending messages to the consumer.
144+
*
145+
* @param initialCredits number of initial credits
146+
* @return flow strategy
147+
*/
49148
static ConsumerFlowStrategy creditWhenHalfMessagesProcessed(int initialCredits) {
50149
return creditOnProcessedMessageCount(initialCredits, 0.5);
51150
}
52151

152+
/**
153+
* Strategy that provides the specified number of initial credits and a credit when the specified
154+
* ratio of the chunk messages are processed.
155+
*
156+
* <p>Make sure to call {@link MessageHandler.Context#processed()} on every message when using
157+
* this strategy, otherwise the broker may stop sending messages to the consumer.
158+
*
159+
* @param initialCredits number of initial credits
160+
* @return flow strategy
161+
*/
53162
static ConsumerFlowStrategy creditOnProcessedMessageCount(int initialCredits, double ratio) {
54163
return new MessageCountConsumerFlowStrategy(initialCredits, ratio);
55164
}
56165

166+
/**
167+
* Strategy that provides the specified number of initial credits and a credit on each new chunk.
168+
*
169+
* <p>Calls to {@link MessageHandler.Context#processed()} are ignored.
170+
*/
57171
class CreditOnChunkArrivalConsumerFlowStrategy implements ConsumerFlowStrategy {
58172

59173
private final int initialCredits;
@@ -74,6 +188,13 @@ public MessageProcessedCallback start(Context context) {
74188
}
75189
}
76190

191+
/**
192+
* Strategy that provides the specified number of initial credits and a credit when the specified
193+
* ratio of the chunk messages are processed.
194+
*
195+
* <p>Make sure to call {@link MessageHandler.Context#processed()} on every message when using
196+
* this strategy, otherwise the broker may stop sending messages to the consumer.
197+
*/
77198
class MessageCountConsumerFlowStrategy implements ConsumerFlowStrategy {
78199

79200
private final int initialCredits;

src/main/java/com/rabbitmq/stream/MessageHandler.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,22 @@ interface Context {
8585
*/
8686
Consumer consumer();
8787

88+
/**
89+
* Mark the message as processed, potentially asking for more messages from the broker.
90+
*
91+
* <p>The exact behavior depends on the {@link ConsumerFlowStrategy} chosen when creating the
92+
* consumer with {@link ConsumerBuilder#flow()}.
93+
*
94+
* <p>The call is a no-op for strategies like {@link
95+
* ConsumerFlowStrategy#creditOnChunkArrival()} and {@link
96+
* ConsumerFlowStrategy#creditOnChunkArrival(int)}.
97+
*
98+
* <p>Calling this method for each message is mandatory for strategies like {@link
99+
* ConsumerFlowStrategy#creditWhenHalfMessagesProcessed()}, {@link
100+
* ConsumerFlowStrategy#creditWhenHalfMessagesProcessed(int)}, and {@link
101+
* ConsumerFlowStrategy#creditOnProcessedMessageCount(int, double)}, otherwise the broker may
102+
* stop sending messages to the consumer.
103+
*/
88104
void processed();
89105
}
90106
}

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ private DefaultFlowConfiguration(ConsumerBuilder consumerBuilder) {
425425
this.consumerBuilder = consumerBuilder;
426426
}
427427

428-
private ConsumerFlowStrategy strategy = ConsumerFlowStrategy.creditOnChunkArrival(1);
428+
private ConsumerFlowStrategy strategy = ConsumerFlowStrategy.creditOnChunkArrival();
429429

430430
@Override
431431
public FlowConfiguration initialCredits(int initialCredits) {

0 commit comments

Comments
 (0)