Skip to content

Remove SslBundles from KafkaProperties as it is no longer used #45727

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException
@Bean
@ConditionalOnMissingBean
KafkaAdmin kafkaAdmin(KafkaConnectionDetails connectionDetails) {
Map<String, Object> properties = this.properties.buildAdminProperties(null);
Map<String, Object> properties = this.properties.buildAdminProperties();
applyKafkaConnectionDetailsForAdmin(properties, connectionDetails);
KafkaAdmin kafkaAdmin = new KafkaAdmin(properties);
KafkaProperties.Admin admin = this.properties.getAdmin();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.springframework.boot.context.properties.PropertyMapper;
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
import org.springframework.boot.convert.DurationUnit;
import org.springframework.boot.ssl.SslBundles;
import org.springframework.core.io.Resource;
import org.springframework.kafka.listener.ContainerProperties.AckMode;
import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
Expand Down Expand Up @@ -162,15 +161,15 @@ public Retry getRetry() {
return this.retry;
}

private Map<String, Object> buildCommonProperties(SslBundles sslBundles) {
private Map<String, Object> buildCommonProperties() {
Map<String, Object> properties = new HashMap<>();
if (this.bootstrapServers != null) {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
}
if (this.clientId != null) {
properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
}
properties.putAll(this.ssl.buildProperties(sslBundles));
properties.putAll(this.ssl.buildProperties());
properties.putAll(this.security.buildProperties());
if (!CollectionUtils.isEmpty(this.properties)) {
properties.putAll(this.properties);
Expand All @@ -187,21 +186,8 @@ private Map<String, Object> buildCommonProperties(SslBundles sslBundles) {
* instance
*/
public Map<String, Object> buildConsumerProperties() {
return buildConsumerProperties(null);
}

/**
* Create an initial map of consumer properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary, and override the
* default {@code kafkaConsumerFactory} bean.
* @param sslBundles bundles providing SSL trust material
* @return the consumer properties initialized with the customizations defined on this
* instance
*/
public Map<String, Object> buildConsumerProperties(SslBundles sslBundles) {
Map<String, Object> properties = buildCommonProperties(sslBundles);
properties.putAll(this.consumer.buildProperties(sslBundles));
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.consumer.buildProperties());
return properties;
}

Expand All @@ -214,21 +200,8 @@ public Map<String, Object> buildConsumerProperties(SslBundles sslBundles) {
* instance
*/
public Map<String, Object> buildProducerProperties() {
return buildProducerProperties(null);
}

/**
* Create an initial map of producer properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary, and override the
* default {@code kafkaProducerFactory} bean.
* @param sslBundles bundles providing SSL trust material
* @return the producer properties initialized with the customizations defined on this
* instance
*/
public Map<String, Object> buildProducerProperties(SslBundles sslBundles) {
Map<String, Object> properties = buildCommonProperties(sslBundles);
properties.putAll(this.producer.buildProperties(sslBundles));
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.producer.buildProperties());
return properties;
}

Expand All @@ -237,27 +210,25 @@ public Map<String, Object> buildProducerProperties(SslBundles sslBundles) {
* <p>
* This allows you to add additional properties, if necessary, and override the
* default {@code kafkaAdmin} bean.
* @param sslBundles bundles providing SSL trust material
* @return the admin properties initialized with the customizations defined on this
* instance
*/
public Map<String, Object> buildAdminProperties(SslBundles sslBundles) {
Map<String, Object> properties = buildCommonProperties(sslBundles);
properties.putAll(this.admin.buildProperties(sslBundles));
public Map<String, Object> buildAdminProperties() {
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.admin.buildProperties());
return properties;
}

/**
* Create an initial map of streams properties from the state of this instance.
* <p>
* This allows you to add additional properties, if necessary.
* @param sslBundles bundles providing SSL trust material
* @return the streams properties initialized with the customizations defined on this
* instance
*/
public Map<String, Object> buildStreamsProperties(SslBundles sslBundles) {
Map<String, Object> properties = buildCommonProperties(sslBundles);
properties.putAll(this.streams.buildProperties(sslBundles));
public Map<String, Object> buildStreamsProperties() {
Map<String, Object> properties = buildCommonProperties();
properties.putAll(this.streams.buildProperties());
return properties;
}

Expand Down Expand Up @@ -473,7 +444,7 @@ public Map<String, String> getProperties() {
return this.properties;
}

public Map<String, Object> buildProperties(SslBundles sslBundles) {
public Map<String, Object> buildProperties() {
Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getAutoCommitInterval)
Expand Down Expand Up @@ -501,7 +472,7 @@ public Map<String, Object> buildProperties(SslBundles sslBundles) {
map.from(this::getMaxPollInterval)
.asInt(Duration::toMillis)
.to(properties.in(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG));
return properties.with(this.ssl, this.security, this.properties, sslBundles);
return properties.with(this.ssl, this.security, this.properties);
}

}
Expand Down Expand Up @@ -663,7 +634,7 @@ public Map<String, String> getProperties() {
return this.properties;
}

public Map<String, Object> buildProperties(SslBundles sslBundles) {
public Map<String, Object> buildProperties() {
Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getAcks).to(properties.in(ProducerConfig.ACKS_CONFIG));
Expand All @@ -677,7 +648,7 @@ public Map<String, Object> buildProperties(SslBundles sslBundles) {
map.from(this::getKeySerializer).to(properties.in(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
map.from(this::getRetries).to(properties.in(ProducerConfig.RETRIES_CONFIG));
map.from(this::getValueSerializer).to(properties.in(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
return properties.with(this.ssl, this.security, this.properties, sslBundles);
return properties.with(this.ssl, this.security, this.properties);
}

}
Expand Down Expand Up @@ -784,11 +755,11 @@ public Map<String, String> getProperties() {
return this.properties;
}

public Map<String, Object> buildProperties(SslBundles sslBundles) {
public Map<String, Object> buildProperties() {
Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getClientId).to(properties.in(ProducerConfig.CLIENT_ID_CONFIG));
return properties.with(this.ssl, this.security, this.properties, sslBundles);
return properties.with(this.ssl, this.security, this.properties);
}

}
Expand Down Expand Up @@ -918,7 +889,7 @@ public Map<String, String> getProperties() {
return this.properties;
}

public Map<String, Object> buildProperties(SslBundles sslBundles) {
public Map<String, Object> buildProperties() {
Properties properties = new Properties();
PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull();
map.from(this::getApplicationId).to(properties.in("application.id"));
Expand All @@ -929,7 +900,7 @@ public Map<String, Object> buildProperties(SslBundles sslBundles) {
map.from(this::getClientId).to(properties.in(CommonClientConfigs.CLIENT_ID_CONFIG));
map.from(this::getReplicationFactor).to(properties.in("replication.factor"));
map.from(this::getStateDir).to(properties.in("state.dir"));
return properties.with(this.ssl, this.security, this.properties, sslBundles);
return properties.with(this.ssl, this.security, this.properties);
}

}
Expand Down Expand Up @@ -1423,12 +1394,7 @@ public void setProtocol(String protocol) {
this.protocol = protocol;
}

@Deprecated(since = "3.2.0", forRemoval = true)
public Map<String, Object> buildProperties() {
return buildProperties(null);
}

public Map<String, Object> buildProperties(SslBundles sslBundles) {
validate();
String bundleName = getBundle();
Properties properties = new Properties();
Expand Down Expand Up @@ -1794,8 +1760,8 @@ <V> java.util.function.Consumer<V> in(String key) {
return (value) -> put(key, value);
}

Properties with(Ssl ssl, Security security, Map<String, String> properties, SslBundles sslBundles) {
putAll(ssl.buildProperties(sslBundles));
Properties with(Ssl ssl, Security security, Map<String, String> properties) {
putAll(ssl.buildProperties());
putAll(security.buildProperties());
putAll(properties);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class KafkaStreamsAnnotationDrivenConfiguration {
@Bean(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
KafkaStreamsConfiguration defaultKafkaStreamsConfig(Environment environment,
KafkaConnectionDetails connectionDetails) {
Map<String, Object> properties = this.properties.buildStreamsProperties(null);
Map<String, Object> properties = this.properties.buildStreamsProperties();
applyKafkaConnectionDetailsForStreams(properties, connectionDetails);
if (this.properties.getStreams().getApplicationId() == null) {
String applicationName = environment.getProperty("spring.application.name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,13 @@
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.IsolationLevel;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Listener;
import org.springframework.boot.context.properties.source.MutuallyExclusiveConfigurationPropertiesException;
import org.springframework.boot.ssl.DefaultSslBundleRegistry;
import org.springframework.boot.ssl.SslBundle;
import org.springframework.core.io.ClassPathResource;
import org.springframework.kafka.core.CleanupConfig;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.listener.ContainerProperties;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.Mockito.mock;

/**
* Tests for {@link KafkaProperties}.
Expand All @@ -47,8 +44,6 @@
*/
class KafkaPropertiesTests {

private final SslBundle sslBundle = mock(SslBundle.class);

@Test
void isolationLevelEnumConsistentWithKafkaVersion() {
org.apache.kafka.common.IsolationLevel[] original = org.apache.kafka.common.IsolationLevel.values();
Expand Down Expand Up @@ -101,15 +96,6 @@ void sslPemConfigurationWithEmptyBundle() {
"-----BEGINchain");
}

@Test
void sslBundleConfiguration() {
KafkaProperties properties = new KafkaProperties();
properties.getSsl().setBundle("myBundle");
Map<String, Object> consumerProperties = properties
.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle));
assertThat(consumerProperties).doesNotContainKey(SslConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG);
}

@Test
void sslPropertiesWhenKeyStoreLocationAndKeySetShouldThrowException() {
KafkaProperties properties = new KafkaProperties();
Expand All @@ -133,35 +119,35 @@ void sslPropertiesWhenKeyStoreLocationAndBundleSetShouldThrowException() {
KafkaProperties properties = new KafkaProperties();
properties.getSsl().setBundle("myBundle");
properties.getSsl().setKeyStoreLocation(new ClassPathResource("ksLoc"));
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy(
() -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle)));
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class)
.isThrownBy(properties::buildConsumerProperties);
}

@Test
void sslPropertiesWhenKeyStoreKeyAndBundleSetShouldThrowException() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should be kept but should call .isThrownBy(properties::buildConsumerProperties);.

KafkaProperties properties = new KafkaProperties();
properties.getSsl().setBundle("myBundle");
properties.getSsl().setKeyStoreKey("-----BEGIN");
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy(
() -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle)));
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class)
.isThrownBy(properties::buildConsumerProperties);
}

@Test
void sslPropertiesWhenTrustStoreLocationAndBundleSetShouldThrowException() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should be kept but should call .isThrownBy(properties::buildConsumerProperties);.

KafkaProperties properties = new KafkaProperties();
properties.getSsl().setBundle("myBundle");
properties.getSsl().setTrustStoreLocation(new ClassPathResource("tsLoc"));
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy(
() -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle)));
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class)
.isThrownBy(properties::buildConsumerProperties);
}

@Test
void sslPropertiesWhenTrustStoreCertificatesAndBundleSetShouldThrowException() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test should be kept but should call .isThrownBy(properties::buildConsumerProperties);.

KafkaProperties properties = new KafkaProperties();
properties.getSsl().setBundle("myBundle");
properties.getSsl().setTrustStoreCertificates("-----BEGIN");
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class).isThrownBy(
() -> properties.buildConsumerProperties(new DefaultSslBundleRegistry("myBundle", this.sslBundle)));
assertThatExceptionOfType(MutuallyExclusiveConfigurationPropertiesException.class)
.isThrownBy(properties::buildConsumerProperties);
}

@Test
Expand Down