Skip to content

Commit ce66ebb

Browse files
authored
Merge pull request #773 from rabbitmq/configure-dynamic-batch-with-max-unconfirmed
Consider max unconfirmed messages in dynamic batch configuration
2 parents 3bcea64 + a067613 commit ce66ebb

File tree

5 files changed

+23
-12
lines changed

5 files changed

+23
-12
lines changed

src/main/java/com/rabbitmq/stream/impl/DynamicBatch.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,22 @@
2828
final class DynamicBatch<T> implements AutoCloseable {
2929

3030
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBatch.class);
31-
private static final int MIN_BATCH_SIZE = 32;
32-
private static final int MAX_BATCH_SIZE = 8192;
31+
private static final int MIN_BATCH_SIZE = 16;
3332

3433
private final BlockingQueue<T> requests = new LinkedBlockingQueue<>();
3534
private final BatchConsumer<T> consumer;
36-
private final int configuredBatchSize;
35+
private final int configuredBatchSize, minBatchSize, maxBatchSize;
3736
private final Thread thread;
3837

39-
DynamicBatch(BatchConsumer<T> consumer, int batchSize) {
38+
DynamicBatch(BatchConsumer<T> consumer, int batchSize, int maxUnconfirmed) {
4039
this.consumer = consumer;
41-
this.configuredBatchSize = min(max(batchSize, MIN_BATCH_SIZE), MAX_BATCH_SIZE);
40+
if (batchSize < maxUnconfirmed) {
41+
this.minBatchSize = min(MIN_BATCH_SIZE, batchSize / 2);
42+
} else {
43+
this.minBatchSize = min(1, maxUnconfirmed / 2);
44+
}
45+
this.configuredBatchSize = batchSize;
46+
this.maxBatchSize = batchSize * 2;
4247
this.thread = ConcurrencyUtils.defaultThreadFactory().newThread(this::loop);
4348
this.thread.start();
4449
}
@@ -104,9 +109,9 @@ private void maybeCompleteBatch(State<T> state, boolean increaseIfCompleted) {
104109
boolean completed = this.consumer.process(state.items);
105110
if (completed) {
106111
if (increaseIfCompleted) {
107-
state.batchSize = min(state.batchSize * 2, MAX_BATCH_SIZE);
112+
state.batchSize = min(state.batchSize * 2, this.maxBatchSize);
108113
} else {
109-
state.batchSize = max(state.batchSize / 2, MIN_BATCH_SIZE);
114+
state.batchSize = max(state.batchSize / 2, this.minBatchSize);
110115
}
111116
state.items = new ArrayList<>(state.batchSize);
112117
}

src/main/java/com/rabbitmq/stream/impl/DynamicBatchMessageAccumulator.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
3838
DynamicBatchMessageAccumulator(
3939
int subEntrySize,
4040
int batchSize,
41+
int maxUnconfirmedMessages,
4142
Codec codec,
4243
int maxFrameSize,
4344
ToLongFunction<Message> publishSequenceFunction,
@@ -75,7 +76,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
7576
}
7677
return result;
7778
},
78-
batchSize);
79+
batchSize,
80+
maxUnconfirmedMessages);
7981
} else {
8082
byte compressionCode =
8183
compressionCodec == null ? Compression.NONE.code() : compressionCodec.code();
@@ -124,7 +126,8 @@ final class DynamicBatchMessageAccumulator implements MessageAccumulator {
124126
}
125127
return result;
126128
},
127-
batchSize * subEntrySize);
129+
batchSize * subEntrySize,
130+
maxUnconfirmedMessages);
128131
}
129132
}
130133

src/main/java/com/rabbitmq/stream/impl/ProducerUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ static MessageAccumulator createMessageAccumulator(
3030
boolean dynamicBatch,
3131
int subEntrySize,
3232
int batchSize,
33+
int maxUnconfirmedMessages,
3334
CompressionCodec compressionCodec,
3435
Codec codec,
3536
ByteBufAllocator byteBufAllocator,
@@ -44,6 +45,7 @@ static MessageAccumulator createMessageAccumulator(
4445
return new DynamicBatchMessageAccumulator(
4546
subEntrySize,
4647
batchSize,
48+
maxUnconfirmedMessages,
4749
codec,
4850
maxFrameSize,
4951
publishSequenceFunction,

src/main/java/com/rabbitmq/stream/impl/StreamProducer.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ public int fragmentLength(Object entity) {
180180
dynamicBatch,
181181
subEntrySize,
182182
batchSize,
183+
maxUnconfirmedMessages,
183184
compressionCodec,
184185
environment.codec(),
185186
environment.byteBufAllocator(),

src/test/java/com/rabbitmq/stream/impl/DynamicBatchTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ void itemAreProcessed() {
7171
sync.down(items.size());
7272
return true;
7373
};
74-
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100)) {
74+
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100, 10_000)) {
7575
RateLimiter rateLimiter = RateLimiter.create(10000);
7676
IntStream.range(0, itemCount)
7777
.forEach(
@@ -102,7 +102,7 @@ void failedProcessingIsReplayed() throws Exception {
102102
}
103103
return result;
104104
};
105-
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100)) {
105+
try (DynamicBatch<String> batch = new DynamicBatch<>(action, 100, 10_000)) {
106106
int firstRoundCount = itemCount / 5;
107107
IntStream.range(0, firstRoundCount)
108108
.forEach(
@@ -132,7 +132,7 @@ void lowThrottlingValueShouldStillHighPublishingRate() throws Exception {
132132
return true;
133133
};
134134

135-
try (DynamicBatch<Long> batch = new DynamicBatch<>(action, batchSize)) {
135+
try (DynamicBatch<Long> batch = new DynamicBatch<>(action, batchSize, 10_000)) {
136136
MetricRegistry metrics = new MetricRegistry();
137137
Meter rate = metrics.meter("publishing-rate");
138138
AtomicBoolean keepGoing = new AtomicBoolean(true);

0 commit comments

Comments
 (0)