Skip to content

Commit 4ac551d

Browse files
garyrussellartembilan
authored andcommitted
GH-1126: Add delegating Serializer/Deserializer
Resolves #1126 * * Doc polishing per PR comments.
1 parent ceaf8d8 commit 4ac551d

File tree

5 files changed

+501
-1
lines changed

5 files changed

+501
-1
lines changed
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.serializer;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
import org.apache.kafka.common.header.Headers;
23+
import org.apache.kafka.common.serialization.Deserializer;
24+
25+
import org.springframework.lang.Nullable;
26+
import org.springframework.util.Assert;
27+
import org.springframework.util.ClassUtils;
28+
import org.springframework.util.StringUtils;
29+
30+
/**
31+
* A {@link Deserializer} that delegates to other deserializers based on a serialization
32+
* selector header.
33+
*
34+
* @author Gary Russell
35+
* @since 2.3
36+
*
37+
*/
38+
public class DelegatingDeserializer implements Deserializer<Object> {
39+
40+
/**
41+
* Name of the configuration property containing the serialization selector map with
42+
* format {@code selector:class,...}.
43+
*/
44+
public static final String SERIALIZATION_SELECTOR_CONFIG = DelegatingSerializer.SERIALIZATION_SELECTOR_CONFIG;
45+
46+
47+
private final Map<String, Deserializer<?>> delegates = new HashMap<>();
48+
49+
/**
50+
* Construct an instance that will be configured in {@link #configure(Map, boolean)}
51+
* with a consumer property
52+
* {@link SERIALIZATION_SELECTOR_CONFIG}.
53+
*/
54+
public DelegatingDeserializer() {
55+
super();
56+
}
57+
58+
/**
59+
* Construct an instance with the supplied mapping of selectors to delegate
60+
* deserializers. The selector must be supplied in the
61+
* {@link DelegatingSerializer#SERIALIZATION_SELECTOR} header.
62+
* @param delegates the map of delegates.
63+
*/
64+
public DelegatingDeserializer(Map<String, Deserializer<?>> delegates) {
65+
this.delegates.putAll(delegates);
66+
}
67+
68+
@SuppressWarnings("unchecked")
69+
@Override
70+
public void configure(Map<String, ?> configs, boolean isKey) {
71+
Object value = configs.get(SERIALIZATION_SELECTOR_CONFIG);
72+
if (value == null) {
73+
return;
74+
}
75+
if (value instanceof Map) {
76+
((Map<String, Object>) value).forEach((selector, deser) -> {
77+
if (deser instanceof Deserializer) {
78+
this.delegates.put(selector, (Deserializer<?>) deser);
79+
}
80+
else if (deser instanceof Class) {
81+
instantiateAndConfigure(configs, isKey, this.delegates, selector, (Class<?>) deser);
82+
}
83+
else if (deser instanceof String) {
84+
createInstanceAndConfigure(configs, isKey, this.delegates, selector, (String) deser);
85+
}
86+
else {
87+
throw new IllegalStateException(SERIALIZATION_SELECTOR_CONFIG
88+
+ " map entries must be Serializers or class names, not " + value.getClass());
89+
}
90+
});
91+
}
92+
else if (value instanceof String) {
93+
this.delegates.putAll(createDelegates((String) value, configs, isKey));
94+
}
95+
else {
96+
throw new IllegalStateException(
97+
SERIALIZATION_SELECTOR_CONFIG + " must be a map or String, not " + value.getClass());
98+
}
99+
}
100+
101+
protected static Map<String, Deserializer<?>> createDelegates(String mappings, Map<String, ?> configs,
102+
boolean isKey) {
103+
104+
Map<String, Deserializer<?>> delegateMap = new HashMap<>();
105+
String[] array = StringUtils.commaDelimitedListToStringArray(mappings);
106+
for (String entry : array) {
107+
String[] split = entry.split(":");
108+
Assert.isTrue(split.length == 2, "Each comma-delimited selector entry must have exactly one ':'");
109+
createInstanceAndConfigure(configs, isKey, delegateMap, split[0], split[1]);
110+
}
111+
return delegateMap;
112+
}
113+
114+
protected static void createInstanceAndConfigure(Map<String, ?> configs, boolean isKey,
115+
Map<String, Deserializer<?>> delegateMap, String selector, String className) {
116+
117+
try {
118+
Class<?> clazz = ClassUtils.forName(className.trim(), ClassUtils.getDefaultClassLoader());
119+
instantiateAndConfigure(configs, isKey, delegateMap, selector, clazz);
120+
}
121+
catch (ClassNotFoundException | LinkageError e) {
122+
throw new IllegalArgumentException(e);
123+
}
124+
}
125+
126+
protected static void instantiateAndConfigure(Map<String, ?> configs, boolean isKey,
127+
Map<String, Deserializer<?>> delegateMap, String selector, Class<?> clazz) {
128+
129+
try {
130+
Deserializer<?> delegate = (Deserializer<?>) clazz.newInstance();
131+
delegate.configure(configs, isKey);
132+
delegateMap.put(selector.trim(), delegate);
133+
}
134+
catch (InstantiationException | IllegalAccessException e) {
135+
throw new IllegalArgumentException(e);
136+
}
137+
}
138+
139+
public void addDelegate(String selector, Deserializer<?> deserializer) {
140+
this.delegates.put(selector, deserializer);
141+
}
142+
143+
@Nullable
144+
public Deserializer<?> removeDelegate(String selector) {
145+
return this.delegates.remove(selector);
146+
}
147+
148+
@Override
149+
public Object deserialize(String topic, byte[] data) {
150+
throw new UnsupportedOperationException();
151+
}
152+
153+
@Override
154+
public Object deserialize(String topic, Headers headers, byte[] data) {
155+
byte[] value = headers.lastHeader(DelegatingSerializer.SERIALIZATION_SELECTOR).value();
156+
if (value == null) {
157+
throw new IllegalStateException("No '" + DelegatingSerializer.SERIALIZATION_SELECTOR + "' header present");
158+
}
159+
String selector = new String(value).replaceAll("\"", "");
160+
@SuppressWarnings("unchecked")
161+
Deserializer<Object> deserializer = (Deserializer<Object>) this.delegates.get(selector);
162+
if (deserializer == null) {
163+
return data;
164+
}
165+
else {
166+
return deserializer.deserialize(topic, headers, data);
167+
}
168+
}
169+
170+
@Override
171+
public void close() {
172+
this.delegates.values().forEach(deser -> deser.close());
173+
}
174+
175+
}
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.support.serializer;
18+
19+
import java.util.HashMap;
20+
import java.util.Map;
21+
22+
import org.apache.kafka.common.header.Headers;
23+
import org.apache.kafka.common.serialization.Serializer;
24+
25+
import org.springframework.lang.Nullable;
26+
import org.springframework.util.Assert;
27+
import org.springframework.util.ClassUtils;
28+
import org.springframework.util.StringUtils;
29+
30+
/**
31+
* A {@link Serializer} that delegates to other serializers based on a serialization
32+
* selector header.
33+
*
34+
* @author Gary Russell
35+
* @since 2.3
36+
*
37+
*/
38+
public class DelegatingSerializer implements Serializer<Object> {
39+
40+
/**
41+
* Name of the header containing the serialization selector.
42+
*/
43+
public static final String SERIALIZATION_SELECTOR = "spring.kafka.serialization.selector";
44+
45+
/**
46+
* Name of the configuration property containing the serialization selector map with
47+
* format {@code selector:class,...}.
48+
*/
49+
public static final String SERIALIZATION_SELECTOR_CONFIG = "spring.kafka.serialization.selector.config";
50+
51+
private final Map<String, Serializer<?>> delegates = new HashMap<>();
52+
53+
/**
54+
* Construct an instance that will be configured in {@link #configure(Map, boolean)}
55+
* with a producer property
56+
* {@link DelegatingSerializer#SERIALIZATION_SELECTOR_CONFIG}.
57+
*/
58+
public DelegatingSerializer() {
59+
super();
60+
}
61+
62+
/**
63+
* Construct an instance with the supplied mapping of selectors to delegate
64+
* serializers. The selector must be supplied in the
65+
* {@link DelegatingSerializer#SERIALIZATION_SELECTOR} header.
66+
* @param delegates the map of delegates.
67+
*/
68+
public DelegatingSerializer(Map<String, Serializer<?>> delegates) {
69+
this.delegates.putAll(delegates);
70+
}
71+
72+
@SuppressWarnings("unchecked")
73+
@Override
74+
public void configure(Map<String, ?> configs, boolean isKey) {
75+
Object value = configs.get(SERIALIZATION_SELECTOR_CONFIG);
76+
if (value == null) {
77+
return;
78+
}
79+
else if (value instanceof Map) {
80+
((Map<String, Object>) value).forEach((selector, serializer) -> {
81+
if (serializer instanceof Serializer) {
82+
this.delegates.put(selector, (Serializer<?>) serializer);
83+
}
84+
else if (serializer instanceof Class) {
85+
instantiateAndConfigure(configs, isKey, this.delegates, selector, (Class<?>) serializer);
86+
}
87+
else if (serializer instanceof String) {
88+
createInstanceAndConfigure(configs, isKey, this.delegates, selector, (String) serializer);
89+
}
90+
else {
91+
throw new IllegalStateException(SERIALIZATION_SELECTOR_CONFIG
92+
+ " map entries must be Serializers or class names, not " + value.getClass());
93+
}
94+
});
95+
}
96+
else if (value instanceof String) {
97+
this.delegates.putAll(createDelegates((String) value, configs, isKey));
98+
}
99+
else {
100+
throw new IllegalStateException(
101+
SERIALIZATION_SELECTOR_CONFIG + " must be a map or String, not " + value.getClass());
102+
}
103+
}
104+
105+
protected static Map<String, Serializer<?>> createDelegates(String mappings, Map<String, ?> configs,
106+
boolean isKey) {
107+
108+
Map<String, Serializer<?>> delegateMap = new HashMap<>();
109+
String[] array = StringUtils.commaDelimitedListToStringArray(mappings);
110+
for (String entry : array) {
111+
String[] split = entry.split(":");
112+
Assert.isTrue(split.length == 2, "Each comma-delimited selector entry must have exactly one ':'");
113+
createInstanceAndConfigure(configs, isKey, delegateMap, split[0], split[1]);
114+
}
115+
return delegateMap;
116+
}
117+
118+
protected static void createInstanceAndConfigure(Map<String, ?> configs, boolean isKey,
119+
Map<String, Serializer<?>> delegateMap, String selector, String className) {
120+
121+
try {
122+
Class<?> clazz = ClassUtils.forName(className.trim(), ClassUtils.getDefaultClassLoader());
123+
instantiateAndConfigure(configs, isKey, delegateMap, selector, clazz);
124+
}
125+
catch (ClassNotFoundException | LinkageError e) {
126+
throw new IllegalArgumentException(e);
127+
}
128+
}
129+
130+
protected static void instantiateAndConfigure(Map<String, ?> configs, boolean isKey,
131+
Map<String, Serializer<?>> delegateMap, String selector, Class<?> clazz) {
132+
133+
try {
134+
Serializer<?> delegate = (Serializer<?>) clazz.newInstance();
135+
delegate.configure(configs, isKey);
136+
delegateMap.put(selector.trim(), delegate);
137+
}
138+
catch (InstantiationException | IllegalAccessException e) {
139+
throw new IllegalArgumentException(e);
140+
}
141+
}
142+
143+
public void addDelegate(String selector, Serializer<?> serializer) {
144+
this.delegates.put(selector, serializer);
145+
}
146+
147+
@Nullable
148+
public Serializer<?> removeDelegate(String selector) {
149+
return this.delegates.remove(selector);
150+
}
151+
152+
@Override
153+
public byte[] serialize(String topic, Object data) {
154+
throw new UnsupportedOperationException();
155+
}
156+
157+
158+
@Override
159+
public byte[] serialize(String topic, Headers headers, Object data) {
160+
byte[] value = headers.lastHeader(SERIALIZATION_SELECTOR).value();
161+
if (value == null) {
162+
throw new IllegalStateException("No '" + SERIALIZATION_SELECTOR + "' header present");
163+
}
164+
String selector = new String(value).replaceAll("\"", "");
165+
@SuppressWarnings("unchecked")
166+
Serializer<Object> serializer = (Serializer<Object>) this.delegates.get(selector);
167+
if (serializer == null) {
168+
throw new IllegalStateException(
169+
"No serializer found for '" + SERIALIZATION_SELECTOR + "' header with value '" + selector + "'");
170+
}
171+
return serializer.serialize(topic, headers, data);
172+
}
173+
174+
@Override
175+
public void close() {
176+
this.delegates.values().forEach(ser -> ser.close());
177+
}
178+
179+
}

0 commit comments

Comments
 (0)