Skip to content

Memory leak on a Kafka Observation due to the metric "spring.kafka.listener.active" #3690

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
vivi2701 opened this issue Dec 20, 2024 · 1 comment · Fixed by #3694
Closed
Milestone

Comments

@vivi2701
Copy link

In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3.0 and 3.3.1

Describe the bug

We have an application using Spring Boot v3.3.6 and Spring Kafka v3.3.0. And we have seen that the metric called spring.kafka.listener.active leads to an increasing number of activeTask instances of type DefaultLongTaskTimer from Micrometer, which are never garbage collected.
We can see in the screenshot the memory and CPU usage of the process, which shows a classic memory leak trend :
image

The issue doesn’t appear on another application we have that uses spring Kafka v3.2.2

The symptoms are similar to the one in this issue: spring-projects/spring-security#14030, where the stop method is not call on the observation.
We can see by debugging the code in method doInvokeRecordListener in the class KafkaMessageListenerContainer, that the finally bloc containing the observation.stop is not called because the listener is an instance of RecordMessaginMessageListenerAdapter.

When we disable this property, system resource consumption seems to return to normal : spring.cloud.stream.kafka.binder.enableObservation

Further compounding the issue, prometheus scrapes regularly this metric, which uses even more CPU and leads to timeouts or broken pipes on the scraping endpoint (/actuator/Prometheus)

See the stack trace associated to the scrapping workload : threaddump-1734718231951.zip

To Reproduce

We have been able to create a minimal sample project to reproduce the issue.
This is a simple Kafka producer / consumer using the latest Spring Boot v3.4.1 and Spring Kafka v3.3.1 versions.
We changed the rate of the producer to 1 millisecond (so around 1000 messages per second) to speed up the phenomenon.

We see millions of instance counts (and growing) for the active task after about 1h of running the test :
image

Sample

sample

@artembilan
Copy link
Member

I see where is the problem.
The KafkaMessageListenerContainer has the logic like:

				if (!(this.listener instanceof RecordMessagingMessageListenerAdapter<K, V>)) {
					observation.stop();
				}

where we assume that invoke() of the super class of that RecordMessagingMessageListenerAdapter is invoked.
And that one has a logic like:

Observation currentObservation = getCurrentObservation();
...
		finally {
			if (listenerError != null || result == null) {
				currentObservation.stop();
			}
		}

However, turns out, Spring Cloud Stream Kafka Binder uses KafkaMessageDrivenChannelAdapter from Spring Integration and that one has its own private class IntegrationRecordMessageListener extends RecordMessagingMessageListenerAdapter<K, V> {, which does not call the mentioned invoke().

So, yeah, confirmed as a bug.
Not sure yet how to fix.

Thank you for so simple reproducible sample!

@artembilan artembilan added this to the 3.3.2 milestone Dec 20, 2024
artembilan added a commit to artembilan/spring-kafka that referenced this issue Dec 23, 2024
…enerContainer`

Fixes: spring-projects#3690

When `this.listener` is an instance of `RecordMessagingMessageListenerAdapter`,
we rely on its logic to call `invoke()` from super class to handle observation
lifecycle this or other way.
However, Spring Integration's `KafkaMessageDrivenChannelAdapter` use its own
`IntegrationRecordMessageListener` extension of the `RecordMessagingMessageListenerAdapter`
without calling super `invoke()`.
The problem apparent from Spring Cloud Stream Kafka Binder, where an observation is enabled.

* Fix `KafkaMessageListenerContainer` to check for exact type of `this.listener`
before making decision to close an observation here, or propagate it down to the `RecordMessagingMessageListenerAdapter`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants