Skip to content

Commit 751326e

Browse files
authored
GH-1201: Native BatchMessageListener Support
Resolves #1201 Support de-batching producer batches to a `List<Meessage>` with native `BatchMessageListener`s; previously this was only possible with `@RabbitListener` which does the debatching itself. * Fix race in test. #1202 (comment) The test has a very short timeout; there is a race such that the channel won't be physically closed if we don't get the `ConsumeOK` in time. This won't really cause a problem in a real application, but this change will prevent the test from sporadically failing.
1 parent e1580d2 commit 751326e

File tree

6 files changed

+74
-19
lines changed

6 files changed

+74
-19
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/core/RabbitTemplate.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2631,6 +2631,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, BasicPropertie
26312631
future.completeExceptionally(
26322632
new ConsumeOkNotReceivedException("Blocking receive, consumer failed to consume within "
26332633
+ timeoutMillis + " ms: " + consumer));
2634+
RabbitUtils.setPhysicalCloseRequired(channel, true);
26342635
}
26352636
return consumer;
26362637
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/AbstractMessageListenerContainer.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
4343
import org.springframework.amqp.core.AcknowledgeMode;
4444
import org.springframework.amqp.core.AmqpAdmin;
45+
import org.springframework.amqp.core.BatchMessageListener;
4546
import org.springframework.amqp.core.Message;
4647
import org.springframework.amqp.core.MessageListener;
4748
import org.springframework.amqp.core.MessagePostProcessor;
@@ -1920,6 +1921,17 @@ private void checkPossibleAuthenticationFailureFatalFromProperty() {
19201921
}
19211922
}
19221923

1924+
@Nullable
1925+
protected List<Message> debatch(Message message) {
1926+
if (isDeBatchingEnabled() && getBatchingStrategy().canDebatch(message.getMessageProperties())
1927+
&& getMessageListener() instanceof BatchMessageListener) {
1928+
final List<Message> messageList = new ArrayList<>();
1929+
getBatchingStrategy().deBatch(message, fragment -> messageList.add(fragment));
1930+
return messageList;
1931+
}
1932+
return null;
1933+
}
1934+
19231935
@FunctionalInterface
19241936
private interface ContainerDelegate {
19251937

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/DirectMessageListenerContainer.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -974,9 +974,14 @@ public void handleDelivery(String consumerTag, Envelope envelope,
974974
this.logger.debug(this + " received " + message);
975975
}
976976
updateLastReceive();
977+
Object data = message;
978+
List<Message> debatched = debatch(message);
979+
if (debatched != null) {
980+
data = debatched;
981+
}
977982
if (this.transactionManager != null) {
978983
try {
979-
executeListenerInTransaction(message, deliveryTag);
984+
executeListenerInTransaction(data, deliveryTag);
980985
}
981986
catch (WrappedTransactionException e) {
982987
if (e.getCause() instanceof Error) {
@@ -994,15 +999,15 @@ public void handleDelivery(String consumerTag, Envelope envelope,
994999
}
9951000
else {
9961001
try {
997-
callExecuteListener(message, deliveryTag);
1002+
callExecuteListener(data, deliveryTag);
9981003
}
9991004
catch (Exception e) {
10001005
// NOSONAR
10011006
}
10021007
}
10031008
}
10041009

1005-
private void executeListenerInTransaction(Message message, long deliveryTag) {
1010+
private void executeListenerInTransaction(Object data, long deliveryTag) {
10061011
if (this.isRabbitTxManager) {
10071012
ConsumerChannelRegistry.registerConsumerChannel(getChannel(), this.connectionFactory);
10081013
}
@@ -1018,7 +1023,7 @@ private void executeListenerInTransaction(Message message, long deliveryTag) {
10181023
}
10191024
// unbound in ResourceHolderSynchronization.beforeCompletion()
10201025
try {
1021-
callExecuteListener(message, deliveryTag);
1026+
callExecuteListener(data, deliveryTag);
10221027
}
10231028
catch (RuntimeException e1) {
10241029
prepareHolderForRollback(resourceHolder, e1);
@@ -1031,10 +1036,10 @@ private void executeListenerInTransaction(Message message, long deliveryTag) {
10311036
});
10321037
}
10331038

1034-
private void callExecuteListener(Message message, long deliveryTag) {
1039+
private void callExecuteListener(Object data, long deliveryTag) {
10351040
boolean channelLocallyTransacted = isChannelLocallyTransacted();
10361041
try {
1037-
executeListener(getChannel(), message);
1042+
executeListener(getChannel(), data);
10381043
handleAck(deliveryTag, channelLocallyTransacted);
10391044
}
10401045
catch (ImmediateAcknowledgeAmqpException e) {

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -945,6 +945,10 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
945945
}
946946
}
947947
else {
948+
messages = debatch(message);
949+
if (messages != null) {
950+
break;
951+
}
948952
try {
949953
executeListener(channel, message);
950954
}
@@ -994,7 +998,7 @@ private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Excep
994998
}
995999
}
9961000
}
997-
if (this.consumerBatchEnabled && messages != null) {
1001+
if (messages != null) {
9981002
executeWithList(channel, messages, deliveryTag, consumer);
9991003
}
10001004

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/BatchingRabbitTemplateTests.java

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.mockito.ArgumentCaptor;
4343

4444
import org.springframework.amqp.AmqpException;
45+
import org.springframework.amqp.core.BatchMessageListener;
4546
import org.springframework.amqp.core.Message;
4647
import org.springframework.amqp.core.MessageDeliveryMode;
4748
import org.springframework.amqp.core.MessageListener;
@@ -53,7 +54,9 @@
5354
import org.springframework.amqp.rabbit.junit.BrokerTestUtils;
5455
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
5556
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
57+
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
5658
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
59+
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
5760
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
5861
import org.springframework.amqp.support.AmqpHeaders;
5962
import org.springframework.amqp.support.postprocessor.AbstractCompressingPostProcessor;
@@ -228,20 +231,47 @@ public void testSimpleBatchTwoEqualBufferLimit() throws Exception {
228231
}
229232

230233
@Test
231-
public void testDebatchByContainer() throws Exception {
232-
final List<Message> received = new ArrayList<Message>();
233-
final CountDownLatch latch = new CountDownLatch(2);
234+
void testDebatchSMLCSplit() throws Exception {
234235
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactory);
236+
container.setReceiveTimeout(100);
237+
testDebatchByContainer(container, false);
238+
}
239+
240+
@Test
241+
void testDebatchSMLC() throws Exception {
242+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactory);
243+
container.setReceiveTimeout(100);
244+
testDebatchByContainer(container, true);
245+
}
246+
247+
@Test
248+
void testDebatchDMLC() throws Exception {
249+
testDebatchByContainer(new DirectMessageListenerContainer(this.connectionFactory), true);
250+
}
251+
252+
private void testDebatchByContainer(AbstractMessageListenerContainer container, boolean asList) throws Exception {
253+
final List<Message> received = new ArrayList<Message>();
254+
final CountDownLatch latch = new CountDownLatch(asList ? 1 : 2);
235255
container.setQueueNames(ROUTE);
236256
List<Boolean> lastInBatch = new ArrayList<>();
237257
AtomicInteger batchSize = new AtomicInteger();
238-
container.setMessageListener((MessageListener) message -> {
239-
received.add(message);
240-
lastInBatch.add(message.getMessageProperties().isLastInBatch());
241-
batchSize.set(message.getMessageProperties().getHeader(AmqpHeaders.BATCH_SIZE));
242-
latch.countDown();
243-
});
244-
container.setReceiveTimeout(100);
258+
if (asList) {
259+
container.setMessageListener((BatchMessageListener) messages -> {
260+
received.addAll(messages);
261+
lastInBatch.add(false);
262+
lastInBatch.add(true);
263+
batchSize.set(messages.size());
264+
latch.countDown();
265+
});
266+
}
267+
else {
268+
container.setMessageListener((MessageListener) message -> {
269+
received.add(message);
270+
lastInBatch.add(message.getMessageProperties().isLastInBatch());
271+
batchSize.set(message.getMessageProperties().getHeader(AmqpHeaders.BATCH_SIZE));
272+
latch.countDown();
273+
});
274+
}
245275
container.afterPropertiesSet();
246276
container.start();
247277
try {

src/reference/asciidoc/amqp.adoc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2001,11 +2001,12 @@ Batched messages (created by a producer) are automatically de-batched by listene
20012001
Rejecting any message from a batch causes the entire batch to be rejected.
20022002
See <<template-batching>> for more information about batching.
20032003

2004-
Starting with version 2.2, the `SimpleMessageListeneContainer` can be use to create batches on the consumer side (where the producer sent discrete messages).
2004+
Starting with version 2.2, the `SimpleMessageListenerContainer` can be use to create batches on the consumer side (where the producer sent discrete messages).
20052005

20062006
Set the container property `consumerBatchEnabled` to enable this feature.
20072007
`deBatchingEnabled` must also be true so that the container is responsible for processing batches of both types.
20082008
Implement `BatchMessageListener` or `ChannelAwareBatchMessageListener` when `consumerBatchEnabled` is true.
2009+
Starting with version 2.2.7 both the `SimpleMessageListenerContainer` and `DirectMessageListenerContainer` can debatch <<template-batching,producer created batches>> as `List<Message>`.
20092010
See <<receiving-batch>> for information about using this feature with `@RabbitListener`.
20102011

20112012
[[consumer-events]]
@@ -5472,6 +5473,8 @@ a|image::images/tickmark.png[]
54725473
(N/A)
54735474

54745475
|When true, the listener container will debatch batched messages and invoke the listener with each message from the batch.
5476+
Starting with version 2.2.7, <<template-batching,producer created batches>> will be debatched as a `List<Message>` if the listener is a `BatchMessageListener` or `ChannelAwareBatchMessageListener`.
5477+
Otherwise messages from the batch are presented one-at-a-time.
54755478
Default true.
54765479
See <<template-batching>> and <<receiving-batch>>.
54775480

0 commit comments

Comments
 (0)