Skip to content

Commit 0875b58

Browse files
authored
Close and release channel upon validation failure (#3855)
When content length validation fails when PublisherAdapter#onComplete is invoked, behave as on PublisherAdapter#onError and also close and release the channel; otherwise the channel would be left in the leased state forever.
1 parent 33f6011 commit 0875b58

File tree

3 files changed

+58
-0
lines changed

3 files changed

+58
-0
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Netty NIO HTTP Client",
4+
"contributor": "",
5+
"description": "Fix a bug where, if validation of of the amount of expected data to be received (HTTP `Content-Length`) fails, the connection would be left dangling, consuming a connection from the pool until the client is shut down."
6+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,8 @@ public void onComplete() {
376376
}
377377
} catch (IOException e) {
378378
notifyError(e);
379+
runAndLogError(channelContext.channel(), () -> "Could not release channel back to the pool",
380+
() -> closeAndRelease(channelContext));
379381
}
380382
}
381383

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/PublisherAdapterTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,56 @@ public void onComplete() {
235235
}
236236
}
237237

238+
@Test
239+
public void contentLengthValidationFails_closesAndReleasesConnection() {
240+
channel.attr(ChannelAttributeKey.RESPONSE_CONTENT_LENGTH).set(1L);
241+
channel.attr(ChannelAttributeKey.RESPONSE_DATA_READ).set(0L);
242+
243+
Publisher<HttpContent> publisher = subscriber -> subscriber.onSubscribe(new Subscription() {
244+
@Override
245+
public void request(long l) {
246+
subscriber.onComplete();
247+
}
248+
249+
@Override
250+
public void cancel() {
251+
}
252+
});
253+
254+
DefaultStreamedHttpResponse streamedResponse = new DefaultStreamedHttpResponse(HttpVersion.HTTP_1_1,
255+
HttpResponseStatus.OK, publisher);
256+
257+
Subscriber<ByteBuffer> subscriber = new Subscriber<ByteBuffer>() {
258+
private Subscription subscription;
259+
260+
@Override
261+
public void onSubscribe(Subscription subscription) {
262+
this.subscription = subscription;
263+
subscription.request(Long.MAX_VALUE);
264+
}
265+
266+
@Override
267+
public void onNext(ByteBuffer byteBuffer) {
268+
}
269+
270+
@Override
271+
public void onError(Throwable throwable) {
272+
}
273+
274+
@Override
275+
public void onComplete() {
276+
}
277+
};
278+
279+
ResponseHandler.PublisherAdapter publisherAdapter = new ResponseHandler.PublisherAdapter(streamedResponse, ctx,
280+
requestContext, executeFuture);
281+
282+
publisherAdapter.subscribe(subscriber);
283+
284+
verify(ctx).close();
285+
verify(channelPool).release(channel);
286+
}
287+
238288
static final class TestSubscriber implements Subscriber<ByteBuffer> {
239289

240290
private Subscription subscription;

0 commit comments

Comments
 (0)