Skip to content

Commit cdd00c5

Browse files
committed
Use message context in flow strategy callback
1 parent 9db4b52 commit cdd00c5

File tree

4 files changed

+23
-21
lines changed

4 files changed

+23
-21
lines changed

src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,12 @@
1414
package com.rabbitmq.stream;
1515

1616
import java.util.concurrent.atomic.AtomicLong;
17-
import java.util.function.LongConsumer;
1817

1918
public interface ConsumerFlowStrategy {
2019

2120
int initialCredits();
2221

23-
LongConsumer start(Context context);
22+
MessageProcessedCallback start(Context context);
2423

2524
interface Context {
2625

@@ -29,6 +28,12 @@ interface Context {
2928
long messageCount();
3029
}
3130

31+
@FunctionalInterface
32+
interface MessageProcessedCallback {
33+
34+
void processed(MessageHandler.Context messageContext);
35+
}
36+
3237
static ConsumerFlowStrategy creditOnChunkArrival() {
3338
return creditOnChunkArrival(1);
3439
}
@@ -63,7 +68,7 @@ public int initialCredits() {
6368
}
6469

6570
@Override
66-
public LongConsumer start(Context context) {
71+
public MessageProcessedCallback start(Context context) {
6772
context.credits(1);
6873
return value -> {};
6974
}
@@ -85,7 +90,7 @@ public int initialCredits() {
8590
}
8691

8792
@Override
88-
public LongConsumer start(Context context) {
93+
public MessageProcessedCallback start(Context context) {
8994
long l = (long) (context.messageCount() * ratio);
9095
long limit = Math.max(1, l);
9196
AtomicLong processedMessages = new AtomicLong(0);

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -484,14 +484,14 @@ private static final class MessageHandlerContext implements Context {
484484
private final long timestamp;
485485
private final long committedOffset;
486486
private final StreamConsumer consumer;
487-
private final LongConsumer processedCallback;
487+
private final ConsumerFlowStrategy.MessageProcessedCallback processedCallback;
488488

489489
private MessageHandlerContext(
490490
long offset,
491491
long timestamp,
492492
long committedOffset,
493493
StreamConsumer consumer,
494-
LongConsumer processedCallback) {
494+
ConsumerFlowStrategy.MessageProcessedCallback processedCallback) {
495495
this.offset = offset;
496496
this.timestamp = timestamp;
497497
this.committedOffset = committedOffset;
@@ -530,7 +530,7 @@ public Consumer consumer() {
530530

531531
@Override
532532
public void processed() {
533-
this.processedCallback.accept(this.offset);
533+
this.processedCallback.processed(this);
534534
}
535535
}
536536

@@ -567,7 +567,7 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
567567
(client, subscriptionId, offset, messageCount, dataSize) -> {
568568
SubscriptionTracker subscriptionTracker =
569569
subscriptionTrackers.get(subscriptionId & 0xFF);
570-
LongConsumer processCallback;
570+
ConsumerFlowStrategy.MessageProcessedCallback processCallback;
571571
if (subscriptionTracker != null && subscriptionTracker.consumer.isOpen()) {
572572
ConsumerFlowStrategy.Context chunkContext =
573573
new ConsumerFlowStrategy.Context() {
@@ -616,7 +616,7 @@ public long messageCount() {
616616
chunkTimestamp,
617617
committedOffset,
618618
subscriptionTracker.consumer,
619-
(LongConsumer) chunkContext),
619+
(ConsumerFlowStrategy.MessageProcessedCallback) chunkContext),
620620
message);
621621
} else {
622622
LOGGER.debug(

src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,6 @@ public class ConsumersCoordinatorTest {
7171

7272
private static final SubscriptionListener NO_OP_SUBSCRIPTION_LISTENER = subscriptionContext -> {};
7373
private static final Runnable NO_OP_TRACKING_CLOSING_CALLBACK = () -> {};
74-
private int initialCredits = 10;
75-
private int additionalCredits = 1;
7674

7775
@Mock StreamEnvironment environment;
7876
@Mock StreamConsumer consumer;

src/test/java/com/rabbitmq/stream/impl/MessageCountConsumerFlowStrategyTest.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import com.rabbitmq.stream.ConsumerFlowStrategy;
2121
import java.util.concurrent.atomic.AtomicInteger;
22-
import java.util.function.LongConsumer;
2322
import org.junit.jupiter.api.Test;
2423

2524
public class MessageCountConsumerFlowStrategyTest {
@@ -30,27 +29,27 @@ public class MessageCountConsumerFlowStrategyTest {
3029
void shouldCreditOnceLimitIsReached() {
3130
ConsumerFlowStrategy strategy = build(0.5);
3231
long messageCount = 1000;
33-
LongConsumer processedCallback = strategy.start(context(messageCount));
34-
range(0, messageCount / 2 - 1).forEach(ignored -> processedCallback.accept(42));
32+
ConsumerFlowStrategy.MessageProcessedCallback callback = strategy.start(context(messageCount));
33+
range(0, messageCount / 2 - 1).forEach(ignored -> callback.processed(null));
3534
assertThat(requestedCredits).hasValue(0);
36-
processedCallback.accept(42);
35+
callback.processed(null);
3736
assertThat(requestedCredits).hasValue(1);
38-
processedCallback.accept(42);
37+
callback.processed(null);
3938
assertThat(requestedCredits).hasValue(1);
40-
range(0, messageCount).forEach(ignored -> processedCallback.accept(42));
39+
range(0, messageCount).forEach(ignored -> callback.processed(null));
4140
assertThat(requestedCredits).hasValue(1);
4241
}
4342

4443
@Test
4544
void smallChunksAndSmallRatiosShouldCredit() {
4645
ConsumerFlowStrategy strategy = build(0.5);
47-
LongConsumer processedCallback = strategy.start(context(1));
48-
processedCallback.accept(42);
46+
ConsumerFlowStrategy.MessageProcessedCallback callback = strategy.start(context(1));
47+
callback.processed(null);
4948
assertThat(requestedCredits).hasValue(1);
5049

5150
strategy = build(0.05);
52-
processedCallback = strategy.start(context(15));
53-
processedCallback.accept(42);
51+
callback = strategy.start(context(15));
52+
callback.processed(null);
5453
assertThat(requestedCredits).hasValue(1);
5554
}
5655

0 commit comments

Comments
 (0)