16
16
package software .amazon .awssdk .http .nio .netty .internal ;
17
17
18
18
import static software .amazon .awssdk .http .HttpMetric .CONCURRENCY_ACQUIRE_DURATION ;
19
- import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .CHANNEL_DIAGNOSTICS ;
20
19
import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .EXECUTE_FUTURE_KEY ;
21
20
import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .EXECUTION_ID_KEY ;
22
21
import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .IN_USE ;
23
22
import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .KEEP_ALIVE ;
24
23
import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .LAST_HTTP_CONTENT_RECEIVED_KEY ;
25
24
import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .REQUEST_CONTEXT_KEY ;
26
25
import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .RESPONSE_COMPLETE_KEY ;
27
- import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .RESPONSE_CONTENT_LENGTH ;
28
- import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .RESPONSE_DATA_READ ;
29
26
import static software .amazon .awssdk .http .nio .netty .internal .NettyRequestMetrics .measureTimeTaken ;
27
+ import static software .amazon .awssdk .http .nio .netty .internal .utils .NettyUtils .CLOSED_CHANNEL_MESSAGE ;
30
28
31
29
import io .netty .buffer .ByteBuf ;
32
30
import io .netty .buffer .Unpooled ;
40
38
import io .netty .handler .codec .http .HttpMethod ;
41
39
import io .netty .handler .codec .http .HttpRequest ;
42
40
import io .netty .handler .codec .http .HttpVersion ;
41
+ import io .netty .handler .timeout .ReadTimeoutException ;
43
42
import io .netty .handler .timeout .ReadTimeoutHandler ;
43
+ import io .netty .handler .timeout .WriteTimeoutException ;
44
44
import io .netty .handler .timeout .WriteTimeoutHandler ;
45
45
import io .netty .util .concurrent .Future ;
46
46
import io .netty .util .concurrent .GenericFutureListener ;
47
47
import io .netty .util .concurrent .Promise ;
48
48
import java .io .IOException ;
49
49
import java .net .URI ;
50
50
import java .nio .ByteBuffer ;
51
+ import java .nio .channels .ClosedChannelException ;
51
52
import java .time .Duration ;
52
53
import java .util .Optional ;
53
54
import java .util .concurrent .CompletableFuture ;
54
55
import java .util .concurrent .TimeUnit ;
56
+ import java .util .concurrent .TimeoutException ;
55
57
import java .util .concurrent .atomic .AtomicLong ;
56
58
import java .util .function .Supplier ;
57
59
import org .reactivestreams .Publisher ;
@@ -174,13 +176,9 @@ private void makeRequestListener(Future<Channel> channelFuture) {
174
176
if (channelFuture .isSuccess ()) {
175
177
channel = channelFuture .getNow ();
176
178
NettyUtils .doInEventLoop (channel .eventLoop (), () -> {
177
- try {
178
- configureChannel ();
179
- configurePipeline ();
179
+ configureChannel ();
180
+ if (tryConfigurePipeline ()) {
180
181
makeRequest ();
181
- } catch (Throwable t ) {
182
- closeAndRelease (channel );
183
- handleFailure (channel , () -> "Failed to initiate request to " + endpoint (), t );
184
182
}
185
183
});
186
184
} else {
@@ -194,13 +192,10 @@ private void configureChannel() {
194
192
channel .attr (REQUEST_CONTEXT_KEY ).set (context );
195
193
channel .attr (RESPONSE_COMPLETE_KEY ).set (false );
196
194
channel .attr (LAST_HTTP_CONTENT_RECEIVED_KEY ).set (false );
197
- channel .attr (RESPONSE_CONTENT_LENGTH ).set (null );
198
- channel .attr (RESPONSE_DATA_READ ).set (null );
199
- channel .attr (CHANNEL_DIAGNOSTICS ).get ().incrementRequestCount ();
200
195
channel .config ().setOption (ChannelOption .AUTO_READ , false );
201
196
}
202
197
203
- private void configurePipeline () throws IOException {
198
+ private boolean tryConfigurePipeline () {
204
199
Protocol protocol = ChannelAttributeKey .getProtocolNow (channel );
205
200
ChannelPipeline pipeline = channel .pipeline ();
206
201
@@ -215,7 +210,10 @@ private void configurePipeline() throws IOException {
215
210
requestAdapter = REQUEST_ADAPTER_HTTP1_1 ;
216
211
break ;
217
212
default :
218
- throw new IOException ("Unknown protocol: " + protocol );
213
+ String errorMsg = "Unknown protocol: " + protocol ;
214
+ closeAndRelease (channel );
215
+ handleFailure (channel , () -> errorMsg , new RuntimeException (errorMsg ));
216
+ return false ;
219
217
}
220
218
221
219
pipeline .addLast (LastHttpContentHandler .create ());
@@ -229,8 +227,13 @@ private void configurePipeline() throws IOException {
229
227
// handler (which will monitor for it going inactive from now on).
230
228
// Make sure it's active here, or the request will never complete: https://github.com/aws/aws-sdk-java-v2/issues/1207
231
229
if (!channel .isActive ()) {
232
- throw new IOException (NettyUtils .closedChannelMessage (channel ));
230
+ String errorMessage = "Channel was closed before it could be written to." ;
231
+ closeAndRelease (channel );
232
+ handleFailure (channel , () -> errorMessage , new IOException (errorMessage ));
233
+ return false ;
233
234
}
235
+
236
+ return true ;
234
237
}
235
238
236
239
private void makeRequest () {
@@ -305,11 +308,80 @@ private URI endpoint() {
305
308
306
309
private void handleFailure (Channel channel , Supplier <String > msgSupplier , Throwable cause ) {
307
310
log .debug (channel , msgSupplier , cause );
308
- cause = NettyUtils . decorateException (channel , cause );
311
+ cause = decorateException (cause );
309
312
context .handler ().onError (cause );
310
313
executeFuture .completeExceptionally (cause );
311
314
}
312
315
316
+ private Throwable decorateException (Throwable originalCause ) {
317
+ if (isAcquireTimeoutException (originalCause )) {
318
+ return new Throwable (getMessageForAcquireTimeoutException (), originalCause );
319
+ } else if (isTooManyPendingAcquiresException (originalCause )) {
320
+ return new Throwable (getMessageForTooManyAcquireOperationsError (), originalCause );
321
+ } else if (originalCause instanceof ReadTimeoutException ) {
322
+ return new IOException ("Read timed out" , originalCause );
323
+ } else if (originalCause instanceof WriteTimeoutException ) {
324
+ return new IOException ("Write timed out" , originalCause );
325
+ } else if (originalCause instanceof ClosedChannelException ) {
326
+ return new IOException (CLOSED_CHANNEL_MESSAGE , originalCause );
327
+ }
328
+
329
+ return originalCause ;
330
+ }
331
+
332
+ private boolean isAcquireTimeoutException (Throwable originalCause ) {
333
+ String message = originalCause .getMessage ();
334
+ return originalCause instanceof TimeoutException &&
335
+ message != null &&
336
+ message .contains ("Acquire operation took longer" );
337
+ }
338
+
339
+ private boolean isTooManyPendingAcquiresException (Throwable originalCause ) {
340
+ String message = originalCause .getMessage ();
341
+ return originalCause instanceof IllegalStateException &&
342
+ message != null &&
343
+ originalCause .getMessage ().contains ("Too many outstanding acquire operations" );
344
+ }
345
+
346
+ private String getMessageForAcquireTimeoutException () {
347
+ return "Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a "
348
+ + "connection from the pool within the specified maximum time. This can be due to high request rate.\n "
349
+
350
+ + "Consider taking any of the following actions to mitigate the issue: increase max connections, "
351
+ + "increase acquire timeout, or slowing the request rate.\n "
352
+
353
+ + "Increasing the max connections can increase client throughput (unless the network interface is already "
354
+ + "fully utilized), but can eventually start to hit operation system limitations on the number of file "
355
+ + "descriptors used by the process. If you already are fully utilizing your network interface or cannot "
356
+ + "further increase your connection count, increasing the acquire timeout gives extra time for requests to "
357
+ + "acquire a connection before timing out. If the connections doesn't free up, the subsequent requests "
358
+ + "will still timeout.\n "
359
+
360
+ + "If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large "
361
+ + "traffic bursts cannot overload the client, being more efficient with the number of times you need to "
362
+ + "call AWS, or by increasing the number of hosts sending requests." ;
363
+ }
364
+
365
+ private String getMessageForTooManyAcquireOperationsError () {
366
+ return "Maximum pending connection acquisitions exceeded. The request rate is too high for the client to keep up.\n "
367
+
368
+ + "Consider taking any of the following actions to mitigate the issue: increase max connections, "
369
+ + "increase max pending acquire count, decrease pool lease timeout, or slowing the request rate.\n "
370
+
371
+ + "Increasing the max connections can increase client throughput (unless the network interface is already "
372
+ + "fully utilized), but can eventually start to hit operation system limitations on the number of file "
373
+ + "descriptors used by the process. If you already are fully utilizing your network interface or cannot "
374
+ + "further increase your connection count, increasing the pending acquire count allows extra requests to be "
375
+ + "buffered by the client, but can cause additional request latency and higher memory usage. If your request"
376
+ + " latency or memory usage is already too high, decreasing the lease timeout will allow requests to fail "
377
+ + "more quickly, reducing the number of pending connection acquisitions, but likely won't decrease the total "
378
+ + "number of failed requests.\n "
379
+
380
+ + "If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large "
381
+ + "traffic bursts cannot overload the client, being more efficient with the number of times you need to call "
382
+ + "AWS, or by increasing the number of hosts sending requests." ;
383
+ }
384
+
313
385
/**
314
386
* Close and release the channel back to the pool.
315
387
*
0 commit comments