Skip to content

Enable idle connection reaper for Apache client #754

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changes/next-release/feature-ApacheHTTPClient-fa9710f.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"category": "Apache HTTP Client",
"type": "feature",
"description": "Add support for idle connection reaping."
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import software.amazon.awssdk.http.apache.internal.DefaultConfiguration;
import software.amazon.awssdk.http.apache.internal.SdkProxyRoutePlanner;
import software.amazon.awssdk.http.apache.internal.conn.ClientConnectionManagerFactory;
import software.amazon.awssdk.http.apache.internal.conn.IdleConnectionReaper;
import software.amazon.awssdk.http.apache.internal.conn.SdkConnectionKeepAliveStrategy;
import software.amazon.awssdk.http.apache.internal.conn.SdkTlsSocketFactory;
import software.amazon.awssdk.http.apache.internal.impl.ApacheHttpRequestFactory;
Expand Down Expand Up @@ -129,10 +130,10 @@ private ConnectionManagerAwareHttpClient createClient(ApacheHttpClient.DefaultBu

addProxyConfig(builder, configuration.proxyConfiguration);

// TODO idle connection reaper
// if (.useReaper()) {
// IdleConnectionReaper.registerConnectionManager(cm, settings.getMaxIdleConnectionTime());
// }
if (useIdleConnectionReaper(configuration)) {
IdleConnectionReaper.getInstance().registerConnectionManager(
cm, connectionMaxIdleTime(configuration).toMillis());
}

return new software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient(builder.build(), cm);
}
Expand All @@ -155,12 +156,19 @@ private void addProxyConfig(HttpClientBuilder builder,
}

private ConnectionKeepAliveStrategy buildKeepAliveStrategy(ApacheHttpClient.DefaultBuilder configuration) {
final long maxIdle = Optional.ofNullable(configuration.connectionMaxIdleTime)
.orElse(DefaultConfiguration.MAX_IDLE_CONNECTION_TIME)
.toMillis();
final long maxIdle = connectionMaxIdleTime(configuration).toMillis();
return maxIdle > 0 ? new SdkConnectionKeepAliveStrategy(maxIdle) : null;
}

private Duration connectionMaxIdleTime(DefaultBuilder configuration) {
return Optional.ofNullable(configuration.connectionMaxIdleTime)
.orElse(DefaultConfiguration.MAX_IDLE_CONNECTION_TIME);
}

private boolean useIdleConnectionReaper(DefaultBuilder configuration) {
return Boolean.TRUE.equals(configuration.useIdleConnectionReaper);
}

