Skip to content

Setting up the backoff aware message listener adapter should work for any message listener #2313

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

filiphr
Copy link
Contributor

@filiphr filiphr commented Jun 21, 2022

We had a problem here while trying to implement our programatic retry mechanism using Kafka.

This is one of the places that we had to adapt our MessageListener.

I am not sure whether the cast you require is mandatory or not, but the KafkaBackoffAwareMessageListenerAdapter does support different message listeners.

@tomazfernandes
Copy link
Contributor

Hi @filiphr, this makes sense to me, we don't seem to require an AcknowledgingConsumerAwareMessageListener anywhere down the road.

As far as I can see, the consequence of having a different MessageListener for the RetryableTopics feature would be that we wouldn't be able to apply the TimingAdjustment if we don't have access to the Consumer, which doesn't seem like a deal breaker to me.

Of course, it's up to @garyrussell whether to proceed with this or not.

Thanks

@filiphr
Copy link
Contributor Author

filiphr commented Jun 22, 2022

As far as I can see, the consequence of having a different MessageListener for the RetryableTopics feature would be that we wouldn't be able to apply the TimingAdjustment if we don't have access to the Consumer, which doesn't seem like a deal breaker to me.

Are you referring to

maybeGetBackoffTimestamp(consumerRecord)
.ifPresent(nextExecutionTimestamp -> this.kafkaConsumerBackoffManager
.backOffIfNecessary(createContext(consumerRecord, nextExecutionTimestamp, consumer)));

because the KafkaBackoffAwareMessageListenerAdapter is an AcknowledgingConsumerAwareMessageListener. This change is to relax the requirement of the delegate to also be AcknowledgingConsumerAwareMessageListener. From what I could see ConcurrentMessageListenerContainer will use the newly created ConcurrentMessageListenerContainer as a message listener. Thus the right method will be invoked and you will still have access to the Consumer.

However, my knowledge about this is way more limited than yours and there might be something that I am not seeing 😄

@tomazfernandes
Copy link
Contributor

Yep, I think you're right. It's the delegate that might not have access to the consumer if it's a different interface. So, no concerns from me regarding this. Thanks!

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do this over there even right now:

new KafkaBackoffAwareMessageListenerAdapter<>(listener,

Which leads to this:

public KafkaBackoffAwareMessageListenerAdapter(MessageListener<K, V> adapter,

So, that is really strange to have it forced just to the AcknowledgingConsumerAwareMessageListener...

LGTM, though!

@garyrussell
Copy link
Contributor

Merged as 596f7d9 and cherry-picked to 2.9.x, 2.8.x.

@filiphr filiphr deleted the listener-container-factory-setup-backoff-listener branch June 22, 2022 15:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants