Skip to content

Add Kotlin Coroutines Support #3902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
artembilan opened this issue Oct 3, 2022 Discussed in #3894 · 4 comments · Fixed by #3905
Closed

Add Kotlin Coroutines Support #3902

artembilan opened this issue Oct 3, 2022 Discussed in #3894 · 4 comments · Fixed by #3905

Comments

@artembilan
Copy link
Member

Discussed in #3894

Originally posted by tomvandenberge September 25, 2022
Hi,
I'm using Spring Integration in various Kotlin (previously Java) projects. Some of these projects are using coroutines. So far, I haven't been able to find a way to use coroutines together with Spring Integration. I'd love to be able e.g. to have a suspending function as a service activator. Currently this is not possible.

I realise that this probably requires fundamental changes to Spring Integration, but I wanted to check with you if this is something that is already on your roadmap?

Thanks,
Tom

@artembilan
Copy link
Member Author

@tomvandenberge,

So, I've created an issue from a discussion we had before since this request is really valid.
And those Kotlin Coroutines look for me as another Reactive Streams library.
Turns out that Spring Framework provides for already a CoroutinesUtils with its API around Reactor's Mono which we already support in Spring Integration.
So, suspended functions should not be too hard to implement as far as I see.

Thank you!

@tomvandenberge
Copy link
Contributor

Thanks for looking into this. I would say coroutines is a bit more than just another Reactive Streams library -- it's a feature that's built into the kotlin language itself. Other parts of the Spring framework already provide support for coroutines, e.g. WebFlux, which allows you to use suspending @controller functions. In my view, Spring Integration could provide similar support, meaning that it's possible to use Spring Integration components using suspending functions. I don't see how you can do this very easily though; I think it means making all Spring Integration components work with suspending functions.

@artembilan
Copy link
Member Author

Well, I look at that from the point of view of this Java framework.
It is really out of my scope if that is high-level language feature or some extra library. For me me it is just a set of respective types I can deal with in the framework Java code.
And that's probably not a surprise why Spring Framework asks for the kotlinx-coroutines-reactor dependency...
But that's already this framework responsibility how to deal with those types.

From end-user perspective it indeed would look similar to that @RequestMapping for suspend fun:

    @ServiceActivator(inputChannel = "input")
    suspend fun render(payload: String): String {

The change (I believe) is going to happen only in one place - MessagingMethodInvokerHelper. So, all other messaging annotations would be able to be used with suspend functions as well.

Another part of the puzzle indeed going to be a @MessagingGateway for those suspend method definitions on the interface.

Since it is still a Reactive Streams library to make the whole integration flow fully suspendable we have to use a FluxMessageChannel in between endpoints as we recommend for Flux and Mono: https://docs.spring.io/spring-integration/docs/current/reference/html/reactive-streams.html#reactive-streams.

artembilan added a commit to artembilan/spring-integration that referenced this issue Oct 4, 2022
Fixes spring-projects#3902

* Add `isAsync()` propagation from the `MessagingMethodInvokerHelper`
to the `AbstractMessageProducingHandler` to set into its `async` property.
The logic is based on a `CompletableFuture`, `Publisher` or Kotlin `suspend`
return types of the POJO method
* Introduce `IntegrationMessageHandlerMethodFactory` and `IntegrationInvocableHandlerMethod`
to extend the logic to newly introduced `ContinuationHandlerMethodArgumentResolver`
and call for Kotlin suspend functions.
* Remove `MessageHandlerMethodFactoryCreatingFactoryBean` since its logic now is covered with the
`IntegrationMessageHandlerMethodFactory`
* Kotlin suspend functions are essentially reactive, so use `CoroutinesUtils.invokeSuspendingFunction()`
and existing logic in the `AbstractMessageProducingHandler` to deal with `Publisher` reply
@artembilan
Copy link
Member Author

@tomvandenberge ,

here you are PR: #3905 !

artembilan added a commit to artembilan/spring-integration that referenced this issue Oct 5, 2022
Fixes spring-projects#3902

* Add `isAsync()` propagation from the `MessagingMethodInvokerHelper`
to the `AbstractMessageProducingHandler` to set into its `async` property.
The logic is based on a `CompletableFuture`, `Publisher` or Kotlin `suspend`
return types of the POJO method
* Introduce `IntegrationMessageHandlerMethodFactory` and `IntegrationInvocableHandlerMethod`
to extend the logic to newly introduced `ContinuationHandlerMethodArgumentResolver`
and call for Kotlin suspend functions.
* Remove `MessageHandlerMethodFactoryCreatingFactoryBean` since its logic now is covered with the
`IntegrationMessageHandlerMethodFactory`
* Kotlin suspend functions are essentially reactive, so use `CoroutinesUtils.invokeSuspendingFunction()`
and existing logic in the `AbstractMessageProducingHandler` to deal with `Publisher` reply
artembilan added a commit to artembilan/spring-integration that referenced this issue Oct 6, 2022
Fixes spring-projects#3902

* Add `isAsync()` propagation from the `MessagingMethodInvokerHelper`
to the `AbstractMessageProducingHandler` to set into its `async` property.
The logic is based on a `CompletableFuture`, `Publisher` or Kotlin `suspend`
return types of the POJO method
* Introduce `IntegrationMessageHandlerMethodFactory` and `IntegrationInvocableHandlerMethod`
to extend the logic to newly introduced `ContinuationHandlerMethodArgumentResolver`
and call for Kotlin suspend functions.
* Remove `MessageHandlerMethodFactoryCreatingFactoryBean` since its logic now is covered with the
`IntegrationMessageHandlerMethodFactory`
* Kotlin suspend functions are essentially reactive, so use `CoroutinesUtils.invokeSuspendingFunction()`
and existing logic in the `AbstractMessageProducingHandler` to deal with `Publisher` reply

* Fix `GroovySplitterTests` for the current code base

* Add `kotlinx.coroutines.flow.Flow` support
The `Flow` is essentially a multi-value reactive `Publisher`,
so use `ReactiveAdapterRegistry` to convert any custom reactive streams result to `Flux` and `Mono`
which we already support as reply types

* Add docs for `Kotlin Coroutines`
Rearrange the doc a bit extracting Kotlin support to individual `kotlin-functions.adoc` file

* Fix missed link to `reactive-streams.adoc` from the `index-single.adoc`
* Fix unintended Javadocs formatting in the `AbstractMessageProducingHandler`

* Add suspend functions support for Messaging Gateway
* Add convenient `CoroutinesUtils` for Coroutines types and `Continuation` argument fulfilling via `Mono`
* Treat `suspend fun` in the `GatewayProxyFactoryBean` as a `Mono` return
* Convert `Mono` to the `Continuation` resuming in the end of gateway call

* Document `suspend fun` for `@MessagingGateway`
artembilan added a commit to artembilan/spring-integration that referenced this issue Oct 17, 2022
Fixes spring-projects#3902

* Add `isAsync()` propagation from the `MessagingMethodInvokerHelper`
to the `AbstractMessageProducingHandler` to set into its `async` property.
The logic is based on a `CompletableFuture`, `Publisher` or Kotlin `suspend`
return types of the POJO method
* Introduce `IntegrationMessageHandlerMethodFactory` and `IntegrationInvocableHandlerMethod`
to extend the logic to newly introduced `ContinuationHandlerMethodArgumentResolver`
and call for Kotlin suspend functions.
* Remove `MessageHandlerMethodFactoryCreatingFactoryBean` since its logic now is covered with the
`IntegrationMessageHandlerMethodFactory`
* Kotlin suspend functions are essentially reactive, so use `CoroutinesUtils.invokeSuspendingFunction()`
and existing logic in the `AbstractMessageProducingHandler` to deal with `Publisher` reply

* Fix `GroovySplitterTests` for the current code base

* Add `kotlinx.coroutines.flow.Flow` support
The `Flow` is essentially a multi-value reactive `Publisher`,
so use `ReactiveAdapterRegistry` to convert any custom reactive streams result to `Flux` and `Mono`
which we already support as reply types

* Add docs for `Kotlin Coroutines`
Rearrange the doc a bit extracting Kotlin support to individual `kotlin-functions.adoc` file

* Fix missed link to `reactive-streams.adoc` from the `index-single.adoc`
* Fix unintended Javadocs formatting in the `AbstractMessageProducingHandler`

* Add suspend functions support for Messaging Gateway
* Add convenient `CoroutinesUtils` for Coroutines types and `Continuation` argument fulfilling via `Mono`
* Treat `suspend fun` in the `GatewayProxyFactoryBean` as a `Mono` return
* Convert `Mono` to the `Continuation` resuming in the end of gateway call

* Document `suspend fun` for `@MessagingGateway`
artembilan added a commit to artembilan/spring-integration that referenced this issue Oct 17, 2022
Fixes spring-projects#3902

* Add `isAsync()` propagation from the `MessagingMethodInvokerHelper`
to the `AbstractMessageProducingHandler` to set into its `async` property.
The logic is based on a `CompletableFuture`, `Publisher` or Kotlin `suspend`
return types of the POJO method
* Introduce `IntegrationMessageHandlerMethodFactory` and `IntegrationInvocableHandlerMethod`
to extend the logic to newly introduced `ContinuationHandlerMethodArgumentResolver`
and call for Kotlin suspend functions.
* Remove `MessageHandlerMethodFactoryCreatingFactoryBean` since its logic now is covered with the
`IntegrationMessageHandlerMethodFactory`
* Kotlin suspend functions are essentially reactive, so use `CoroutinesUtils.invokeSuspendingFunction()`
and existing logic in the `AbstractMessageProducingHandler` to deal with `Publisher` reply

* Fix `GroovySplitterTests` for the current code base

* Add `kotlinx.coroutines.flow.Flow` support
The `Flow` is essentially a multi-value reactive `Publisher`,
so use `ReactiveAdapterRegistry` to convert any custom reactive streams result to `Flux` and `Mono`
which we already support as reply types

* Add docs for `Kotlin Coroutines`
Rearrange the doc a bit extracting Kotlin support to individual `kotlin-functions.adoc` file

* Fix missed link to `reactive-streams.adoc` from the `index-single.adoc`
* Fix unintended Javadocs formatting in the `AbstractMessageProducingHandler`

* Add suspend functions support for Messaging Gateway
* Add convenient `CoroutinesUtils` for Coroutines types and `Continuation` argument fulfilling via `Mono`
* Treat `suspend fun` in the `GatewayProxyFactoryBean` as a `Mono` return
* Convert `Mono` to the `Continuation` resuming in the end of gateway call

* Document `suspend fun` for `@MessagingGateway`
garyrussell added a commit that referenced this issue Oct 17, 2022
* GH-3902: Add Kotlin Coroutines Support

Fixes #3902

* Add `isAsync()` propagation from the `MessagingMethodInvokerHelper`
to the `AbstractMessageProducingHandler` to set into its `async` property.
The logic is based on a `CompletableFuture`, `Publisher` or Kotlin `suspend`
return types of the POJO method
* Introduce `IntegrationMessageHandlerMethodFactory` and `IntegrationInvocableHandlerMethod`
to extend the logic to newly introduced `ContinuationHandlerMethodArgumentResolver`
and call for Kotlin suspend functions.
* Remove `MessageHandlerMethodFactoryCreatingFactoryBean` since its logic now is covered with the
`IntegrationMessageHandlerMethodFactory`
* Kotlin suspend functions are essentially reactive, so use `CoroutinesUtils.invokeSuspendingFunction()`
and existing logic in the `AbstractMessageProducingHandler` to deal with `Publisher` reply

* Fix `GroovySplitterTests` for the current code base

* Add `kotlinx.coroutines.flow.Flow` support
The `Flow` is essentially a multi-value reactive `Publisher`,
so use `ReactiveAdapterRegistry` to convert any custom reactive streams result to `Flux` and `Mono`
which we already support as reply types

* Add docs for `Kotlin Coroutines`
Rearrange the doc a bit extracting Kotlin support to individual `kotlin-functions.adoc` file

* Fix missed link to `reactive-streams.adoc` from the `index-single.adoc`
* Fix unintended Javadocs formatting in the `AbstractMessageProducingHandler`

* Add suspend functions support for Messaging Gateway
* Add convenient `CoroutinesUtils` for Coroutines types and `Continuation` argument fulfilling via `Mono`
* Treat `suspend fun` in the `GatewayProxyFactoryBean` as a `Mono` return
* Convert `Mono` to the `Continuation` resuming in the end of gateway call

* Document `suspend fun` for `@MessagingGateway`

* * Make `async` implicitly only for `suspend fun`

* * Remove unused imports

* * Verify sync and async `Flow` processing
* Mention default sync behavior in the docs

* * Improve reflection in the `CoroutinesUtils`

* Fix language in docs

Co-authored-by: Gary Russell <[email protected]>

* * Rebase and revert blank lines around `include` in docs

Co-authored-by: Gary Russell <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants