Skip to content

Commit 9db4b52

Browse files
committed
Add consumer flow strategy creation helpers
1 parent a40da6f commit 9db4b52

File tree

5 files changed

+52
-10
lines changed

5 files changed

+52
-10
lines changed

src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,31 @@ interface Context {
2929
long messageCount();
3030
}
3131

32-
class DefaultConsumerFlowStrategy implements ConsumerFlowStrategy {
32+
static ConsumerFlowStrategy creditOnChunkArrival() {
33+
return creditOnChunkArrival(1);
34+
}
35+
36+
static ConsumerFlowStrategy creditOnChunkArrival(int initialCredits) {
37+
return new CreditOnChunkArrivalConsumerFlowStrategy(initialCredits);
38+
}
39+
40+
static ConsumerFlowStrategy creditWhenHalfMessagesProcessed() {
41+
return creditOnProcessedMessageCount(1, 0.5);
42+
}
43+
44+
static ConsumerFlowStrategy creditWhenHalfMessagesProcessed(int initialCredits) {
45+
return creditOnProcessedMessageCount(initialCredits, 0.5);
46+
}
47+
48+
static ConsumerFlowStrategy creditOnProcessedMessageCount(int initialCredits, double ratio) {
49+
return new MessageCountConsumerFlowStrategy(initialCredits, ratio);
50+
}
51+
52+
class CreditOnChunkArrivalConsumerFlowStrategy implements ConsumerFlowStrategy {
3353

3454
private final int initialCredits;
3555

36-
public DefaultConsumerFlowStrategy(int initialCredits) {
56+
private CreditOnChunkArrivalConsumerFlowStrategy(int initialCredits) {
3757
this.initialCredits = initialCredits;
3858
}
3959

@@ -52,9 +72,11 @@ public LongConsumer start(Context context) {
5272
class MessageCountConsumerFlowStrategy implements ConsumerFlowStrategy {
5373

5474
private final int initialCredits;
75+
private final double ratio;
5576

56-
public MessageCountConsumerFlowStrategy(int initialCredits) {
77+
private MessageCountConsumerFlowStrategy(int initialCredits, double ratio) {
5778
this.initialCredits = initialCredits;
79+
this.ratio = ratio;
5880
}
5981

6082
@Override
@@ -64,8 +86,9 @@ public int initialCredits() {
6486

6587
@Override
6688
public LongConsumer start(Context context) {
89+
long l = (long) (context.messageCount() * ratio);
90+
long limit = Math.max(1, l);
6791
AtomicLong processedMessages = new AtomicLong(0);
68-
long limit = context.messageCount() == 1 ? 1 : context.messageCount() / 2;
6992
return messageOffset -> {
7093
if (processedMessages.incrementAndGet() == limit) {
7194
context.credits(1);

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -425,14 +425,14 @@ private DefaultFlowConfiguration(ConsumerBuilder consumerBuilder) {
425425
this.consumerBuilder = consumerBuilder;
426426
}
427427

428-
private ConsumerFlowStrategy strategy = new ConsumerFlowStrategy.DefaultConsumerFlowStrategy(1);
428+
private ConsumerFlowStrategy strategy = ConsumerFlowStrategy.creditOnChunkArrival(1);
429429

430430
@Override
431431
public FlowConfiguration initialCredits(int initialCredits) {
432432
if (initialCredits <= 0) {
433433
throw new IllegalArgumentException("Credits must be positive");
434434
}
435-
this.strategy = new ConsumerFlowStrategy.DefaultConsumerFlowStrategy(initialCredits);
435+
this.strategy = ConsumerFlowStrategy.creditOnChunkArrival(initialCredits);
436436
return this;
437437
}
438438

src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1678,6 +1678,6 @@ private static Response responseOk() {
16781678
}
16791679

16801680
private static ConsumerFlowStrategy flowStrategy() {
1681-
return new ConsumerFlowStrategy.DefaultConsumerFlowStrategy(10);
1681+
return ConsumerFlowStrategy.creditOnChunkArrival(10);
16821682
}
16831683
}

src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.ConsumerFlowStrategy.creditOnProcessedMessageCount;
1617
import static java.util.stream.LongStream.range;
1718
import static org.assertj.core.api.Assertions.assertThat;
1819

@@ -26,8 +27,8 @@ public class MessageCountConsumerFlowStrategyTest {
2627
AtomicInteger requestedCredits = new AtomicInteger();
2728

2829
@Test
29-
void test() {
30-
ConsumerFlowStrategy strategy = new ConsumerFlowStrategy.MessageCountConsumerFlowStrategy(1);
30+
void shouldCreditOnceLimitIsReached() {
31+
ConsumerFlowStrategy strategy = build(0.5);
3132
long messageCount = 1000;
3233
LongConsumer processedCallback = strategy.start(context(messageCount));
3334
range(0, messageCount / 2 - 1).forEach(ignored -> processedCallback.accept(42));
@@ -40,6 +41,23 @@ void test() {
4041
assertThat(requestedCredits).hasValue(1);
4142
}
4243

44+
@Test
45+
void smallChunksAndSmallRatiosShouldCredit() {
46+
ConsumerFlowStrategy strategy = build(0.5);
47+
LongConsumer processedCallback = strategy.start(context(1));
48+
processedCallback.accept(42);
49+
assertThat(requestedCredits).hasValue(1);
50+
51+
strategy = build(0.05);
52+
processedCallback = strategy.start(context(15));
53+
processedCallback.accept(42);
54+
assertThat(requestedCredits).hasValue(1);
55+
}
56+
57+
ConsumerFlowStrategy build(double ratio) {
58+
return creditOnProcessedMessageCount(1, ratio);
59+
}
60+
4361
ConsumerFlowStrategy.Context context(long messageCount) {
4462
requestedCredits.set(0);
4563
return new ConsumerFlowStrategy.Context() {

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
1414
package com.rabbitmq.stream.impl;
1515

16+
import static com.rabbitmq.stream.ConsumerFlowStrategy.creditWhenHalfMessagesProcessed;
1617
import static com.rabbitmq.stream.impl.TestUtils.*;
1718
import static java.lang.String.format;
1819
import static java.util.Collections.synchronizedList;
@@ -199,7 +200,7 @@ void consumeWithAsyncConsumerFlowControl() throws Exception {
199200
environment.consumerBuilder().stream(stream)
200201
.offset(OffsetSpecification.first())
201202
.flow()
202-
.strategy(new ConsumerFlowStrategy.MessageCountConsumerFlowStrategy(1))
203+
.strategy(creditWhenHalfMessagesProcessed())
203204
.builder();
204205

205206
List<MessageHandler.Context> messageContexts = synchronizedList(new ArrayList<>());

0 commit comments

Comments
 (0)