Skip to content

Commit 6081647

Browse files
[netty-nio-client] Ensure initial channel used for protocol detection is released before re-acquiring (#2882)
* [netty-nio-client] Ensure initial channel used for protocol detection is released before re-acquiring HttpOrHttp2ChannelPool establishes an initial connection to determine if H1 or H2 should be used for the ChannelPool. It then releases the Channel to the Pool to allow it to be re-acquired and reused. However, in some (perhaps rare) circumstances, it's possible for a subsequent acquire to race against the release, causing the second acquire to establish a new channel if the release had not yet completed. This may add unnecessary delay and incur unnecessary channel overhead. Delay because the release is likely to complete in sub-millisecond time while a new connection handshake may take many milliseconds to complete. Overhead because a second channel may linger in the pool even if the user only ever uses one channel at a time.
1 parent b549c13 commit 6081647

File tree

5 files changed

+205
-4
lines changed

5 files changed

+205
-4
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "Netty NIO HTTP Client",
3+
"contributor": "",
4+
"type": "bugfix",
5+
"description": "Ensure initial channel used for protocol detection is released before re-acquiring"
6+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPool.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.PROTOCOL_FUTURE;
1919
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop;
20+
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.runOrPropagate;
2021

2122
import io.netty.channel.Channel;
2223
import io.netty.channel.EventLoop;
@@ -140,7 +141,7 @@ private void completeProtocolConfiguration(Channel newChannel, Protocol protocol
140141
closeAndRelease(newChannel, new IllegalStateException("Pool closed"));
141142
} else {
142143
try {
143-
protocolImplPromise.setSuccess(configureProtocol(newChannel, protocol));
144+
configureProtocol(newChannel, protocol);
144145
} catch (Throwable e) {
145146
closeAndRelease(newChannel, e);
146147
}
@@ -154,7 +155,7 @@ private void closeAndRelease(Channel newChannel, Throwable e) {
154155
protocolImplPromise.setFailure(e);
155156
}
156157

157-
private ChannelPool configureProtocol(Channel newChannel, Protocol protocol) {
158+
private void configureProtocol(Channel newChannel, Protocol protocol) {
158159
if (Protocol.HTTP1_1 == protocol) {
159160
// For HTTP/1.1 we use a traditional channel pool without multiplexing
160161
SdkChannelPool idleConnectionMetricChannelPool = new IdleConnectionCountingChannelPool(eventLoop, delegatePool);
@@ -180,8 +181,10 @@ private ChannelPool configureProtocol(Channel newChannel, Protocol protocol) {
180181
.build();
181182
}
182183
// Give the channel back so it can be acquired again by protocolImpl
183-
delegatePool.release(newChannel);
184-
return protocolImpl;
184+
// Await the release completion to ensure we do not unnecessarily acquire a second channel
185+
delegatePool.release(newChannel).addListener(runOrPropagate(protocolImplPromise, () -> {
186+
protocolImplPromise.trySuccess(protocolImpl);
187+
}));
185188
}
186189

187190
@Override

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtils.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.CompletableFuture;
3030
import java.util.concurrent.TimeUnit;
3131
import java.util.function.BiConsumer;
32+
import java.util.function.Consumer;
3233
import java.util.function.Function;
3334
import javax.net.ssl.SSLEngine;
3435
import javax.net.ssl.SSLParameters;
@@ -221,4 +222,48 @@ private static void configureSslEngine(SSLEngine sslEngine) {
221222
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
222223
sslEngine.setSSLParameters(sslParameters);
223224
}
225+
226+
/**
227+
* Create a {@link GenericFutureListener} that will propagate any failures or cancellations to the provided {@link Promise},
228+
* or invoke the provided {@link Consumer} with the result of a successful operation completion. This is useful for chaining
229+
* together multiple futures that may depend upon each other but that may not have the same return type.
230+
* <p>
231+
* Note that if you do not need the value returned by a successful completion (or if it returns {@link Void}) you may use
232+
* {@link #runOrPropagate(Promise, Runnable)} instead.
233+
*
234+
* @param destination the Promise to notify upon failure or cancellation
235+
* @param onSuccess the Consumer to invoke upon success
236+
*/
237+
public static <T> GenericFutureListener<Future<T>> consumeOrPropagate(Promise<?> destination, Consumer<T> onSuccess) {
238+
return f -> {
239+
if (f.isSuccess()) {
240+
T result = f.getNow();
241+
onSuccess.accept(result);
242+
} else if (f.isCancelled()) {
243+
destination.cancel(false);
244+
} else {
245+
destination.tryFailure(f.cause());
246+
}
247+
};
248+
}
249+
250+
/**
251+
* Create a {@link GenericFutureListener} that will propagate any failures or cancellations to the provided {@link Promise},
252+
* or invoke the provided {@link Runnable} upon successful operation completion. This is useful for chaining together multiple
253+
* futures that may depend upon each other but that may not have the same return type.
254+
*
255+
* @param destination the Promise to notify upon failure or cancellation
256+
* @param onSuccess the Runnable to invoke upon success
257+
*/
258+
public static <T> GenericFutureListener<Future<T>> runOrPropagate(Promise<?> destination, Runnable onSuccess) {
259+
return f -> {
260+
if (f.isSuccess()) {
261+
onSuccess.run();
262+
} else if (f.isCancelled()) {
263+
destination.cancel(false);
264+
} else {
265+
destination.tryFailure(f.cause());
266+
}
267+
};
268+
}
224269
}

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPoolTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.http.nio.netty.internal.http2;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.mockito.Matchers.any;
1920
import static org.mockito.Matchers.eq;
2021
import static org.mockito.Mockito.verify;
2122
import static org.mockito.Mockito.when;
@@ -241,7 +242,9 @@ public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessFor
241242

242243
public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessForProtocol(Protocol protocol) throws Exception {
243244
Promise<Channel> acquirePromise = eventLoopGroup.next().newPromise();
245+
Promise<Void> releasePromise = eventLoopGroup.next().newPromise();
244246
when(mockDelegatePool.acquire()).thenReturn(acquirePromise);
247+
when(mockDelegatePool.release(any(Channel.class))).thenReturn(releasePromise);
245248

246249
// startConnection
247250
httpOrHttp2ChannelPool.acquire();
@@ -258,6 +261,7 @@ public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessFor
258261
eventLoopGroup.register(channel);
259262
channel.attr(PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(protocol));
260263
acquirePromise.setSuccess(channel);
264+
releasePromise.setSuccess(null);
261265

262266
metricsFuture.join();
263267
MetricCollection metrics = metricCollector.collect();
@@ -267,4 +271,31 @@ public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessFor
267271
assertThat(metrics.metricValues(HttpMetric.AVAILABLE_CONCURRENCY).get(0)).isBetween(0, 1);
268272
assertThat(metrics.metricValues(HttpMetric.LEASED_CONCURRENCY).get(0)).isBetween(0, 1);
269273
}
274+
275+
@Test(timeout = 5_000)
276+
public void protocolFutureAwaitsReleaseFuture() throws Exception {
277+
Promise<Channel> delegateAcquirePromise = eventLoopGroup.next().newPromise();
278+
Promise<Void> releasePromise = eventLoopGroup.next().newPromise();
279+
when(mockDelegatePool.acquire()).thenReturn(delegateAcquirePromise);
280+
when(mockDelegatePool.release(any(Channel.class))).thenReturn(releasePromise);
281+
282+
MockChannel channel = new MockChannel();
283+
eventLoopGroup.register(channel);
284+
channel.attr(PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(Protocol.HTTP1_1));
285+
286+
// Acquire a new connection and save the returned future
287+
Future<Channel> acquireFuture = httpOrHttp2ChannelPool.acquire();
288+
289+
// Return a successful connection from the delegate pool
290+
delegateAcquirePromise.setSuccess(channel);
291+
292+
// The returned future should not complete until the release completes
293+
assertThat(acquireFuture.isDone()).isFalse();
294+
295+
// Complete the release
296+
releasePromise.setSuccess(null);
297+
298+
// Assert the returned future completes (within the test timeout)
299+
acquireFuture.await();
300+
}
270301
}

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtilsTest.java

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,39 @@
2323
import static org.mockito.Mockito.when;
2424

2525
import io.netty.channel.Channel;
26+
import io.netty.channel.EventLoopGroup;
27+
import io.netty.channel.nio.NioEventLoopGroup;
2628
import io.netty.handler.ssl.SslContext;
2729
import io.netty.handler.ssl.SslContextBuilder;
2830
import io.netty.handler.ssl.SslHandler;
2931
import io.netty.util.AttributeKey;
3032
import io.netty.util.concurrent.EventExecutor;
33+
import io.netty.util.concurrent.Future;
34+
import io.netty.util.concurrent.GenericFutureListener;
35+
import io.netty.util.concurrent.Promise;
3136
import java.time.Duration;
37+
import java.util.concurrent.atomic.AtomicBoolean;
38+
import java.util.concurrent.atomic.AtomicReference;
3239
import javax.net.ssl.SSLEngine;
40+
import org.junit.AfterClass;
41+
import org.junit.BeforeClass;
3342
import org.junit.Test;
3443
import software.amazon.awssdk.http.nio.netty.internal.MockChannel;
3544

3645
public class NettyUtilsTest {
46+
47+
private static EventLoopGroup eventLoopGroup;
48+
49+
@BeforeClass
50+
public static void setup() {
51+
eventLoopGroup = new NioEventLoopGroup(1);
52+
}
53+
54+
@AfterClass
55+
public static void teardown() throws InterruptedException {
56+
eventLoopGroup.shutdownGracefully().await();
57+
}
58+
3759
@Test
3860
public void testGetOrCreateAttributeKey_calledTwiceWithSameName_returnsSameInstance() {
3961
String attr = "NettyUtilsTest.Foo";
@@ -77,4 +99,98 @@ public void doInEventLoop_notInEventLoop_submits() {
7799
NettyUtils.doInEventLoop(mockExecutor, () -> {});
78100
verify(mockExecutor).submit(any(Runnable.class));
79101
}
102+
103+
@Test
104+
public void runOrPropagate_success_runs() throws Exception {
105+
Promise<String> destination = eventLoopGroup.next().newPromise();
106+
AtomicBoolean reference = new AtomicBoolean();
107+
108+
GenericFutureListener<Future<Void>> listener =
109+
NettyUtils.runOrPropagate(destination, () -> reference.set(true));
110+
111+
Promise<Void> source = eventLoopGroup.next().newPromise();
112+
source.setSuccess(null);
113+
listener.operationComplete(source);
114+
115+
assertThat(reference.get()).isTrue();
116+
}
117+
118+
@Test
119+
public void runOrPropagate_exception_propagates() throws Exception {
120+
Promise<String> destination = eventLoopGroup.next().newPromise();
121+
122+
GenericFutureListener<Future<Void>> listener =
123+
NettyUtils.runOrPropagate(destination, () -> {
124+
});
125+
126+
Promise<Void> source = eventLoopGroup.next().newPromise();
127+
source.setFailure(new RuntimeException("Intentional exception for testing purposes"));
128+
listener.operationComplete(source);
129+
130+
assertThat(destination.cause())
131+
.isInstanceOf(RuntimeException.class)
132+
.hasMessage("Intentional exception for testing purposes");
133+
}
134+
135+
@Test
136+
public void runOrPropagate_cancel_propagates() throws Exception {
137+
Promise<String> destination = eventLoopGroup.next().newPromise();
138+
139+
GenericFutureListener<Future<Void>> listener =
140+
NettyUtils.runOrPropagate(destination, () -> {
141+
});
142+
143+
Promise<Void> source = eventLoopGroup.next().newPromise();
144+
source.cancel(false);
145+
listener.operationComplete(source);
146+
147+
assertThat(destination.isCancelled()).isTrue();
148+
}
149+
150+
@Test
151+
public void consumeOrPropagate_success_consumes() throws Exception {
152+
Promise<String> destination = eventLoopGroup.next().newPromise();
153+
AtomicReference<String> reference = new AtomicReference<>();
154+
155+
GenericFutureListener<Future<String>> listener =
156+
NettyUtils.consumeOrPropagate(destination, reference::set);
157+
158+
Promise<String> source = eventLoopGroup.next().newPromise();
159+
source.setSuccess("test");
160+
listener.operationComplete(source);
161+
162+
assertThat(reference.get()).isEqualTo("test");
163+
}
164+
165+
@Test
166+
public void consumeOrPropagate_exception_propagates() throws Exception {
167+
Promise<String> destination = eventLoopGroup.next().newPromise();
168+
169+
GenericFutureListener<Future<String>> listener =
170+
NettyUtils.consumeOrPropagate(destination, s -> {
171+
});
172+
173+
Promise<String> source = eventLoopGroup.next().newPromise();
174+
source.setFailure(new RuntimeException("Intentional exception for testing purposes"));
175+
listener.operationComplete(source);
176+
177+
assertThat(destination.cause())
178+
.isInstanceOf(RuntimeException.class)
179+
.hasMessage("Intentional exception for testing purposes");
180+
}
181+
182+
@Test
183+
public void consumeOrPropagate_cancel_propagates() throws Exception {
184+
Promise<String> destination = eventLoopGroup.next().newPromise();
185+
186+
GenericFutureListener<Future<String>> listener =
187+
NettyUtils.consumeOrPropagate(destination, s -> {
188+
});
189+
190+
Promise<String> source = eventLoopGroup.next().newPromise();
191+
source.cancel(false);
192+
listener.operationComplete(source);
193+
194+
assertThat(destination.isCancelled()).isTrue();
195+
}
80196
}

0 commit comments

Comments
 (0)