Skip to content

Commit db54e24

Browse files
garyrussellartembilan
authored andcommitted
GH-2274: Option to Not Configure (De)serializers
Resolves #2274 Provide an option to suppress configuring programmatically provided `Serializer`s and `Deserializer`s. Also fix the DKPF, which did not configure serializers provided by the supplier setter. # Conflicts: # spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java
1 parent db0f958 commit db54e24

File tree

2 files changed

+182
-21
lines changed

2 files changed

+182
-21
lines changed

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java

Lines changed: 72 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2021 the original author or authors.
2+
* Copyright 2016-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -89,6 +89,7 @@ public class DefaultKafkaConsumerFactory<K, V> extends KafkaResourceFactory
8989

9090
private String beanName = "not.managed.by.Spring";
9191

92+
private boolean configureDeserializers = true;
9293

9394
/**
9495
* Construct a factory with the provided configuration.
@@ -113,6 +114,23 @@ public DefaultKafkaConsumerFactory(Map<String, Object> configs,
113114
this(configs, () -> keyDeserializer, () -> valueDeserializer);
114115
}
115116

117+
/**
118+
* Construct a factory with the provided configuration and deserializers.
119+
* The deserializers' {@code configure()} methods will be called with the
120+
* configuration map unless {@code configureDeserializers} is false.
121+
* @param configs the configuration.
122+
* @param keyDeserializer the key {@link Deserializer}.
123+
* @param valueDeserializer the value {@link Deserializer}.
124+
* @param configureDeserializers false to not configure the deserializers.
125+
* @since 2.8.7
126+
*/
127+
public DefaultKafkaConsumerFactory(Map<String, Object> configs,
128+
@Nullable Deserializer<K> keyDeserializer,
129+
@Nullable Deserializer<V> valueDeserializer, boolean configureDeserializers) {
130+
131+
this(configs, () -> keyDeserializer, () -> valueDeserializer, configureDeserializers);
132+
}
133+
116134
/**
117135
* Construct a factory with the provided configuration and deserializer suppliers.
118136
* When the suppliers are invoked to get an instance, the deserializers'
@@ -126,14 +144,36 @@ public DefaultKafkaConsumerFactory(Map<String, Object> configs,
126144
@Nullable Supplier<Deserializer<K>> keyDeserializerSupplier,
127145
@Nullable Supplier<Deserializer<V>> valueDeserializerSupplier) {
128146

147+
this(configs, keyDeserializerSupplier, valueDeserializerSupplier, true);
148+
}
149+
150+
/**
151+
* Construct a factory with the provided configuration and deserializer suppliers.
152+
* When the suppliers are invoked to get an instance, the deserializers'
153+
* {@code configure()} methods will be called with the configuration map unless
154+
* {@code configureDeserializers} is false.
155+
* @param configs the configuration.
156+
* @param keyDeserializerSupplier the key {@link Deserializer} supplier function.
157+
* @param valueDeserializerSupplier the value {@link Deserializer} supplier function.
158+
* @param configureDeserializers false to not configure the deserializers.
159+
* @since 2.8.7
160+
*/
161+
public DefaultKafkaConsumerFactory(Map<String, Object> configs,
162+
@Nullable Supplier<Deserializer<K>> keyDeserializerSupplier,
163+
@Nullable Supplier<Deserializer<V>> valueDeserializerSupplier, boolean configureDeserializers) {
164+
129165
this.configs = new ConcurrentHashMap<>(configs);
166+
this.configureDeserializers = configureDeserializers;
130167
this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier);
131168
this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier);
132169
}
133170

