Skip to content

Commit 94e70e6

Browse files
committed
Properly handling cancellation and protecting channel pool from multiple releases
1 parent 6825899 commit 94e70e6

File tree

5 files changed

+154
-65
lines changed

5 files changed

+154
-65
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/MakeAsyncHttpRequestStage.java

Lines changed: 1 addition & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,12 @@
1919
import java.util.concurrent.CompletableFuture;
2020
import java.util.concurrent.Executor;
2121
import java.util.concurrent.RejectedExecutionException;
22-
import java.util.concurrent.atomic.AtomicBoolean;
2322
import org.reactivestreams.Publisher;
24-
import org.reactivestreams.Subscriber;
25-
import org.reactivestreams.Subscription;
2623
import org.slf4j.Logger;
2724
import org.slf4j.LoggerFactory;
2825
import software.amazon.awssdk.annotations.SdkInternalApi;
2926
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
3027
import software.amazon.awssdk.core.client.config.SdkClientOption;
31-
import software.amazon.awssdk.core.exception.SdkClientException;
3228
import software.amazon.awssdk.core.exception.SdkException;
3329
import software.amazon.awssdk.core.http.HttpResponse;
3430
import software.amazon.awssdk.core.internal.Response;
@@ -47,8 +43,6 @@
4743
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
4844
import software.amazon.awssdk.http.async.SdkHttpRequestProvider;
4945
import software.amazon.awssdk.http.async.SdkHttpResponseHandler;
50-
import software.amazon.awssdk.utils.async.DelegatingSubscriber;
51-
import software.amazon.awssdk.utils.async.DelegatingSubscription;
5246

5347
/**
5448
* Delegate to the HTTP implementation to make an HTTP request and receive the response.
@@ -128,7 +122,6 @@ private boolean shouldSetContentLength(SdkHttpFullRequest request, SdkHttpReques
128122
private class ResponseHandler implements SdkHttpResponseHandler<Response<OutputT>> {
129123
private final SdkHttpFullRequest request;
130124
private final Completable completable;
131-
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
132125

133126
private volatile SdkHttpResponse response;
134127
private volatile boolean isSuccess = false;
@@ -157,36 +150,21 @@ public void headersReceived(SdkHttpResponse response) {
157150
public void onStream(Publisher<ByteBuffer> publisher) {
158151
if (isSuccess) {
159152
// TODO handle exception as non retryable
160-
responseHandler.onStream(subscriber ->
161-
publisher.subscribe(new OnCancelSubscriber(subscriber, this::onCancel)));
153+
responseHandler.onStream(publisher);
162154
} else {
163155
errorResponseHandler.onStream(publisher);
164156
}
165157
}
166158

167-
/**
168-
* If the subscriber cancels the subscription we treat it as an error and notify the future accordingly.
169-
*/
170-
private void onCancel() {
171-
this.isCancelled.set(true);
172-
completable.completeExceptionally(SdkClientException.create("Subscriber cancelled before all events were published"));
173-
}
174-
175159
@Override
176160
public void exceptionOccurred(Throwable throwable) {
177-
if (isCancelled.get()) {
178-
return;
179-
}
180161
// Note that we don't notify the response handler here, we do that in AsyncRetryableStage where we
181162
// have more context of what's going on and can deliver exceptions more reliably.
182163
completable.completeExceptionally(throwable);
183164
}
184165

