File tree Expand file tree Collapse file tree 6 files changed +38
-4
lines changed
main/java/com/rabbitmq/stream/impl
test/java/com/rabbitmq/stream/impl Expand file tree Collapse file tree 6 files changed +38
-4
lines changed Original file line number Diff line number Diff line change @@ -32,11 +32,19 @@ jobs:
32
32
run : ci/start-broker.sh
33
33
env :
34
34
RABBITMQ_IMAGE : ${{ matrix.rabbitmq-image }}
35
- - name : Test
35
+ - name : Test (no dynamic-batch publishing)
36
36
run : |
37
37
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
38
+ -Drabbitmq.stream.producer.dynamic.batch=false \
38
39
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
39
40
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
40
41
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
42
+ - name : Test (dynamic-batch publishing)
43
+ run : |
44
+ ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
45
+ -Drabbitmq.stream.producer.dynamic.batch=true \
46
+ -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
47
+ -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
48
+ -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
41
49
- name : Stop broker
42
50
run : docker stop rabbitmq && docker rm rabbitmq
Original file line number Diff line number Diff line change @@ -33,12 +33,21 @@ jobs:
33
33
run : ci/start-broker.sh
34
34
- name : Display Java version
35
35
run : ./mvnw --version
36
- - name : Test
36
+ - name : Test (no dynamic-batch publishing)
37
37
run : |
38
38
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
39
+ -Drabbitmq.stream.producer.dynamic.batch=false \
39
40
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
40
41
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
41
42
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem \
42
43
-Dnet.bytebuddy.experimental=true -Djacoco.skip=true -Dspotbugs.skip=true
44
+ - name : Test (dynamic-batch publishing)
45
+ run : |
46
+ ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
47
+ -Drabbitmq.stream.producer.dynamic.batch=true \
48
+ -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
49
+ -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
50
+ -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem \
51
+ -Dnet.bytebuddy.experimental=true -Djacoco.skip=true -Dspotbugs.skip=true
43
52
- name : Stop broker
44
53
run : docker stop rabbitmq && docker rm rabbitmq
Original file line number Diff line number Diff line change @@ -33,12 +33,20 @@ jobs:
33
33
gpg-passphrase : MAVEN_GPG_PASSPHRASE
34
34
- name : Start broker
35
35
run : ci/start-broker.sh
36
- - name : Test
36
+ - name : Test (no dynamic-batch publishing)
37
37
run : |
38
38
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
39
+ -Drabbitmq.stream.producer.dynamic.batch=false \
39
40
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
40
41
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
41
42
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
43
+ - name : Test (dynamic-batch publishing)
44
+ run : |
45
+ ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
46
+ -Drabbitmq.stream.producer.dynamic.batch=true \
47
+ -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
48
+ -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
49
+ -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
42
50
- name : Stop broker
43
51
run : docker stop rabbitmq && docker rm rabbitmq
44
52
- name : Upload Codecov report
Original file line number Diff line number Diff line change @@ -86,6 +86,7 @@ class StreamProducer implements Producer {
86
86
String stream ,
87
87
int subEntrySize ,
88
88
int batchSize ,
89
+ boolean dynamicBatch ,
89
90
Compression compression ,
90
91
Duration batchPublishingDelay ,
91
92
int maxUnconfirmedMessages ,
@@ -172,7 +173,6 @@ public int fragmentLength(Object entity) {
172
173
if (compression != null ) {
173
174
compressionCodec = environment .compressionCodecFactory ().get (compression );
174
175
}
175
- boolean dynamicBatch = true ;
176
176
this .accumulator =
177
177
ProducerUtils .createMessageAccumulator (
178
178
dynamicBatch ,
Original file line number Diff line number Diff line change 27
27
28
28
class StreamProducerBuilder implements ProducerBuilder {
29
29
30
+ static final boolean DEFAULT_DYNAMIC_BATCH =
31
+ Boolean .parseBoolean (System .getProperty ("rabbitmq.stream.producer.dynamic.batch" , "false" ));
32
+
30
33
private final StreamEnvironment environment ;
31
34
32
35
private String name ;
@@ -53,6 +56,8 @@ class StreamProducerBuilder implements ProducerBuilder {
53
56
54
57
private Function <Message , String > filterValueExtractor ;
55
58
59
+ private boolean dynamicBatch = DEFAULT_DYNAMIC_BATCH ;
60
+
56
61
StreamProducerBuilder (StreamEnvironment environment ) {
57
62
this .environment = environment ;
58
63
}
@@ -198,6 +203,7 @@ public Producer build() {
198
203
stream ,
199
204
subEntrySize ,
200
205
batchSize ,
206
+ dynamicBatch ,
201
207
compression ,
202
208
batchPublishingDelay ,
203
209
maxUnconfirmedMessages ,
Original file line number Diff line number Diff line change @@ -176,6 +176,7 @@ void confirmTimeoutTaskShouldFailMessagesAfterTimeout(
176
176
"stream" ,
177
177
subEntrySize ,
178
178
10 ,
179
+ StreamProducerBuilder .DEFAULT_DYNAMIC_BATCH ,
179
180
Compression .NONE ,
180
181
Duration .ofMillis (100 ),
181
182
messageCount * 10 ,
@@ -226,6 +227,7 @@ void enqueueTimeoutMessageShouldBeFailedWhenEnqueueTimeoutIsReached(int subEntry
226
227
"stream" ,
227
228
subEntrySize ,
228
229
10 ,
230
+ StreamProducerBuilder .DEFAULT_DYNAMIC_BATCH ,
229
231
Compression .NONE ,
230
232
Duration .ZERO ,
231
233
2 ,
@@ -266,6 +268,7 @@ void enqueueTimeoutSendingShouldBlockWhenEnqueueTimeoutIsZero(int subEntrySize)
266
268
"stream" ,
267
269
subEntrySize ,
268
270
10 ,
271
+ StreamProducerBuilder .DEFAULT_DYNAMIC_BATCH ,
269
272
Compression .NONE ,
270
273
Duration .ZERO ,
271
274
2 ,
You can’t perform that action at this time.
0 commit comments