Skip to content

Commit a8c63e2

Browse files
garyrussellartembilan
authored andcommitted
Fix CachedSessionFactory Race
Close the pool so that any sessions returned after the factory is `destroy()`ed are closed. * Call `removeAllIdleItems()` in `close()`. * Close sessions in `SftpStreamingMessageSourceTests`. **cherry-pick to all supported branches**
1 parent 3857157 commit a8c63e2

File tree

6 files changed

+53
-7
lines changed

6 files changed

+53
-7
lines changed

spring-integration-core/src/main/java/org/springframework/integration/util/Pool.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,4 +72,11 @@ public interface Pool<T> {
7272
*/
7373
int getAllocatedCount();
7474

75+
/**
76+
* Close the pool; returned items will be destroyed.
77+
* @since 4.3.23
78+
*/
79+
default void close() {
80+
}
81+
7582
}

spring-integration-core/src/main/java/org/springframework/integration/util/SimplePool.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public class SimplePool<T> implements Pool<T> {
6262

6363
private final PoolItemCallback<T> callback;
6464

65+
private volatile boolean closed;
66+
6567
/**
6668
* Creates a SimplePool with a specific limit.
6769
* @param poolSize The maximum number of items the pool supports.
@@ -161,6 +163,7 @@ public void setWaitTimeout(long waitTimeout) {
161163
*/
162164
@Override
163165
public T getItem() {
166+
Assert.state(!this.closed, "Pool has been closed");
164167
boolean permitted = false;
165168
try {
166169
try {
@@ -218,7 +221,7 @@ public synchronized void releaseItem(T item) {
218221
Assert.isTrue(this.allocated.contains(item),
219222
"You can only release items that were obtained from the pool");
220223
if (this.inUse.contains(item)) {
221-
if (this.poolSize.get() > this.targetPoolSize.get()) {
224+
if (this.poolSize.get() > this.targetPoolSize.get() || this.closed) {
222225
this.poolSize.decrementAndGet();
223226
doRemoveItem(item);
224227
}
@@ -255,6 +258,12 @@ private void doRemoveItem(T item) {
255258
this.callback.removedFromPool(item);
256259
}
257260

261+
@Override
262+
public synchronized void close() {
263+
this.closed = true;
264+
removeAllIdleItems();
265+
}
266+
258267
/**
259268
* User of the pool provide an implementation of this interface; called during
260269
* various pool operations.

spring-integration-core/src/test/java/org/springframework/integration/util/SimplePoolTests.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,8 @@
1818

1919
import static org.assertj.core.api.Assertions.assertThat;
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
21+
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
22+
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
2123
import static org.assertj.core.api.Assertions.fail;
2224

2325
import java.util.ArrayList;
@@ -27,7 +29,7 @@
2729
import java.util.concurrent.Semaphore;
2830
import java.util.concurrent.atomic.AtomicBoolean;
2931

30-
import org.junit.Test;
32+
import org.junit.jupiter.api.Test;
3133

3234
import org.springframework.integration.test.util.TestUtils;
3335

@@ -127,13 +129,13 @@ public void testOverCommitAndResize() {
127129
assertThat(pool.getAllocatedCount()).isEqualTo(2);
128130
}
129131

130-
@Test(expected = IllegalArgumentException.class)
132+
@Test
131133
public void testForeignObject() {
132134
final Set<String> strings = new HashSet<String>();
133135
final AtomicBoolean stale = new AtomicBoolean();
134136
SimplePool<String> pool = stringPool(2, strings, stale);
135137
pool.getItem();
136-
pool.releaseItem("Hello, world!");
138+
assertThatIllegalArgumentException().isThrownBy(() -> pool.releaseItem("Hello, world!"));
137139
}
138140

139141
@Test
@@ -251,27 +253,45 @@ public void testSizeUpdateIfFullyAllocated() {
251253
assertThat(pool.getActiveCount()).isEqualTo(0);
252254
}
253255

256+
@Test
257+
void testClose() {
258+
SimplePool<String> pool = stringPool(10, new HashSet<>(), new AtomicBoolean());
259+
String item1 = pool.getItem();
260+
String item2 = pool.getItem();
261+
pool.releaseItem(item2);
262+
assertThat(pool.getAllocatedCount()).isEqualTo(2);
263+
pool.close();
264+
pool.releaseItem(item1);
265+
assertThat(pool.getAllocatedCount()).isEqualTo(0);
266+
assertThatIllegalStateException().isThrownBy(pool::getItem);
267+
}
268+
254269
private SimplePool<String> stringPool(int size, final Set<String> strings,
255270
final AtomicBoolean stale) {
271+
256272
SimplePool<String> pool = new SimplePool<String>(size, new SimplePool.PoolItemCallback<String>() {
257273
private int i;
274+
258275
@Override
259276
public String createForPool() {
260277
String string = "String" + i++;
261278
strings.add(string);
262279
return string;
263280
}
281+
264282
@Override
265283
public boolean isStale(String item) {
266284
if (stale.get()) {
267285
strings.remove(item);
268286
}
269287
return stale.get();
270288
}
289+
271290
@Override
272291
public void removedFromPool(String item) {
273292
strings.remove(item);
274293
}
294+
275295
});
276296
return pool;
277297
}

spring-integration-file/src/main/java/org/springframework/integration/file/remote/session/CachingSessionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public Session<F> getSession() {
137137
*/
138138
@Override
139139
public void destroy() {
140-
this.pool.removeAllIdleItems();
140+
this.pool.close();
141141
}
142142

143143
/**

spring-integration-ip/src/main/java/org/springframework/integration/ip/tcp/connection/CachingClientConnectionFactory.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.concurrent.Executor;
2222
import java.util.concurrent.atomic.AtomicBoolean;
2323

24+
import org.springframework.beans.factory.DisposableBean;
2425
import org.springframework.core.serializer.Deserializer;
2526
import org.springframework.core.serializer.Serializer;
2627
import org.springframework.integration.ip.IpHeaders;
@@ -41,7 +42,7 @@
4142
* @since 2.2
4243
*
4344
*/
44-
public class CachingClientConnectionFactory extends AbstractClientConnectionFactory {
45+
public class CachingClientConnectionFactory extends AbstractClientConnectionFactory implements DisposableBean {
4546

4647
private final AbstractClientConnectionFactory targetConnectionFactory;
4748

@@ -389,6 +390,11 @@ public synchronized void stop() {
389390
this.pool.removeAllIdleItems();
390391
}
391392

393+
@Override
394+
public void destroy() throws Exception {
395+
this.pool.close();
396+
}
397+
392398
private final class CachedConnection extends TcpConnectionInterceptorSupport {
393399

394400
private final AtomicBoolean released = new AtomicBoolean();

spring-integration-sftp/src/test/java/org/springframework/integration/sftp/inbound/SftpStreamingMessageSourceTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.springframework.context.ApplicationContext;
3131
import org.springframework.context.annotation.Bean;
3232
import org.springframework.context.annotation.Configuration;
33+
import org.springframework.integration.StaticMessageHeaderAccessor;
3334
import org.springframework.integration.annotation.InboundChannelAdapter;
3435
import org.springframework.integration.annotation.Transformer;
3536
import org.springframework.integration.channel.QueueChannel;
@@ -134,6 +135,7 @@ public void testMaxFetch() throws Exception {
134135
.isIn(" sftpSource1.txt", "sftpSource2.txt");
135136

136137
received.getPayload().close();
138+
StaticMessageHeaderAccessor.getCloseableResource(received).close();
137139
}
138140

139141
@Test
@@ -148,6 +150,7 @@ public void testMaxFetchNoFilter() throws Exception {
148150
.isIn(" sftpSource1.txt", "sftpSource2.txt");
149151

150152
received.getPayload().close();
153+
StaticMessageHeaderAccessor.getCloseableResource(received).close();
151154
}
152155

153156
@Test
@@ -162,6 +165,7 @@ public void testMaxFetchLambdaFilter() throws Exception {
162165
.isIn(" sftpSource1.txt", "sftpSource2.txt");
163166

164167
received.getPayload().close();
168+
StaticMessageHeaderAccessor.getCloseableResource(received).close();
165169
}
166170

167171
private SftpStreamingMessageSource buildSource() {

0 commit comments

Comments
 (0)