Skip to content

Commit aa507b3

Browse files
authored
GH-1675: Validation on @KafkaHandler Methods
Resolves #1675 Add support for payload validation with `@KafkaHandler` methods. * Fix javadoc.
1 parent 4781d81 commit aa507b3

File tree

6 files changed

+164
-21
lines changed

6 files changed

+164
-21
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2020 the original author or authors.
2+
* Copyright 2014-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -187,6 +187,10 @@ public void afterPropertiesSet() {
187187
protected void registerAllEndpoints() {
188188
synchronized (this.endpointDescriptors) {
189189
for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
190+
if (descriptor.endpoint instanceof MultiMethodKafkaListenerEndpoint
191+
&& this.validator != null) {
192+
((MultiMethodKafkaListenerEndpoint) descriptor.endpoint).setValidator(this.validator);
193+
}
190194
this.endpointRegistry.registerListenerContainer(
191195
descriptor.endpoint, resolveContainerFactory(descriptor));
192196
}

spring-kafka/src/main/java/org/springframework/kafka/config/MultiMethodKafkaListenerEndpoint.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
2626
import org.springframework.lang.Nullable;
2727
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
28+
import org.springframework.validation.Validator;
2829

2930
/**
3031
* The {@link MethodKafkaListenerEndpoint} extension for several POJO methods
@@ -44,11 +45,16 @@ public class MultiMethodKafkaListenerEndpoint<K, V> extends MethodKafkaListenerE
4445

4546
private final Method defaultMethod;
4647

48+
private Validator validator;
49+
4750
/**
4851
* Construct an instance for the provided methods and bean with no default method.
4952
* @param methods the methods.
5053
* @param bean the bean.
54+
* @deprecated in favor of
55+
* {@link #MultiMethodKafkaListenerEndpoint(List, Method, Object)}.
5156
*/
57+
@Deprecated
5258
public MultiMethodKafkaListenerEndpoint(List<Method> methods, Object bean) {
5359
this(methods, null, bean);
5460
}
@@ -66,6 +72,15 @@ public MultiMethodKafkaListenerEndpoint(List<Method> methods, @Nullable Method d
6672
setBean(bean);
6773
}
6874

75+
/**
76+
* Set a payload validator.
77+
* @param validator the validator.
78+
* @since 2.5.11
79+
*/
80+
public void setValidator(Validator validator) {
81+
this.validator = validator;
82+
}
83+
6984
@Override
7085
protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapter<K, V> messageListener) {
7186
List<InvocableHandlerMethod> invocableHandlerMethods = new ArrayList<InvocableHandlerMethod>();
@@ -79,7 +94,7 @@ protected HandlerAdapter configureListenerAdapter(MessagingMessageListenerAdapte
7994
}
8095
}
8196
DelegatingInvocableHandler delegatingHandler = new DelegatingInvocableHandler(invocableHandlerMethods,
82-
defaultHandler, getBean(), getResolver(), getBeanExpressionContext(), getBeanFactory());
97+
defaultHandler, getBean(), getResolver(), getBeanExpressionContext(), getBeanFactory(), this.validator);
8398
return new HandlerAdapter(delegatingHandler);
8499
}
85100

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/DelegatingInvocableHandler.java

Lines changed: 86 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -39,11 +39,15 @@
3939
import org.springframework.kafka.support.KafkaUtils;
4040
import org.springframework.lang.Nullable;
4141
import org.springframework.messaging.Message;
42+
import org.springframework.messaging.MessageHeaders;
43+
import org.springframework.messaging.converter.MessageConverter;
4244
import org.springframework.messaging.handler.HandlerMethod;
4345
import org.springframework.messaging.handler.annotation.Header;
4446
import org.springframework.messaging.handler.annotation.SendTo;
47+
import org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver;
4548
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
4649
import org.springframework.util.Assert;
50+
import org.springframework.validation.Validator;
4751

4852

