Skip to content

Commit 9b1e626

Browse files
committed
Refine some consumer flow control details
1 parent b450249 commit 9b1e626

File tree

3 files changed

+46
-23
lines changed

3 files changed

+46
-23
lines changed

src/main/java/com/rabbitmq/stream/ConsumerFlowStrategy.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
* ideal solution, it depends on the use cases and several parameters (processing time, network,
3131
* etc).
3232
*
33+
* <p>This is an experimental API, subject to change.
34+
*
3335
* @since 0.12.0
3436
* @see MessageHandler.Context#processed()
3537
* @see ConsumerBuilder#flow()

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -562,19 +562,9 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
562562
subscriptionTrackers.get(subscriptionId & 0xFF);
563563
ConsumerFlowStrategy.MessageProcessedCallback processCallback;
564564
if (subscriptionTracker != null && subscriptionTracker.consumer.isOpen()) {
565-
ConsumerFlowStrategy.Context chunkContext =
566-
new ConsumerFlowStrategy.Context() {
567-
@Override
568-
public void credits(int credits) {
569-
client.credit(subscriptionId, 1);
570-
}
571-
572-
@Override
573-
public long messageCount() {
574-
return messageCount;
575-
}
576-
};
577-
processCallback = subscriptionTracker.flowStrategy.start(chunkContext);
565+
processCallback =
566+
subscriptionTracker.flowStrategy.start(
567+
new DefaultConsumerFlowStrategyContext(subscriptionId, client, messageCount));
578568
} else {
579569
LOGGER.debug(
580570
"Could not find stream subscription {} or subscription closing, not providing credits",
@@ -1208,4 +1198,36 @@ public ClientClosedException() {
12081198
super("Client already closed");
12091199
}
12101200
}
1201+
1202+
private static class DefaultConsumerFlowStrategyContext implements ConsumerFlowStrategy.Context {
1203+
1204+
private final byte subscriptionId;
1205+
private final Client client;
1206+
private final long messageCount;
1207+
1208+
private DefaultConsumerFlowStrategyContext(
1209+
byte subscriptionId, Client client, long messageCount) {
1210+
this.subscriptionId = subscriptionId;
1211+
this.client = client;
1212+
this.messageCount = messageCount;
1213+
}
1214+
1215+
@Override
1216+
public void credits(int credits) {
1217+
try {
1218+
client.credit(subscriptionId, credits);
1219+
} catch (Exception e) {
1220+
LOGGER.info(
1221+
"Error while providing {} credit(s) to subscription {}: {}",
1222+
credits,
1223+
subscriptionId,
1224+
e.getMessage());
1225+
}
1226+
}
1227+
1228+
@Override
1229+
public long messageCount() {
1230+
return messageCount;
1231+
}
1232+
}
12111233
}

src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ void nameShouldBeSetIfTrackingStrategyIsSet() {
134134
@BrokerVersionAtLeast(BrokerVersion.RABBITMQ_3_11)
135135
void committedOffsetShouldBeSet() throws Exception {
136136
int messageCount = 20_000;
137-
TestUtils.publishAndWaitForConfirms(cf, messageCount, this.stream);
137+
publishAndWaitForConfirms(cf, messageCount, this.stream);
138138

139139
CountDownLatch consumeLatch = new CountDownLatch(messageCount);
140140
AtomicLong committedOffset = new AtomicLong();
@@ -196,7 +196,7 @@ void consume() throws Exception {
196196
@Test
197197
void consumeWithAsyncConsumerFlowControl() throws Exception {
198198
int messageCount = 100_000;
199-
TestUtils.publishAndWaitForConfirms(cf, messageCount, stream);
199+
publishAndWaitForConfirms(cf, messageCount, stream);
200200

201201
ConsumerBuilder consumerBuilder =
202202
environment.consumerBuilder().stream(stream)
@@ -241,7 +241,7 @@ void consumeWithAsyncConsumerFlowControl() throws Exception {
241241
@Test
242242
void asynchronousProcessingWithFlowControl() {
243243
int messageCount = 100_000;
244-
TestUtils.publishAndWaitForConfirms(cf, messageCount, stream);
244+
publishAndWaitForConfirms(cf, messageCount, stream);
245245

246246
ExecutorService executorService =
247247
Executors.newFixedThreadPool(getRuntime().availableProcessors());
@@ -253,13 +253,12 @@ void asynchronousProcessingWithFlowControl() {
253253
.strategy(creditWhenHalfMessagesProcessed())
254254
.builder()
255255
.messageHandler(
256-
(ctx, message) -> {
257-
executorService.submit(
258-
() -> {
259-
latch.countDown();
260-
ctx.processed();
261-
});
262-
})
256+
(ctx, message) ->
257+
executorService.submit(
258+
() -> {
259+
latch.countDown();
260+
ctx.processed();
261+
}))
263262
.build();
264263
assertThat(latch).is(completed());
265264
} finally {

0 commit comments

Comments
 (0)