Skip to content

Consider max unconfirmed messages in dynamic batch configuration #773

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,22 @@
final class DynamicBatch<T> implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class);
private static final int MIN_BATCH_SIZE = 32;
private static final int MAX_BATCH_SIZE = 8192;
private static final int MIN_BATCH_SIZE = 16;

private final BlockingQueue<T> requests = new LinkedBlockingQueue<>();
private final BatchConsumer<T> consumer;
private final int configuredBatchSize;
private final int configuredBatchSize, minBatchSize, maxBatchSize;
private final Thread thread;

DynamicBatch(BatchConsumer<T> consumer, int batchSize) {
DynamicBatch(BatchConsumer<T> consumer, int batchSize, int maxUnconfirmed) {
this.consumer = consumer;
this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE);
if (batchSize < maxUnconfirmed) {
this.minBatchSize = min(MIN_BATCH_SIZE, batchSize / 2);
} else {
this.minBatchSize = min(1, maxUnconfirmed / 2);
}
this.configuredBatchSize = batchSize;
this.maxBatchSize = batchSize * 2;
this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop);
this.thread.start();
}
Expand Down Expand Up @@ -104,9 +109,9 @@ private void maybeCompleteBatch(State<T> state, boolean increaseIfCompleted) {
boolean completed = this.consumer.process(state.items);
if (completed) {
if (increaseIfCompleted) {
state.batchSize = min(state.batchSize * 2, MAX_BATCH_SIZE);
state.batchSize = min(state.batchSize * 2, this.maxBatchSize);
} else {
state.batchSize = max(state.batchSize / 2, MIN_BATCH_SIZE);
state.batchSize = max(state.batchSize / 2, this.minBatchSize);
}
state.items = new ArrayList<>(state.batchSize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
DynamicBatchMessageAccumulator(
int subEntrySize,
int batchSize,
int maxUnconfirmedMessages,
Codec codec,
int maxFrameSize,
ToLongFunction<Message> publishSequenceFunction,
Expand Down Expand Up @@ -75,7 +76,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
}
return result;
},
batchSize);
batchSize,
maxUnconfirmedMessages);
} else {
byte compressionCode =
compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
Expand Down Expand Up @@ -124,7 +126,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
}
return result;
},
batchSize * subEntrySize);
batchSize * subEntrySize,
maxUnconfirmedMessages);
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ static MessageAccumulator createMessageAccumulator(
boolean dynamicBatch,
int subEntrySize,
int batchSize,
int maxUnconfirmedMessages,
CompressionCodec compressionCodec,
Codec codec,
ByteBufAllocator byteBufAllocator,
Expand All @@ -44,6 +45,7 @@ static MessageAccumulator createMessageAccumulator(
return new DynamicBatchMessageAccumulator(
subEntrySize,
batchSize,
maxUnconfirmedMessages,
codec,
maxFrameSize,
publishSequenceFunction,
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/rabbitmq/stream/impl/StreamProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public int fragmentLength(Object entity) {
dynamicBatch,
subEntrySize,
batchSize,
maxUnconfirmedMessages,
compressionCodec,
environment.codec(),
environment.byteBufAllocator(),
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void itemAreProcessed() {
sync.down(items.size());
return true;
};
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100)) {
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100, 10_000)) {
RateLimiter rateLimiter = RateLimiter.create(10000);
IntStream.range(0, itemCount)
.forEach(
Expand Down Expand Up @@ -102,7 +102,7 @@ void failedProcessingIsReplayed() throws Exception {
}
return result;
};
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100)) {
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100, 10_000)) {
int firstRoundCount = itemCount / 5;
IntStream.range(0, firstRoundCount)
.forEach(
Expand Down Expand Up @@ -132,7 +132,7 @@ void lowThrottlingValueShouldStillHighPublishingRate() throws Exception {
return true;
};

try (DynamicBatch<Long> batch = new DynamicBatch<>(action, batchSize)) {
try (DynamicBatch<Long> batch = new DynamicBatch<>(action, batchSize, 10_000)) {
MetricRegistry metrics = new MetricRegistry();
Meter rate = metrics.meter("publishing-rate");
AtomicBoolean keepGoing = new AtomicBoolean(true);
Expand Down