4953
/**
@@ -63,6 +67,9 @@ public class DelegatingInvocableHandler {
6367

6468
private final ConcurrentMap<Class<?>, InvocableHandlerMethod> cachedHandlers = new ConcurrentHashMap<>();
6569

70+
private final ConcurrentMap<InvocableHandlerMethod, MethodParameter> payloadMethodParameters =
71+
new ConcurrentHashMap<>();
72+
6673
private final InvocableHandlerMethod defaultHandler;
6774

6875
private final Map<InvocableHandlerMethod, Expression> handlerSendTo = new HashMap<>();
@@ -77,17 +84,22 @@ public class DelegatingInvocableHandler {
7784

7885
private final ConfigurableListableBeanFactory beanFactory;
7986

87+
private final PayloadValidator validator;
88+
8089
/**
8190
* Construct an instance with the supplied handlers for the bean.
8291
* @param handlers the handlers.
8392
* @param bean the bean.
8493
* @param beanExpressionResolver the expression resolver.
8594
* @param beanExpressionContext the expression context.
95+
* @deprecated in favor of
96+
* {@link #DelegatingInvocableHandler(List, InvocableHandlerMethod, Object, BeanExpressionResolver, BeanExpressionContext, BeanFactory, Validator)}
8697
*/
98+
@Deprecated
8799
public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers, Object bean,
88100
BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext) {
89101

90-
this(handlers, null, bean, beanExpressionResolver, beanExpressionContext);
102+
this(handlers, null, bean, beanExpressionResolver, beanExpressionContext, null, null);
91103
}
92104

93105
/**
@@ -97,13 +109,16 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers, Object
97109
* @param bean the bean.
98110
* @param beanExpressionResolver the resolver.
99111
* @param beanExpressionContext the context.
112+
* @deprecated in favor of
113+
* {@link #DelegatingInvocableHandler(List, InvocableHandlerMethod, Object, BeanExpressionResolver, BeanExpressionContext, BeanFactory, Validator)}
100114
* @since 2.1.3
101115
*/
116+
@Deprecated
102117
public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
103118
@Nullable InvocableHandlerMethod defaultHandler,
104119
Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext) {
105120

106-
this(handlers, defaultHandler, bean, beanExpressionResolver, beanExpressionContext, null);
121+
this(handlers, defaultHandler, bean, beanExpressionResolver, beanExpressionContext, null, null);
107122
}
108123