134171
private Supplier<Deserializer<K>> keyDeserializerSupplier(
135172
@Nullable Supplier<Deserializer<K>> keyDeserializerSupplier) {
136173

174+
if (!this.configureDeserializers) {
175+
return keyDeserializerSupplier;
176+
}
137177
return keyDeserializerSupplier == null
138178
? () -> null
139179
: () -> {
@@ -148,6 +188,9 @@ private Supplier<Deserializer<K>> keyDeserializerSupplier(
148188
private Supplier<Deserializer<V>> valueDeserializerSupplier(
149189
@Nullable Supplier<Deserializer<V>> valueDeserializerSupplier) {
150190

191+
if (!this.configureDeserializers) {
192+
return valueDeserializerSupplier;
193+
}
151194
return valueDeserializerSupplier == null
152195
? () -> null
153196
: () -> {
@@ -165,23 +208,29 @@ public void setBeanName(String name) {
165208
}
166209

167210
/**
168-
* Set the key deserializer.
211+
* Set the key deserializer. The deserializer will be configured using the consumer
212+
* configuration, unless {@link #setConfigureDeserializers(boolean)
213+
* configureDeserializers} is false.
169214
* @param keyDeserializer the deserializer.
170215
*/
171216
public void setKeyDeserializer(@Nullable Deserializer<K> keyDeserializer) {
172217
this.keyDeserializerSupplier = keyDeserializerSupplier(() -> keyDeserializer);
173218
}
174219

175220
/**
176-
* Set the value deserializer.
221+
* Set the value deserializer. The deserializer will be configured using the consumer
222+
* configuration, unless {@link #setConfigureDeserializers(boolean)
223+
* configureDeserializers} is false.
177224
* @param valueDeserializer the value deserializer.
178225
*/
179226
public void setValueDeserializer(@Nullable Deserializer<V> valueDeserializer) {
180227
this.valueDeserializerSupplier = valueDeserializerSupplier(() -> valueDeserializer);
181228
}
182229

183230
/**
184-
* Set a supplier to supply instances of the key deserializer.
231+
* Set a supplier to supply instances of the key deserializer. The deserializer will
232+
* be configured using the consumer configuration, unless
233+
* {@link #setConfigureDeserializers(boolean) configureDeserializers} is false.
185234
* @param keyDeserializerSupplier the supplier.
186235
* @since 2.8
187236
*/
@@ -190,14 +239,32 @@ public void setKeyDeserializerSupplier(Supplier<Deserializer<K>> keyDeserializer
190239
}
191240

192241
/**
193-
* Set a supplier to supply instances of the value deserializer.
242+
* Set a supplier to supply instances of the value deserializer. The deserializer will
243+
* be configured using the consumer configuration, unless
244+
* {@link #setConfigureDeserializers(boolean) configureDeserializers} is false.
194245
* @param valueDeserializerSupplier the supplier.
195246
* @since 2.8
196247
*/
197248
public void setValueDeserializerSupplier(Supplier<Deserializer<V>> valueDeserializerSupplier) {
198249
this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier);
199250
}
200251

252+
253+
/**
254+
* Set to false (default true) to prevent programmatically provided deserializers (via
255+
* constructor or setters) from being configured using the producer configuration,
256+
* e.g. if the deserializers are already fully configured.
257+
* @param configureDeserializers false to not configure.
258+
* @since 2.8.7
259+
* @see #setKeyDeserializer(Deserializer)
260+
* @see #setKeyDeserializerSupplier(Supplier)
261+
* @see #setValueDeserializer(Deserializer)
262+
* @see #setValueDeserializerSupplier(Supplier)
263+
**/
264+
public void setConfigureDeserializers(boolean configureDeserializers) {
265+
this.configureDeserializers = configureDeserializers;
266+
}
267+
201268
@Override
202269
public Map<String, Object> getConfigurationProperties() {
203270
Map<String, Object> configs2 = new HashMap<>(this.configs);

spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaProducerFactory.java

Lines changed: 110 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
149149

150150
private long maxAge;
151151

152+
private boolean configureSerializers = true;
153+
152154
private volatile String transactionIdPrefix;
153155

154156
private volatile String clientIdPrefix;
@@ -178,16 +180,37 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
178180
@Nullable Serializer<K> keySerializer,
179181
@Nullable Serializer<V> valueSerializer) {
180182

181-
this(configs, () -> keySerializer, () -> valueSerializer);
183+
this(configs, () -> keySerializer, () -> valueSerializer, true);
182184
}
183185

184186
/**
185-
* Construct a factory with the provided configuration and {@link Serializer} Suppliers.
186-
* Also configures a {@link #transactionIdPrefix} as a value from the
187-
* {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided.
188-
* This config is going to be overridden with a suffix for target {@link Producer} instance.
189-
* When the suppliers are invoked to get an instance, the serializers'
190-
* {@code configure()} methods will be called with the configuration map.
187+
* Construct a factory with the provided configuration and {@link Serializer}s. Also
188+
* configures a {@link #transactionIdPrefix} as a value from the
189+
* {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided. This config is going to
190+
* be overridden with a suffix for target {@link Producer} instance. The serializers'
191+
* {@code configure()} methods will be called with the configuration map unless
192+
* {@code configureSerializers} is false..
193+
* @param configs the configuration.
194+
* @param keySerializer the key {@link Serializer}.
195+
* @param valueSerializer the value {@link Serializer}.
196+
* @param configureSerializers set to false if serializers are already fully
197+
* configured.
198+
* @since 2.8.7
199+
*/
200+
public DefaultKafkaProducerFactory(Map<String, Object> configs,
201+
@Nullable Serializer<K> keySerializer,
202+
@Nullable Serializer<V> valueSerializer, boolean configureSerializers) {
203+
204+
this(configs, () -> keySerializer, () -> valueSerializer, configureSerializers);
205+
}
206+
207+
/**
208+
* Construct a factory with the provided configuration and {@link Serializer}
209+
* Suppliers. Also configures a {@link #transactionIdPrefix} as a value from the
210+
* {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided. This config is going to
211+
* be overridden with a suffix for target {@link Producer} instance. When the
212+
* suppliers are invoked to get an instance, the serializers' {@code configure()}
213+
* methods will be called with the configuration map.
191214
* @param configs the configuration.
192215
* @param keySerializerSupplier the key {@link Serializer} supplier function.
193216
* @param valueSerializerSupplier the value {@link Serializer} supplier function.
@@ -197,7 +220,30 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
197220
@Nullable Supplier<Serializer<K>> keySerializerSupplier,
198221
@Nullable Supplier<Serializer<V>> valueSerializerSupplier) {
199222

223+
this(configs, keySerializerSupplier, valueSerializerSupplier, true);
224+
}
225+
226+
/**
227+
* Construct a factory with the provided configuration and {@link Serializer}
228+
* Suppliers. Also configures a {@link #transactionIdPrefix} as a value from the
229+
* {@link ProducerConfig#TRANSACTIONAL_ID_CONFIG} if provided. This config is going to
230+
* be overridden with a suffix for target {@link Producer} instance. When the
231+
* suppliers are invoked to get an instance, the serializers' {@code configure()}
232+
* methods will be called with the configuration map unless
233+
* {@code configureSerializers} is false.
234+
* @param configs the configuration.
235+
* @param keySerializerSupplier the key {@link Serializer} supplier function.
236+
* @param valueSerializerSupplier the value {@link Serializer} supplier function.
237+
* @param configureSerializers set to false if serializers are already fully
238+
* configured.
239+
* @since 2.8.7
240+
*/
241+
public DefaultKafkaProducerFactory(Map<String, Object> configs,
242+
@Nullable Supplier<Serializer<K>> keySerializerSupplier,
243+
@Nullable Supplier<Serializer<V>> valueSerializerSupplier, boolean configureSerializers) {
244+
200245
this.configs = new ConcurrentHashMap<>(configs);
246+
this.configureSerializers = configureSerializers;
201247
this.keySerializerSupplier = keySerializerSupplier(keySerializerSupplier);
202248
this.valueSerializerSupplier = valueSerializerSupplier(valueSerializerSupplier);
203249
if (this.clientIdPrefix == null && configs.get(ProducerConfig.CLIENT_ID_CONFIG) instanceof String) {
@@ -212,6 +258,9 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
212258

213259
private Supplier<Serializer<K>> keySerializerSupplier(@Nullable Supplier<Serializer<K>> keySerializerSupplier) {
214260
this.rawKeySerializerSupplier = keySerializerSupplier;
261+
if (!this.configureSerializers) {
262+
return keySerializerSupplier;
263+
}
215264
return keySerializerSupplier == null
216265
? () -> null
217266
: () -> {
@@ -225,6 +274,9 @@ private Supplier<Serializer<K>> keySerializerSupplier(@Nullable Supplier<Seriali
225274

226275
private Supplier<Serializer<V>> valueSerializerSupplier(@Nullable Supplier<Serializer<V>> valueSerializerSupplier) {
227276
this.rawValueSerializerSupplier = valueSerializerSupplier;
277+
if (!this.configureSerializers) {
278+
return valueSerializerSupplier;
279+
}
228280
return valueSerializerSupplier == null
229281
? () -> null
230282
: () -> {
@@ -247,37 +299,79 @@ public void setBeanName(String name) {
247299
}
248300

249301
/**
250-
* Set a key serializer.
302+
* Set a key serializer. The serializer will be configured using the producer
303+
* configuration, unless {@link #setConfigureSerializers(boolean)
304+
* configureSerializers} is false.
251305
* @param keySerializer the key serializer.
306+
* @see #setConfigureSerializers(boolean)
252307
*/
253308
public void setKeySerializer(@Nullable Serializer<K> keySerializer) {
254309
this.keySerializerSupplier = keySerializerSupplier(() -> keySerializer);
255310
}
256311

257312
/**
258-
* Set a value serializer.
313+
* Set a value serializer. The serializer will be configured using the producer
314+
* configuration, unless {@link #setConfigureSerializers(boolean)
315+
* configureSerializers} is false.
259316
* @param valueSerializer the value serializer.
317+
* @see #setConfigureSerializers(boolean)
260318
*/
261319
public void setValueSerializer(@Nullable Serializer<V> valueSerializer) {
262320
this.valueSerializerSupplier = valueSerializerSupplier(() -> valueSerializer);
263321
}
264322

265323
/**
266-
* Set a supplier to supply instances of the key serializer.
324+
* Set a supplier to supply instances of the key serializer. The serializer will be
325+
* configured using the producer configuration, unless
326+
* {@link #setConfigureSerializers(boolean) configureSerializers} is false.
267327
* @param keySerializerSupplier the supplier.
268328
* @since 2.8
329+
* @see #setConfigureSerializers(boolean)
269330
*/
270331
public void setKeySerializerSupplier(Supplier<Serializer<K>> keySerializerSupplier) {
271-
this.keySerializerSupplier = keySerializerSupplier;
332+
this.keySerializerSupplier = keySerializerSupplier(keySerializerSupplier);
272333
}
273334

274335
/**
275336
* Set a supplier to supply instances of the value serializer.
276-
* @param valueSerializerSupplier the supplier.
337+
* @param valueSerializerSupplier the supplier. The serializer will be configured
338+
* using the producer configuration, unless {@link #setConfigureSerializers(boolean)
339+
* configureSerializers} is false.
277340
* @since 2.8
341+
* @see #setConfigureSerializers(boolean)
278342
*/
279343
public void setValueSerializerSupplier(Supplier<Serializer<V>> valueSerializerSupplier) {
280-
this.valueSerializerSupplier = valueSerializerSupplier;
344+
this.valueSerializerSupplier = valueSerializerSupplier(valueSerializerSupplier);
345+
}
346+
347+
/**
348+
* If true (default), programmatically provided serializers (via constructor or
349+
* setters) will be configured using the producer configuration. Set to false if the
350+
* serializers are already fully configured.
351+
* @return true to configure.
352+
* @since 2.8.7
353+
* @see #setKeySerializer(Serializer)
354+
* @see #setKeySerializerSupplier(Supplier)
355+
* @see #setValueSerializer(Serializer)
356+
* @see #setValueSerializerSupplier(Supplier)
357+
*/
358+
public boolean isConfigureSerializers() {
359+
return this.configureSerializers;
360+
}
361+
362+
/**
363+
* Set to false (default true) to prevent programmatically provided serializers (via
364+
* constructor or setters) from being configured using the producer configuration,
365+
* e.g. if the serializers are already fully configured.
366+
* @param configureSerializers false to not configure.
367+
* @since 2.8.7
368+
* @see #setKeySerializer(Serializer)
369+
* @see #setKeySerializerSupplier(Supplier)
370+
* @see #setValueSerializer(Serializer)
371+
* @see #setValueSerializerSupplier(Supplier)
372+
*/
373+
public void setConfigureSerializers(boolean configureSerializers) {
374+
this.configureSerializers = configureSerializers;
281375
}
282376

283377
/**
@@ -441,10 +535,10 @@ public ProducerFactory<K, V> copyWithConfigurationOverride(Map<String, Object> o
441535
Map<String, Object> producerProperties = new HashMap<>(getConfigurationProperties());
442536
producerProperties.putAll(overrideProperties);
443537
producerProperties = ensureExistingTransactionIdPrefixInProperties(producerProperties);
444-
DefaultKafkaProducerFactory<K, V> newFactory =
445-
new DefaultKafkaProducerFactory<>(producerProperties,
538+
DefaultKafkaProducerFactory<K, V> newFactory = new DefaultKafkaProducerFactory<>(producerProperties,
446539
getKeySerializerSupplier(),
447-
getValueSerializerSupplier());
540+
getValueSerializerSupplier(),
541+
isConfigureSerializers());
448542
newFactory.setPhysicalCloseTimeout((int) getPhysicalCloseTimeout().getSeconds());
449543
newFactory.setProducerPerThread(isProducerPerThread());
450544
for (ProducerPostProcessor<K, V> templatePostProcessor : getPostProcessors()) {

0 commit comments

Comments
 (0)