Skip to content

Remove proxy in DefaultKafkaConsumerFactory #2822

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2022 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.kafka.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
Expand All @@ -28,19 +29,13 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import org.aopalliance.aop.Advice;
import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Deserializer;

import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.NameMatchMethodPointcutAdvisor;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
Expand Down Expand Up @@ -445,68 +440,73 @@ private void checkInaccessible(Properties properties, Map<String, Object> modifi
}
}

@SuppressWarnings("resource")
protected Consumer<K, V> createKafkaConsumer(Map<String, Object> configProps) {
checkBootstrap(configProps);
Consumer<K, V> kafkaConsumer = createRawConsumer(configProps);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we could throw an exception here, if listeners are present and the created consumer is not an instance of ExtendedKafkaConsumer?

Copy link
Contributor

@garyrussell garyrussell Sep 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, Java doesn't support multiple inheritance (the user might already be extending some other class), so a hard failure would not be so good; perhaps log a warning that listeners must be invoked when closing a custom consumer.

Or, simply revert the code and have ExtendedKafkaConsumer wrap the proxy instead so that the consumer is not Serializable.

But, of course, that would mean we'd have to delegate all methods - ugh!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish we have never had this protected Consumer<K, V> createKafkaConsumer() on our default CF: for any custom client you always can implement your own CF.

So, I'm just going to log a WARN that listeners are ignored because not an ExtendedKafkaConsumer...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree; makes sense; maybe also deprecate that method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would let it live for now until we really fail for some peculiar use-case to learn from.
If we deprecate and remove it (I guess there was a reason behind that method), we may never learn the mentioned use-case 😉


if (this.listeners.size() > 0) {
Map<MetricName, ? extends Metric> metrics = kafkaConsumer.metrics();
Iterator<MetricName> metricIterator = metrics.keySet().iterator();
String clientId;
if (metricIterator.hasNext()) {
clientId = metricIterator.next().tags().get("client-id");
}
else {
clientId = "unknown";
}
String id = this.beanName + "." + clientId;
kafkaConsumer = createProxy(kafkaConsumer, id);
for (Listener<K, V> listener : this.listeners) {
listener.consumerAdded(id, kafkaConsumer);
}
if (!this.listeners.isEmpty() && !(kafkaConsumer instanceof ExtendedKafkaConsumer)) {
LOGGER.warn("The 'ConsumerFactory.Listener' configuration is ignored " +
"because the consumer is not an instance of 'ExtendedKafkaConsumer'." +
"Consider extending 'ExtendedKafkaConsumer' or implement your own 'ConsumerFactory'.");
}

for (ConsumerPostProcessor<K, V> pp : this.postProcessors) {
kafkaConsumer = pp.apply(kafkaConsumer);
}
return kafkaConsumer;
}

/**
* Create a Consumer.
* Create a {@link Consumer}.
* By default, this method returns an internal {@link ExtendedKafkaConsumer}
* which is aware of provided into this {@link #listeners}, therefore it is recommended
* to extend that class if {@link #listeners} are still involved for a custom {@link Consumer}.
* @param configProps the configuration properties.
* @return the consumer.
* @since 2.5
*/
protected Consumer<K, V> createRawConsumer(Map<String, Object> configProps) {
return new KafkaConsumer<>(configProps, this.keyDeserializerSupplier.get(),
this.valueDeserializerSupplier.get());
return new ExtendedKafkaConsumer(configProps);
}

@SuppressWarnings("unchecked")
private Consumer<K, V> createProxy(Consumer<K, V> kafkaConsumer, String id) {
ProxyFactory pf = new ProxyFactory(kafkaConsumer);
Advice advice = new MethodInterceptor() {
@Override
public boolean isAutoCommit() {
Object auto = this.configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
return auto instanceof Boolean
? (Boolean) auto
: !(auto instanceof String) || Boolean.parseBoolean((String) auto);
}

protected class ExtendedKafkaConsumer extends KafkaConsumer<K, V> {

private String idForListeners;

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
DefaultKafkaConsumerFactory.this.listeners.forEach(listener ->
listener.consumerRemoved(id, kafkaConsumer));
return invocation.proceed();
protected ExtendedKafkaConsumer(Map<String, Object> configProps) {
super(configProps,
DefaultKafkaConsumerFactory.this.keyDeserializerSupplier.get(),
DefaultKafkaConsumerFactory.this.valueDeserializerSupplier.get());

if (!DefaultKafkaConsumerFactory.this.listeners.isEmpty()) {
Iterator<MetricName> metricIterator = metrics().keySet().iterator();
String clientId = "unknown";
if (metricIterator.hasNext()) {
clientId = metricIterator.next().tags().get("client-id");
}
this.idForListeners = DefaultKafkaConsumerFactory.this.beanName + "." + clientId;
for (Listener<K, V> listener : DefaultKafkaConsumerFactory.this.listeners) {
listener.consumerAdded(this.idForListeners, this);
}
}
}

};
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(advice);
advisor.addMethodName("close");
pf.addAdvisor(advisor);
return (Consumer<K, V>) pf.getProxy();
}
@Override
public void close(Duration timeout) {
super.close(timeout);

for (Listener<K, V> listener : DefaultKafkaConsumerFactory.this.listeners) {
listener.consumerRemoved(this.idForListeners, this);
}
}

@Override
public boolean isAutoCommit() {
Object auto = this.configs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
return auto instanceof Boolean ? (Boolean) auto
: auto instanceof String ? Boolean.valueOf((String) auto) : true;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2022 the original author or authors.
* Copyright 2017-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,7 +17,6 @@
package org.springframework.kafka.core;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

Expand All @@ -38,14 +37,11 @@
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;

import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.ConsumerFactory.Listener;
Expand All @@ -63,6 +59,8 @@
/**
* @author Gary Russell
* @author Chris Gilbert
* @author Artem Bilan
*
* @since 1.0.6
*/
@EmbeddedKafka(topics = { "txCache1", "txCache2", "txCacheSendFromListener" },
Expand Down Expand Up @@ -345,7 +343,7 @@ public void testNestedTxProducerIsCached() throws Exception {
ProxyFactory prox = new ProxyFactory();
prox.setTarget(consumer);
@SuppressWarnings("unchecked")
Consumer<Integer, String> proxy = (Consumer<Integer, String>) prox.getProxy();
Consumer<Integer, String> proxy = (Consumer<Integer, String>) prox.getProxy();
wrapped.set(proxy);
return proxy;
});
Expand Down Expand Up @@ -381,25 +379,12 @@ public void testNestedTxProducerIsCached() throws Exception {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
void listener() {
Consumer consumer = mock(Consumer.class);
Map<MetricName, ? extends Metric> metrics = new HashMap<>();
metrics.put(new MetricName("test", "group", "desc", Collections.singletonMap("client-id", "foo-0")), null);
given(consumer.metrics()).willReturn(metrics);
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(Collections.emptyMap()) {

@Override
protected Consumer createRawConsumer(Map configProps) {
return consumer;
}

};
Map<String, Object> consumerConfig = KafkaTestUtils.consumerProps("txCache1Group", "false", this.embeddedKafka);
consumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, "foo-0");
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(consumerConfig);
List<String> adds = new ArrayList<>();
List<String> removals = new ArrayList<>();

Consumer consum = cf.createConsumer();
assertThat(AopUtils.isAopProxy(consum)).isFalse();
assertThat(adds).hasSize(0);

cf.addListener(new Listener() {

@Override
Expand All @@ -415,13 +400,11 @@ public void consumerRemoved(String id, Consumer consumer) {
});
cf.setBeanName("cf");

consum = cf.createConsumer();
assertThat(AopUtils.isAopProxy(consum)).isTrue();
assertThat(AopUtils.isJdkDynamicProxy(consum)).isTrue();
Consumer consumer = cf.createConsumer();
assertThat(adds).hasSize(1);
assertThat(adds.get(0)).isEqualTo("cf.foo-0");
assertThat(removals).hasSize(0);
consum.close();
consumer.close();
assertThat(removals).hasSize(1);
}

Expand Down