Skip to content

Commit a067613

Browse files
committed
Consider max unconfirmed messages in dynamic batch configuration
A small value for max unconfirmed messages can impact the dynamic batch mechanism. This commit sets the min batch size to half the max unconfirmed messages value if it is less than the configured batch size. References #757
1 parent 3bcea64 commit a067613

File tree

5 files changed

+23
-12
lines changed

5 files changed

+23
-12
lines changed

src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,22 @@
2828
final class DynamicBatch<T> implements AutoCloseable {
2929

3030
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class);
31-
private static final int MIN_BATCH_SIZE = 32;
32-
private static final int MAX_BATCH_SIZE = 8192;
31+
private static final int MIN_BATCH_SIZE = 16;
3332

3433
private final BlockingQueue<T> requests = new LinkedBlockingQueue<>();
3534
private final BatchConsumer<T> consumer;
36-
private final int configuredBatchSize;
35+
private final int configuredBatchSize, minBatchSize, maxBatchSize;
3736
private final Thread thread;
3837

39-
DynamicBatch(BatchConsumer<T> consumer, int batchSize) {
38+
DynamicBatch(BatchConsumer<T> consumer, int batchSize, int maxUnconfirmed) {
4039
this.consumer = consumer;
41-
this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE);
40+
if (batchSize < maxUnconfirmed) {
41+
this.minBatchSize = min(MIN_BATCH_SIZE, batchSize / 2);
42+
} else {
43+
this.minBatchSize = min(1, maxUnconfirmed / 2);
44+
}
45+
this.configuredBatchSize = batchSize;
46+
this.maxBatchSize = batchSize * 2;
4247
this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop);
4348
this.thread.start();
4449
}
@@ -104,9 +109,9 @@ private void maybeCompleteBatch(State<T> state, boolean increaseIfCompleted) {
104109
boolean completed = this.consumer.process(state.items);
105110
if (completed) {
106111
if (increaseIfCompleted) {
107-
state.batchSize = min(state.batchSize * 2, MAX_BATCH_SIZE);
112+
state.batchSize = min(state.batchSize * 2, this.maxBatchSize);
108113
} else {
109-
state.batchSize = max(state.batchSize / 2, MIN_BATCH_SIZE);
114+
state.batchSize = max(state.batchSize / 2, this.minBatchSize);
110115
}
111116
state.items = new ArrayList<>(state.batchSize);
112117
}

src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
3838
DynamicBatchMessageAccumulator(
3939
int subEntrySize,
4040
int batchSize,
41+
int maxUnconfirmedMessages,
4142
Codec codec,
4243
int maxFrameSize,
4344
ToLongFunction<Message> publishSequenceFunction,
@@ -75,7 +76,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
7576
}
7677
return result;
7778
},
78-
batchSize);
79+
batchSize,
80+
maxUnconfirmedMessages);
7981
} else {
8082
byte compressionCode =
8183
compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
@@ -124,7 +126,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
124126
}
125127
return result;
126128
},
127-
batchSize * subEntrySize);
129+
batchSize * subEntrySize,
130+
maxUnconfirmedMessages);
128131
}
129132
}
130133

src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ static MessageAccumulator createMessageAccumulator(
3030
boolean dynamicBatch,
3131
int subEntrySize,
3232
int batchSize,
33+
int maxUnconfirmedMessages,
3334
CompressionCodec compressionCodec,
3435
Codec codec,
3536
ByteBufAllocator byteBufAllocator,
@@ -44,6 +45,7 @@ static MessageAccumulator createMessageAccumulator(
4445
return new DynamicBatchMessageAccumulator(
4546
subEntrySize,
4647
batchSize,
48+
maxUnconfirmedMessages,
4749
codec,
4850
maxFrameSize,
4951
publishSequenceFunction,

src/main/java/com/rabbitmq/stream/impl/StreamProducer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ public int fragmentLength(Object entity) {
180180
dynamicBatch,
181181
subEntrySize,
182182
batchSize,
183+
maxUnconfirmedMessages,
183184
compressionCodec,
184185
environment.codec(),
185186
environment.byteBufAllocator(),

src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ void itemAreProcessed() {
7171
sync.down(items.size());
7272
return true;
7373
};
74-
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100)) {
74+
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100, 10_000)) {
7575
RateLimiter rateLimiter = RateLimiter.create(10000);
7676
IntStream.range(0, itemCount)
7777
.forEach(
@@ -102,7 +102,7 @@ void failedProcessingIsReplayed() throws Exception {
102102
}
103103
return result;
104104
};
105-
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100)) {
105+
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100, 10_000)) {
106106
int firstRoundCount = itemCount / 5;
107107
IntStream.range(0, firstRoundCount)
108108
.forEach(
@@ -132,7 +132,7 @@ void lowThrottlingValueShouldStillHighPublishingRate() throws Exception {
132132
return true;
133133
};
134134

135-
try (DynamicBatch<Long> batch = new DynamicBatch<>(action, batchSize)) {
135+
try (DynamicBatch<Long> batch = new DynamicBatch<>(action, batchSize, 10_000)) {
136136
MetricRegistry metrics = new MetricRegistry();
137137
Meter rate = metrics.meter("publishing-rate");
138138
AtomicBoolean keepGoing = new AtomicBoolean(true);

0 commit comments

Comments
 (0)