|
15 | 15 |
|
16 | 16 | package software.amazon.awssdk.http.apache.internal.conn;
|
17 | 17 |
|
18 |
| -import java.util.ArrayList; |
19 |
| -import java.util.List; |
| 18 | +import java.time.Duration; |
20 | 19 | import java.util.Map;
|
21 | 20 | import java.util.concurrent.ConcurrentHashMap;
|
| 21 | +import java.util.concurrent.ExecutorService; |
| 22 | +import java.util.concurrent.Executors; |
22 | 23 | import java.util.concurrent.TimeUnit;
|
| 24 | +import java.util.function.Supplier; |
23 | 25 | import org.apache.http.conn.HttpClientConnectionManager;
|
24 | 26 | import org.slf4j.Logger;
|
25 | 27 | import org.slf4j.LoggerFactory;
|
26 | 28 | import software.amazon.awssdk.annotations.SdkInternalApi;
|
| 29 | +import software.amazon.awssdk.annotations.SdkTestInternalApi; |
27 | 30 |
|
28 | 31 | /**
|
29 |
| - * Daemon thread to periodically check connection pools for idle connections. |
30 |
| - * <p> |
31 |
| - * Connections sitting around idle in the HTTP connection pool for too long will |
32 |
| - * eventually be terminated by the AWS end of the connection, and will go into |
33 |
| - * CLOSE_WAIT. If this happens, sockets will sit around in CLOSE_WAIT, still |
34 |
| - * using resources on the client side to manage that socket. Many sockets stuck |
35 |
| - * in CLOSE_WAIT can prevent the OS from creating new connections. |
36 |
| - * <p> |
37 |
| - * This class closes idle connections before they can move into the CLOSE_WAIT |
38 |
| - * state. |
39 |
| - * <p> |
40 |
| - * This thread is important because by default, we disable Apache HttpClient's |
41 |
| - * stale connection checking, so without this thread running in the background, |
42 |
| - * cleaning up old/inactive HTTP connections, we'd see more IO exceptions when |
43 |
| - * stale connections (i.e. closed on the AWS side) are left in the connection |
44 |
| - * pool, and requests grab one of them to begin executing a request. |
| 32 | + * Manages the reaping of idle connections. |
45 | 33 | */
|
46 | 34 | @SdkInternalApi
|
47 |
| -public final class IdleConnectionReaper extends Thread { |
48 |
| - |
49 |
| - /** |
50 |
| - * Shared log for any errors during connection reaping. |
51 |
| - */ |
| 35 | +public final class IdleConnectionReaper { |
52 | 36 | private static final Logger log = LoggerFactory.getLogger(IdleConnectionReaper.class);
|
53 |
| - /** |
54 |
| - * The period between invocations of the idle connection reaper. |
55 |
| - */ |
56 |
| - private static final int PERIOD_MILLISECONDS = 1000 * 60; |
57 | 37 |
|
58 |
| - /** |
59 |
| - * Legacy constant used when {@link #registerConnectionManager(HttpClientConnectionManager)} is called. New code paths should |
60 |
| - * use {@link #registerConnectionManager(HttpClientConnectionManager, long)} and provide the max idle timeout for that |
61 |
| - * particular connection manager. |
62 |
| - */ |
63 |
| - @Deprecated |
64 |
| - private static final int DEFAULT_MAX_IDLE_MILLIS = 1000 * 60; |
| 38 | + private static final IdleConnectionReaper INSTANCE = new IdleConnectionReaper(); |
65 | 39 |
|
66 |
| - private static final Map<HttpClientConnectionManager, Long> CONNECTION_MANAGERS = new ConcurrentHashMap<>(); |
67 |
| - /** |
68 |
| - * Singleton instance of the connection reaper. |
69 |
| - */ |
70 |
| - private static volatile IdleConnectionReaper instance; |
71 |
| - /** |
72 |
| - * Set to true when shutting down the reaper; Once set to true, this |
73 |
| - * flag is never set back to false. |
74 |
| - */ |
75 |
| - private volatile boolean shuttingDown; |
| 40 | + private final Map<HttpClientConnectionManager, Long> connectionManagers; |
| 41 | + |
| 42 | + private final Supplier<ExecutorService> executorServiceSupplier; |
| 43 | + |
| 44 | + private final long sleepPeriod; |
| 45 | + |
| 46 | + private volatile ExecutorService exec; |
| 47 | + |
| 48 | + private volatile ReaperTask reaperTask; |
76 | 49 |
|
77 |
| - /** |
78 |
| - * Private constructor - singleton pattern. |
79 |
| - */ |
80 | 50 | private IdleConnectionReaper() {
|
81 |
| - super("java-sdk-http-connection-reaper"); |
82 |
| - setDaemon(true); |
| 51 | + this.connectionManagers = new ConcurrentHashMap<>(); |
| 52 | + |
| 53 | + this.executorServiceSupplier = () -> { |
| 54 | + ExecutorService e = Executors.newSingleThreadExecutor(r -> { |
| 55 | + Thread t = new Thread(r, "idle-connection-reaper"); |
| 56 | + t.setDaemon(true); |
| 57 | + return t; |
| 58 | + }); |
| 59 | + return e; |
| 60 | + }; |
| 61 | + |
| 62 | + this.sleepPeriod = Duration.ofMinutes(1).toMillis(); |
83 | 63 | }
|
84 | 64 |
|
85 |
| - /** |
86 |
| - * Registers the given connection manager with this reaper. |
87 |
| - * |
88 |
| - * @return true if the connection manager has been successfully registered; false otherwise. |
89 |
| - * @deprecated By {@link #registerConnectionManager(HttpClientConnectionManager, long)}. |
90 |
| - */ |
91 |
| - @Deprecated |
92 |
| - public static boolean registerConnectionManager(HttpClientConnectionManager connectionManager) { |
93 |
| - return registerConnectionManager(connectionManager, DEFAULT_MAX_IDLE_MILLIS); |
| 65 | + @SdkTestInternalApi |
| 66 | + IdleConnectionReaper(Map<HttpClientConnectionManager, Long> connectionManagers, |
| 67 | + Supplier<ExecutorService> executorServiceSupplier, |
| 68 | + long sleepPeriod) { |
| 69 | + |
| 70 | + this.connectionManagers = connectionManagers; |
| 71 | + this.executorServiceSupplier = executorServiceSupplier; |
| 72 | + this.sleepPeriod = sleepPeriod; |
94 | 73 | }
|
95 | 74 |
|
96 | 75 | /**
|
97 |
| - * Registers the given connection manager with this reaper; |
| 76 | + * Register the connection manager with this reaper. |
98 | 77 | *
|
99 |
| - * @param connectionManager Connection manager to register |
100 |
| - * @param maxIdleInMs Max idle connection timeout in milliseconds for this connection manager. |
101 |
| - * @return true if the connection manager has been successfully registered; false otherwise. |
| 78 | + * @param manager The connection manager. |
| 79 | + * @param maxIdleTime The maximum time connections in the connection manager are to remain idle before being reaped. |
| 80 | + * @return {@code true} If the connection manager was not previously registered with this reaper, {@code false} |
| 81 | + * otherwise. |
102 | 82 | */
|
103 |
| - public static boolean registerConnectionManager(HttpClientConnectionManager connectionManager, long maxIdleInMs) { |
104 |
| - if (instance == null) { |
105 |
| - synchronized (IdleConnectionReaper.class) { |
106 |
| - if (instance == null) { |
107 |
| - IdleConnectionReaper newInstance = new IdleConnectionReaper(); |
108 |
| - newInstance.start(); |
109 |
| - instance = newInstance; |
110 |
| - } |
111 |
| - } |
112 |
| - } |
113 |
| - return CONNECTION_MANAGERS.put(connectionManager, maxIdleInMs) == null; |
| 83 | + public synchronized boolean registerConnectionManager(HttpClientConnectionManager manager, long maxIdleTime) { |
| 84 | + boolean notPreviouslyRegistered = connectionManagers.put(manager, maxIdleTime) == null; |
| 85 | + setupExecutorIfNecessary(); |
| 86 | + return notPreviouslyRegistered; |
114 | 87 | }
|
115 | 88 |
|
116 | 89 | /**
|
117 |
| - * Removes the given connection manager from this reaper, |
118 |
| - * and shutting down the reaper if there is zero connection manager left. |
| 90 | + * Deregister this connection manager with this reaper. |
119 | 91 | *
|
120 |
| - * @return true if the connection manager has been successfully removed, false otherwise. |
| 92 | + * @param manager The connection manager. |
| 93 | + * @return {@code true} If this connection manager was previously registered with this reaper and it was removed, {@code |
| 94 | + * false} otherwise. |
121 | 95 | */
|
122 |
| - public static boolean removeConnectionManager(HttpClientConnectionManager connectionManager) { |
123 |
| - boolean wasRemoved = CONNECTION_MANAGERS.remove(connectionManager) != null; |
124 |
| - if (CONNECTION_MANAGERS.isEmpty()) { |
125 |
| - shutdown(); |
126 |
| - } |
| 96 | + public synchronized boolean deregisterConnectionManager(HttpClientConnectionManager manager) { |
| 97 | + boolean wasRemoved = connectionManagers.remove(manager) != null; |
| 98 | + cleanupExecutorIfNecessary(); |
127 | 99 | return wasRemoved;
|
128 | 100 | }
|
129 | 101 |
|
130 |
| - public static List<HttpClientConnectionManager> getRegisteredConnectionManagers() { |
131 |
| - return new ArrayList<HttpClientConnectionManager>(CONNECTION_MANAGERS.keySet()); |
132 |
| - } |
133 |
| - |
134 | 102 | /**
|
135 |
| - * Shuts down the thread, allowing the class and instance to be collected. |
136 |
| - * <p> |
137 |
| - * Since this is a daemon thread, its running will not prevent JVM shutdown. |
138 |
| - * It will, however, prevent this class from being unloaded or garbage |
139 |
| - * collected, in the context of a long-running application, until it is |
140 |
| - * interrupted. This method will stop the thread's execution and clear its |
141 |
| - * state. Any use of a service client will cause the thread to be restarted. |
142 |
| - * |
143 |
| - * @return true if an actual shutdown has been made; false otherwise. |
| 103 | + * @return The singleton instance of this class. |
144 | 104 | */
|
145 |
| - public static synchronized boolean shutdown() { |
146 |
| - if (instance != null) { |
147 |
| - instance.markShuttingDown(); |
148 |
| - instance.interrupt(); |
149 |
| - CONNECTION_MANAGERS.clear(); |
150 |
| - instance = null; |
151 |
| - return true; |
152 |
| - } |
153 |
| - return false; |
| 105 | + public static IdleConnectionReaper getInstance() { |
| 106 | + return INSTANCE; |
154 | 107 | }
|
155 | 108 |
|
156 |
| - /** |
157 |
| - * For testing purposes. |
158 |
| - * Returns the number of connection managers currently monitored by this |
159 |
| - * reaper. |
160 |
| - */ |
161 |
| - static int size() { |
162 |
| - return CONNECTION_MANAGERS.size(); |
| 109 | + private void setupExecutorIfNecessary() { |
| 110 | + if (exec != null) { |
| 111 | + return; |
| 112 | + } |
| 113 | + |
| 114 | + ExecutorService e = executorServiceSupplier.get(); |
| 115 | + |
| 116 | + this.reaperTask = new ReaperTask(connectionManagers, sleepPeriod); |
| 117 | + |
| 118 | + e.execute(this.reaperTask); |
| 119 | + |
| 120 | + exec = e; |
163 | 121 | }
|
164 | 122 |
|
165 |
| - private void markShuttingDown() { |
166 |
| - shuttingDown = true; |
| 123 | + private void cleanupExecutorIfNecessary() { |
| 124 | + if (exec == null || !connectionManagers.isEmpty()) { |
| 125 | + return; |
| 126 | + } |
| 127 | + |
| 128 | + reaperTask.stop(); |
| 129 | + reaperTask = null; |
| 130 | + exec.shutdownNow(); |
| 131 | + exec = null; |
167 | 132 | }
|
168 | 133 |
|
169 |
| - @SuppressWarnings("unchecked") |
170 |
| - @Override |
171 |
| - public void run() { |
172 |
| - while (true) { |
173 |
| - if (shuttingDown) { |
174 |
| - log.debug("Shutting down reaper thread."); |
175 |
| - return; |
176 |
| - } |
177 |
| - try { |
178 |
| - Thread.sleep(PERIOD_MILLISECONDS); |
179 |
| - |
180 |
| - for (Map.Entry<HttpClientConnectionManager, Long> entry : CONNECTION_MANAGERS.entrySet()) { |
181 |
| - // When we release connections, the connection manager leaves them |
182 |
| - // open so they can be reused. We want to close out any idle |
183 |
| - // connections so that they don't sit around in CLOSE_WAIT. |
184 |
| - try { |
185 |
| - entry.getKey().closeIdleConnections(entry.getValue(), TimeUnit.MILLISECONDS); |
186 |
| - } catch (Exception t) { |
187 |
| - log.warn("Unable to close idle connections", t); |
| 134 | + private static final class ReaperTask implements Runnable { |
| 135 | + private final Map<HttpClientConnectionManager, Long> connectionManagers; |
| 136 | + private final long sleepPeriod; |
| 137 | + |
| 138 | + private volatile boolean stopping = false; |
| 139 | + |
| 140 | + private ReaperTask(Map<HttpClientConnectionManager, Long> connectionManagers, |
| 141 | + long sleepPeriod) { |
| 142 | + this.connectionManagers = connectionManagers; |
| 143 | + this.sleepPeriod = sleepPeriod; |
| 144 | + } |
| 145 | + |
| 146 | + @Override |
| 147 | + public void run() { |
| 148 | + while (!stopping) { |
| 149 | + try { |
| 150 | + Thread.sleep(sleepPeriod); |
| 151 | + |
| 152 | + for (Map.Entry<HttpClientConnectionManager, Long> entry : connectionManagers.entrySet()) { |
| 153 | + try { |
| 154 | + entry.getKey().closeIdleConnections(entry.getValue(), TimeUnit.MILLISECONDS); |
| 155 | + } catch (Exception t) { |
| 156 | + log.warn("Unable to close idle connections", t); |
| 157 | + } |
188 | 158 | }
|
| 159 | + } catch (Throwable t) { |
| 160 | + log.debug("Reaper thread: ", t); |
189 | 161 | }
|
190 |
| - } catch (Throwable t) { |
191 |
| - log.debug("Reaper thread: ", t); |
192 | 162 | }
|
| 163 | + log.debug("Shutting down reaper thread."); |
| 164 | + } |
| 165 | + |
| 166 | + private void stop() { |
| 167 | + stopping = true; |
193 | 168 | }
|
194 | 169 | }
|
195 | 170 | }
|
0 commit comments