-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Listener receives multiple messages when using @RetryableTopic with topicPartitions to reset offset #2220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Thanks again for reporting @ashishqcs, this should be fixed soon and released in the upcoming |
@garyrussell, please help me out here if you can - I'm not that familiar with using manual assignments. I'm thinking we can replicate the There's also an issue with the retry topics when more than one Another take on this would be ignoring manual assignments for the retry topics, and also filtering out any duplicated In the future we might also consider having WDYT? Thanks |
It's not clear to me why @ashishqcs sets an initial position for the manually assigned topic, as well as using retryable topic; that logic is usually used with compacted topics (always read the full log). Perhaps he can explain the use case. In any case, I agree, there should be no repositioning of the retry topics because if the main listener fails after a rewind we'd get a new failed record in the retry topic; we don't want to see already processed failed records there. I am not sure I understand your concerns about multiple listeners; if a listener only assigns partitions 0 and 2, then he should only get retry listeners for those same partitions and the resolver would need to always resolve to the same partition. If there is another listener consuming from 1 and 3, he'd get his own retry containers for those partitions too. I don't think it needs to be any more complicated than that. |
Sure. I just tested having two So that seems to be alright - the feature will work as you described, with each retry topic container being assigned to the same partitions as the main topic one, only without the offset. If the specified number of partitions for the retry topic is smaller than what's specified, we can just set that to partition 0 for example. I'll open a PR and we can see if there's anything else left. Thanks a lot @garyrussell! |
Resolves spring-projects#2220 * Register NewTopics for Retry Topics only once per topic. * Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic.
Resolves spring-projects#2220 * Register NewTopics for Retry Topics only once per topic. * Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic.
* GH-2220: Fix TopicPartitionOffset for Retry Topics Resolves #2220 * Register NewTopics for Retry Topics only once per topic. * Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic. * Move default partition to constant field. * Address review comments Make EndpointCustomizerFactory ctor public * Remove main endpoint customization * Remove failing @SuppressWarnings * Revert TPO hashCode changes Adjust tests Improve TPO and topics handling * Enable customizing main endpoint TPOs
* GH-2220: Fix TopicPartitionOffset for Retry Topics Resolves #2220 * Register NewTopics for Retry Topics only once per topic. * Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic. * Move default partition to constant field. * Address review comments Make EndpointCustomizerFactory ctor public * Remove main endpoint customization * Remove failing @SuppressWarnings * Revert TPO hashCode changes Adjust tests Improve TPO and topics handling * Enable customizing main endpoint TPOs
* GH-2220: Fix TopicPartitionOffset for Retry Topics Resolves #2220 * Register NewTopics for Retry Topics only once per topic. * Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic. * Move default partition to constant field. * Address review comments Make EndpointCustomizerFactory ctor public * Remove main endpoint customization * Remove failing @SuppressWarnings * Revert TPO hashCode changes Adjust tests Improve TPO and topics handling * Enable customizing main endpoint TPOs
* GH-2220: Fix TopicPartitionOffset for Retry Topics Resolves #2220 * Register NewTopics for Retry Topics only once per topic. * Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic. * Move default partition to constant field. * Address review comments Make EndpointCustomizerFactory ctor public * Remove main endpoint customization * Remove failing @SuppressWarnings * Revert TPO hashCode changes Adjust tests Improve TPO and topics handling * Enable customizing main endpoint TPOs
In what version(s) of Spring for Apache Kafka are you seeing this issue?
Between 2.7.0 and 2.8.2
Description
I am trying to use @RetryableTopic for unblocking retries and topicPartitions in order to read messages from beginning.
Below are my main and DLT listeners:
With this main topic listener as well as DLT topic listener receive same event (since there is one retry topic - main listener receives same event 2 times and 1 by dlt listener as well - total 3 copies of same event).
logs:
If I use above code without topicPartitions by removing below line, listener works as expected.
@TopicPartition(topic = "products", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))
The text was updated successfully, but these errors were encountered: