Skip to content

Commit c53c68e

Browse files
garyrussellartembilan
authored andcommitted
GH-1026: Support KafkaHeaders.GROUP_ID
Resolves #1026 Also improve `DefaultKafkaHeaderMapper` with a `NeverMatchHeaderMatcher`. * Always add GROUP_ID header; polish header mapper per PR comments. * Support custom matchers in DefaultHeaderMapper.
1 parent 4f1c90e commit c53c68e

20 files changed

+455
-150
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/AbstractKafkaListenerContainerFactory.java

Lines changed: 26 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
4040
import org.springframework.kafka.listener.adapter.ReplyHeadersConfigurer;
4141
import org.springframework.kafka.requestreply.ReplyingKafkaOperations;
42+
import org.springframework.kafka.support.JavaUtils;
4243
import org.springframework.kafka.support.TopicPartitionInitialOffset;
4344
import org.springframework.kafka.support.converter.MessageConverter;
4445
import org.springframework.retry.RecoveryCallback;
@@ -286,10 +287,8 @@ public void afterPropertiesSet() {
286287
@Override
287288
public C createListenerContainer(KafkaListenerEndpoint endpoint) {
288289
C instance = createContainerInstance(endpoint);
289-
290-
if (endpoint.getId() != null) {
291-
instance.setBeanName(endpoint.getId());
292-
}
290+
JavaUtils.INSTANCE
291+
.acceptIfNotNull(endpoint.getId(), instance::setBeanName);
293292
if (endpoint instanceof AbstractKafkaListenerEndpoint) {
294293
configureEndpoint((AbstractKafkaListenerEndpoint<K, V>) endpoint);
295294
}
@@ -301,30 +300,15 @@ public C createListenerContainer(KafkaListenerEndpoint endpoint) {
301300
}
302301

303302
private void configureEndpoint(AbstractKafkaListenerEndpoint<K, V> aklEndpoint) {
304-
if (this.recordFilterStrategy != null) {
305-
aklEndpoint.setRecordFilterStrategy(this.recordFilterStrategy);
306-
}
307-
if (this.ackDiscarded != null) {
308-
aklEndpoint.setAckDiscarded(this.ackDiscarded);
309-
}
310-
if (this.retryTemplate != null) {
311-
aklEndpoint.setRetryTemplate(this.retryTemplate);
312-
}
313-
if (this.recoveryCallback != null) {
314-
aklEndpoint.setRecoveryCallback(this.recoveryCallback);
315-
}
316-
if (this.statefulRetry != null) {
317-
aklEndpoint.setStatefulRetry(this.statefulRetry);
318-
}
319-
if (this.batchListener != null) {
320-
aklEndpoint.setBatchListener(this.batchListener);
321-
}
322-
if (this.replyTemplate != null) {
323-
aklEndpoint.setReplyTemplate(this.replyTemplate);
324-
}
325-
if (this.replyHeadersConfigurer != null) {
326-
aklEndpoint.setReplyHeadersConfigurer(this.replyHeadersConfigurer);
327-
}
303+
JavaUtils.INSTANCE
304+
.acceptIfNotNull(this.recordFilterStrategy, aklEndpoint::setRecordFilterStrategy)
305+
.acceptIfNotNull(this.ackDiscarded, aklEndpoint::setAckDiscarded)
306+
.acceptIfNotNull(this.retryTemplate, aklEndpoint::setRetryTemplate)
307+
.acceptIfNotNull(this.recoveryCallback, aklEndpoint::setRecoveryCallback)
308+
.acceptIfNotNull(this.statefulRetry, aklEndpoint::setStatefulRetry)
309+
.acceptIfNotNull(this.batchListener, aklEndpoint::setBatchListener)
310+
.acceptIfNotNull(this.replyTemplate, aklEndpoint::setReplyTemplate)
311+
.acceptIfNotNull(this.replyHeadersConfigurer, aklEndpoint::setReplyHeadersConfigurer);
328312
}
329313

330314
/**
@@ -345,35 +329,26 @@ protected void initializeContainer(C instance, KafkaListenerEndpoint endpoint) {
345329
ContainerProperties properties = instance.getContainerProperties();
346330
BeanUtils.copyProperties(this.containerProperties, properties, "topics", "topicPartitions", "topicPattern",
347331
"messageListener", "ackCount", "ackTime");
348-
if (this.afterRollbackProcessor != null) {
349-
instance.setAfterRollbackProcessor(this.afterRollbackProcessor);
350-
}
351-
if (this.containerProperties.getAckCount() > 0) {
352-
properties.setAckCount(this.containerProperties.getAckCount());
353-
}
354-
if (this.containerProperties.getAckTime() > 0) {
355-
properties.setAckTime(this.containerProperties.getAckTime());
356-
}
357-
if (this.errorHandler != null) {
358-
instance.setGenericErrorHandler(this.errorHandler);
359-
}
332+
JavaUtils.INSTANCE
333+
.acceptIfNotNull(this.afterRollbackProcessor, instance::setAfterRollbackProcessor)
334+
.acceptIfCondition(this.containerProperties.getAckCount() > 0, this.containerProperties.getAckCount(),
335+
properties::setAckCount)
336+
.acceptIfCondition(this.containerProperties.getAckTime() > 0, this.containerProperties.getAckTime(),
337+
properties::setAckTime)
338+
.acceptIfNotNull(this.errorHandler, instance::setGenericErrorHandler);
360339
if (endpoint.getAutoStartup() != null) {
361340
instance.setAutoStartup(endpoint.getAutoStartup());
362341
}
363342
else if (this.autoStartup != null) {
364343
instance.setAutoStartup(this.autoStartup);
365344
}
366-
if (this.phase != null) {
367-
instance.setPhase(this.phase);
368-
}
369-
if (this.applicationEventPublisher != null) {
370-
instance.setApplicationEventPublisher(this.applicationEventPublisher);
371-
}
372-
instance.getContainerProperties().setGroupId(endpoint.getGroupId());
373-
instance.getContainerProperties().setClientId(endpoint.getClientIdPrefix());
374-
if (endpoint.getConsumerProperties() != null) {
375-
instance.getContainerProperties().setConsumerProperties(endpoint.getConsumerProperties());
376-
}
345+
JavaUtils.INSTANCE
346+
.acceptIfNotNull(this.phase, instance::setPhase)
347+
.acceptIfNotNull(this.applicationEventPublisher, instance::setApplicationEventPublisher)
348+
.acceptIfNotNull(endpoint.getGroupId(), instance.getContainerProperties()::setGroupId)
349+
.acceptIfNotNull(endpoint.getClientIdPrefix(), instance.getContainerProperties()::setClientId)
350+
.acceptIfNotNull(endpoint.getConsumerProperties(),
351+
instance.getContainerProperties()::setConsumerProperties);
377352
}
378353

379354
@Override

spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.kafka.listener.adapter.HandlerAdapter;
3131
import org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter;
3232
import org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter;
33+
import org.springframework.kafka.support.JavaUtils;
3334
import org.springframework.kafka.support.converter.BatchMessageConverter;
3435
import org.springframework.kafka.support.converter.MessageConverter;
3536
import org.springframework.kafka.support.converter.RecordMessageConverter;
@@ -147,19 +148,19 @@ protected MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
147148
@Override
148149
protected MessagingMessageListenerAdapter<K, V> createMessageListener(MessageListenerContainer container,
149150
MessageConverter messageConverter) {
151+
150152
Assert.state(this.messageHandlerMethodFactory != null,
151153
"Could not create message listener - MessageHandlerMethodFactory not set");
152154
MessagingMessageListenerAdapter<K, V> messageListener = createMessageListenerInstance(messageConverter);
153155
messageListener.setHandlerMethod(configureListenerAdapter(messageListener));
154-
String replyTopic = getReplyTopic();
155-
if (replyTopic != null) {
156-
Assert.state(getMethod().getReturnType().equals(void.class)
157-
|| getReplyTemplate() != null, "a KafkaTemplate is required to support replies");
158-
messageListener.setReplyTopic(replyTopic);
159-
}
160-
if (getReplyTemplate() != null) {
161-
messageListener.setReplyTemplate(getReplyTemplate());
162-
}
156+
JavaUtils.INSTANCE
157+
.acceptIfNotNull(getReplyTopic(), replyTopic -> {
158+
Assert.state(getMethod().getReturnType().equals(void.class)
159+
|| getReplyTemplate() != null, "a KafkaTemplate is required to support replies");
160+
messageListener.setReplyTopic(replyTopic);
161+
})
162+
.acceptIfNotNull(getReplyTemplate(), messageListener::setReplyTemplate);
163+
163164
return messageListener;
164165
}
165166

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaOperations.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,8 @@ public interface KafkaOperations<K, V> {
186186

187187
/**
188188
* When running in a transaction, send the consumer offset(s) to the transaction. The
189-
* group id is obtained from {@link ProducerFactoryUtils#getConsumerGroupId()}. It is
189+
* group id is obtained from
190+
* {@link org.springframework.kafka.support.KafkaUtils#getConsumerGroupId()}. It is
190191
* not necessary to call this method if the operations are invoked on a listener
191192
* container thread (and the listener container is configured with a
192193
* {@link org.springframework.kafka.transaction.KafkaAwareTransactionManager}) since

spring-kafka/src/main/java/org/springframework/kafka/core/KafkaTemplate.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.kafka.common.TopicPartition;
3232

3333
import org.springframework.kafka.support.KafkaHeaders;
34+
import org.springframework.kafka.support.KafkaUtils;
3435
import org.springframework.kafka.support.LoggingProducerListener;
3536
import org.springframework.kafka.support.ProducerListener;
3637
import org.springframework.kafka.support.SendResult;
@@ -329,7 +330,7 @@ public void flush() {
329330

330331
@Override
331332
public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets) {
332-
sendOffsetsToTransaction(offsets, ProducerFactoryUtils.getConsumerGroupId());
333+
sendOffsetsToTransaction(offsets, KafkaUtils.getConsumerGroupId());
333334
}
334335

335336
@Override

spring-kafka/src/main/java/org/springframework/kafka/core/ProducerFactoryUtils.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.clients.producer.Producer;
2020

21+
import org.springframework.kafka.support.KafkaUtils;
2122
import org.springframework.lang.Nullable;
2223
import org.springframework.transaction.support.ResourceHolderSynchronization;
2324
import org.springframework.transaction.support.TransactionSynchronization;
@@ -35,8 +36,6 @@
3536
*/
3637
public final class ProducerFactoryUtils {
3738

38-
private static ThreadLocal<String> groupIds = new ThreadLocal<>();
39-
4039
private ProducerFactoryUtils() {
4140
super();
4241
}
@@ -82,27 +81,33 @@ public static <K, V> void releaseResources(@Nullable KafkaResourceHolder<K, V> r
8281
/**
8382
* Set the group id for the consumer bound to this thread.
8483
* @param groupId the group id.
84+
* @deprecated in favor of {@link KafkaUtils#setConsumerGroupId(String)}.
8585
* @since 1.3
8686
*/
87+
@Deprecated
8788
public static void setConsumerGroupId(String groupId) {
88-
groupIds.set(groupId);
89+
KafkaUtils.setConsumerGroupId(groupId);
8990
}
9091

9192
/**
9293
* Get the group id for the consumer bound to this thread.
9394
* @return the group id.
95+
* @deprecated in favor of {@link KafkaUtils#getConsumerGroupId()}.
9496
* @since 1.3
9597
*/
98+
@Deprecated
9699
public static String getConsumerGroupId() {
97-
return groupIds.get();
100+
return KafkaUtils.getConsumerGroupId();
98101
}
99102

100103
/**
101104
* Clear the group id for the consumer bound to this thread.
105+
* @deprecated in favor of {@link KafkaUtils#clearConsumerGroupId()}.
102106
* @since 1.3
103107
*/
108+
@Deprecated
104109
public static void clearConsumerGroupId() {
105-
groupIds.remove();
110+
KafkaUtils.clearConsumerGroupId();
106111
}
107112

108113
private static <K, V> void bindResourceToTransaction(KafkaResourceHolder<K, V> resourceHolder,
@@ -151,8 +156,8 @@ public void afterCompletion(int status) {
151156
}
152157

153158
@Override
154-
protected void releaseResource(KafkaResourceHolder<K, V> resourceHolder, Object resourceKey) {
155-
ProducerFactoryUtils.releaseResources(resourceHolder);
159+
protected void releaseResource(KafkaResourceHolder<K, V> holder, Object resourceKey) {
160+
ProducerFactoryUtils.releaseResources(holder);
156161
}
157162

158163
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@
5656
import org.springframework.kafka.core.ConsumerFactory;
5757
import org.springframework.kafka.core.KafkaResourceHolder;
5858
import org.springframework.kafka.core.ProducerFactory;
59-
import org.springframework.kafka.core.ProducerFactoryUtils;
6059
import org.springframework.kafka.event.ConsumerPausedEvent;
6160
import org.springframework.kafka.event.ConsumerResumedEvent;
6261
import org.springframework.kafka.event.ConsumerStoppedEvent;
@@ -66,6 +65,7 @@
6665
import org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback;
6766
import org.springframework.kafka.listener.ContainerProperties.AckMode;
6867
import org.springframework.kafka.support.Acknowledgment;
68+
import org.springframework.kafka.support.KafkaUtils;
6969
import org.springframework.kafka.support.LogIfLevelEnabled;
7070
import org.springframework.kafka.support.TopicPartitionInitialOffset;
7171
import org.springframework.kafka.support.TopicPartitionInitialOffset.SeekPosition;
@@ -743,9 +743,7 @@ public void run() {
743743
if (this.genericListener instanceof ConsumerSeekAware) {
744744
((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
745745
}
746-
if (this.transactionManager != null) {
747-
ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
748-
}
746+
KafkaUtils.setConsumerGroupId(this.consumerGroupId);
749747
this.count = 0;
750748
this.last = System.currentTimeMillis();
751749
initAssignedPartitions();
@@ -864,7 +862,7 @@ private void checkIdle() {
864862
}
865863

866864
public void wrapUp() {
867-
ProducerFactoryUtils.clearConsumerGroupId();
865+
KafkaUtils.clearConsumerGroupId();
868866
publishConsumerStoppingEvent(this.consumer);
869867
if (!this.fatalError) {
870868
if (this.kafkaTxManager == null) {

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.springframework.messaging.MessageHeaders;
5858
import org.springframework.messaging.MessagingException;
5959
import org.springframework.messaging.converter.MessageConversionException;
60+
import org.springframework.messaging.handler.annotation.Header;
6061
import org.springframework.messaging.handler.annotation.Payload;
6162
import org.springframework.messaging.support.MessageBuilder;
6263
import org.springframework.util.Assert;
@@ -475,7 +476,7 @@ protected Type determineInferredType(Method method) { // NOSONAR complexity
475476
}
476477

477478
Type genericParameterType = null;
478-
boolean hasConsumerParameter = false;
479+
int allowedBatchParameters = 1;
479480

480481
for (int i = 0; i < method.getParameterCount(); i++) {
481482
MethodParameter methodParameter = new MethodParameter(method, i);
@@ -529,23 +530,33 @@ else if (parameterizedType.getRawType().equals(List.class)
529530
}
530531
else if (methodParameter.getGenericParameterType().equals(Acknowledgment.class)) {
531532
this.hasAckParameter = true;
533+
allowedBatchParameters++;
534+
}
535+
else if (methodParameter.hasParameterAnnotation(Header.class)) {
536+
Header header = methodParameter.getParameterAnnotation(Header.class);
537+
if (header.value().equals(KafkaHeaders.GROUP_ID)) {
538+
allowedBatchParameters++;
539+
}
532540
}
533541
else {
534542
if (methodParameter.getGenericParameterType().equals(Consumer.class)) {
535-
hasConsumerParameter = true;
543+
allowedBatchParameters++;
536544
}
537545
else {
538546
Type parameterType = methodParameter.getGenericParameterType();
539-
hasConsumerParameter = parameterType instanceof ParameterizedType
540-
&& ((ParameterizedType) parameterType).getRawType().equals(Consumer.class);
547+
if (parameterType instanceof ParameterizedType
548+
&& ((ParameterizedType) parameterType).getRawType().equals(Consumer.class)) {
549+
allowedBatchParameters++;
550+
}
541551
}
542552
}
543553
}
544-
boolean validParametersForBatch = validParametersForBatch(method.getGenericParameterTypes().length,
545-
this.hasAckParameter, hasConsumerParameter);
554+
boolean validParametersForBatch = method.getGenericParameterTypes().length <= allowedBatchParameters;
555+
546556
if (!validParametersForBatch) {
547557
String stateMessage = "A parameter of type '%s' must be the only parameter "
548-
+ "(except for an optional 'Acknowledgment' and/or 'Consumer')";
558+
+ "(except for an optional 'Acknowledgment' and/or 'Consumer' "
559+
+ "and/or '@Header(KafkaHeaders.GROUP_ID) String groupId'";
549560
Assert.state(!this.isConsumerRecords,
550561
() -> String.format(stateMessage, "ConsumerRecords"));
551562
Assert.state(!this.isConsumerRecordList,
@@ -557,18 +568,6 @@ else if (methodParameter.getGenericParameterType().equals(Acknowledgment.class))
557568
return genericParameterType;
558569
}
559570

560-
private boolean validParametersForBatch(int parameterCount, boolean hasAck, boolean hasConsumer) {
561-
if (hasAck) {
562-
return parameterCount == 2 || (hasConsumer && parameterCount == 3); // NOSONAR magic #
563-
}
564-
else if (hasConsumer) {
565-
return parameterCount == 2; // NOSONAR magic #
566-
}
567-
else {
568-
return parameterCount == 1; // NOSONAR magic #
569-
}
570-
}
571-
572571
/*
573572
* Don't consider parameter types that are available after conversion.
574573
* Acknowledgment, ConsumerRecord, Consumer, ConsumerRecord<...>, Consumer<...>, and Message<?>.

0 commit comments

Comments
 (0)