Skip to content

Commit ca60db5

Browse files
artembilangaryrussell
authored andcommitted
SO-62761903: Inject BF into Gateway's correlator
Related to https://stackoverflow.com/questions/62761903/spring-integration-reactive-streams-support-exception-in-creating-a-reactive The `MessagingGatewaySupport` creates an internal endpoint for consuming messages from the provided `replyChannel`. The endpoint type depends on the channel type. The `ReactiveStreamsConsumer` was missing a `BeanFactory` injection causing an error when `ReactiveStreamsConsumer` tries to extract a default `ErrorHandler` from `BeanFactory` * Refactor `MessagingGatewaySupport` to inject a `BeanFactory` to all the possible correlator endpoints. Also always call `afterPropertiesSet()` for all of them **Cherry-pick to 5.3.x & 5.2.x**
1 parent a8c63e2 commit ca60db5

File tree

2 files changed

+24
-8
lines changed

2 files changed

+24
-8
lines changed

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -794,23 +794,21 @@ protected void registerReplyMessageCorrelatorIfNecessary() {
794794
}
795795
else if (replyChan instanceof PollableChannel) {
796796
PollingConsumer endpoint = new PollingConsumer((PollableChannel) replyChan, handler);
797-
if (beanFactory != null) {
798-
endpoint.setBeanFactory(beanFactory);
799-
}
800797
endpoint.setReceiveTimeout(this.replyTimeout);
801-
endpoint.afterPropertiesSet();
802798
correlator = endpoint;
803799
}
804800
else if (replyChan instanceof ReactiveStreamsSubscribableChannel) {
805-
ReactiveStreamsConsumer endpoint =
806-
new ReactiveStreamsConsumer(replyChan, (Subscriber<Message<?>>) handler);
807-
endpoint.afterPropertiesSet();
808-
correlator = endpoint;
801+
correlator = new ReactiveStreamsConsumer(replyChan, (Subscriber<Message<?>>) handler);
809802
}
810803
else {
811804
throw new MessagingException("Unsupported 'replyChannel' type [" + replyChan.getClass() + "]."
812805
+ "SubscribableChannel or PollableChannel type are supported.");
813806
}
807+
808+
if (beanFactory != null) {
809+
correlator.setBeanFactory(beanFactory);
810+
}
811+
correlator.afterPropertiesSet();
814812
this.replyMessageCorrelator = correlator;
815813
}
816814
if (isRunning()) {

spring-integration-core/src/test/java/org/springframework/integration/gateway/GatewayProxyFactoryBeanTests.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.springframework.integration.annotation.Gateway;
4848
import org.springframework.integration.annotation.GatewayHeader;
4949
import org.springframework.integration.channel.DirectChannel;
50+
import org.springframework.integration.channel.FluxMessageChannel;
5051
import org.springframework.integration.channel.QueueChannel;
5152
import org.springframework.integration.endpoint.EventDrivenConsumer;
5253
import org.springframework.integration.support.utils.IntegrationUtils;
@@ -165,6 +166,23 @@ public void testReceiveMessage() {
165166
.isEqualTo(IntegrationPatternType.outbound_channel_adapter);
166167
}
167168

169+
@Test
170+
public void testReactiveReplyChannel() {
171+
QueueChannel requestChannel = new QueueChannel();
172+
startResponder(requestChannel);
173+
FluxMessageChannel replyChannel = new FluxMessageChannel();
174+
GatewayProxyFactoryBean proxyFactory = new GatewayProxyFactoryBean(TestService.class);
175+
proxyFactory.setDefaultRequestChannel(requestChannel);
176+
proxyFactory.setDefaultReplyChannel(replyChannel);
177+
178+
proxyFactory.setBeanFactory(mock(BeanFactory.class));
179+
proxyFactory.afterPropertiesSet();
180+
TestService service = (TestService) proxyFactory.getObject();
181+
182+
String result = service.requestReply("test");
183+
assertThat(result).isEqualTo("testbar");
184+
}
185+
168186
@Test
169187
public void testRequestReplyWithTypeConversion() {
170188
final QueueChannel requestChannel = new QueueChannel();

0 commit comments

Comments
 (0)