Skip to content

Add a Lightweight MessageListener Adapter Supporting Message Conversion #2068

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
garyrussell opened this issue Jan 13, 2022 · 11 comments
Closed
Labels
ideal-for-user-contribution An issue that would ideal for a user to get started with contributing. type: enhancement
Milestone

Comments

@garyrussell
Copy link
Contributor

See https://stackoverflow.com/questions/70684560/concurrentkafkalistenercontainerfactory-message-converter-is-ignored-when-config/70685111#70685111

Similar to RabbitMQ, we could provide a thin adapter that performs message conversion.

@garyrussell garyrussell added type: enhancement ideal-for-user-contribution An issue that would ideal for a user to get started with contributing. labels Jan 13, 2022
@garyrussell garyrussell added this to the Backlog milestone Jan 13, 2022
@breader124
Copy link
Contributor

Hi, I'll try to start working on this issue

@breader124
Copy link
Contributor

Hi once again, it seems that after update introduced in 4e2f9fd project doesn't compile anymore due to failures in resolving micrometer related imports. What's more, one test fails even after I resolved problem with imports. I guess you'll be fixing it all soon (as the commit I mentioned is the last one on main branch and it was pushed last Friday), so I'll just wait for it and start working whenever it's ready, from my perspective it doesn't make sense to double the work

@garyrussell
Copy link
Contributor Author

garyrussell commented Feb 20, 2022

@breader124 My sincere apologies - I was off on Friday and we have a holiday tomorrow, so I was trying to get a jump on Tuesday's releases (they were delayed from tomorrow because of the holiday). I should have checked the CI builds (I actually did, but on my 'phone and failed to notice the failure). Fixing now...

@garyrussell
Copy link
Contributor Author

Fixed. @breader124 once again sorry about that.

@breader124
Copy link
Contributor

@garyrussell, it's absolutely not a problem, thanks so much for quick reaction. Now I'm able to jump in

@garyrussell
Copy link
Contributor Author

@breader124 More apologies - I have had to revert the Micrometer upgrade for compatibility with other projects at this time.

Please rebase your branch before pushing a PR.

Thanks.

@breader124
Copy link
Contributor

Hi @garyrussell, no problem, I'll do it. I'd also like to ask you for a little guidance and validation of my initial thoughts. So basically as I understand this task, I need to provide concrete implementation of MessageListener interface, right? In order to do that, I'd propose to create another interface extending MessageListener and called ConvertingMessageListener. In this new interface, I'd provide default implementation for onMessage(ConsumerRecord<K, V> data) simply throwing UnsupportedOperationException and add unimplemented method void onMessage(T data, Consumer consumer). This way I'd be able to provide then concrete implementation of ConvertingMessageListener named let's say SimpleConvertingMessageListenerImpl. This class during creation could accept desired type and Consumer for already converted message. Example usage could look similarly to

kafkaMessageListenerContainer.setupMessageListener(new ConvertingMessageListenerAdapter<>(Type desiredType, Consumer consumer));

What's your opinion about your solution? Please bear with me if I'm proposing something totally incorrect, it's the first time with this codebase from my perspective :)

I'd also like to ask question about the difference between ConsumerRecord and Message. In order to convert ConsumerRecord to Message I'd need to use MessagingMessageConverter, but I don't know if that's the way to go and most probably I need more information about the real purpose of those classes.

Thanks in advance! I'm looking forward to your answer.

@garyrussell
Copy link
Contributor Author

Sorry; I am tied up on another task at the moment; I will try to get back to this soon, but we definitely don't want another flavor of the MessageListener interface.

You might want to look at the adapters in spring-rabbit; the legacy adapter is close to what this lightweight adapter might be.

https://github.com/spring-projects/spring-amqp/blob/main/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessageListenerAdapter.java

It might be easier to look at it on an old branch before it was refactored.

https://github.com/spring-projects/spring-amqp/blob/1.3.x/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/adapter/MessageListenerAdapter.java

@breader124
Copy link
Contributor

Okay, thanks for an advice. I think I understand what there's need to be done better right now after looking at the classes you linked. Unfortunately I didn't have enough time previous week to code it, I'll do my best to do it as soon as possible.

@breader124
Copy link
Contributor

Hi @garyrussell, I just opened PR (#2165). Looking forward to your review!

breader124 pushed a commit to breader124/spring-kafka that referenced this issue Mar 15, 2022
breader124 pushed a commit to breader124/spring-kafka that referenced this issue Mar 15, 2022
breader124 pushed a commit to breader124/spring-kafka that referenced this issue Mar 18, 2022
breader124 added a commit to breader124/spring-kafka that referenced this issue Apr 2, 2022
…ata and fix doubled invokation of message listener
breader124 added a commit to breader124/spring-kafka that referenced this issue Apr 5, 2022
garyrussell pushed a commit that referenced this issue Jun 1, 2022
* GH-2068: Add lightweight converting adapter

* GH-2068: Introduce amendments after code review

* GH-2068: Introduce further amendments after code review

* GH-2068: Extend content of copied record by more metadata and fix doubled invokation of message listener

* GH-2068: Add possibility to specify header mapper and cover it with tests

Co-authored-by: Adrian Chlebosz <[email protected]>
@garyrussell
Copy link
Contributor Author

Closed with #2165

@garyrussell garyrussell modified the milestones: Backlog, 3.0.0-M5 Oct 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ideal-for-user-contribution An issue that would ideal for a user to get started with contributing. type: enhancement
Projects
None yet
Development

No branches or pull requests

2 participants