Skip to content

Commit 8e5aae4

Browse files
authored
Merge pull request #372 from rabbitmq/configure-initial-credits
Make subscription initial credits configurable
2 parents f118d0d + bb36c28 commit 8e5aae4

File tree

6 files changed

+78
-101
lines changed

6 files changed

+78
-101
lines changed

src/main/java/com/rabbitmq/stream/ConsumerBuilder.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,13 @@ public interface ConsumerBuilder {
151151
*/
152152
ConsumerBuilder noTrackingStrategy();
153153

154+
/**
155+
* Configure flow of messages.
156+
*
157+
* @return the flow configuration
158+
*/
159+
FlowConfiguration flow();
160+
154161
/**
155162
* Create the configured {@link Consumer}
156163
*
@@ -209,4 +216,25 @@ interface AutoTrackingStrategy {
209216
*/
210217
ConsumerBuilder builder();
211218
}
219+
220+
/** Message flow configuration. */
221+
interface FlowConfiguration {
222+
223+
/**
224+
* The number of initial credits for the subscription.
225+
*
226+
* <p>Default is 1.
227+
*
228+
* @param initialCredits the number of initial credits
229+
* @return this configuration instance
230+
*/
231+
FlowConfiguration initialCredits(int initialCredits);
232+
233+
/**
234+
* Go back to the builder.
235+
*
236+
* @return the consumer builder
237+
*/
238+
ConsumerBuilder builder();
239+
}
212240
}

src/main/java/com/rabbitmq/stream/impl/Client.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1113,17 +1113,17 @@ public Response subscribe(
11131113
* @param subscriptionId identifier to correlate inbound messages to this subscription
11141114
* @param stream the stream to consume from
11151115
* @param offsetSpecification the specification of the offset to consume from
1116-
* @param credit the initial number of credits
1116+
* @param initialCredits the initial number of credits
11171117
* @param properties some optional properties to describe the subscription
11181118
* @return the subscription confirmation
11191119
*/
11201120
public Response subscribe(
11211121
byte subscriptionId,
11221122
String stream,
11231123
OffsetSpecification offsetSpecification,
1124-
int credit,
1124+
int initialCredits,
11251125
Map<String, String> properties) {
1126-
if (credit < 0 || credit > Short.MAX_VALUE) {
1126+
if (initialCredits < 0 || initialCredits > Short.MAX_VALUE) {
11271127
throw new IllegalArgumentException("Credit value must be between 0 and " + Short.MAX_VALUE);
11281128
}
11291129
int length = 2 + 2 + 4 + 1 + 2 + stream.length() + 2 + 2; // misses the offset
@@ -1152,7 +1152,7 @@ public Response subscribe(
11521152
if (offsetSpecification.isOffset() || offsetSpecification.isTimestamp()) {
11531153
bb.writeLong(offsetSpecification.getOffset());
11541154
}
1155-
bb.writeShort(credit);
1155+
bb.writeShort(initialCredits);
11561156
if (properties != null && !properties.isEmpty()) {
11571157
bb.writeInt(properties.size());
11581158
for (Map.Entry<String, String> entry : properties.entrySet()) {

src/main/java/com/rabbitmq/stream/impl/StreamConsumerBuilder.java

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,9 @@ class StreamConsumerBuilder implements ConsumerBuilder {
4040
private boolean noTrackingStrategy = false;
4141
private boolean lazyInit = false;
4242
private SubscriptionListener subscriptionListener = subscriptionContext -> {};
43-
private Map<String, String> subscriptionProperties = new ConcurrentHashMap<>();
43+
private final Map<String, String> subscriptionProperties = new ConcurrentHashMap<>();
4444
private ConsumerUpdateListener consumerUpdateListener;
45-
private int initialCredits = 1;
46-
private int additionalCredits = 1;
45+
private final DefaultFlowConfiguration flowConfiguration = new DefaultFlowConfiguration(this);
4746

4847
public StreamConsumerBuilder(StreamEnvironment environment) {
4948
this.environment = environment;
@@ -132,13 +131,9 @@ public ConsumerBuilder noTrackingStrategy() {
132131
return this;
133132
}
134133

135-
public ConsumerBuilder credits(int initial, int onChunkDelivery) {
136-
if (initial <= 0 || onChunkDelivery <= 0) {
137-
throw new IllegalArgumentException("Credits must be positive");
138-
}
139-
this.initialCredits = initial;
140-
this.additionalCredits = onChunkDelivery;
141-
return this;
134+
@Override
135+
public FlowConfiguration flow() {
136+
return this.flowConfiguration;
142137
}
143138

144139
StreamConsumerBuilder lazyInit(boolean lazyInit) {
@@ -204,8 +199,8 @@ public Consumer build() {
204199
this.subscriptionListener,
205200
this.subscriptionProperties,
206201
this.consumerUpdateListener,
207-
this.initialCredits,
208-
this.additionalCredits);
202+
this.flowConfiguration.initialCredits,
203+
this.flowConfiguration.additionalCredits);
209204
environment.addConsumer((StreamConsumer) consumer);
210205
} else {
211206
if (Utils.isSac(this.subscriptionProperties)) {
@@ -342,6 +337,32 @@ StreamConsumerBuilder duplicate() {
342337
return duplicate;
343338
}
344339

340+
private static class DefaultFlowConfiguration implements FlowConfiguration {
341+
342+
private final ConsumerBuilder consumerBuilder;
343+
344+
private DefaultFlowConfiguration(ConsumerBuilder consumerBuilder) {
345+
this.consumerBuilder = consumerBuilder;
346+
}
347+
348+
private int initialCredits = 1;
349+
private final int additionalCredits = 1;
350+
351+
@Override
352+
public FlowConfiguration initialCredits(int initialCredits) {
353+
if (initialCredits <= 0) {
354+
throw new IllegalArgumentException("Credits must be positive");
355+
}
356+
this.initialCredits = initialCredits;
357+
return this;
358+
}
359+
360+
@Override
361+
public ConsumerBuilder builder() {
362+
return this.consumerBuilder;
363+
}
364+
}
365+
345366
// to help testing
346367
public ConsumerUpdateListener consumerUpdateListener() {
347368
return consumerUpdateListener;

src/main/java/com/rabbitmq/stream/perf/StreamPerfTest.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import com.rabbitmq.stream.impl.Client;
4747
import com.rabbitmq.stream.metrics.MetricsCollector;
4848
import com.rabbitmq.stream.perf.ShutdownService.CloseCallback;
49-
import com.rabbitmq.stream.perf.Utils.CreditSettings;
5049
import com.rabbitmq.stream.perf.Utils.NamedThreadFactory;
5150
import com.rabbitmq.stream.perf.Utils.PerformanceMicrometerMetricsCollector;
5251
import io.micrometer.core.instrument.Counter;
@@ -415,13 +414,6 @@ public class StreamPerfTest implements Callable<Integer> {
415414
defaultValue = "false")
416415
private boolean metricsCommandLineArguments;
417416

418-
@CommandLine.Option(
419-
names = {"--credits", "-cr"},
420-
description = "initial and additional credits for subscriptions",
421-
defaultValue = "1:1",
422-
converter = Utils.CreditsTypeConverter.class)
423-
private CreditSettings credits;
424-
425417
@CommandLine.Option(
426418
names = {"--requested-max-frame-size", "-rmfs"},
427419
description = "maximum frame size to request",
@@ -472,6 +464,13 @@ static class InstanceSyncOptions {
472464
private String instanceSyncNamespace;
473465
}
474466

467+
@CommandLine.Option(
468+
names = {"--initial-credits", "-ic"},
469+
description = "initial credits for subscription",
470+
defaultValue = "1",
471+
converter = Utils.NotNegativeIntegerTypeConverter.class)
472+
private int initialCredits;
473+
475474
private MetricsCollector metricsCollector;
476475
private PerformanceMetrics performanceMetrics;
477476
private List<Monitoring> monitorings;
@@ -994,7 +993,12 @@ public Integer call() throws Exception {
994993
AtomicLong messageCount = new AtomicLong(0);
995994
String stream = stream(streams, i);
996995
ConsumerBuilder consumerBuilder =
997-
environment.consumerBuilder().offset(this.offset);
996+
environment
997+
.consumerBuilder()
998+
.offset(this.offset)
999+
.flow()
1000+
.initialCredits(this.initialCredits)
1001+
.builder();
9981002

9991003
if (this.superStreams) {
10001004
consumerBuilder.superStream(stream);

src/main/java/com/rabbitmq/stream/perf/Utils.java

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -649,65 +649,6 @@ public Integer convert(String input) {
649649
}
650650
}
651651

652-
static class CreditsTypeConverter implements CommandLine.ITypeConverter<CreditSettings> {
653-
654-
@Override
655-
public CreditSettings convert(String input) {
656-
String errorMessage =
657-
input + " is not a valid credits setting, " + "valid example values: 20:1, 15";
658-
if (input == null || input.trim().isEmpty()) {
659-
typeConversionException(errorMessage);
660-
}
661-
if (input.contains(":") || input.contains("-")) {
662-
String separator = input.contains(":") ? ":" : "-";
663-
String[] split = input.split(separator);
664-
if (split.length != 2) {
665-
typeConversionException(errorMessage);
666-
} else {
667-
int[] credits =
668-
Arrays.stream(split)
669-
.mapToInt(Integer::valueOf)
670-
.peek(
671-
c -> {
672-
if (c <= 0) {
673-
typeConversionException("credit values must be positive");
674-
}
675-
})
676-
.toArray();
677-
return new CreditSettings(credits[0], credits[1]);
678-
}
679-
}
680-
try {
681-
int value = Integer.parseInt(input);
682-
if (value <= 0) {
683-
typeConversionException("credit values must be positive");
684-
}
685-
return new CreditSettings(value, 1);
686-
} catch (Exception e) {
687-
typeConversionException(errorMessage);
688-
}
689-
return new CreditSettings(10, 1);
690-
}
691-
}
692-
693-
static class CreditSettings {
694-
695-
private final int initial, additional;
696-
697-
CreditSettings(int initial, int additional) {
698-
this.initial = initial;
699-
this.additional = additional;
700-
}
701-
702-
int initial() {
703-
return this.initial;
704-
}
705-
706-
int additional() {
707-
return this.additional;
708-
}
709-
}
710-
711652
private static void typeConversionException(String message) {
712653
throw new TypeConversionException(message);
713654
}

src/test/java/com/rabbitmq/stream/perf/UtilsTest.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import com.rabbitmq.stream.OffsetSpecification;
2525
import com.rabbitmq.stream.compression.Compression;
2626
import com.rabbitmq.stream.perf.Utils.CompressionTypeConverter;
27-
import com.rabbitmq.stream.perf.Utils.CreditSettings;
28-
import com.rabbitmq.stream.perf.Utils.CreditsTypeConverter;
2927
import com.rabbitmq.stream.perf.Utils.MetricsTagsTypeConverter;
3028
import com.rabbitmq.stream.perf.Utils.NameStrategyConverter;
3129
import com.rabbitmq.stream.perf.Utils.PatternNameStrategy;
@@ -370,21 +368,6 @@ void commandLineMetricsTest() {
370368
.isEqualTo("-x 1 -y 2");
371369
}
372370

373-
@ParameterizedTest
374-
@CsvSource({"10:1,10,1", "20:10,20,10", "20,20,1", "20-10,20,10"})
375-
void creditsConverterOk(String input, int expectedInitial, int expectedAdditional) {
376-
CreditSettings credits = new CreditsTypeConverter().convert(input);
377-
assertThat(credits.initial()).isEqualTo(expectedInitial);
378-
assertThat(credits.additional()).isEqualTo(expectedAdditional);
379-
}
380-
381-
@ParameterizedTest
382-
@ValueSource(strings = {"foo", "-20:10", "20:-1"})
383-
void creditsConverterKo(String input) {
384-
assertThatThrownBy(() -> new CreditsTypeConverter().convert(input))
385-
.isInstanceOf(TypeConversionException.class);
386-
}
387-
388371
@Command(name = "test-command")
389372
static class TestCommand {
390373

0 commit comments

Comments
 (0)