Skip to content

Commit e521033

Browse files
zhangheng0027garyrussell
authored andcommitted
GH-2528: Handle Conversion Exception with Batch
Resolves #2528 Detect a conversion exception within a batch and reject just that message. formatting code Polishing.
1 parent 09c612c commit e521033

File tree

2 files changed

+143
-3
lines changed

2 files changed

+143
-3
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/BatchMessagingMessageListenerAdapter.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
2626
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
2727
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
28+
import org.springframework.amqp.support.converter.MessageConversionException;
2829
import org.springframework.lang.Nullable;
2930
import org.springframework.messaging.Message;
3031
import org.springframework.messaging.support.GenericMessage;
@@ -63,7 +64,20 @@ public void onMessageBatch(List<org.springframework.amqp.core.Message> messages,
6364
else {
6465
List<Message<?>> messagingMessages = new ArrayList<>();
6566
for (org.springframework.amqp.core.Message message : messages) {
66-
messagingMessages.add(toMessagingMessage(message));
67+
try {
68+
Message<?> messagingMessage = toMessagingMessage(message);
69+
messagingMessages.add(messagingMessage);
70+
}
71+
catch (MessageConversionException e) {
72+
this.logger.error("Could not convert incoming message", e);
73+
try {
74+
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
75+
}
76+
catch (Exception ex) {
77+
this.logger.error("Failed to reject message with conversion error", ex);
78+
throw e; // NOSONAR
79+
}
80+
}
6781
}
6882
if (this.converterAdapter.isMessageList()) {
6983
converted = new GenericMessage<>(messagingMessages);

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/adapter/BatchMessagingMessageListenerAdapterTests.java

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2022 the original author or authors.
2+
* Copyright 2022-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,16 +21,42 @@
2121

2222
import java.lang.reflect.Method;
2323
import java.util.List;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.TimeUnit;
2426

2527
import org.junit.jupiter.api.Test;
2628

29+
import org.springframework.amqp.core.Message;
30+
import org.springframework.amqp.core.MessageBuilder;
31+
import org.springframework.amqp.core.MessagePropertiesBuilder;
32+
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
33+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
34+
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
35+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
36+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
37+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
38+
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
39+
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
40+
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
41+
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
42+
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
2743
import org.springframework.amqp.utils.test.TestUtils;
44+
import org.springframework.beans.factory.annotation.Autowired;
45+
import org.springframework.context.annotation.Bean;
46+
import org.springframework.context.annotation.Configuration;
47+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
48+
49+
import com.fasterxml.jackson.databind.ObjectMapper;
2850

2951
/**
3052
* @author Gary Russell
53+
* @author heng zhang
54+
*
3155
* @since 3.0
3256
*
3357
*/
58+
@SpringJUnitConfig
59+
@RabbitAvailable(queues = "test.batchQueue")
3460
public class BatchMessagingMessageListenerAdapterTests {
3561

3662
@Test
@@ -52,4 +78,104 @@ public void listen(String in) {
5278
public void listen(List<String> in) {
5379
}
5480

81+
82+
@Test
83+
public void errorMsgConvert(@Autowired BatchMessagingMessageListenerAdapterTests.Config config,
84+
@Autowired RabbitTemplate template) throws Exception {
85+
86+
Message message = MessageBuilder.withBody("""
87+
{
88+
"name" : "Tom",
89+
"age" : 18
90+
}
91+
""".getBytes()).andProperties(
92+
MessagePropertiesBuilder.newInstance()
93+
.setContentType("application/json")
94+
.setReplyTo("nowhere")
95+
.build())
96+
.build();
97+
98+
Message errorMessage = MessageBuilder.withBody("".getBytes()).andProperties(
99+
MessagePropertiesBuilder.newInstance()
100+
.setContentType("application/json")
101+
.setReplyTo("nowhere")
102+
.build())
103+
.build();
104+
105+
for (int i = 0; i < config.count; i++) {
106+
template.send("test.batchQueue", message);
107+
template.send("test.batchQueue", errorMessage);
108+
}
109+
110+
assertThat(config.countDownLatch.await(config.count * 1000L, TimeUnit.SECONDS)).isTrue();
111+
}
112+
113+
114+
115+
@Configuration
116+
@EnableRabbit
117+
public static class Config {
118+
volatile int count = 5;
119+
volatile CountDownLatch countDownLatch = new CountDownLatch(count);
120+
121+
@RabbitListener(
122+
queues = "test.batchQueue",
123+
containerFactory = "batchListenerContainerFactory"
124+
)
125+
public void listen(List<Model> list) {
126+
for (Model model : list) {
127+
countDownLatch.countDown();
128+
}
129+
130+
}
131+
132+
@Bean
133+
ConnectionFactory cf() {
134+
return new CachingConnectionFactory(RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
135+
}
136+
137+
@Bean(name = "batchListenerContainerFactory")
138+
public RabbitListenerContainerFactory<SimpleMessageListenerContainer> rc(ConnectionFactory connectionFactory) {
139+
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
140+
factory.setConnectionFactory(connectionFactory);
141+
factory.setPrefetchCount(1);
142+
factory.setConcurrentConsumers(1);
143+
factory.setBatchListener(true);
144+
factory.setBatchSize(3);
145+
factory.setConsumerBatchEnabled(true);
146+
147+
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(new ObjectMapper());
148+
factory.setMessageConverter(jackson2JsonMessageConverter);
149+
150+
return factory;
151+
}
152+
153+
@Bean
154+
RabbitTemplate template(ConnectionFactory cf) {
155+
return new RabbitTemplate(cf);
156+
}
157+
158+
159+
}
160+
public static class Model {
161+
String name;
162+
String age;
163+
164+
public String getName() {
165+
return name;
166+
}
167+
168+
public void setName(String name) {
169+
this.name = name;
170+
}
171+
172+
public String getAge() {
173+
return age;
174+
}
175+
176+
public void setAge(String age) {
177+
this.age = age;
178+
}
179+
}
180+
55181
}

0 commit comments

Comments
 (0)