Skip to content

Listener receives multiple messages when using @RetryableTopic with topicPartitions to reset offset #2220

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ashishqcs opened this issue Apr 7, 2022 · 4 comments · Fixed by #2223

Comments

@ashishqcs
Copy link

In what version(s) of Spring for Apache Kafka are you seeing this issue?

Between 2.7.0 and 2.8.2

Description

I am trying to use @RetryableTopic for unblocking retries and topicPartitions in order to read messages from beginning.

Below are my main and DLT listeners:

@RetryableTopic(
            attempts = "4",
            backoff = @Backoff(delay = 1000),
            fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
    @KafkaListener(topicPartitions = {@TopicPartition(topic = "products",
            partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
    public void listen(ConsumerRecord<String, String> message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        log.info("message consumed - \nkey: {} , \nvalue: {}, \ntopic: {}, \nat: {}",
                message.key(),
                message.value(),
                message.topic(),
                LocalDateTime.now());
    }

    @DltHandler
    public void dltListener(ConsumerRecord<String, String> message, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        log.info("message consumed at DLT - \nkey: {} , \nvalue: {}, \ntopic: {}, \nat: {}",
                message.key(),
                message.value(),
                message.topic(),
                LocalDateTime.now());
    }

With this main topic listener as well as DLT topic listener receive same event (since there is one retry topic - main listener receives same event 2 times and 1 by dlt listener as well - total 3 copies of same event).

logs:

15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#8-0-C-1] INFO  c.m.s.c.n.SingleTopicRetryConsumer 
- message consumed - 
key: product1 , 
value: This is Product1, 
topic: products, 
at: 2022-04-07T15:10:50.950810          

15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#9-retry-0-C-1] INFO  c.m.s.c.n.SingleTopicRetryConsumer - 
message consumed - 
key: product1 , 
value: This is Product1, 
topic: products, 
at: 2022-04-07T15:10:50.950810      

15:10:50.950 [org.springframework.kafka.KafkaListenerEndpointContainer#10-dlt-0-C-1] INFO  c.m.s.c.n.SingleTopicRetryConsumer - 
message consumed at DLT - 
key: product1 , 
value: This is Product1, 
topic: products, 
at: 2022-04-07T15:10:50.950810  

If I use above code without topicPartitions by removing below line, listener works as expected.
@TopicPartition(topic = "products", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))

@tomazfernandes
Copy link
Contributor

Thanks again for reporting @ashishqcs, this should be fixed soon and released in the upcoming 2.8.5 version which is out on April 18th.

@tomazfernandes
Copy link
Contributor

@garyrussell, please help me out here if you can - I'm not that familiar with using manual assignments.

I'm thinking we can replicate the TopicPartition and leave the offset out, since that's almost certainly different between the main and retry topics. We'd also check if the partition is <= the number of partitions informed in the RetryTopicConfiguration to make sure we don't try to assign an inexistent partition.

There's also an issue with the retry topics when more than one @KafkaListener is used for the same topic, perhaps with different partition assignments, is that a common use for manual assignments? Maybe we can document that doesn't work with retry topics for now.

Another take on this would be ignoring manual assignments for the retry topics, and also filtering out any duplicated @KafkaListener for the same topics in retry topic logic - then we'd have a single coordinated consumer group per retry topic independently of how many @KafkaListener with manual assignments the user declares. Not sure if we should go there though.

In the future we might also consider having @TopicPartition within the @RetryableTopic annotation, and enabling having more than one container per topic, but doesn't seem like something we should do at the moment.

WDYT?

Thanks

@garyrussell
Copy link
Contributor

It's not clear to me why @ashishqcs sets an initial position for the manually assigned topic, as well as using retryable topic; that logic is usually used with compacted topics (always read the full log). Perhaps he can explain the use case.

In any case, I agree, there should be no repositioning of the retry topics because if the main listener fails after a rewind we'd get a new failed record in the retry topic; we don't want to see already processed failed records there.

I am not sure I understand your concerns about multiple listeners; if a listener only assigns partitions 0 and 2, then he should only get retry listeners for those same partitions and the resolver would need to always resolve to the same partition.

If there is another listener consuming from 1 and 3, he'd get his own retry containers for those partitions too.

I don't think it needs to be any more complicated than that.

@tomazfernandes
Copy link
Contributor

I am not sure I understand your concerns about multiple listeners; if a listener only assigns partitions 0 and 2, then he should only get retry listeners for those same partitions and the resolver would need to always resolve to the same partition.

If there is another listener consuming from 1 and 3, he'd get his own retry containers for those partitions too.

I don't think it needs to be any more complicated than that.

Sure. I just tested having two @KafkaListeners with @RetryableTopics for the same topic and I might say I'm a bit surprised that it just works - had never tested that before. The only issue is we have to set one of the retryable topics to autocreate=false. But that's something I can easily fix as part of this PR.

So that seems to be alright - the feature will work as you described, with each retry topic container being assigned to the same partitions as the main topic one, only without the offset. If the specified number of partitions for the retry topic is smaller than what's specified, we can just set that to partition 0 for example.

I'll open a PR and we can see if there's anything else left.

Thanks a lot @garyrussell!

tomazfernandes added a commit to tomazfernandes/spring-kafka that referenced this issue Apr 8, 2022
Resolves spring-projects#2220

* Register NewTopics for Retry Topics only once per topic.

* Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic.
tomazfernandes added a commit to tomazfernandes/spring-kafka that referenced this issue Apr 14, 2022
Resolves spring-projects#2220

* Register NewTopics for Retry Topics only once per topic.

* Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic.
garyrussell pushed a commit that referenced this issue Apr 14, 2022
* GH-2220: Fix TopicPartitionOffset for Retry Topics

Resolves #2220

* Register NewTopics for Retry Topics only once per topic.

* Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic.

* Move default partition to constant field.

* Address review comments

Make EndpointCustomizerFactory ctor public

* Remove main endpoint customization

* Remove failing @SuppressWarnings

* Revert TPO hashCode changes
Adjust tests
Improve TPO and topics handling

* Enable customizing main endpoint TPOs
garyrussell pushed a commit that referenced this issue Apr 14, 2022
* GH-2220: Fix TopicPartitionOffset for Retry Topics

Resolves #2220

* Register NewTopics for Retry Topics only once per topic.

* Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic.

* Move default partition to constant field.

* Address review comments

Make EndpointCustomizerFactory ctor public

* Remove main endpoint customization

* Remove failing @SuppressWarnings

* Revert TPO hashCode changes
Adjust tests
Improve TPO and topics handling

* Enable customizing main endpoint TPOs
garyrussell pushed a commit that referenced this issue Apr 14, 2022
* GH-2220: Fix TopicPartitionOffset for Retry Topics

Resolves #2220

* Register NewTopics for Retry Topics only once per topic.

* Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic.

* Move default partition to constant field.

* Address review comments

Make EndpointCustomizerFactory ctor public

* Remove main endpoint customization

* Remove failing @SuppressWarnings

* Revert TPO hashCode changes
Adjust tests
Improve TPO and topics handling

* Enable customizing main endpoint TPOs
garyrussell pushed a commit that referenced this issue Apr 14, 2022
* GH-2220: Fix TopicPartitionOffset for Retry Topics

Resolves #2220

* Register NewTopics for Retry Topics only once per topic.

* Add docs explaining configuration behavior for retry topics when more than one listener container is used for the same topic.

* Move default partition to constant field.

* Address review comments

Make EndpointCustomizerFactory ctor public

* Remove main endpoint customization

* Remove failing @SuppressWarnings

* Revert TPO hashCode changes
Adjust tests
Improve TPO and topics handling

* Enable customizing main endpoint TPOs
garyrussell added a commit that referenced this issue Apr 14, 2022
garyrussell added a commit that referenced this issue Apr 14, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment