Skip to content

Commit e382f67

Browse files
garyrussellartembilan
authored andcommitted
GH-1289: Confirms and Returns with Routing CF
Resolves #1289 `RoutingConnectionFactory` did not support correlated confirms or returns. Target factories (and default) must have the same settings. **cherry-pick to 2.2.x, 2.1.x**
1 parent 072e8ac commit e382f67

File tree

3 files changed

+99
-11
lines changed

3 files changed

+99
-11
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractRoutingConnectionFactory.java

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.ConcurrentHashMap;
2323

2424
import org.springframework.amqp.AmqpException;
25+
import org.springframework.beans.factory.InitializingBean;
2526
import org.springframework.lang.Nullable;
2627
import org.springframework.util.Assert;
2728

@@ -35,7 +36,8 @@
3536
* @author Gary Russell
3637
* @since 1.3
3738
*/
38-
public abstract class AbstractRoutingConnectionFactory implements ConnectionFactory, RoutingConnectionFactory {
39+
public abstract class AbstractRoutingConnectionFactory implements ConnectionFactory, RoutingConnectionFactory,
40+
InitializingBean {
3941

4042
private final Map<Object, ConnectionFactory> targetConnectionFactories =
4143
new ConcurrentHashMap<Object, ConnectionFactory>();
@@ -46,6 +48,10 @@ public abstract class AbstractRoutingConnectionFactory implements ConnectionFact
4648

4749
private boolean lenientFallback = true;
4850

51+
private Boolean confirms;
52+
53+
private Boolean returns;
54+
4955
/**
5056
* Specify the map of target ConnectionFactories, with the lookup key as key.
5157
* <p>The key can be of arbitrary type; this class implements the
@@ -58,6 +64,7 @@ public void setTargetConnectionFactories(Map<Object, ConnectionFactory> targetCo
5864
Assert.noNullElements(targetConnectionFactories.values().toArray(),
5965
"'targetConnectionFactories' cannot have null values.");
6066
this.targetConnectionFactories.putAll(targetConnectionFactories);
67+
targetConnectionFactories.values().stream().forEach(cf -> checkConfirmsAndReturns(cf));
6168
}
6269

6370
/**
@@ -69,6 +76,7 @@ public void setTargetConnectionFactories(Map<Object, ConnectionFactory> targetCo
6976
*/
7077
public void setDefaultTargetConnectionFactory(ConnectionFactory defaultTargetConnectionFactory) {
7178
this.defaultTargetConnectionFactory = defaultTargetConnectionFactory;
79+
checkConfirmsAndReturns(defaultTargetConnectionFactory);
7280
}
7381

7482
/**
@@ -93,9 +101,37 @@ public boolean isLenientFallback() {
93101
return this.lenientFallback;
94102
}
95103

104+
@Override
105+
public boolean isPublisherConfirms() {
106+
return this.confirms;
107+
}
108+
109+
@Override
110+
public boolean isPublisherReturns() {
111+
return this.returns;
112+
}
113+
114+
@Override
115+
public void afterPropertiesSet() throws Exception {
116+
Assert.notNull(this.confirms, "At least one target factory (or default) is required");
117+
}
118+
119+
private void checkConfirmsAndReturns(ConnectionFactory cf) {
120+
if (this.confirms == null) {
121+
this.confirms = cf.isPublisherConfirms();
122+
}
123+
if (this.returns == null) {
124+
this.returns = cf.isPublisherReturns();
125+
}
126+
Assert.isTrue(this.confirms.booleanValue() == cf.isPublisherConfirms(),
127+
"Target connection factories must have the same setting for publisher confirms");
128+
Assert.isTrue(this.returns.booleanValue() == cf.isPublisherReturns(),
129+
"Target connection factories must have the same setting for publisher returns");
130+
}
131+
96132
@Override
97133
public Connection createConnection() throws AmqpException {
98-
return this.determineTargetConnectionFactory().createConnection();
134+
return determineTargetConnectionFactory().createConnection();
99135
}
100136

101137
/**

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitTemplatePublisherCallbacksIntegrationTests2.java

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -27,6 +27,8 @@
2727

2828
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
2929
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType;
30+
import org.springframework.amqp.rabbit.connection.CorrelationData;
31+
import org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory;
3032
import org.springframework.amqp.rabbit.junit.BrokerTestUtils;
3133
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
3234

@@ -39,36 +41,38 @@
3941
* @since 1.6
4042
*
4143
*/
42-
@RabbitAvailable(queues = RabbitTemplatePublisherCallbacksIntegrationTests2.ROUTE)
44+
@RabbitAvailable(queues = { RabbitTemplatePublisherCallbacksIntegrationTests2.ROUTE,
45+
RabbitTemplatePublisherCallbacksIntegrationTests2.ROUTE2 })
4346
public class RabbitTemplatePublisherCallbacksIntegrationTests2 {
4447

4548
public static final String ROUTE = "test.queue.RabbitTemplatePublisherCallbacksIntegrationTests2";
4649

50+
public static final String ROUTE2 = "test.queue.RabbitTemplatePublisherCallbacksIntegrationTests2.route";
51+
4752
private CachingConnectionFactory connectionFactoryWithConfirmsEnabled;
4853

4954
private RabbitTemplate templateWithConfirmsEnabled;
5055

5156
@BeforeEach
52-
public void create() {
57+
void create() {
5358
connectionFactoryWithConfirmsEnabled = new CachingConnectionFactory();
5459
connectionFactoryWithConfirmsEnabled.setHost("localhost");
55-
// When using publisher confirms, the cache size needs to be large enough
56-
// otherwise channels can be closed before confirms are received.
5760
connectionFactoryWithConfirmsEnabled.setChannelCacheSize(100);
5861
connectionFactoryWithConfirmsEnabled.setPort(BrokerTestUtils.getPort());
5962
connectionFactoryWithConfirmsEnabled.setPublisherConfirmType(ConfirmType.CORRELATED);
63+
connectionFactoryWithConfirmsEnabled.setPublisherReturns(true);
6064
templateWithConfirmsEnabled = new RabbitTemplate(connectionFactoryWithConfirmsEnabled);
6165
}
6266

6367
@AfterEach
64-
public void cleanUp() {
68+
void cleanUp() {
6569
if (connectionFactoryWithConfirmsEnabled != null) {
6670
connectionFactoryWithConfirmsEnabled.destroy();
6771
}
6872
}
6973

7074
@Test
71-
public void test36Methods() throws Exception {
75+
void test36Methods() throws Exception {
7276
this.templateWithConfirmsEnabled.convertAndSend(ROUTE, "foo");
7377
this.templateWithConfirmsEnabled.convertAndSend(ROUTE, "foo");
7478
assertMessageCountEquals(2L);
@@ -91,6 +95,51 @@ public void handleDelivery(String ag, Envelope envelope, BasicProperties propert
9195
assertMessageCountEquals(0L);
9296
}
9397

98+
@Test
99+
void routingWithConfirmsNoListener() throws Exception {
100+
routingWithConfirms(false);
101+
}
102+
103+
@Test
104+
void routingWithConfirmsListener() throws Exception {
105+
routingWithConfirms(true);
106+
}
107+
108+
private void routingWithConfirms(boolean listener) throws Exception {
109+
CountDownLatch latch = new CountDownLatch(1);
110+
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
111+
rcf.setDefaultTargetConnectionFactory(this.connectionFactoryWithConfirmsEnabled);
112+
this.templateWithConfirmsEnabled.setConnectionFactory(rcf);
113+
if (listener) {
114+
this.templateWithConfirmsEnabled.setConfirmCallback((correlationData, ack, cause) -> {
115+
latch.countDown();
116+
});
117+
}
118+
this.templateWithConfirmsEnabled.setMandatory(true);
119+
CorrelationData corr = new CorrelationData();
120+
this.templateWithConfirmsEnabled.convertAndSend("", ROUTE2, "foo", corr);
121+
assertThat(corr.getFuture().get(10, TimeUnit.SECONDS).isAck()).isTrue();
122+
if (listener) {
123+
assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue();
124+
}
125+
corr = new CorrelationData();
126+
this.templateWithConfirmsEnabled.convertAndSend("", "bad route", "foo", corr);
127+
assertThat(corr.getFuture().get(10, TimeUnit.SECONDS).isAck()).isTrue();
128+
assertThat(corr.getReturnedMessage()).isNotNull();
129+
}
130+
131+
@Test
132+
void routingWithSimpleConfirms() throws Exception {
133+
SimpleRoutingConnectionFactory rcf = new SimpleRoutingConnectionFactory();
134+
rcf.setDefaultTargetConnectionFactory(this.connectionFactoryWithConfirmsEnabled);
135+
this.templateWithConfirmsEnabled.setConnectionFactory(rcf);
136+
assertThat(this.templateWithConfirmsEnabled.<Boolean>invoke(template -> {
137+
template.convertAndSend("", ROUTE2, "foo");
138+
template.waitForConfirmsOrDie(10_000);
139+
return true;
140+
})).isTrue();
141+
}
142+
94143
private void assertMessageCountEquals(long wanted) throws InterruptedException {
95144
long messageCount = determineMessageCount();
96145
int n = 0;

src/reference/asciidoc/amqp.adoc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,9 @@ Doing so enables, for example, listening to queues with the same name but in a d
691691

692692
For example, with lookup key qualifier `thing1` and a container listening to queue `thing2`, the lookup key you could register the target connection factory with could be `thing1[thing2]`.
693693

694+
IMPORTANT: The target (and default, if provided) connection factories must have the same settings for publisher confirms and returns.
695+
See <<cf-pub-conf-ret>>.
696+
694697
[[queue-affinity]]
695698
===== Queue Affinity and the `LocalizedQueueConnectionFactory`
696699

@@ -1196,7 +1199,7 @@ The reason is not populated for broker-generated `nack` instances.
11961199
It is populated for `nack` instances generated by the framework (for example, closing the connection while `ack` instances are outstanding).
11971200

11981201
In addition, when both confirms and returns are enabled, the `CorrelationData` is populated with the returned message, as long as the `CorrelationData` has a unique `id`; this is always the case, by default, starting with version 2.3.
1199-
It is guaranteed that the return message is set before the future is set with the `ack`.
1202+
It is guaranteed that the returned message is set before the future is set with the `ack`.
12001203

12011204
See also <<scoped-operations>> for a simpler mechanism for waiting for publisher confirms.
12021205

0 commit comments

Comments
 (0)