Skip to content

Commit 38d07c1

Browse files
garyrussellartembilan
authored andcommitted
GH-1339: Fix RLErrorHandler with Conversion Ex. (#1346)
Resolves #1339 Error handler was not called for conversion exceptions, preventing the application from returning some error to the caller for request/reply processing. **cherry-pick to 2.2.x**
1 parent 7299583 commit 38d07c1

File tree

4 files changed

+117
-44
lines changed

4 files changed

+117
-44
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 54 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -131,8 +131,50 @@ public void setMessageConverter(MessageConverter messageConverter) {
131131

132132
@Override
133133
public void onMessage(org.springframework.amqp.core.Message amqpMessage, Channel channel) throws Exception { // NOSONAR
134-
Message<?> message = toMessagingMessage(amqpMessage);
135-
invokeHandlerAndProcessResult(amqpMessage, channel, message);
134+
Message<?> message = null;
135+
try {
136+
message = toMessagingMessage(amqpMessage);
137+
invokeHandlerAndProcessResult(amqpMessage, channel, message);
138+
}
139+
catch (ListenerExecutionFailedException ex) {
140+
handleException(amqpMessage, channel, message, ex);
141+
}
142+
catch (ReplyFailureException ex) {
143+
throw ex;
144+
}
145+
catch (Exception ex) {
146+
handleException(amqpMessage, channel, message, new ListenerExecutionFailedException(
147+
"Failed to convert message", ex, amqpMessage));
148+
}
149+
}
150+
151+
private void handleException(org.springframework.amqp.core.Message amqpMessage, Channel channel,
152+
@Nullable Message<?> message, ListenerExecutionFailedException e) throws Exception {
153+
154+
if (this.errorHandler != null) {
155+
try {
156+
Message<?> messageWithChannel = null;
157+
if (message != null) {
158+
messageWithChannel = MessageBuilder.fromMessage(message)
159+
.setHeader(AmqpHeaders.CHANNEL, channel)
160+
.build();
161+
}
162+
Object errorResult = this.errorHandler.handleError(amqpMessage, messageWithChannel, e);
163+
if (errorResult != null) {
164+
handleResult(this.handlerAdapter.getInvocationResultFor(errorResult, message.getPayload()),
165+
amqpMessage, channel, message);
166+
}
167+
else {
168+
logger.trace("Error handler returned no result");
169+
}
170+
}
171+
catch (Exception ex) {
172+
returnOrThrow(amqpMessage, channel, message, ex, ex);
173+
}
174+
}
175+
else {
176+
returnOrThrow(amqpMessage, channel, message, e.getCause(), e);
177+
}
136178
}
137179

138180
protected void invokeHandlerAndProcessResult(@Nullable org.springframework.amqp.core.Message amqpMessage,
@@ -142,41 +184,16 @@ protected void invokeHandlerAndProcessResult(@Nullable org.springframework.amqp.
142184
logger.debug("Processing [" + message + "]");
143185
}
144186
InvocationResult result = null;
145-
try {
146-
if (this.messagingMessageConverter.method == null && amqpMessage != null) {
147-
amqpMessage.getMessageProperties()
148-
.setTargetMethod(this.handlerAdapter.getMethodFor(message.getPayload()));
149-
}
150-
result = invokeHandler(amqpMessage, channel, message);
151-
if (result.getReturnValue() != null) {
152-
handleResult(result, amqpMessage, channel, message);
153-
}
154-
else {
155-
logger.trace("No result object given - no result to handle");
156-
}
187+
if (this.messagingMessageConverter.method == null && amqpMessage != null) {
188+
amqpMessage.getMessageProperties()
189+
.setTargetMethod(this.handlerAdapter.getMethodFor(message.getPayload()));
157190
}
158-
catch (ListenerExecutionFailedException e) {
159-
if (this.errorHandler != null) {
160-
try {
161-
Message<?> messageWithChannel = MessageBuilder.fromMessage(message)
162-
.setHeader(AmqpHeaders.CHANNEL, channel)
163-
.build();
164-
Object errorResult = this.errorHandler.handleError(amqpMessage, messageWithChannel, e);
165-
if (errorResult != null) {
166-
handleResult(this.handlerAdapter.getInvocationResultFor(errorResult, message.getPayload()),
167-
amqpMessage, channel, message);
168-
}
169-
else {
170-
logger.trace("Error handler returned no result");
171-
}
172-
}
173-
catch (Exception ex) {
174-
returnOrThrow(amqpMessage, channel, message, ex, ex);
175-
}
176-
}
177-
else {
178-
returnOrThrow(amqpMessage, channel, message, e.getCause(), e);
179-
}
191+
result = invokeHandler(amqpMessage, channel, message);
192+
if (result.getReturnValue() != null) {
193+
handleResult(result, amqpMessage, channel, message);
194+
}
195+
else {
196+
logger.trace("No result object given - no result to handle");
180197
}
181198
}
182199

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/api/RabbitListenerErrorHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import org.springframework.amqp.core.Message;
2020
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
21+
import org.springframework.lang.Nullable;
2122

2223
/**
2324
* An error handler which is called when a {code @RabbitListener} method
@@ -35,13 +36,13 @@ public interface RabbitListenerErrorHandler {
3536
* Handle the error. If an exception is not thrown, the return value is returned to
3637
* the sender using normal {@code replyTo/@SendTo} semantics.
3738
* @param amqpMessage the raw message received.
38-
* @param message the converted spring-messaging message.
39+
* @param message the converted spring-messaging message (if available).
3940
* @param exception the exception the listener threw, wrapped in a
4041
* {@link ListenerExecutionFailedException}.
4142
* @return the return value to be sent to the sender.
4243
* @throws Exception an exception which may be the original or different.
4344
*/
44-
Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,
45+
Object handleError(Message amqpMessage, @Nullable org.springframework.messaging.Message<?> message,
4546
ListenerExecutionFailedException exception) throws Exception; // NOSONAR
4647

4748
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/adapter/MessagingMessageListenerAdapterTests.java

Lines changed: 54 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2020 the original author or authors.
2+
* Copyright 2014-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,16 +29,20 @@
2929
import java.util.LinkedHashMap;
3030
import java.util.List;
3131
import java.util.Map;
32+
import java.util.concurrent.atomic.AtomicBoolean;
3233
import java.util.stream.Collectors;
3334

3435
import org.junit.jupiter.api.BeforeEach;
3536
import org.junit.jupiter.api.Test;
3637

3738
import org.springframework.amqp.core.MessageProperties;
39+
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
3840
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
3941
import org.springframework.amqp.rabbit.test.MessageTestUtils;
4042
import org.springframework.amqp.support.AmqpHeaders;
4143
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
44+
import org.springframework.amqp.support.converter.MessageConversionException;
45+
import org.springframework.amqp.support.converter.MessageConverter;
4246
import org.springframework.amqp.support.converter.SimpleMessageConverter;
4347
import org.springframework.amqp.utils.test.TestUtils;
4448
import org.springframework.beans.factory.support.StaticListableBeanFactory;
@@ -273,9 +277,50 @@ public void batchTypedObjectTest() {
273277
assertThat(this.sample.batchPayloads.get(0).getClass()).isEqualTo(Foo.class);
274278
}
275279

280+
@Test
281+
void errorHandlerAfterConversionEx() throws Exception {
282+
org.springframework.amqp.core.Message message = MessageTestUtils.createTextMessage("foo");
283+
Channel channel = mock(Channel.class);
284+
AtomicBoolean ehCalled = new AtomicBoolean();
285+
MessagingMessageListenerAdapter listener = getSimpleInstance("fail",
286+
new RabbitListenerErrorHandler() {
287+
288+
@Override
289+
public Object handleError(org.springframework.amqp.core.Message amqpMessage, Message<?> message,
290+
ListenerExecutionFailedException exception) throws Exception {
291+
292+
ehCalled.set(true);
293+
return null;
294+
}
295+
296+
}, String.class);
297+
listener.setMessageConverter(new MessageConverter() {
298+
299+
@Override
300+
public org.springframework.amqp.core.Message toMessage(Object object, MessageProperties messageProperties)
301+
throws MessageConversionException {
302+
303+
return null;
304+
}
305+
306+
@Override
307+
public Object fromMessage(org.springframework.amqp.core.Message message) throws MessageConversionException {
308+
throw new MessageConversionException("test");
309+
}
310+
});
311+
listener.onMessage(message, channel);
312+
assertThat(ehCalled.get()).isTrue();
313+
}
314+
276315
protected MessagingMessageListenerAdapter getSimpleInstance(String methodName, Class<?>... parameterTypes) {
316+
return getSimpleInstance(methodName, null, parameterTypes);
317+
}
318+
319+
protected MessagingMessageListenerAdapter getSimpleInstance(String methodName, RabbitListenerErrorHandler eh,
320+
Class<?>... parameterTypes) {
321+
277322
Method m = ReflectionUtils.findMethod(SampleBean.class, methodName, parameterTypes);
278-
return createInstance(m, false);
323+
return createInstance(m, false, eh);
279324
}
280325

281326
protected MessagingMessageListenerAdapter getSimpleInstance(String methodName, boolean returnExceptions,
@@ -285,7 +330,13 @@ protected MessagingMessageListenerAdapter getSimpleInstance(String methodName, b
285330
}
286331

287332
protected MessagingMessageListenerAdapter createInstance(Method m, boolean returnExceptions) {
288-
MessagingMessageListenerAdapter adapter = new MessagingMessageListenerAdapter(null, m, returnExceptions, null);
333+
return createInstance(m, returnExceptions, null);
334+
}
335+
336+
protected MessagingMessageListenerAdapter createInstance(Method m, boolean returnExceptions,
337+
RabbitListenerErrorHandler eh) {
338+
339+
MessagingMessageListenerAdapter adapter = new MessagingMessageListenerAdapter(null, m, returnExceptions, eh);
289340
adapter.setHandlerAdapter(new HandlerAdapter(factory.createInvocableHandlerMethod(sample, m)));
290341
return adapter;
291342
}

src/reference/asciidoc/amqp.adoc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2848,7 +2848,7 @@ static class TxServiceImpl implements TxService<Foo> {
28482848
28492849
@Override
28502850
@RabbitListener(...)
2851-
public String handle(Foo foo, String rk) {
2851+
public String handle(Thing thing, String rk) {
28522852
...
28532853
}
28542854
@@ -2931,6 +2931,10 @@ public Object handleError(Message amqpMessage, org.springframework.messaging.Mes
29312931
----
29322932
====
29332933

2934+
Starting with version 2.2.18, if a message conversion exception is thrown, the error handler will be called, with `null` in the `message` argument.
2935+
This allows the application to send some result to the caller, indicating that a badly-formed message was received.
2936+
Previously, such errors were thrown and handled by the container.
2937+
29342938
====== Container Management
29352939

29362940
Containers created for annotations are not registered with the application context.

0 commit comments

Comments
 (0)