|
34 | 34 | import io.netty.util.concurrent.Future;
|
35 | 35 | import java.net.URI;
|
36 | 36 | import java.nio.ByteBuffer;
|
| 37 | +import java.util.concurrent.TimeoutException; |
37 | 38 | import java.util.function.Supplier;
|
38 | 39 | import org.reactivestreams.Publisher;
|
39 | 40 | import org.reactivestreams.Subscriber;
|
@@ -111,13 +112,81 @@ private URI endpoint() {
|
111 | 112 | private void handleFailure(Supplier<String> msg, Throwable cause) {
|
112 | 113 | log.error(msg.get(), cause);
|
113 | 114 | runAndLogError("Exception thrown from AsyncResponseHandler",
|
114 |
| - () -> context.handler().exceptionOccurred(cause)); |
| 115 | + () -> context.handler().exceptionOccurred(modifyHighBurstTrafficException(cause))); |
115 | 116 | if (channel != null) {
|
116 |
| - runAndLogError("Unable to release channel back to the pool.", |
117 |
| - () -> closeAndRelease(channel)); |
| 117 | + runAndLogError("Unable to release channel back to the pool.", () -> closeAndRelease(channel)); |
118 | 118 | }
|
119 | 119 | }
|
120 | 120 |
|
| 121 | + private Throwable modifyHighBurstTrafficException(Throwable originalCause) { |
| 122 | + String originalMessage = originalCause.getMessage(); |
| 123 | + String newMessage = null; |
| 124 | + |
| 125 | + if (originalCause instanceof TimeoutException && |
| 126 | + originalMessage.contains("Acquire operation took longer")) { |
| 127 | + newMessage = getMessageForAcquireTimeoutException(); |
| 128 | + |
| 129 | + } else if (originalCause instanceof IllegalStateException && |
| 130 | + originalMessage.contains("Too many outstanding acquire operations")) { |
| 131 | + newMessage = getMessageForTooManyAcquireOperationsError(); |
| 132 | + |
| 133 | + } else { |
| 134 | + return originalCause; |
| 135 | + } |
| 136 | + |
| 137 | + return new Throwable(newMessage, originalCause); |
| 138 | + } |
| 139 | + |
| 140 | + |
| 141 | + private String getMessageForAcquireTimeoutException() { |
| 142 | + StringBuilder stringBuilder = new StringBuilder(); |
| 143 | + |
| 144 | + stringBuilder |
| 145 | + .append("Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a " |
| 146 | + + "connection from the pool within the specified maximum time. This can be due to high request rate.\n") |
| 147 | + |
| 148 | + .append("Consider taking any of the following actions to mitigate the issue: increase max connections, " |
| 149 | + + "increase acquire timeout, or slowing the request rate.\n") |
| 150 | + |
| 151 | + .append("Increasing the max connections can increase client throughput (unless the network interface is already " |
| 152 | + + "fully utilized), but can eventually start to hit operation system limitations on the number of file " |
| 153 | + + "descriptors used by the process. If you already are fully utilizing your network interface or cannot " |
| 154 | + + "further increase your connection count, increasing the acquire timeout gives extra time for requests to " |
| 155 | + + "acquire a connection before timing out. If the connections doesn't free up, the subsequent requests " |
| 156 | + + "will still timeout.\n") |
| 157 | + |
| 158 | + .append("If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large " |
| 159 | + + "traffic bursts cannot overload the client, being more efficient with the number of times you need to " |
| 160 | + + "call AWS, or by increasing the number of hosts sending requests."); |
| 161 | + |
| 162 | + return stringBuilder.toString(); |
| 163 | + } |
| 164 | + |
| 165 | + private String getMessageForTooManyAcquireOperationsError() { |
| 166 | + StringBuilder stringBuilder = new StringBuilder(); |
| 167 | + |
| 168 | + stringBuilder |
| 169 | + .append("Maximum pending connection acquisitions exceeded. The request rate is too high for the client to keep up.\n") |
| 170 | + |
| 171 | + .append("Consider taking any of the following actions to mitigate the issue: increase max connections, " |
| 172 | + + "increase max pending acquire count, decrease pool lease timeout, or slowing the request rate.\n") |
| 173 | + |
| 174 | + .append("Increasing the max connections can increase client throughput (unless the network interface is already " |
| 175 | + + "fully utilized), but can eventually start to hit operation system limitations on the number of file " |
| 176 | + + "descriptors used by the process. If you already are fully utilizing your network interface or cannot " |
| 177 | + + "further increase your connection count, increasing the pending acquire count allows extra requests to be " |
| 178 | + + "buffered by the client, but can cause additional request latency and higher memory usage. If your request" |
| 179 | + + " latency or memory usage is already too high, decreasing the lease timeout will allow requests to fail " |
| 180 | + + "more quickly, reducing the number of pending connection acquisitions, but likely won't decrease the total " |
| 181 | + + "number of failed requests.\n") |
| 182 | + |
| 183 | + .append("If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large " |
| 184 | + + "traffic bursts cannot overload the client, being more efficient with the number of times you need to call " |
| 185 | + + "AWS, or by increasing the number of hosts sending requests."); |
| 186 | + |
| 187 | + return stringBuilder.toString(); |
| 188 | + } |
| 189 | + |
121 | 190 | private static void closeAndRelease(Channel channel) {
|
122 | 191 | RequestContext requestCtx = channel.attr(REQUEST_CONTEXT_KEY).get();
|
123 | 192 | channel.close().addListener(ignored -> requestCtx.channelPool().release(channel));
|
|
0 commit comments