Skip to content

Commit 8115ef3

Browse files
committed
Experiment with consumer flow strategy
Rough draft.
1 parent 6df7fda commit 8115ef3

31 files changed

+601
-295
lines changed

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,8 @@ interface FlowConfiguration {
235235
*/
236236
FlowConfiguration initialCredits(int initialCredits);
237237

238+
FlowConfiguration strategy(ConsumerFlowStrategy strategy);
239+
238240
/**
239241
* Go back to the builder.
240242
*
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
// Copyright (c) 2023 VMware, Inc. or its affiliates. All rights reserved.
2+
//
3+
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
4+
// Mozilla Public License 2.0 ("MPL"), and the Apache License version 2 ("ASL").
5+
// For the MPL, please see LICENSE-MPL-RabbitMQ. For the ASL,
6+
// please see LICENSE-APACHE2.
7+
//
8+
// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,
9+
// either express or implied. See the LICENSE file for specific language governing
10+
// rights and limitations of this software.
11+
//
12+
// If you have any questions regarding licensing, please contact us at
13+
14+
package com.rabbitmq.stream;
15+
16+
import java.util.concurrent.atomic.AtomicLong;
17+
import java.util.function.LongConsumer;
18+
19+
public interface ConsumerFlowStrategy {
20+
21+
int initialCredits();
22+
23+
LongConsumer start(Context context);
24+
25+
interface Context {
26+
27+
void credits(int credits);
28+
29+
long messageCount();
30+
}
31+
32+
class DefaultConsumerFlowStrategy implements ConsumerFlowStrategy {
33+
34+
private final int initialCredits;
35+
36+
public DefaultConsumerFlowStrategy(int initialCredits) {
37+
this.initialCredits = initialCredits;
38+
}
39+
40+
@Override
41+
public int initialCredits() {
42+
return this.initialCredits;
43+
}
44+
45+
@Override
46+
public LongConsumer start(Context context) {
47+
context.credits(1);
48+
return value -> {};
49+
}
50+
}
51+
52+
class MessageCountConsumerFlowStrategy implements ConsumerFlowStrategy {
53+
54+
private final int initialCredits;
55+
56+
public MessageCountConsumerFlowStrategy(int initialCredits) {
57+
this.initialCredits = initialCredits;
58+
}
59+
60+
@Override
61+
public int initialCredits() {
62+
return this.initialCredits;
63+
}
64+
65+
@Override
66+
public LongConsumer start(Context context) {
67+
AtomicLong processedMessages = new AtomicLong(0);
68+
long limit = context.messageCount() == 1 ? 1 : context.messageCount() / 2;
69+
return messageOffset -> {
70+
if (processedMessages.incrementAndGet() == limit) {
71+
context.credits(1);
72+
}
73+
};
74+
}
75+
}
76+
}

src/main/java/com/rabbitmq/stream/MessageHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,5 +84,7 @@ interface Context {
8484
* @see Consumer#store(long)
8585
*/
8686
Consumer consumer();
87+
88+
void processed();
8789
}
8890
}

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1657,7 +1657,8 @@ public interface ChunkListener {
16571657
* @param messageCount the total number of messages in the chunk
16581658
* @param dataSize the size in bytes of the data in the chunk
16591659
*/
1660-
void handle(Client client, byte subscriptionId, long offset, long messageCount, long dataSize);
1660+
Object handle(
1661+
Client client, byte subscriptionId, long offset, long messageCount, long dataSize);
16611662
}
16621663

16631664
public interface MessageListener {
@@ -1667,6 +1668,7 @@ void handle(
16671668
long offset,
16681669
long chunkTimestamp,
16691670
long committedChunkId,
1671+
Object chunkContext,
16701672
Message message);
16711673
}
16721674

@@ -2209,9 +2211,9 @@ public static class ClientParameters {
22092211
private PublishConfirmListener publishConfirmListener = NO_OP_PUBLISH_CONFIRM_LISTENER;
22102212
private PublishErrorListener publishErrorListener = NO_OP_PUBLISH_ERROR_LISTENER;
22112213
private ChunkListener chunkListener =
2212-
(client, correlationId, offset, messageCount, dataSize) -> {};
2214+
(client, correlationId, offset, messageCount, dataSize) -> null;
22132215
private MessageListener messageListener =
2214-
(correlationId, offset, chunkTimestamp, committedOffset, message) -> {};
2216+
(correlationId, offset, chunkTimestamp, committedOffset, chunkContext, message) -> {};
22152217
private MetadataListener metadataListener = (stream, code) -> {};
22162218
private CreditNotification creditNotification =
22172219
(subscriptionId, responseCode) ->

src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,9 @@
2121
import static com.rabbitmq.stream.impl.Utils.namedRunnable;
2222
import static com.rabbitmq.stream.impl.Utils.quote;
2323

24-
import com.rabbitmq.stream.BackOffDelayPolicy;
25-
import com.rabbitmq.stream.Constants;
24+
import com.rabbitmq.stream.*;
2625
import com.rabbitmq.stream.Consumer;
27-
import com.rabbitmq.stream.MessageHandler;
2826
import com.rabbitmq.stream.MessageHandler.Context;
29-
import com.rabbitmq.stream.OffsetSpecification;
30-
import com.rabbitmq.stream.StreamDoesNotExistException;
31-
import com.rabbitmq.stream.StreamException;
32-
import com.rabbitmq.stream.StreamNotAvailableException;
33-
import com.rabbitmq.stream.SubscriptionListener;
3427
import com.rabbitmq.stream.SubscriptionListener.SubscriptionContext;
3528
import com.rabbitmq.stream.impl.Client.Broker;
3629
import com.rabbitmq.stream.impl.Client.ChunkListener;
@@ -61,8 +54,7 @@
6154
import java.util.concurrent.atomic.AtomicBoolean;
6255
import java.util.concurrent.atomic.AtomicLong;
6356
import java.util.concurrent.atomic.AtomicReference;
64-
import java.util.function.Function;
65-
import java.util.function.Predicate;
57+
import java.util.function.*;
6658
import java.util.stream.Collectors;
6759
import java.util.stream.IntStream;
6860
import org.slf4j.Logger;
@@ -122,8 +114,7 @@ Runnable subscribe(
122114
Runnable trackingClosingCallback,
123115
MessageHandler messageHandler,
124116
Map<String, String> subscriptionProperties,
125-
int initialCredits,
126-
int additionalCredits) {
117+
ConsumerFlowStrategy flowStrategy) {
127118
List<Client.Broker> candidates = findBrokersForStream(stream);
128119
Client.Broker newNode = pickBroker(candidates);
129120
if (newNode == null) {
@@ -143,8 +134,7 @@ Runnable subscribe(
143134
trackingClosingCallback,
144135
messageHandler,
145136
subscriptionProperties,
146-
initialCredits,
147-
additionalCredits);
137+
flowStrategy);
148138

149139
try {
150140
addToManager(newNode, subscriptionTracker, offsetSpecification, true);
@@ -403,8 +393,7 @@ private static class SubscriptionTracker {
403393
private volatile ClientSubscriptionsManager manager;
404394
private volatile AtomicReference<SubscriptionState> state =
405395
new AtomicReference<>(SubscriptionState.OPENING);
406-
private final int initialCredits;
407-
private final int additionalCredits;
396+
private final ConsumerFlowStrategy flowStrategy;
408397

409398
private SubscriptionTracker(
410399
long id,
@@ -416,8 +405,7 @@ private SubscriptionTracker(
416405
Runnable trackingClosingCallback,
417406
MessageHandler messageHandler,
418407
Map<String, String> subscriptionProperties,
419-
int initialCredits,
420-
int additionalCredits) {
408+
ConsumerFlowStrategy flowStrategy) {
421409
this.id = id;
422410
this.consumer = consumer;
423411
this.stream = stream;
@@ -426,8 +414,7 @@ private SubscriptionTracker(
426414
this.subscriptionListener = subscriptionListener;
427415
this.trackingClosingCallback = trackingClosingCallback;
428416
this.messageHandler = messageHandler;
429-
this.initialCredits = initialCredits;
430-
this.additionalCredits = additionalCredits;
417+
this.flowStrategy = flowStrategy;
431418
if (this.offsetTrackingReference == null) {
432419
this.subscriptionProperties = subscriptionProperties;
433420
} else {
@@ -497,13 +484,19 @@ private static final class MessageHandlerContext implements Context {
497484
private final long timestamp;
498485
private final long committedOffset;
499486
private final StreamConsumer consumer;
487+
private final LongConsumer processedCallback;
500488

501489
private MessageHandlerContext(
502-
long offset, long timestamp, long committedOffset, StreamConsumer consumer) {
490+
long offset,
491+
long timestamp,
492+
long committedOffset,
493+
StreamConsumer consumer,
494+
LongConsumer processedCallback) {
503495
this.offset = offset;
504496
this.timestamp = timestamp;
505497
this.committedOffset = committedOffset;
506498
this.consumer = consumer;
499+
this.processedCallback = processedCallback;
507500
}
508501

509502
@Override
@@ -534,6 +527,11 @@ public String stream() {
534527
public Consumer consumer() {
535528
return this.consumer;
536529
}
530+
531+
@Override
532+
public void processed() {
533+
this.processedCallback.accept(this.offset);
534+
}
537535
}
538536

539537
/**
@@ -569,13 +567,28 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
569567
(client, subscriptionId, offset, messageCount, dataSize) -> {
570568
SubscriptionTracker subscriptionTracker =
571569
subscriptionTrackers.get(subscriptionId & 0xFF);
570+
LongConsumer processCallback;
572571
if (subscriptionTracker != null && subscriptionTracker.consumer.isOpen()) {
573-
client.credit(subscriptionId, subscriptionTracker.additionalCredits);
572+
ConsumerFlowStrategy.Context chunkContext =
573+
new ConsumerFlowStrategy.Context() {
574+
@Override
575+
public void credits(int credits) {
576+
client.credit(subscriptionId, 1);
577+
}
578+
579+
@Override
580+
public long messageCount() {
581+
return messageCount;
582+
}
583+
};
584+
processCallback = subscriptionTracker.flowStrategy.start(chunkContext);
574585
} else {
575586
LOGGER.debug(
576587
"Could not find stream subscription {} or subscription closing, not providing credits",
577588
subscriptionId & 0xFF);
589+
processCallback = null;
578590
}
591+
return processCallback;
579592
};
580593

581594
CreditNotification creditNotification =
@@ -591,17 +604,20 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
591604
};
592605

593606
MessageListener messageListener =
594-
(subscriptionId, offset, chunkTimestamp, committedOffset, message) -> {
607+
(subscriptionId, offset, chunkTimestamp, committedOffset, chunkContext, message) -> {
595608
SubscriptionTracker subscriptionTracker =
596609
subscriptionTrackers.get(subscriptionId & 0xFF);
597610
if (subscriptionTracker != null) {
598611
subscriptionTracker.offset = offset;
599612
subscriptionTracker.hasReceivedSomething = true;
600613
subscriptionTracker.messageHandler.handle(
601614
new MessageHandlerContext(
602-
offset, chunkTimestamp, committedOffset, subscriptionTracker.consumer),
615+
offset,
616+
chunkTimestamp,
617+
committedOffset,
618+
subscriptionTracker.consumer,
619+
(LongConsumer) chunkContext),
603620
message);
604-
// FIXME set offset here as well, best effort to avoid duplicates?
605621
} else {
606622
LOGGER.debug(
607623
"Could not find stream subscription {} in manager {}, node {}",
@@ -969,7 +985,7 @@ synchronized void add(
969985
subId,
970986
subscriptionTracker.stream,
971987
subscriptionContext.offsetSpecification(),
972-
subscriptionTracker.initialCredits,
988+
subscriptionTracker.flowStrategy.initialCredits(),
973989
subscriptionTracker.subscriptionProperties),
974990
RETRY_ON_TIMEOUT,
975991
"Subscribe request for consumer %s on stream '%s'",

src/main/java/com/rabbitmq/stream/impl/ServerFrameHandler.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,8 @@ static int handleMessage(
352352
long committedChunkId,
353353
Codec codec,
354354
MessageListener messageListener,
355-
byte subscriptionId) {
355+
byte subscriptionId,
356+
Object chunkContext) {
356357
int entrySize = bb.readInt();
357358
read += 4;
358359
byte[] data = new byte[entrySize];
@@ -363,7 +364,8 @@ static int handleMessage(
363364
messageFiltered.set(true);
364365
} else {
365366
Message message = codec.decode(data);
366-
messageListener.handle(subscriptionId, offset, chunkTimestamp, committedChunkId, message);
367+
messageListener.handle(
368+
subscriptionId, offset, chunkTimestamp, committedChunkId, chunkContext, message);
367369
}
368370
return read;
369371
}
@@ -449,7 +451,8 @@ static int handleDeliver(
449451
message.readInt(); // 4 reserved bytes, unused here
450452
read += 4;
451453

452-
chunkListener.handle(client, subscriptionId, offset, numRecords, dataLength);
454+
Object chunkContext =
455+
chunkListener.handle(client, subscriptionId, offset, numRecords, dataLength);
453456

454457
long offsetLimit = -1;
455458
if (!subscriptionOffsets.isEmpty()) {
@@ -500,8 +503,11 @@ static int handleDeliver(
500503
committedOffset,
501504
codec,
502505
messageListener,
503-
subscriptionId);
506+
subscriptionId,
507+
chunkContext);
504508
if (messageFiltered.get()) {
509+
// TODO add listener and pass chunk context in
510+
// this will be used e.g. to mark messages as "processed"
505511
messageFiltered.set(false);
506512
} else {
507513
messagesRead++;
@@ -566,7 +572,8 @@ static int handleDeliver(
566572
committedOffset,
567573
codec,
568574
messageListener,
569-
subscriptionId);
575+
subscriptionId,
576+
chunkContext);
570577
if (messageFiltered.get()) {
571578
messageFiltered.set(false);
572579
} else {

src/main/java/com/rabbitmq/stream/impl/StreamConsumer.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,8 @@
1818
import static com.rabbitmq.stream.impl.Utils.offsetBefore;
1919
import static java.time.Duration.ofMillis;
2020

21-
import com.rabbitmq.stream.Constants;
22-
import com.rabbitmq.stream.Consumer;
23-
import com.rabbitmq.stream.ConsumerUpdateListener;
24-
import com.rabbitmq.stream.MessageHandler;
21+
import com.rabbitmq.stream.*;
2522
import com.rabbitmq.stream.MessageHandler.Context;
26-
import com.rabbitmq.stream.NoOffsetException;
27-
import com.rabbitmq.stream.OffsetSpecification;
28-
import com.rabbitmq.stream.StreamException;
29-
import com.rabbitmq.stream.SubscriptionListener;
3023
import com.rabbitmq.stream.impl.Client.QueryOffsetResponse;
3124
import com.rabbitmq.stream.impl.StreamConsumerBuilder.TrackingConfiguration;
3225
import com.rabbitmq.stream.impl.StreamEnvironment.TrackingConsumerRegistration;
@@ -81,8 +74,7 @@ class StreamConsumer implements Consumer {
8174
SubscriptionListener subscriptionListener,
8275
Map<String, String> subscriptionProperties,
8376
ConsumerUpdateListener consumerUpdateListener,
84-
int initialCredits,
85-
int additionalCredits) {
77+
ConsumerFlowStrategy flowStrategy) {
8678

8779
this.id = ID_SEQUENCE.getAndIncrement();
8880
Runnable trackingClosingCallback;
@@ -256,8 +248,7 @@ class StreamConsumer implements Consumer {
256248
trackingClosingCallback,
257249
closedAwareMessageHandler,
258250
Collections.unmodifiableMap(subscriptionProperties),
259-
initialCredits,
260-
additionalCredits);
251+
flowStrategy);
261252

262253
this.status = Status.RUNNING;
263254
};

0 commit comments

Comments
 (0)