Skip to content

Commit bb36c28

Browse files
committed
Make subscription initial credits configurable
Default is 1, but making it higher can help in some cases, depending on message size, network speed, etc.
1 parent f118d0d commit bb36c28

File tree

6 files changed

+78
-101
lines changed

6 files changed

+78
-101
lines changed

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,13 @@ public interface ConsumerBuilder {
151151
*/
152152
ConsumerBuilder noTrackingStrategy();
153153

154+
/**
155+
* Configure flow of messages.
156+
*
157+
* @return the flow configuration
158+
*/
159+
FlowConfiguration flow();
160+
154161
/**
155162
* Create the configured {@link Consumer}
156163
*
@@ -209,4 +216,25 @@ interface AutoTrackingStrategy {
209216
*/
210217
ConsumerBuilder builder();
211218
}
219+
220+
/** Message flow configuration. */
221+
interface FlowConfiguration {
222+
223+
/**
224+
* The number of initial credits for the subscription.
225+
*
226+
* <p>Default is 1.
227+
*
228+
* @param initialCredits the number of initial credits
229+
* @return this configuration instance
230+
*/
231+
FlowConfiguration initialCredits(int initialCredits);
232+
233+
/**
234+
* Go back to the builder.
235+
*
236+
* @return the consumer builder
237+
*/
238+
ConsumerBuilder builder();
239+
}
212240
}

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,17 +1113,17 @@ public Response subscribe(
11131113
* @param subscriptionId identifier to correlate inbound messages to this subscription
11141114
* @param stream the stream to consume from
11151115
* @param offsetSpecification the specification of the offset to consume from
1116-
* @param credit the initial number of credits
1116+
* @param initialCredits the initial number of credits
11171117
* @param properties some optional properties to describe the subscription
11181118
* @return the subscription confirmation
11191119
*/
11201120
public Response subscribe(
11211121
byte subscriptionId,
11221122
String stream,
11231123
OffsetSpecification offsetSpecification,
1124-
int credit,
1124+
int initialCredits,
11251125
Map<String, String> properties) {
1126-
if (credit < 0 || credit > Short.MAX_VALUE) {
1126+
if (initialCredits < 0 || initialCredits > Short.MAX_VALUE) {
11271127
throw new IllegalArgumentException("Credit value must be between 0 and " + Short.MAX_VALUE);
11281128
}
11291129
int length = 2 + 2 + 4 + 1 + 2 + stream.length() + 2 + 2; // misses the offset
@@ -1152,7 +1152,7 @@ public Response subscribe(
11521152
if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) {
11531153
bb.writeLong(offsetSpecification.getOffset());
11541154
}
1155-
bb.writeShort(credit);
1155+
bb.writeShort(initialCredits);
11561156
if (properties != null && !properties.isEmpty()) {
11571157
bb.writeInt(properties.size());
11581158
for (Map.Entry<String, String> entry : properties.entrySet()) {

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,9 @@ class StreamConsumerBuilder implements ConsumerBuilder {
4040
private boolean noTrackingStrategy = false;
4141
private boolean lazyInit = false;
4242
private SubscriptionListener subscriptionListener = subscriptionContext -> {};
43-
private Map<String, String> subscriptionProperties = new ConcurrentHashMap<>();
43+
private final Map<String, String> subscriptionProperties = new ConcurrentHashMap<>();
4444
private ConsumerUpdateListener consumerUpdateListener;
45-
private int initialCredits = 1;
46-
private int additionalCredits = 1;
45+
private final DefaultFlowConfiguration flowConfiguration = new DefaultFlowConfiguration(this);
4746

4847
public StreamConsumerBuilder(StreamEnvironment environment) {
4948
this.environment = environment;
@@ -132,13 +131,9 @@ public ConsumerBuilder noTrackingStrategy() {
132131
return this;
133132
}
134133

135-
public ConsumerBuilder credits(int initial, int onChunkDelivery) {
136-
if (initial <= 0 || onChunkDelivery <= 0) {
137-
throw new IllegalArgumentException("Credits must be positive");
138-
}
139-
this.initialCredits = initial;
140-
this.additionalCredits = onChunkDelivery;
141-
return this;
134+
@Override
135+
public FlowConfiguration flow() {
136+
return this.flowConfiguration;
142137
}
143138

144139
StreamConsumerBuilder lazyInit(boolean lazyInit) {
@@ -204,8 +199,8 @@ public Consumer build() {
204199
this.subscriptionListener,
205200
this.subscriptionProperties,
206201
this.consumerUpdateListener,
207-
this.initialCredits,
208-
this.additionalCredits);
202+
this.flowConfiguration.initialCredits,
203+
this.flowConfiguration.additionalCredits);
209204
environment.addConsumer((StreamConsumer) consumer);
210205
} else {
211206
if (Utils.isSac(this.subscriptionProperties)) {
@@ -342,6 +337,32 @@ StreamConsumerBuilder duplicate() {
342337
return duplicate;
343338
}
344339

340+
private static class DefaultFlowConfiguration implements FlowConfiguration {
341+
342+
private final ConsumerBuilder consumerBuilder;
343+
344+
private DefaultFlowConfiguration(ConsumerBuilder consumerBuilder) {
345+
this.consumerBuilder = consumerBuilder;
346+
}
347+
348+
private int initialCredits = 1;
349+
private final int additionalCredits = 1;
350+
351+
@Override
352+
public FlowConfiguration initialCredits(int initialCredits) {
353+
if (initialCredits <= 0) {
354+
throw new IllegalArgumentException("Credits must be positive");
355+
}
356+
this.initialCredits = initialCredits;
357+
return this;
358+
}
359+
360+
@Override
361+
public ConsumerBuilder builder() {
362+
return this.consumerBuilder;
363+
}
364+
}
365+
345366
// to help testing
346367
public ConsumerUpdateListener consumerUpdateListener() {
347368
return consumerUpdateListener;

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import com.rabbitmq.stream.impl.Client;
4747
import com.rabbitmq.stream.metrics.MetricsCollector;
4848
import com.rabbitmq.stream.perf.ShutdownService.CloseCallback;
49-
import com.rabbitmq.stream.perf.Utils.CreditSettings;
5049
import com.rabbitmq.stream.perf.Utils.NamedThreadFactory;
5150
import com.rabbitmq.stream.perf.Utils.PerformanceMicrometerMetricsCollector;
5251
import io.micrometer.core.instrument.Counter;
@@ -415,13 +414,6 @@ public class StreamPerfTest implements Callable<Integer> {
415414
defaultValue = "false")
416415
private boolean metricsCommandLineArguments;
417416

418-
@CommandLine.Option(
419-
names = {"--credits", "-cr"},
420-
description = "initial and additional credits for subscriptions",
421-
defaultValue = "1:1",
422-
converter = Utils.CreditsTypeConverter.class)
423-
private CreditSettings credits;
424-
425417
@CommandLine.Option(
426418
names = {"--requested-max-frame-size", "-rmfs"},
427419
description = "maximum frame size to request",
@@ -472,6 +464,13 @@ static class InstanceSyncOptions {
472464
private String instanceSyncNamespace;
473465
}
474466

467+
@CommandLine.Option(
468+
names = {"--initial-credits", "-ic"},
469+
description = "initial credits for subscription",
470+
defaultValue = "1",
471+
converter = Utils.NotNegativeIntegerTypeConverter.class)
472+
private int initialCredits;
473+
475474
private MetricsCollector metricsCollector;
476475
private PerformanceMetrics performanceMetrics;
477476
private List<Monitoring> monitorings;
@@ -994,7 +993,12 @@ public Integer call() throws Exception {
994993
AtomicLong messageCount = new AtomicLong(0);
995994
String stream = stream(streams, i);
996995
ConsumerBuilder consumerBuilder =
997-
environment.consumerBuilder().offset(this.offset);
996+
environment
997+
.consumerBuilder()
998+
.offset(this.offset)
999+
.flow()
1000+
.initialCredits(this.initialCredits)
1001+
.builder();
9981002

9991003
if (this.superStreams) {
10001004
consumerBuilder.superStream(stream);

src/main/java/com/rabbitmq/stream/perf/Utils.java

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -649,65 +649,6 @@ public Integer convert(String input) {
649649
}
650650
}
651651

652-
static class CreditsTypeConverter implements CommandLine.ITypeConverter<CreditSettings> {
653-
654-
@Override
655-
public CreditSettings convert(String input) {
656-
String errorMessage =
657-
input + " is not a valid credits setting, " + "valid example values: 20:1, 15";
658-
if (input == null || input.trim().isEmpty()) {
659-
typeConversionException(errorMessage);
660-
}
661-
if (input.contains(":") || input.contains("-")) {
662-
String separator = input.contains(":") ? ":" : "-";
663-
String[] split = input.split(separator);
664-
if (split.length != 2) {
665-
typeConversionException(errorMessage);
666-
} else {
667-
int[] credits =
668-
Arrays.stream(split)
669-
.mapToInt(Integer::valueOf)
670-
.peek(
671-
c -> {
672-
if (c <= 0) {
673-
typeConversionException("credit values must be positive");
674-
}
675-
})
676-
.toArray();
677-
return new CreditSettings(credits[0], credits[1]);
678-
}
679-
}
680-
try {
681-
int value = Integer.parseInt(input);
682-
if (value <= 0) {
683-
typeConversionException("credit values must be positive");
684-
}
685-
return new CreditSettings(value, 1);
686-
} catch (Exception e) {
687-
typeConversionException(errorMessage);
688-
}
689-
return new CreditSettings(10, 1);
690-
}
691-
}
692-
693-
static class CreditSettings {
694-
695-
private final int initial, additional;
696-
697-
CreditSettings(int initial, int additional) {
698-
this.initial = initial;
699-
this.additional = additional;
700-
}
701-
702-
int initial() {
703-
return this.initial;
704-
}
705-
706-
int additional() {
707-
return this.additional;
708-
}
709-
}
710-
711652
private static void typeConversionException(String message) {
712653
throw new TypeConversionException(message);
713654
}

src/test/java/com/rabbitmq/stream/perf/UtilsTest.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import com.rabbitmq.stream.OffsetSpecification;
2525
import com.rabbitmq.stream.compression.Compression;
2626
import com.rabbitmq.stream.perf.Utils.CompressionTypeConverter;
27-
import com.rabbitmq.stream.perf.Utils.CreditSettings;
28-
import com.rabbitmq.stream.perf.Utils.CreditsTypeConverter;
2927
import com.rabbitmq.stream.perf.Utils.MetricsTagsTypeConverter;
3028
import com.rabbitmq.stream.perf.Utils.NameStrategyConverter;
3129
import com.rabbitmq.stream.perf.Utils.PatternNameStrategy;
@@ -370,21 +368,6 @@ void commandLineMetricsTest() {
370368
.isEqualTo("-x 1 -y 2");
371369
}
372370

373-
@ParameterizedTest
374-
@CsvSource({"10:1,10,1", "20:10,20,10", "20,20,1", "20-10,20,10"})
375-
void creditsConverterOk(String input, int expectedInitial, int expectedAdditional) {
376-
CreditSettings credits = new CreditsTypeConverter().convert(input);
377-
assertThat(credits.initial()).isEqualTo(expectedInitial);
378-
assertThat(credits.additional()).isEqualTo(expectedAdditional);
379-
}
380-
381-
@ParameterizedTest
382-
@ValueSource(strings = {"foo", "-20:10", "20:-1"})
383-
void creditsConverterKo(String input) {
384-
assertThatThrownBy(() -> new CreditsTypeConverter().convert(input))
385-
.isInstanceOf(TypeConversionException.class);
386-
}
387-
388371
@Command(name = "test-command")
389372
static class TestCommand {
390373

0 commit comments

Comments
 (0)