Skip to content

Commit 404d53b

Browse files
garyrussellartembilan
authored andcommitted
GH-1194: Fix cache limit with Pub Confirms channel
Resolves #1194 Deferred channel closes were unconditionally returned to the cache. Move the cache size check etc to `doReturnToCache`. **cherry-pick to 2.2.x, 2.1.x** # Conflicts: # spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java
1 parent 6f517a3 commit 404d53b

File tree

2 files changed

+97
-11
lines changed

2 files changed

+97
-11
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2018 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -1087,13 +1087,9 @@ else if (methodName.equals("toString")) {
10871087
else if (methodName.equals("close")) {
10881088
// Handle close method: don't pass the call on.
10891089
if (CachingConnectionFactory.this.active) {
1090-
synchronized (this.channelList) {
1091-
if (CachingConnectionFactory.this.active && !RabbitUtils.isPhysicalCloseRequired() &&
1092-
(this.channelList.size() < getChannelCacheSize()
1093-
|| this.channelList.contains(proxy))) {
1094-
logicalClose((ChannelProxy) proxy);
1095-
return null;
1096-
}
1090+
if (!RabbitUtils.isPhysicalCloseRequired()) {
1091+
logicalClose((ChannelProxy) proxy);
1092+
return null;
10971093
}
10981094
}
10991095

@@ -1241,7 +1237,18 @@ private void doReturnToCache(Channel proxy) {
12411237
synchronized (this.channelList) {
12421238
// Allow for multiple close calls...
12431239
if (CachingConnectionFactory.this.active) {
1244-
if (!this.channelList.contains(proxy)) {
1240+
boolean alreadyCached = this.channelList.contains(proxy);
1241+
if (this.channelList.size() >= getChannelCacheSize() && !alreadyCached) {
1242+
if (logger.isTraceEnabled()) {
1243+
logger.trace("Cache limit reached: " + this.target);
1244+
}
1245+
try {
1246+
physicalClose(proxy);
1247+
}
1248+
catch (@SuppressWarnings(UNUSED) Exception e) {
1249+
}
1250+
}
1251+
else if (!alreadyCached) {
12451252
if (logger.isTraceEnabled()) {
12461253
logger.trace("Returning cached Channel: " + this.target);
12471254
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/PublisherCallbackChannelTests.java

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,12 +22,22 @@
2222
import static org.mockito.Mockito.mock;
2323

2424
import java.io.IOException;
25+
import java.util.Properties;
26+
import java.util.concurrent.CountDownLatch;
2527
import java.util.concurrent.ExecutorService;
28+
import java.util.concurrent.TimeUnit;
2629
import java.util.concurrent.TimeoutException;
2730
import java.util.concurrent.atomic.AtomicBoolean;
31+
import java.util.stream.IntStream;
2832

2933
import org.junit.jupiter.api.Test;
3034

35+
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType;
36+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
37+
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
38+
import org.springframework.amqp.rabbit.junit.RabbitAvailableCondition;
39+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
40+
3141
import com.rabbitmq.client.Channel;
3242
import com.rabbitmq.client.Method;
3343
import com.rabbitmq.client.ShutdownListener;
@@ -38,10 +48,11 @@
3848
* @since 2.1.5
3949
*
4050
*/
51+
@RabbitAvailable
4152
public class PublisherCallbackChannelTests {
4253

4354
@Test
44-
public void shutdownWhileCreate() throws IOException, TimeoutException {
55+
void shutdownWhileCreate() throws IOException, TimeoutException {
4556
Channel delegate = mock(Channel.class);
4657
AtomicBoolean npe = new AtomicBoolean();
4758
willAnswer(inv -> {
@@ -59,4 +70,72 @@ public void shutdownWhileCreate() throws IOException, TimeoutException {
5970
channel.close();
6071
}
6172

73+
@Test
74+
void testNotCached() throws Exception {
75+
CachingConnectionFactory cf = new CachingConnectionFactory(
76+
RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
77+
cf.setPublisherConfirmType(ConfirmType.CORRELATED);
78+
cf.setChannelCacheSize(2);
79+
cf.afterPropertiesSet();
80+
RabbitTemplate template = new RabbitTemplate(cf);
81+
CountDownLatch confirmLatch = new CountDownLatch(2);
82+
template.setConfirmCallback((correlationData, ack, cause) -> {
83+
try {
84+
Thread.sleep(50);
85+
confirmLatch.countDown();
86+
}
87+
catch (InterruptedException e) {
88+
Thread.currentThread().interrupt();
89+
}
90+
});
91+
CountDownLatch openedLatch = new CountDownLatch(2);
92+
CountDownLatch closeLatch = new CountDownLatch(1);
93+
CountDownLatch closedLatch = new CountDownLatch(2);
94+
CountDownLatch waitForOtherLatch = new CountDownLatch(1);
95+
SimpleAsyncTaskExecutor exec = new SimpleAsyncTaskExecutor();
96+
IntStream.range(0, 2).forEach(i -> {
97+
// this will open 3 or 4 channels
98+
exec.execute(() -> {
99+
template.execute(chann -> {
100+
openedLatch.countDown();
101+
template.convertAndSend("", "foo", "msg", msg -> {
102+
if (i == 0) {
103+
try {
104+
waitForOtherLatch.await();
105+
}
106+
catch (InterruptedException e) {
107+
Thread.currentThread().interrupt();
108+
}
109+
}
110+
else {
111+
waitForOtherLatch.countDown();
112+
}
113+
return msg;
114+
}, new CorrelationData("" + i));
115+
closeLatch.await();
116+
return null;
117+
});
118+
closedLatch.countDown();
119+
});
120+
});
121+
assertThat(openedLatch.await(10, TimeUnit.SECONDS)).isTrue();
122+
Connection conn = cf.createConnection();
123+
Channel chan1 = conn.createChannel(false);
124+
Channel chan2 = conn.createChannel(false);
125+
chan1.close();
126+
chan2.close();
127+
Properties cacheProperties = cf.getCacheProperties();
128+
assertThat(cacheProperties.getProperty("idleChannelsNotTx")).isEqualTo("2");
129+
closeLatch.countDown();
130+
assertThat(confirmLatch.await(10, TimeUnit.SECONDS)).isTrue();
131+
assertThat(closedLatch.await(10, TimeUnit.SECONDS)).isTrue();
132+
cacheProperties = cf.getCacheProperties();
133+
int n = 0;
134+
while (n++ < 100 && Integer.parseInt(cacheProperties.getProperty("idleChannelsNotTx")) < 2) {
135+
Thread.sleep(100);
136+
cacheProperties = cf.getCacheProperties();
137+
}
138+
assertThat(cacheProperties.getProperty("idleChannelsNotTx")).isEqualTo("2");
139+
}
140+
62141
}

0 commit comments

Comments
 (0)