-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Add Kotlin Coroutines Support #3902
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
So, I've created an issue from a discussion we had before since this request is really valid. Thank you! |
Thanks for looking into this. I would say coroutines is a bit more than just another Reactive Streams library -- it's a feature that's built into the kotlin language itself. Other parts of the Spring framework already provide support for coroutines, e.g. WebFlux, which allows you to use suspending @controller functions. In my view, Spring Integration could provide similar support, meaning that it's possible to use Spring Integration components using suspending functions. I don't see how you can do this very easily though; I think it means making all Spring Integration components work with suspending functions. |
Well, I look at that from the point of view of this Java framework. From end-user perspective it indeed would look similar to that
The change (I believe) is going to happen only in one place - Another part of the puzzle indeed going to be a Since it is still a Reactive Streams library to make the whole integration flow fully suspendable we have to use a |
Fixes spring-projects#3902 * Add `isAsync()` propagation from the `MessagingMethodInvokerHelper` to the `AbstractMessageProducingHandler` to set into its `async` property. The logic is based on a `CompletableFuture`, `Publisher` or Kotlin `suspend` return types of the POJO method * Introduce `IntegrationMessageHandlerMethodFactory` and `IntegrationInvocableHandlerMethod` to extend the logic to newly introduced `ContinuationHandlerMethodArgumentResolver` and call for Kotlin suspend functions. * Remove `MessageHandlerMethodFactoryCreatingFactoryBean` since its logic now is covered with the `IntegrationMessageHandlerMethodFactory` * Kotlin suspend functions are essentially reactive, so use `CoroutinesUtils.invokeSuspendingFunction()` and existing logic in the `AbstractMessageProducingHandler` to deal with `Publisher` reply
here you are PR: #3905 ! |
Fixes spring-projects#3902 * Add `isAsync()` propagation from the `MessagingMethodInvokerHelper` to the `AbstractMessageProducingHandler` to set into its `async` property. The logic is based on a `CompletableFuture`, `Publisher` or Kotlin `suspend` return types of the POJO method * Introduce `IntegrationMessageHandlerMethodFactory` and `IntegrationInvocableHandlerMethod` to extend the logic to newly introduced `ContinuationHandlerMethodArgumentResolver` and call for Kotlin suspend functions. * Remove `MessageHandlerMethodFactoryCreatingFactoryBean` since its logic now is covered with the `IntegrationMessageHandlerMethodFactory` * Kotlin suspend functions are essentially reactive, so use `CoroutinesUtils.invokeSuspendingFunction()` and existing logic in the `AbstractMessageProducingHandler` to deal with `Publisher` reply
Fixes spring-projects#3902 * Add `isAsync()` propagation from the `MessagingMethodInvokerHelper` to the `AbstractMessageProducingHandler` to set into its `async` property. The logic is based on a `CompletableFuture`, `Publisher` or Kotlin `suspend` return types of the POJO method * Introduce `IntegrationMessageHandlerMethodFactory` and `IntegrationInvocableHandlerMethod` to extend the logic to newly introduced `ContinuationHandlerMethodArgumentResolver` and call for Kotlin suspend functions. * Remove `MessageHandlerMethodFactoryCreatingFactoryBean` since its logic now is covered with the `IntegrationMessageHandlerMethodFactory` * Kotlin suspend functions are essentially reactive, so use `CoroutinesUtils.invokeSuspendingFunction()` and existing logic in the `AbstractMessageProducingHandler` to deal with `Publisher` reply * Fix `GroovySplitterTests` for the current code base * Add `kotlinx.coroutines.flow.Flow` support The `Flow` is essentially a multi-value reactive `Publisher`, so use `ReactiveAdapterRegistry` to convert any custom reactive streams result to `Flux` and `Mono` which we already support as reply types * Add docs for `Kotlin Coroutines` Rearrange the doc a bit extracting Kotlin support to individual `kotlin-functions.adoc` file * Fix missed link to `reactive-streams.adoc` from the `index-single.adoc` * Fix unintended Javadocs formatting in the `AbstractMessageProducingHandler` * Add suspend functions support for Messaging Gateway * Add convenient `CoroutinesUtils` for Coroutines types and `Continuation` argument fulfilling via `Mono` * Treat `suspend fun` in the `GatewayProxyFactoryBean` as a `Mono` return * Convert `Mono` to the `Continuation` resuming in the end of gateway call * Document `suspend fun` for `@MessagingGateway`
Fixes spring-projects#3902 * Add `isAsync()` propagation from the `MessagingMethodInvokerHelper` to the `AbstractMessageProducingHandler` to set into its `async` property. The logic is based on a `CompletableFuture`, `Publisher` or Kotlin `suspend` return types of the POJO method * Introduce `IntegrationMessageHandlerMethodFactory` and `IntegrationInvocableHandlerMethod` to extend the logic to newly introduced `ContinuationHandlerMethodArgumentResolver` and call for Kotlin suspend functions. * Remove `MessageHandlerMethodFactoryCreatingFactoryBean` since its logic now is covered with the `IntegrationMessageHandlerMethodFactory` * Kotlin suspend functions are essentially reactive, so use `CoroutinesUtils.invokeSuspendingFunction()` and existing logic in the `AbstractMessageProducingHandler` to deal with `Publisher` reply * Fix `GroovySplitterTests` for the current code base * Add `kotlinx.coroutines.flow.Flow` support The `Flow` is essentially a multi-value reactive `Publisher`, so use `ReactiveAdapterRegistry` to convert any custom reactive streams result to `Flux` and `Mono` which we already support as reply types * Add docs for `Kotlin Coroutines` Rearrange the doc a bit extracting Kotlin support to individual `kotlin-functions.adoc` file * Fix missed link to `reactive-streams.adoc` from the `index-single.adoc` * Fix unintended Javadocs formatting in the `AbstractMessageProducingHandler` * Add suspend functions support for Messaging Gateway * Add convenient `CoroutinesUtils` for Coroutines types and `Continuation` argument fulfilling via `Mono` * Treat `suspend fun` in the `GatewayProxyFactoryBean` as a `Mono` return * Convert `Mono` to the `Continuation` resuming in the end of gateway call * Document `suspend fun` for `@MessagingGateway`
Fixes spring-projects#3902 * Add `isAsync()` propagation from the `MessagingMethodInvokerHelper` to the `AbstractMessageProducingHandler` to set into its `async` property. The logic is based on a `CompletableFuture`, `Publisher` or Kotlin `suspend` return types of the POJO method * Introduce `IntegrationMessageHandlerMethodFactory` and `IntegrationInvocableHandlerMethod` to extend the logic to newly introduced `ContinuationHandlerMethodArgumentResolver` and call for Kotlin suspend functions. * Remove `MessageHandlerMethodFactoryCreatingFactoryBean` since its logic now is covered with the `IntegrationMessageHandlerMethodFactory` * Kotlin suspend functions are essentially reactive, so use `CoroutinesUtils.invokeSuspendingFunction()` and existing logic in the `AbstractMessageProducingHandler` to deal with `Publisher` reply * Fix `GroovySplitterTests` for the current code base * Add `kotlinx.coroutines.flow.Flow` support The `Flow` is essentially a multi-value reactive `Publisher`, so use `ReactiveAdapterRegistry` to convert any custom reactive streams result to `Flux` and `Mono` which we already support as reply types * Add docs for `Kotlin Coroutines` Rearrange the doc a bit extracting Kotlin support to individual `kotlin-functions.adoc` file * Fix missed link to `reactive-streams.adoc` from the `index-single.adoc` * Fix unintended Javadocs formatting in the `AbstractMessageProducingHandler` * Add suspend functions support for Messaging Gateway * Add convenient `CoroutinesUtils` for Coroutines types and `Continuation` argument fulfilling via `Mono` * Treat `suspend fun` in the `GatewayProxyFactoryBean` as a `Mono` return * Convert `Mono` to the `Continuation` resuming in the end of gateway call * Document `suspend fun` for `@MessagingGateway`
* GH-3902: Add Kotlin Coroutines Support Fixes #3902 * Add `isAsync()` propagation from the `MessagingMethodInvokerHelper` to the `AbstractMessageProducingHandler` to set into its `async` property. The logic is based on a `CompletableFuture`, `Publisher` or Kotlin `suspend` return types of the POJO method * Introduce `IntegrationMessageHandlerMethodFactory` and `IntegrationInvocableHandlerMethod` to extend the logic to newly introduced `ContinuationHandlerMethodArgumentResolver` and call for Kotlin suspend functions. * Remove `MessageHandlerMethodFactoryCreatingFactoryBean` since its logic now is covered with the `IntegrationMessageHandlerMethodFactory` * Kotlin suspend functions are essentially reactive, so use `CoroutinesUtils.invokeSuspendingFunction()` and existing logic in the `AbstractMessageProducingHandler` to deal with `Publisher` reply * Fix `GroovySplitterTests` for the current code base * Add `kotlinx.coroutines.flow.Flow` support The `Flow` is essentially a multi-value reactive `Publisher`, so use `ReactiveAdapterRegistry` to convert any custom reactive streams result to `Flux` and `Mono` which we already support as reply types * Add docs for `Kotlin Coroutines` Rearrange the doc a bit extracting Kotlin support to individual `kotlin-functions.adoc` file * Fix missed link to `reactive-streams.adoc` from the `index-single.adoc` * Fix unintended Javadocs formatting in the `AbstractMessageProducingHandler` * Add suspend functions support for Messaging Gateway * Add convenient `CoroutinesUtils` for Coroutines types and `Continuation` argument fulfilling via `Mono` * Treat `suspend fun` in the `GatewayProxyFactoryBean` as a `Mono` return * Convert `Mono` to the `Continuation` resuming in the end of gateway call * Document `suspend fun` for `@MessagingGateway` * * Make `async` implicitly only for `suspend fun` * * Remove unused imports * * Verify sync and async `Flow` processing * Mention default sync behavior in the docs * * Improve reflection in the `CoroutinesUtils` * Fix language in docs Co-authored-by: Gary Russell <[email protected]> * * Rebase and revert blank lines around `include` in docs Co-authored-by: Gary Russell <[email protected]>
Discussed in #3894
Originally posted by tomvandenberge September 25, 2022
Hi,
I'm using Spring Integration in various Kotlin (previously Java) projects. Some of these projects are using coroutines. So far, I haven't been able to find a way to use coroutines together with Spring Integration. I'd love to be able e.g. to have a suspending function as a service activator. Currently this is not possible.
I realise that this probably requires fundamental changes to Spring Integration, but I wanted to check with you if this is something that is already on your roadmap?
Thanks,
Tom
The text was updated successfully, but these errors were encountered: