16
16
package software .amazon .awssdk .http .nio .netty .internal ;
17
17
18
18
import static software .amazon .awssdk .http .HttpMetric .CONCURRENCY_ACQUIRE_DURATION ;
19
+ import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .CHANNEL_DIAGNOSTICS ;
19
20
import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .EXECUTE_FUTURE_KEY ;
20
21
import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .EXECUTION_ID_KEY ;
21
22
import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .IN_USE ;
22
23
import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .KEEP_ALIVE ;
23
24
import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .LAST_HTTP_CONTENT_RECEIVED_KEY ;
24
25
import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .REQUEST_CONTEXT_KEY ;
25
26
import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .RESPONSE_COMPLETE_KEY ;
27
+ import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .RESPONSE_CONTENT_LENGTH ;
28
+ import static software .amazon .awssdk .http .nio .netty .internal .ChannelAttributeKey .RESPONSE_DATA_READ ;
26
29
import static software .amazon .awssdk .http .nio .netty .internal .NettyRequestMetrics .measureTimeTaken ;
27
- import static software .amazon .awssdk .http .nio .netty .internal .utils .NettyUtils .CLOSED_CHANNEL_MESSAGE ;
28
30
29
31
import io .netty .buffer .ByteBuf ;
30
32
import io .netty .buffer .Unpooled ;
38
40
import io .netty .handler .codec .http .HttpMethod ;
39
41
import io .netty .handler .codec .http .HttpRequest ;
40
42
import io .netty .handler .codec .http .HttpVersion ;
41
- import io .netty .handler .timeout .ReadTimeoutException ;
42
43
import io .netty .handler .timeout .ReadTimeoutHandler ;
43
- import io .netty .handler .timeout .WriteTimeoutException ;
44
44
import io .netty .handler .timeout .WriteTimeoutHandler ;
45
45
import io .netty .util .concurrent .Future ;
46
46
import io .netty .util .concurrent .GenericFutureListener ;
47
47
import io .netty .util .concurrent .Promise ;
48
48
import java .io .IOException ;
49
49
import java .net .URI ;
50
50
import java .nio .ByteBuffer ;
51
- import java .nio .channels .ClosedChannelException ;
52
51
import java .time .Duration ;
53
52
import java .util .Optional ;
54
53
import java .util .concurrent .CompletableFuture ;
55
54
import java .util .concurrent .TimeUnit ;
56
- import java .util .concurrent .TimeoutException ;
57
55
import java .util .concurrent .atomic .AtomicLong ;
58
56
import java .util .function .Supplier ;
59
57
import org .reactivestreams .Publisher ;
@@ -176,9 +174,13 @@ private void makeRequestListener(Future<Channel> channelFuture) {
176
174
if (channelFuture .isSuccess ()) {
177
175
channel = channelFuture .getNow ();
178
176
NettyUtils .doInEventLoop (channel .eventLoop (), () -> {
179
- configureChannel ();
180
- if (tryConfigurePipeline ()) {
177
+ try {
178
+ configureChannel ();
179
+ configurePipeline ();
181
180
makeRequest ();
181
+ } catch (Throwable t ) {
182
+ closeAndRelease (channel );
183
+ handleFailure (channel , () -> "Failed to initiate request to " + endpoint (), t );
182
184
}
183
185
});
184
186
} else {
@@ -192,10 +194,13 @@ private void configureChannel() {
192
194
channel .attr (REQUEST_CONTEXT_KEY ).set (context );
193
195
channel .attr (RESPONSE_COMPLETE_KEY ).set (false );
194
196
channel .attr (LAST_HTTP_CONTENT_RECEIVED_KEY ).set (false );
197
+ channel .attr (RESPONSE_CONTENT_LENGTH ).set (null );
198
+ channel .attr (RESPONSE_DATA_READ ).set (null );
199
+ channel .attr (CHANNEL_DIAGNOSTICS ).get ().incrementRequestCount ();
195
200
channel .config ().setOption (ChannelOption .AUTO_READ , false );
196
201
}
197
202
198
- private boolean tryConfigurePipeline () {
203
+ private void configurePipeline () throws IOException {
199
204
Protocol protocol = ChannelAttributeKey .getProtocolNow (channel );
200
205
ChannelPipeline pipeline = channel .pipeline ();
201
206
@@ -210,10 +215,7 @@ private boolean tryConfigurePipeline() {
210
215
requestAdapter = REQUEST_ADAPTER_HTTP1_1 ;
211
216
break ;
212
217
default :
213
- String errorMsg = "Unknown protocol: " + protocol ;
214
- closeAndRelease (channel );
215
- handleFailure (channel , () -> errorMsg , new RuntimeException (errorMsg ));
216
- return false ;
218
+ throw new IOException ("Unknown protocol: " + protocol );
217
219
}
218
220
219
221
pipeline .addLast (LastHttpContentHandler .create ());
@@ -227,13 +229,8 @@ private boolean tryConfigurePipeline() {
227
229
// handler (which will monitor for it going inactive from now on).
228
230
// Make sure it's active here, or the request will never complete: https://github.com/aws/aws-sdk-java-v2/issues/1207
229
231
if (!channel .isActive ()) {
230
- String errorMessage = "Channel was closed before it could be written to." ;
231
- closeAndRelease (channel );
232
- handleFailure (channel , () -> errorMessage , new IOException (errorMessage ));
233
- return false ;
232
+ throw new IOException (NettyUtils .closedChannelMessage (channel ));
234
233
}
235
-
236
- return true ;
237
234
}
238
235
239
236
private void makeRequest () {
@@ -308,80 +305,11 @@ private URI endpoint() {
308
305
309
306
private void handleFailure (Channel channel , Supplier <String > msgSupplier , Throwable cause ) {
310
307
log .debug (channel , msgSupplier , cause );
311
- cause = decorateException (cause );
308
+ cause = NettyUtils . decorateException (channel , cause );
312
309
context .handler ().onError (cause );
313
310
executeFuture .completeExceptionally (cause );
314
311
}
315
312
316
- private Throwable decorateException (Throwable originalCause ) {
317
- if (isAcquireTimeoutException (originalCause )) {
318
- return new Throwable (getMessageForAcquireTimeoutException (), originalCause );
319
- } else if (isTooManyPendingAcquiresException (originalCause )) {
320
- return new Throwable (getMessageForTooManyAcquireOperationsError (), originalCause );
321
- } else if (originalCause instanceof ReadTimeoutException ) {
322
- return new IOException ("Read timed out" , originalCause );
323
- } else if (originalCause instanceof WriteTimeoutException ) {
324
- return new IOException ("Write timed out" , originalCause );
325
- } else if (originalCause instanceof ClosedChannelException ) {
326
- return new IOException (CLOSED_CHANNEL_MESSAGE , originalCause );
327
- }
328
-
329
- return originalCause ;
330
- }
331
-
332
- private boolean isAcquireTimeoutException (Throwable originalCause ) {
333
- String message = originalCause .getMessage ();
334
- return originalCause instanceof TimeoutException &&
335
- message != null &&
336
- message .contains ("Acquire operation took longer" );
337
- }
338
-
339
- private boolean isTooManyPendingAcquiresException (Throwable originalCause ) {
340
- String message = originalCause .getMessage ();
341
- return originalCause instanceof IllegalStateException &&
342
- message != null &&
343
- originalCause .getMessage ().contains ("Too many outstanding acquire operations" );
344
- }
345
-
346
- private String getMessageForAcquireTimeoutException () {
347
- return "Acquire operation took longer than the configured maximum time. This indicates that a request cannot get a "
348
- + "connection from the pool within the specified maximum time. This can be due to high request rate.\n "
349
-
350
- + "Consider taking any of the following actions to mitigate the issue: increase max connections, "
351
- + "increase acquire timeout, or slowing the request rate.\n "
352
-
353
- + "Increasing the max connections can increase client throughput (unless the network interface is already "
354
- + "fully utilized), but can eventually start to hit operation system limitations on the number of file "
355
- + "descriptors used by the process. If you already are fully utilizing your network interface or cannot "
356
- + "further increase your connection count, increasing the acquire timeout gives extra time for requests to "
357
- + "acquire a connection before timing out. If the connections doesn't free up, the subsequent requests "
358
- + "will still timeout.\n "
359
-
360
- + "If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large "
361
- + "traffic bursts cannot overload the client, being more efficient with the number of times you need to "
362
- + "call AWS, or by increasing the number of hosts sending requests." ;
363
- }
364
-
365
- private String getMessageForTooManyAcquireOperationsError () {
366
- return "Maximum pending connection acquisitions exceeded. The request rate is too high for the client to keep up.\n "
367
-
368
- + "Consider taking any of the following actions to mitigate the issue: increase max connections, "
369
- + "increase max pending acquire count, decrease pool lease timeout, or slowing the request rate.\n "
370
-
371
- + "Increasing the max connections can increase client throughput (unless the network interface is already "
372
- + "fully utilized), but can eventually start to hit operation system limitations on the number of file "
373
- + "descriptors used by the process. If you already are fully utilizing your network interface or cannot "
374
- + "further increase your connection count, increasing the pending acquire count allows extra requests to be "
375
- + "buffered by the client, but can cause additional request latency and higher memory usage. If your request"
376
- + " latency or memory usage is already too high, decreasing the lease timeout will allow requests to fail "
377
- + "more quickly, reducing the number of pending connection acquisitions, but likely won't decrease the total "
378
- + "number of failed requests.\n "
379
-
380
- + "If the above mechanisms are not able to fix the issue, try smoothing out your requests so that large "
381
- + "traffic bursts cannot overload the client, being more efficient with the number of times you need to call "
382
- + "AWS, or by increasing the number of hosts sending requests." ;
383
- }
384
-
385
313
/**
386
314
* Close and release the channel back to the pool.
387
315
*
0 commit comments