|
108 | 108 | import org.springframework.transaction.support.TransactionSynchronizationManager;
|
109 | 109 | import org.springframework.transaction.support.TransactionTemplate;
|
110 | 110 | import org.springframework.util.Assert;
|
| 111 | +import org.springframework.util.ClassUtils; |
111 | 112 | import org.springframework.util.StringUtils;
|
112 | 113 | import org.springframework.util.concurrent.ListenableFuture;
|
113 | 114 | import org.springframework.util.concurrent.ListenableFutureCallback;
|
@@ -658,8 +659,8 @@ else if (listener instanceof MessageListener) {
|
658 | 659 | this.logger.info(this.toString());
|
659 | 660 | }
|
660 | 661 | Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
|
661 |
| - this.checkNullKeyForExceptions = checkDeserializer(findDeserializerClass(props, false)); |
662 |
| - this.checkNullValueForExceptions = checkDeserializer(findDeserializerClass(props, true)); |
| 662 | + this.checkNullKeyForExceptions = checkDeserializer(findDeserializerClass(props, consumerProperties, false)); |
| 663 | + this.checkNullValueForExceptions = checkDeserializer(findDeserializerClass(props, consumerProperties, true)); |
663 | 664 | this.syncCommitTimeout = determineSyncCommitTimeout();
|
664 | 665 | if (this.containerProperties.getSyncCommitTimeout() == null) {
|
665 | 666 | // update the property so we can use it directly from code elsewhere
|
@@ -877,14 +878,21 @@ else if (timeout instanceof String) {
|
877 | 878 | }
|
878 | 879 | }
|
879 | 880 |
|
880 |
| - private Object findDeserializerClass(Map<String, Object> props, boolean isValue) { |
| 881 | + @Nullable |
| 882 | + private Object findDeserializerClass(Map<String, Object> props, Properties consumerOverrides, boolean isValue) { |
881 | 883 | Object configuredDeserializer = isValue
|
882 | 884 | ? KafkaMessageListenerContainer.this.consumerFactory.getValueDeserializer()
|
883 | 885 | : KafkaMessageListenerContainer.this.consumerFactory.getKeyDeserializer();
|
884 | 886 | if (configuredDeserializer == null) {
|
885 |
| - return props.get(isValue |
| 887 | + Object deser = consumerOverrides.get(isValue |
886 | 888 | ? ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
|
887 | 889 | : ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
|
| 890 | + if (deser == null) { |
| 891 | + deser = props.get(isValue |
| 892 | + ? ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG |
| 893 | + : ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); |
| 894 | + } |
| 895 | + return deser; |
888 | 896 | }
|
889 | 897 | else {
|
890 | 898 | return configuredDeserializer.getClass();
|
@@ -916,10 +924,23 @@ private void subscribeOrAssignTopics(final Consumer<? super K, ? super V> subscr
|
916 | 924 | }
|
917 | 925 | }
|
918 | 926 |
|
919 |
| - private boolean checkDeserializer(Object deser) { |
920 |
| - return deser instanceof Class |
921 |
| - ? ErrorHandlingDeserializer.class.isAssignableFrom((Class<?>) deser) |
922 |
| - : deser instanceof String && deser.equals(ErrorHandlingDeserializer.class.getName()); |
| 927 | + private boolean checkDeserializer(@Nullable Object deser) { |
| 928 | + Class<?> deserializer = null; |
| 929 | + if (deser instanceof Class) { |
| 930 | + deserializer = (Class<?>) deser; |
| 931 | + } |
| 932 | + else if (deser instanceof String) { |
| 933 | + try { |
| 934 | + deserializer = ClassUtils.forName((String) deser, getApplicationContext().getClassLoader()); |
| 935 | + } |
| 936 | + catch (ClassNotFoundException | LinkageError e) { |
| 937 | + throw new IllegalStateException(e); |
| 938 | + } |
| 939 | + } |
| 940 | + else if (deser != null) { |
| 941 | + throw new IllegalStateException("Deserializer must be a class or class name, not a " + deser.getClass()); |
| 942 | + } |
| 943 | + return deserializer == null ? false : ErrorHandlingDeserializer.class.isAssignableFrom(deserializer); |
923 | 944 | }
|
924 | 945 |
|
925 | 946 | protected void checkConsumer() {
|
|
0 commit comments