109124
/**
@@ -114,13 +129,35 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
114129
* @param beanExpressionResolver the resolver.
115130
* @param beanExpressionContext the context.
116131
* @param beanFactory the bean factory.
117-
* @since 2.1.11
132+
* @deprecated in favor of
133+
* {@link #DelegatingInvocableHandler(List, InvocableHandlerMethod, Object, BeanExpressionResolver, BeanExpressionContext, BeanFactory, Validator)}
134+
* @since 2.5.11
118135
*/
136+
@Deprecated
119137
public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
120138
@Nullable InvocableHandlerMethod defaultHandler,
121139
Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext,
122140
@Nullable BeanFactory beanFactory) {
123141

142+
this(handlers, defaultHandler, bean, beanExpressionResolver, beanExpressionContext, beanFactory, null);
143+
}
144+
145+
/**
146+
* Construct an instance with the supplied handlers for the bean.
147+
* @param handlers the handlers.
148+
* @param defaultHandler the default handler.
149+
* @param bean the bean.
150+
* @param beanExpressionResolver the resolver.
151+
* @param beanExpressionContext the context.
152+
* @param beanFactory the bean factory.
153+
* @param validator the validator.
154+
* @since 2.5.11
155+
*/
156+
public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
157+
@Nullable InvocableHandlerMethod defaultHandler,
158+
Object bean, BeanExpressionResolver beanExpressionResolver, BeanExpressionContext beanExpressionContext,
159+
@Nullable BeanFactory beanFactory, @Nullable Validator validator) {
160+
124161
this.handlers = new ArrayList<>();
125162
for (InvocableHandlerMethod handler : handlers) {
126163
this.handlers.add(wrapIfNecessary(handler));
@@ -132,6 +169,7 @@ public DelegatingInvocableHandler(List<InvocableHandlerMethod> handlers,
132169
this.beanFactory = beanFactory instanceof ConfigurableListableBeanFactory
133170
? (ConfigurableListableBeanFactory) beanFactory
134171
: null;
172+
this.validator = validator == null ? null : new PayloadValidator(validator);
135173
}
136174

137175
private InvocableHandlerMethod wrapIfNecessary(InvocableHandlerMethod handler) {
@@ -166,6 +204,12 @@ public Object getBean() {
166204
public Object invoke(Message<?> message, Object... providedArgs) throws Exception { //NOSONAR
167205
Class<? extends Object> payloadClass = message.getPayload().getClass();
168206
InvocableHandlerMethod handler = getHandlerForPayload(payloadClass);
207+
if (this.validator != null) {
208+
MethodParameter parameter = this.payloadMethodParameters.get(handler);
209+
if (parameter != null) {
210+
this.validator.validate(message, parameter, message.getPayload());
211+
}
212+
}
169213
Object result;
170214
if (handler instanceof MetadataAwareInvocableHandlerMethod) {
171215
Object[] args = new Object[providedArgs.length + 1];
@@ -279,23 +323,29 @@ protected boolean matchHandlerMethod(Class<? extends Object> payloadClass, Invoc
279323
if ((methodParameter.getParameterAnnotations().length == 0
280324
|| !methodParameter.hasParameterAnnotation(Header.class))
281325
&& methodParameter.getParameterType().isAssignableFrom(payloadClass)) {
326+
if (this.validator != null) {
327+
this.payloadMethodParameters.put(handler, methodParameter);
328+
}
282329
return true;
283330
}
284331
}
285332

286-
boolean foundCandidate = false;
333+
MethodParameter foundCandidate = null;
287334
for (int i = 0; i < parameterAnnotations.length; i++) {
288335
MethodParameter methodParameter = new MethodParameter(method, i);
289336
if ((methodParameter.getParameterAnnotations().length == 0
290337
|| !methodParameter.hasParameterAnnotation(Header.class))
291338
&& methodParameter.getParameterType().isAssignableFrom(payloadClass)) {
292-
if (foundCandidate) {
339+
if (foundCandidate != null) {
293340
throw new KafkaException("Ambiguous payload parameter for " + method.toGenericString());
294341
}
295-
foundCandidate = true;
342+
foundCandidate = methodParameter;
296343
}
297344
}
298-
return foundCandidate;
345+
if (foundCandidate != null && this.validator != null) {
346+
this.payloadMethodParameters.put(handler, foundCandidate);
347+
}
348+
return foundCandidate != null;
299349
}
300350

301351
/**
@@ -325,4 +375,32 @@ private static final class MetadataAwareInvocableHandlerMethod extends Invocable
325375

326376
}
327377

378+
private static final class PayloadValidator extends PayloadMethodArgumentResolver {
379+
380+
PayloadValidator(Validator validator) {
381+
super(new MessageConverter() { // Required but never used
382+
383+
@Override
384+
@Nullable
385+
public Message<?> toMessage(Object payload, @Nullable
386+
MessageHeaders headers) {
387+
return null;
388+
}
389+
390+
@Override
391+
@Nullable
392+
public Object fromMessage(Message<?> message, Class<?> targetClass) {
393+
return null;
394+
}
395+
396+
}, validator);
397+
}
398+
399+
@Override
400+
public void validate(Message<?> message, MethodParameter parameter, Object target) { // NOSONAR - public
401+
super.validate(message, parameter, target);
402+
}
403+
404+
}
405+
328406
}

0 commit comments

Comments
 (0)