Skip to content

Commit 737a514

Browse files
artembilangaryrussell
authored andcommitted
GH-1210: Add Kotlin suspend functions support (#2460)
* GH-1210: Add Kotlin suspend functions support Fixes #1210 Kotlin Coroutines are essentially `Future` wrapping. Therefore, it is natural to have `suspend` support on `@RabbitListener` methods as we do now for `CompletableFuture` and `Mono` * Introduce some utilities since we cannot reuse existing from Spring Messaging: they are there about Kotlin Coroutines only for reactive handlers * Some code clean up in the `RabbitListenerAnnotationBeanPostProcessor` for the latest Java * Add optional dep for `kotlinx-coroutines-reactor` and document the feature * * Remove unused import
1 parent d8e7bd3 commit 737a514

File tree

9 files changed

+284
-109
lines changed

9 files changed

+284
-109
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ ext {
5252
jaywayJsonPathVersion = '2.4.0'
5353
junit4Version = '4.13.2'
5454
junitJupiterVersion = '5.8.2'
55+
kotlinCoroutinesVersion = '1.6.4'
5556
log4jVersion = '2.17.2'
5657
logbackVersion = '1.2.3'
5758
lz4Version = '1.8.0'
@@ -379,6 +380,7 @@ project('spring-rabbit') {
379380
}
380381
optionalApi "com.jayway.jsonpath:json-path:$jaywayJsonPathVersion"
381382
optionalApi "org.apache.commons:commons-pool2:$commonsPoolVersion"
383+
optionalApi "org.jetbrains.kotlinx:kotlinx-coroutines-reactor:$kotlinCoroutinesVersion"
382384

383385
testApi project(':spring-rabbit-junit')
384386
testImplementation("com.willowtreeapps.assertk:assertk-jvm:$assertkVersion")

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/RabbitListenerAnnotationBeanPostProcessor.java

Lines changed: 17 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
import java.lang.reflect.AnnotatedElement;
2020
import java.lang.reflect.Method;
21-
import java.lang.reflect.ParameterizedType;
22-
import java.lang.reflect.Type;
2321
import java.nio.charset.Charset;
2422
import java.nio.charset.StandardCharsets;
2523
import java.util.ArrayList;
@@ -30,7 +28,6 @@
3028
import java.util.HashSet;
3129
import java.util.List;
3230
import java.util.Map;
33-
import java.util.Optional;
3431
import java.util.Set;
3532
import java.util.concurrent.ConcurrentHashMap;
3633
import java.util.concurrent.ConcurrentMap;
@@ -56,6 +53,7 @@
5653
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
5754
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
5855
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistry;
56+
import org.springframework.amqp.rabbit.listener.adapter.AmqpMessageHandlerMethodFactory;
5957
import org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor;
6058
import org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler;
6159
import org.springframework.amqp.support.converter.MessageConverter;
@@ -76,7 +74,6 @@
7674
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
7775
import org.springframework.context.EnvironmentAware;
7876
import org.springframework.context.expression.StandardBeanExpressionResolver;
79-
import org.springframework.core.MethodParameter;
8077
import org.springframework.core.Ordered;
8178
import org.springframework.core.annotation.AnnotationUtils;
8279
import org.springframework.core.annotation.MergedAnnotations;
@@ -88,21 +85,16 @@
8885
import org.springframework.core.task.TaskExecutor;
8986
import org.springframework.format.support.DefaultFormattingConversionService;
9087
import org.springframework.lang.Nullable;
91-
import org.springframework.messaging.Message;
9288
import org.springframework.messaging.converter.GenericMessageConverter;
9389
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
9490
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
95-
import org.springframework.messaging.handler.annotation.support.MethodArgumentNotValidException;
96-
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
9791
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
9892
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
9993
import org.springframework.util.Assert;
10094
import org.springframework.util.ClassUtils;
10195
import org.springframework.util.CollectionUtils;
10296
import org.springframework.util.ReflectionUtils;
10397
import org.springframework.util.StringUtils;
104-
import org.springframework.validation.BindingResult;
105-
import org.springframework.validation.ObjectError;
10698
import org.springframework.validation.Validator;
10799

108100
/**
@@ -440,14 +432,10 @@ protected Collection<Declarable> processListener(MethodRabbitListenerEndpoint en
440432
List<Object> resolvedQueues = resolveQueues(rabbitListener, declarables);
441433
if (!resolvedQueues.isEmpty()) {
442434
if (resolvedQueues.get(0) instanceof String) {
443-
endpoint.setQueueNames(resolvedQueues.stream()
444-
.map(o -> (String) o)
445-
.collect(Collectors.toList()).toArray(new String[0]));
435+
endpoint.setQueueNames(resolvedQueues.stream().map(o -> (String) o).toArray(String[]::new));
446436
}
447437
else {
448-
endpoint.setQueues(resolvedQueues.stream()
449-
.map(o -> (Queue) o)
450-
.collect(Collectors.toList()).toArray(new Queue[0]));
438+
endpoint.setQueues(resolvedQueues.stream().map(o -> (Queue) o).toArray(Queue[]::new));
451439
}
452440
}
453441
endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
@@ -664,12 +652,10 @@ private List<Object> resolveQueues(RabbitListener rabbitListener, Collection<Dec
664652
String[] queues = rabbitListener.queues();
665653
QueueBinding[] bindings = rabbitListener.bindings();
666654
org.springframework.amqp.rabbit.annotation.Queue[] queuesToDeclare = rabbitListener.queuesToDeclare();
667-
List<String> queueNames = new ArrayList<String>();
668-
List<Queue> queueBeans = new ArrayList<Queue>();
669-
if (queues.length > 0) {
670-
for (int i = 0; i < queues.length; i++) {
671-
resolveQueues(queues[i], queueNames, queueBeans);
672-
}
655+
List<String> queueNames = new ArrayList<>();
656+
List<Queue> queueBeans = new ArrayList<>();
657+
for (String queue : queues) {
658+
resolveQueues(queue, queueNames, queueBeans);
673659
}
674660
if (!queueNames.isEmpty()) {
675661
// revert to the previous behavior of just using the name when there is mixture of String and Queue
@@ -681,8 +667,8 @@ private List<Object> resolveQueues(RabbitListener rabbitListener, Collection<Dec
681667
throw new BeanInitializationException(
682668
"@RabbitListener can have only one of 'queues', 'queuesToDeclare', or 'bindings'");
683669
}
684-
for (int i = 0; i < queuesToDeclare.length; i++) {
685-
queueNames.add(declareQueue(queuesToDeclare[i], declarables));
670+
for (org.springframework.amqp.rabbit.annotation.Queue queue : queuesToDeclare) {
671+
queueNames.add(declareQueue(queue, declarables));
686672
}
687673
}
688674
if (bindings.length > 0) {
@@ -752,7 +738,7 @@ private String[] registerBeansForDeclaration(RabbitListener rabbitListener, Coll
752738
declareExchangeAndBinding(binding, queueName, declarables);
753739
}
754740
}
755-
return queues.toArray(new String[queues.size()]);
741+
return queues.toArray(new String[0]);
756742
}
757743

758744
private String declareQueue(org.springframework.amqp.rabbit.annotation.Queue bindingQueue,
@@ -859,7 +845,7 @@ private void registerBindings(QueueBinding binding, String queueName, String exc
859845
}
860846

861847
private Map<String, Object> resolveArguments(Argument[] arguments) {
862-
Map<String, Object> map = new HashMap<String, Object>();
848+
Map<String, Object> map = new HashMap<>();
863849
for (Argument arg : arguments) {
864850
String key = resolveExpressionAsString(arg.name(), "@Argument.name");
865851
if (StringUtils.hasText(key)) {
@@ -1025,7 +1011,7 @@ private MessageHandlerMethodFactory getFactory() {
10251011
}
10261012

10271013
private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
1028-
DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
1014+
DefaultMessageHandlerMethodFactory defaultFactory = new AmqpMessageHandlerMethodFactory();
10291015
Validator validator = RabbitListenerAnnotationBeanPostProcessor.this.registrar.getValidator();
10301016
if (validator != null) {
10311017
defaultFactory.setValidator(validator);
@@ -1038,74 +1024,14 @@ private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
10381024
List<HandlerMethodArgumentResolver> customArgumentsResolver = new ArrayList<>(
10391025
RabbitListenerAnnotationBeanPostProcessor.this.registrar.getCustomMethodArgumentResolvers());
10401026
defaultFactory.setCustomArgumentResolvers(customArgumentsResolver);
1041-
GenericMessageConverter messageConverter = new GenericMessageConverter(
1042-
this.defaultFormattingConversionService);
1043-
defaultFactory.setMessageConverter(messageConverter);
1044-
// Has to be at the end - look at PayloadMethodArgumentResolver documentation
1045-
customArgumentsResolver.add(new OptionalEmptyAwarePayloadArgumentResolver(messageConverter, validator));
1027+
defaultFactory.setMessageConverter(new GenericMessageConverter(this.defaultFormattingConversionService));
1028+
10461029
defaultFactory.afterPropertiesSet();
10471030
return defaultFactory;
10481031
}
10491032

10501033
}
10511034

1052-
private static class OptionalEmptyAwarePayloadArgumentResolver extends PayloadMethodArgumentResolver {
1053-
1054-
OptionalEmptyAwarePayloadArgumentResolver(
1055-
org.springframework.messaging.converter.MessageConverter messageConverter,
1056-
@Nullable Validator validator) {
1057-
1058-
super(messageConverter, validator);
1059-
}
1060-
1061-
@Override
1062-
public Object resolveArgument(MethodParameter parameter, Message<?> message) throws Exception { // NOSONAR
1063-
Object resolved = null;
1064-
try {
1065-
resolved = super.resolveArgument(parameter, message);
1066-
}
1067-
catch (MethodArgumentNotValidException ex) {
1068-
Type type = parameter.getGenericParameterType();
1069-
if (isOptional(message, type)) {
1070-
BindingResult bindingResult = ex.getBindingResult();
1071-
if (bindingResult != null) {
1072-
List<ObjectError> allErrors = bindingResult.getAllErrors();
1073-
if (allErrors.size() == 1) {
1074-
String defaultMessage = allErrors.get(0).getDefaultMessage();
1075-
if ("Payload value must not be empty".equals(defaultMessage)) {
1076-
return Optional.empty();
1077-
}
1078-
}
1079-
}
1080-
}
1081-
throw ex;
1082-
}
1083-
/*
1084-
* Replace Optional.empty() list elements with null.
1085-
*/
1086-
if (resolved instanceof List) {
1087-
List<?> list = ((List<?>) resolved);
1088-
for (int i = 0; i < list.size(); i++) {
1089-
if (list.get(i).equals(Optional.empty())) {
1090-
list.set(i, null);
1091-
}
1092-
}
1093-
}
1094-
return resolved;
1095-
}
1096-
1097-
private boolean isOptional(Message<?> message, Type type) {
1098-
return (Optional.class.equals(type) || (type instanceof ParameterizedType
1099-
&& Optional.class.equals(((ParameterizedType) type).getRawType())))
1100-
&& message.getPayload().equals(Optional.empty());
1101-
}
1102-
1103-
@Override
1104-
protected boolean isEmptyPayload(Object payload) {
1105-
return payload == null || payload.equals(Optional.empty());
1106-
}
1107-
1108-
}
11091035
/**
11101036
* The metadata holder of the class with {@link RabbitListener}
11111037
* and {@link RabbitHandler} annotations.
@@ -1145,6 +1071,9 @@ private TypeMetadata() {
11451071

11461072
/**
11471073
* A method annotated with {@link RabbitListener}, together with the annotations.
1074+
*
1075+
* @param method the method with annotations
1076+
* @param annotations on the method
11481077
*/
11491078
private static class ListenerMethod {
11501079

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/AbstractAdaptableMessageListener.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ protected void handleResult(InvocationResult resultArg, Message request, Channel
365365
* response message back.
366366
* @param resultArg the result object to handle (never <code>null</code>)
367367
* @param request the original request message
368-
* @param channel the Rabbit channel to operate on (may be <code>null</code>)
368+
* @param channel the Rabbit channel to operate on (maybe <code>null</code>)
369369
* @param source the source data for the method invocation - e.g.
370370
* {@code o.s.messaging.Message<?>}; may be null
371371
* @see #buildMessage
@@ -404,8 +404,8 @@ else if (resultArg.getReturnValue() instanceof CompletableFuture) {
404404
}
405405
else if (monoPresent && MonoHandler.isMono(resultArg.getReturnValue())) {
406406
if (!this.isManualAck) {
407-
this.logger.warn("Container AcknowledgeMode must be MANUAL for a Mono<?> return type; "
408-
+ "otherwise the container will ack the message immediately");
407+
this.logger.warn("Container AcknowledgeMode must be MANUAL for a Mono<?> return type" +
408+
"(or Kotlin suspend function); otherwise the container will ack the message immediately");
409409
}
410410
MonoHandler.subscribe(resultArg.getReturnValue(),
411411
r -> asyncSuccess(resultArg, request, channel, source, r),
@@ -461,7 +461,7 @@ private void basicAck(Message request, Channel channel) {
461461
}
462462

463463
private void asyncFailure(Message request, Channel channel, Throwable t) {
464-
this.logger.error("Future or Mono was completed with an exception for " + request, t);
464+
this.logger.error("Future, Mono, or suspend function was completed with an exception for " + request, t);
465465
try {
466466
channel.basicNack(request.getMessageProperties().getDeliveryTag(), false,
467467
ContainerUtils.shouldRequeue(this.defaultRequeueRejected, t, this.logger));

0 commit comments

Comments
 (0)