private boolean isAuthenticatedProxy(ProxyConfiguration proxyConfiguration) {
return proxyConfiguration.username() != null && proxyConfiguration.password() != null;
}
Expand Down Expand Up @@ -188,7 +196,9 @@ public void abort() {

@Override
public void close() {
httpClient.getHttpClientConnectionManager().shutdown();
HttpClientConnectionManager cm = httpClient.getHttpClientConnectionManager();
IdleConnectionReaper.getInstance().deregisterConnectionManager(cm);
cm.shutdown();
}

private SdkHttpFullResponse execute(HttpRequestBase apacheRequest) throws IOException {
Expand Down Expand Up @@ -307,6 +317,14 @@ public interface Builder extends SdkHttpClient.Builder<ApacheHttpClient.Builder>
* Configure the maximum amount of time that a connection should be allowed to remain open while idle.
*/
Builder connectionMaxIdleTime(Duration maxIdleConnectionTimeout);

/**
* Configure whether the idle connections in the connection pool should be closed asynchronously.
* <p>
* When enabled, connections left idling for longer than {@link #connectionMaxIdleTime(Duration)} will be
* closed. If no value is set, the default value of {@link DefaultConfiguration#MAX_IDLE_CONNECTION_TIME} is used.
*/
Builder useIdleConnectionReaper(Boolean useConnectionReaper);
}

private static final class DefaultBuilder implements Builder {
Expand All @@ -316,6 +334,7 @@ private static final class DefaultBuilder implements Builder {
private Boolean expectContinueEnabled;
private Duration connectionTimeToLive;
private Duration connectionMaxIdleTime;
private Boolean useIdleConnectionReaper;

private DefaultBuilder() {
}
Expand Down Expand Up @@ -416,6 +435,16 @@ public void setConnectionMaxIdleTime(Duration connectionMaxIdleTime) {
connectionMaxIdleTime(connectionMaxIdleTime);
}

@Override
public Builder useIdleConnectionReaper(Boolean useIdleConnectionReaper) {
this.useIdleConnectionReaper = useIdleConnectionReaper;
return this;
}

public void setUseIdleConnectionReaper(Boolean useIdleConnectionReaper) {
useIdleConnectionReaper(useIdleConnectionReaper);
}

@Override
public SdkHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
AttributeMap resolvedOptions = standardOptions.build().merge(serviceDefaults).merge(GLOBAL_HTTP_DEFAULTS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,181 +15,156 @@

package software.amazon.awssdk.http.apache.internal.conn;

import java.util.ArrayList;
import java.util.List;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.http.conn.HttpClientConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;

/**
* Daemon thread to periodically check connection pools for idle connections.
* <p>
* Connections sitting around idle in the HTTP connection pool for too long will
* eventually be terminated by the AWS end of the connection, and will go into
* CLOSE_WAIT. If this happens, sockets will sit around in CLOSE_WAIT, still
* using resources on the client side to manage that socket. Many sockets stuck
* in CLOSE_WAIT can prevent the OS from creating new connections.
* <p>
* This class closes idle connections before they can move into the CLOSE_WAIT
* state.
* <p>
* This thread is important because by default, we disable Apache HttpClient's
* stale connection checking, so without this thread running in the background,
* cleaning up old/inactive HTTP connections, we'd see more IO exceptions when
* stale connections (i.e. closed on the AWS side) are left in the connection
* pool, and requests grab one of them to begin executing a request.
* Manages the reaping of idle connections.
*/
@SdkInternalApi
public final class IdleConnectionReaper extends Thread {

/**
* Shared log for any errors during connection reaping.
*/
public final class IdleConnectionReaper {
private static final Logger log = LoggerFactory.getLogger(IdleConnectionReaper.class);
/**
* The period between invocations of the idle connection reaper.
*/
private static final int PERIOD_MILLISECONDS = 1000 * 60;

/**
* Legacy constant used when {@link #registerConnectionManager(HttpClientConnectionManager)} is called. New code paths should
* use {@link #registerConnectionManager(HttpClientConnectionManager, long)} and provide the max idle timeout for that
* particular connection manager.
*/
@Deprecated
private static final int DEFAULT_MAX_IDLE_MILLIS = 1000 * 60;
private static final IdleConnectionReaper INSTANCE = new IdleConnectionReaper();

private static final Map<HttpClientConnectionManager, Long> CONNECTION_MANAGERS = new ConcurrentHashMap<>();
/**
* Singleton instance of the connection reaper.
*/
private static volatile IdleConnectionReaper instance;
/**
* Set to true when shutting down the reaper; Once set to true, this
* flag is never set back to false.
*/
private volatile boolean shuttingDown;
private final Map<HttpClientConnectionManager, Long> connectionManagers;

private final Supplier<ExecutorService> executorServiceSupplier;

private final long sleepPeriod;

private volatile ExecutorService exec;

private volatile ReaperTask reaperTask;

/**
* Private constructor - singleton pattern.
*/
private IdleConnectionReaper() {
super("java-sdk-http-connection-reaper");
setDaemon(true);
this.connectionManagers = new ConcurrentHashMap<>();

this.executorServiceSupplier = () -> {
ExecutorService e = Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "idle-connection-reaper");
t.setDaemon(true);
return t;
});
return e;
};

this.sleepPeriod = Duration.ofMinutes(1).toMillis();
}

/**
* Registers the given connection manager with this reaper.
*
* @return true if the connection manager has been successfully registered; false otherwise.
* @deprecated By {@link #registerConnectionManager(HttpClientConnectionManager, long)}.
*/
@Deprecated
public static boolean registerConnectionManager(HttpClientConnectionManager connectionManager) {
return registerConnectionManager(connectionManager, DEFAULT_MAX_IDLE_MILLIS);
@SdkTestInternalApi
IdleConnectionReaper(Map<HttpClientConnectionManager, Long> connectionManagers,
Supplier<ExecutorService> executorServiceSupplier,
long sleepPeriod) {

this.connectionManagers = connectionManagers;
this.executorServiceSupplier = executorServiceSupplier;
this.sleepPeriod = sleepPeriod;
}

/**
* Registers the given connection manager with this reaper;
* Register the connection manager with this reaper.
*
* @param connectionManager Connection manager to register
* @param maxIdleInMs Max idle connection timeout in milliseconds for this connection manager.
* @return true if the connection manager has been successfully registered; false otherwise.
* @param manager The connection manager.
* @param maxIdleTime The maximum time connections in the connection manager are to remain idle before being reaped.
* @return {@code true} If the connection manager was not previously registered with this reaper, {@code false}
* otherwise.
*/
public static boolean registerConnectionManager(HttpClientConnectionManager connectionManager, long maxIdleInMs) {
if (instance == null) {
synchronized (IdleConnectionReaper.class) {
if (instance == null) {
IdleConnectionReaper newInstance = new IdleConnectionReaper();
newInstance.start();
instance = newInstance;
}
}
}
return CONNECTION_MANAGERS.put(connectionManager, maxIdleInMs) == null;
public synchronized boolean registerConnectionManager(HttpClientConnectionManager manager, long maxIdleTime) {
boolean notPreviouslyRegistered = connectionManagers.put(manager, maxIdleTime) == null;
setupExecutorIfNecessary();
return notPreviouslyRegistered;
}

/**
* Removes the given connection manager from this reaper,
* and shutting down the reaper if there is zero connection manager left.
* Deregister this connection manager with this reaper.
*
* @return true if the connection manager has been successfully removed, false otherwise.
* @param manager The connection manager.
* @return {@code true} If this connection manager was previously registered with this reaper and it was removed, {@code
* false} otherwise.
*/
public static boolean removeConnectionManager(HttpClientConnectionManager connectionManager) {
boolean wasRemoved = CONNECTION_MANAGERS.remove(connectionManager) != null;
if (CONNECTION_MANAGERS.isEmpty()) {
shutdown();
}
public synchronized boolean deregisterConnectionManager(HttpClientConnectionManager manager) {
boolean wasRemoved = connectionManagers.remove(manager) != null;
cleanupExecutorIfNecessary();
return wasRemoved;
}

public static List<HttpClientConnectionManager> getRegisteredConnectionManagers() {
return new ArrayList<HttpClientConnectionManager>(CONNECTION_MANAGERS.keySet());
}

/**
* Shuts down the thread, allowing the class and instance to be collected.
* <p>
* Since this is a daemon thread, its running will not prevent JVM shutdown.
* It will, however, prevent this class from being unloaded or garbage
* collected, in the context of a long-running application, until it is
* interrupted. This method will stop the thread's execution and clear its
* state. Any use of a service client will cause the thread to be restarted.
*
* @return true if an actual shutdown has been made; false otherwise.
* @return The singleton instance of this class.
*/
public static synchronized boolean shutdown() {
if (instance != null) {
instance.markShuttingDown();
instance.interrupt();
CONNECTION_MANAGERS.clear();
instance = null;
return true;
}
return false;
public static IdleConnectionReaper getInstance() {
return INSTANCE;
}

/**
* For testing purposes.
* Returns the number of connection managers currently monitored by this
* reaper.
*/
static int size() {
return CONNECTION_MANAGERS.size();
private void setupExecutorIfNecessary() {
if (exec != null) {
return;
}

ExecutorService e = executorServiceSupplier.get();

this.reaperTask = new ReaperTask(connectionManagers, sleepPeriod);

e.execute(this.reaperTask);

exec = e;
}

private void markShuttingDown() {
shuttingDown = true;
private void cleanupExecutorIfNecessary() {
if (exec == null || !connectionManagers.isEmpty()) {
return;
}

reaperTask.stop();
reaperTask = null;
exec.shutdownNow();
exec = null;
}

@SuppressWarnings("unchecked")
@Override
public void run() {
while (true) {
if (shuttingDown) {
log.debug("Shutting down reaper thread.");
return;
}
try {
Thread.sleep(PERIOD_MILLISECONDS);

for (Map.Entry<HttpClientConnectionManager, Long> entry : CONNECTION_MANAGERS.entrySet()) {
// When we release connections, the connection manager leaves them
// open so they can be reused. We want to close out any idle
// connections so that they don't sit around in CLOSE_WAIT.
try {
entry.getKey().closeIdleConnections(entry.getValue(), TimeUnit.MILLISECONDS);
} catch (Exception t) {
log.warn("Unable to close idle connections", t);
private static final class ReaperTask implements Runnable {
private final Map<HttpClientConnectionManager, Long> connectionManagers;
private final long sleepPeriod;

private volatile boolean stopping = false;

private ReaperTask(Map<HttpClientConnectionManager, Long> connectionManagers,
long sleepPeriod) {
this.connectionManagers = connectionManagers;
this.sleepPeriod = sleepPeriod;
}

@Override
public void run() {
while (!stopping) {
try {
Thread.sleep(sleepPeriod);

for (Map.Entry<HttpClientConnectionManager, Long> entry : connectionManagers.entrySet()) {
try {
entry.getKey().closeIdleConnections(entry.getValue(), TimeUnit.MILLISECONDS);
} catch (Exception t) {
log.warn("Unable to close idle connections", t);
}
}
} catch (Throwable t) {
log.debug("Reaper thread: ", t);
}
} catch (Throwable t) {
log.debug("Reaper thread: ", t);
}
log.debug("Shutting down reaper thread.");
}

private void stop() {
stopping = true;
}
}
}
Loading