185166
@Override
186167
public Response<OutputT> complete() {
187-
if (isCancelled.get()) {
188-
return null;
189-
}
190168
try {
191169
SdkHttpFullResponse httpFullResponse = (SdkHttpFullResponse) this.response;
192170
final HttpResponse httpResponse = SdkHttpResponseAdapter.adapt(false, request, httpFullResponse);
@@ -210,39 +188,6 @@ private Response<OutputT> handleResponse(HttpResponse httpResponse) {
210188

211189
}
212190

213-
/**
214-
* Decorator around a {@link Subscriber} to notify if a cancellation occurs.
215-
*/
216-
private class OnCancelSubscriber extends DelegatingSubscriber<ByteBuffer, ByteBuffer> {
217-
218-
private final Runnable onCancel;
219-
220-
/**
221-
* @param subscriber Subscriber to delegate to.
222-
* @param onCancel Runnable to execute if a cancellation occurs.
223-
*/
224-
private OnCancelSubscriber(Subscriber<? super ByteBuffer> subscriber, Runnable onCancel) {
225-
super(subscriber);
226-
this.onCancel = onCancel;
227-
}
228-
229-
@Override
230-
public void onSubscribe(Subscription subscription) {
231-
super.onSubscribe(new DelegatingSubscription(subscription) {
232-
@Override
233-
public void cancel() {
234-
onCancel.run();
235-
super.cancel();
236-
}
237-
});
238-
}
239-
240-
@Override
241-
public void onNext(ByteBuffer byteBuffer) {
242-
subscriber.onNext(byteBuffer);
243-
}
244-
}
245-
246191
/**
247192
* An interface similar to {@link CompletableFuture} that may or may not dispatch completion of the future to an executor
248193
* service, depending on the client's configuration.

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import software.amazon.awssdk.http.nio.netty.internal.HandlerRemovingChannelPool;
5454
import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration;
5555
import software.amazon.awssdk.http.nio.netty.internal.NonManagedEventLoopGroup;
56+
import software.amazon.awssdk.http.nio.netty.internal.ReleaseOnceChannelPool;
5657
import software.amazon.awssdk.http.nio.netty.internal.RequestAdapter;
5758
import software.amazon.awssdk.http.nio.netty.internal.RequestContext;
5859
import software.amazon.awssdk.http.nio.netty.internal.RunnableRequest;
@@ -147,11 +148,13 @@ protected ChannelPool newPool(URI key) {
147148
.option(ChannelOption.TCP_NODELAY, true)
148149
.remoteAddress(key.getHost(), key.getPort());
149150
AtomicReference<ChannelPool> channelPoolRef = new AtomicReference<>();
150-
channelPoolRef.set(new HandlerRemovingChannelPool(
151-
new HttpOrHttp2ChannelPool(bootstrap,
152-
new ChannelPipelineInitializer(protocol, sslContext, maxStreams, channelPoolRef),
153-
configuration.maxConnections(),
154-
configuration)));
151+
channelPoolRef.set(
152+
new ReleaseOnceChannelPool(
153+
new HandlerRemovingChannelPool(
154+
new HttpOrHttp2ChannelPool(bootstrap,
155+
new ChannelPipelineInitializer(protocol, sslContext, maxStreams, channelPoolRef),
156+
configuration.maxConnections(),
157+
configuration))));
155158
return channelPoolRef.get();
156159
}
157160
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.internal;
17+
18+
import io.netty.channel.Channel;
19+
import io.netty.channel.pool.ChannelPool;
20+
import io.netty.channel.pool.FixedChannelPool;
21+
import io.netty.util.AttributeKey;
22+
import io.netty.util.concurrent.Future;
23+
import io.netty.util.concurrent.GenericFutureListener;
24+
import io.netty.util.concurrent.Promise;
25+
import io.netty.util.concurrent.SucceededFuture;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
import software.amazon.awssdk.http.nio.netty.internal.http2.Http2MultiplexedChannelPool;
28+
29+
/**
30+
* Wrapper around a {@link ChannelPool} to protect it from having the same channel released twice. This can
31+
* cause issues in {@link FixedChannelPool} and {@link Http2MultiplexedChannelPool} which has a simple
32+
* mechanism to track leased connections.
33+
*/
34+
public class ReleaseOnceChannelPool implements ChannelPool {
35+
36+
private static final AttributeKey<AtomicBoolean> IS_RELEASED = AttributeKey.newInstance("isReleased");
37+
38+
private final ChannelPool delegate;
39+
40+
public ReleaseOnceChannelPool(ChannelPool delegate) {
41+
this.delegate = delegate;
42+
}
43+
44+
@Override
45+
public Future<Channel> acquire() {
46+
return delegate.acquire().addListener(onAcquire());
47+
}
48+
49+
@Override
50+
public Future<Channel> acquire(Promise<Channel> promise) {
51+
return delegate.acquire(promise).addListener(onAcquire());
52+
}
53+
54+
private GenericFutureListener<Future<Channel>> onAcquire() {
55+
return future -> {
56+
if(future.isSuccess()) {
57+
future.getNow().attr(IS_RELEASED).set(new AtomicBoolean(false));
58+
}
59+
};
60+
}
61+
62+
@Override
63+
public Future<Void> release(Channel channel) {
64+
if (shouldRelease(channel)) {
65+
return delegate.release(channel);
66+
} else {
67+
return new SucceededFuture<>(channel.eventLoop(), null);
68+
}
69+
}
70+
71+
@Override
72+
public Future<Void> release(Channel channel, Promise<Void> promise) {
73+
if (shouldRelease(channel)) {
74+
return delegate.release(channel, promise);
75+
} else {
76+
return promise.setSuccess(null);
77+
}
78+
}
79+
80+
private boolean shouldRelease(Channel channel) {
81+
return channel.attr(IS_RELEASED).get() == null
82+
|| channel.attr(IS_RELEASED).get().compareAndSet(false, true);
83+
}
84+
85+
@Override
86+
public void close() {
87+
delegate.close();
88+
}
89+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.nio.ByteBuffer;
3939
import java.util.List;
4040
import java.util.Map;
41+
import java.util.concurrent.atomic.AtomicBoolean;
4142
import java.util.stream.Collectors;
4243
import org.reactivestreams.Publisher;
4344
import org.reactivestreams.Subscriber;
@@ -50,6 +51,8 @@
5051
import software.amazon.awssdk.http.SdkHttpResponse;
5152
import software.amazon.awssdk.http.nio.netty.internal.http2.Http2ResetSendingSubscription;
5253
import software.amazon.awssdk.utils.FunctionalUtils.UnsafeRunnable;
54+
import software.amazon.awssdk.utils.async.DelegatingSubscriber;
55+
import software.amazon.awssdk.utils.async.DelegatingSubscription;
5356

5457
@Sharable
5558
@SdkInternalApi
@@ -176,6 +179,7 @@ private static class PublisherAdapter implements Publisher<ByteBuffer> {
176179
private final StreamedHttpResponse response;
177180
private final ChannelHandlerContext channelContext;
178181
private final RequestContext requestContext;
182+
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
179183

180184
private PublisherAdapter(StreamedHttpResponse response, ChannelHandlerContext channelContext,
181185
RequestContext requestContext) {
@@ -189,13 +193,30 @@ public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
189193
response.subscribe(new Subscriber<HttpContent>() {
190194
@Override
191195
public void onSubscribe(Subscription subscription) {
196+
subscriber.onSubscribe(new OnCancelSubscription(resolveSubscription(subscription),
197+
this::onCancel));
198+
}
199+
200+
private Subscription resolveSubscription(Subscription subscription) {
192201
// For HTTP2 we send a RST_STREAM frame on cancel to stop the service from sending more data
193202
if (Protocol.HTTP2.equals(ChannelAttributeKey.getProtocolNow(channelContext.channel()))) {
194-
subscriber.onSubscribe(new Http2ResetSendingSubscription(channelContext, subscription));
203+
return new Http2ResetSendingSubscription(channelContext, subscription);
195204
} else {
196205
// TODO I believe the behavior for H1 is to finish reading the data. Do we want to do this
197206
// or abort the connection?
198-
subscriber.onSubscribe(subscription);
207+
return subscription;
208+
}
209+
}
210+
211+
private void onCancel() {
212+
try {
213+
isCancelled.set(true);
214+
requestContext.handler().exceptionOccurred(new RuntimeException("Cancelled subscription"));
215+
} finally {
216+
runAndLogError("Could not release channel back to the pool",
217+
() -> closeAndRelease(channelContext));
218+
runAndLogError("Could not release channel back to the pool",
219+
() -> closeAndRelease(channelContext));
199220
}
200221
}
201222

@@ -210,6 +231,9 @@ public void onNext(HttpContent httpContent) {
210231

211232
@Override
212233
public void onError(Throwable t) {
234+
if (isCancelled.get()) {
235+
return;
236+
}
213237
try {
214238
runAndLogError(String.format("Subscriber %s threw an exception in onError.", subscriber.toString()),
215239
() -> subscriber.onError(t));
@@ -222,6 +246,11 @@ public void onError(Throwable t) {
222246

223247
@Override
224248
public void onComplete() {
249+
// For HTTP/2 it's possible to get an onComplete after we cancel due to the channel becoming
250+
// inactive. We guard against that here and just ignore the signal (see HandlerPublisher)
251+
if (isCancelled.get()) {
252+
return;
253+
}
225254
try {
226255
runAndLogError(String.format("Subscriber %s threw an exception in onComplete.", subscriber.toString()),
227256
subscriber::onComplete);
@@ -234,6 +263,25 @@ public void onComplete() {
234263
}
235264
}
236265

266+
/**
267+
* Decorator around a {@link Subscription} to notify if a cancellation occurs.
268+
*/
269+
private static class OnCancelSubscription extends DelegatingSubscription {
270+
271+
private final Runnable onCancel;
272+
273+
private OnCancelSubscription(Subscription subscription, Runnable onCancel) {
274+
super(subscription);
275+
this.onCancel = onCancel;
276+
}
277+
278+
@Override
279+
public void cancel() {
280+
onCancel.run();
281+
super.cancel();
282+
}
283+
}
284+
237285
static class FullResponseContentPublisher implements Publisher<ByteBuffer> {
238286
private final ChannelHandlerContext channelContext;
239287
private final ByteBuffer fullContent;
@@ -277,7 +325,6 @@ public void cancel() {
277325
}
278326
}
279327

280-
281328
private static class LastHttpContentSwallower extends SimpleChannelInboundHandler<HttpObject> {
282329

283330
@Override

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/Http2MultiplexedChannelPool.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,12 @@ public Future<Void> release(Channel channel, Promise<Void> promise) {
126126
private void release0(Channel channel, Promise<Void> promise) {
127127
if (channel.parent() == null) {
128128
// This is the socket channel, close and release from underlying connection pool
129-
releaseParentChannel(channel);
129+
try {
130+
releaseParentChannel(channel);
131+
} finally {
132+
// This channel doesn't technically belong to this pool as it was never acquired directly
133+
promise.setFailure(new IllegalArgumentException("Channel does not belong to this pool"));
134+
}
130135
} else {
131136
Channel parentChannel = channel.parent();
132137
MultiplexedChannelRecord channelRecord = parentChannel.attr(CHANNEL_POOL_RECORD).get();

0 commit comments

Comments
 (0)