Skip to content

Commit 53ba7d9

Browse files
authored
Revert "Revert "[netty-nio-client] Ensure initial channel used for protocol detection is released before re-acquiring (#2882)" (#2895)"
This reverts commit d3cc7b8.
1 parent d3cc7b8 commit 53ba7d9

File tree

5 files changed

+205
-4
lines changed

5 files changed

+205
-4
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "Netty NIO HTTP Client",
3+
"contributor": "",
4+
"type": "bugfix",
5+
"description": "Ensure initial channel used for protocol detection is released before re-acquiring"
6+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPool.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.PROTOCOL_FUTURE;
1919
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop;
20+
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.runOrPropagate;
2021

2122
import io.netty.channel.Channel;
2223
import io.netty.channel.EventLoop;
@@ -140,7 +141,7 @@ private void completeProtocolConfiguration(Channel newChannel, Protocol protocol
140141
closeAndRelease(newChannel, new IllegalStateException("Pool closed"));
141142
} else {
142143
try {
143-
protocolImplPromise.setSuccess(configureProtocol(newChannel, protocol));
144+
configureProtocol(newChannel, protocol);
144145
} catch (Throwable e) {
145146
closeAndRelease(newChannel, e);
146147
}
@@ -154,7 +155,7 @@ private void closeAndRelease(Channel newChannel, Throwable e) {
154155
protocolImplPromise.setFailure(e);
155156
}
156157

157-
private ChannelPool configureProtocol(Channel newChannel, Protocol protocol) {
158+
private void configureProtocol(Channel newChannel, Protocol protocol) {
158159
if (Protocol.HTTP1_1 == protocol) {
159160
// For HTTP/1.1 we use a traditional channel pool without multiplexing
160161
SdkChannelPool idleConnectionMetricChannelPool = new IdleConnectionCountingChannelPool(eventLoop, delegatePool);
@@ -180,8 +181,10 @@ private ChannelPool configureProtocol(Channel newChannel, Protocol protocol) {
180181
.build();
181182
}
182183
// Give the channel back so it can be acquired again by protocolImpl
183-
delegatePool.release(newChannel);
184-
return protocolImpl;
184+
// Await the release completion to ensure we do not unnecessarily acquire a second channel
185+
delegatePool.release(newChannel).addListener(runOrPropagate(protocolImplPromise, () -> {
186+
protocolImplPromise.trySuccess(protocolImpl);
187+
}));
185188
}
186189

187190
@Override

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtils.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.CompletableFuture;
3030
import java.util.concurrent.TimeUnit;
3131
import java.util.function.BiConsumer;
32+
import java.util.function.Consumer;
3233
import java.util.function.Function;
3334
import javax.net.ssl.SSLEngine;
3435
import javax.net.ssl.SSLParameters;
@@ -221,4 +222,48 @@ private static void configureSslEngine(SSLEngine sslEngine) {
221222
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
222223
sslEngine.setSSLParameters(sslParameters);
223224
}
225+
226+
/**
227+
* Create a {@link GenericFutureListener} that will propagate any failures or cancellations to the provided {@link Promise},
228+
* or invoke the provided {@link Consumer} with the result of a successful operation completion. This is useful for chaining
229+
* together multiple futures that may depend upon each other but that may not have the same return type.
230+
* <p>
231+
* Note that if you do not need the value returned by a successful completion (or if it returns {@link Void}) you may use
232+
* {@link #runOrPropagate(Promise, Runnable)} instead.
233+
*
234+
* @param destination the Promise to notify upon failure or cancellation
235+
* @param onSuccess the Consumer to invoke upon success
236+
*/
237+
public static <T> GenericFutureListener<Future<T>> consumeOrPropagate(Promise<?> destination, Consumer<T> onSuccess) {
238+
return f -> {
239+
if (f.isSuccess()) {
240+
T result = f.getNow();
241+
onSuccess.accept(result);
242+
} else if (f.isCancelled()) {
243+
destination.cancel(false);
244+
} else {
245+
destination.tryFailure(f.cause());
246+
}
247+
};
248+
}
249+
250+
/**
251+
* Create a {@link GenericFutureListener} that will propagate any failures or cancellations to the provided {@link Promise},
252+
* or invoke the provided {@link Runnable} upon successful operation completion. This is useful for chaining together multiple
253+
* futures that may depend upon each other but that may not have the same return type.
254+
*
255+
* @param destination the Promise to notify upon failure or cancellation
256+
* @param onSuccess the Runnable to invoke upon success
257+
*/
258+
public static <T> GenericFutureListener<Future<T>> runOrPropagate(Promise<?> destination, Runnable onSuccess) {
259+
return f -> {
260+
if (f.isSuccess()) {
261+
onSuccess.run();
262+
} else if (f.isCancelled()) {
263+
destination.cancel(false);
264+
} else {
265+
destination.tryFailure(f.cause());
266+
}
267+
};
268+
}
224269
}

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPoolTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.http.nio.netty.internal.http2;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.mockito.Matchers.any;
1920
import static org.mockito.Matchers.eq;
2021
import static org.mockito.Mockito.verify;
2122
import static org.mockito.Mockito.when;
@@ -241,7 +242,9 @@ public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessFor
241242

242243
public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessForProtocol(Protocol protocol) throws Exception {
243244
Promise<Channel> acquirePromise = eventLoopGroup.next().newPromise();
245+
Promise<Void> releasePromise = eventLoopGroup.next().newPromise();
244246
when(mockDelegatePool.acquire()).thenReturn(acquirePromise);
247+
when(mockDelegatePool.release(any(Channel.class))).thenReturn(releasePromise);
245248

246249
// startConnection
247250
httpOrHttp2ChannelPool.acquire();
@@ -258,6 +261,7 @@ public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessFor
258261
eventLoopGroup.register(channel);
259262
channel.attr(PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(protocol));
260263
acquirePromise.setSuccess(channel);
264+
releasePromise.setSuccess(null);
261265

262266
metricsFuture.join();
263267
MetricCollection metrics = metricCollector.collect();
@@ -267,4 +271,31 @@ public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessFor
267271
assertThat(metrics.metricValues(HttpMetric.AVAILABLE_CONCURRENCY).get(0)).isBetween(0, 1);
268272
assertThat(metrics.metricValues(HttpMetric.LEASED_CONCURRENCY).get(0)).isBetween(0, 1);
269273
}
274+
275+
@Test(timeout = 5_000)
276+
public void protocolFutureAwaitsReleaseFuture() throws Exception {
277+
Promise<Channel> delegateAcquirePromise = eventLoopGroup.next().newPromise();
278+
Promise<Void> releasePromise = eventLoopGroup.next().newPromise();
279+
when(mockDelegatePool.acquire()).thenReturn(delegateAcquirePromise);
280+
when(mockDelegatePool.release(any(Channel.class))).thenReturn(releasePromise);
281+
282+
MockChannel channel = new MockChannel();
283+
eventLoopGroup.register(channel);
284+
channel.attr(PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(Protocol.HTTP1_1));
285+
286+
// Acquire a new connection and save the returned future
287+
Future<Channel> acquireFuture = httpOrHttp2ChannelPool.acquire();
288+
289+
// Return a successful connection from the delegate pool
290+
delegateAcquirePromise.setSuccess(channel);
291+
292+
// The returned future should not complete until the release completes
293+
assertThat(acquireFuture.isDone()).isFalse();
294+
295+
// Complete the release
296+
releasePromise.setSuccess(null);
297+
298+
// Assert the returned future completes (within the test timeout)
299+
acquireFuture.await();
300+
}
270301
}

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtilsTest.java

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,39 @@
2323
import static org.mockito.Mockito.when;
2424

2525
import io.netty.channel.Channel;
26+
import io.netty.channel.EventLoopGroup;
27+
import io.netty.channel.nio.NioEventLoopGroup;
2628
import io.netty.handler.ssl.SslContext;
2729
import io.netty.handler.ssl.SslContextBuilder;
2830
import io.netty.handler.ssl.SslHandler;
2931
import io.netty.util.AttributeKey;
3032
import io.netty.util.concurrent.EventExecutor;
33+
import io.netty.util.concurrent.Future;
34+
import io.netty.util.concurrent.GenericFutureListener;
35+
import io.netty.util.concurrent.Promise;
3136
import java.time.Duration;
37+
import java.util.concurrent.atomic.AtomicBoolean;
38+
import java.util.concurrent.atomic.AtomicReference;
3239
import javax.net.ssl.SSLEngine;
40+
import org.junit.AfterClass;
41+
import org.junit.BeforeClass;
3342
import org.junit.Test;
3443
import software.amazon.awssdk.http.nio.netty.internal.MockChannel;
3544

3645
public class NettyUtilsTest {
46+
47+
private static EventLoopGroup eventLoopGroup;
48+
49+
@BeforeClass
50+
public static void setup() {
51+
eventLoopGroup = new NioEventLoopGroup(1);
52+
}
53+
54+
@AfterClass
55+
public static void teardown() throws InterruptedException {
56+
eventLoopGroup.shutdownGracefully().await();
57+
}
58+
3759
@Test
3860
public void testGetOrCreateAttributeKey_calledTwiceWithSameName_returnsSameInstance() {
3961
String attr = "NettyUtilsTest.Foo";
@@ -77,4 +99,98 @@ public void doInEventLoop_notInEventLoop_submits() {
7799
NettyUtils.doInEventLoop(mockExecutor, () -> {});
78100
verify(mockExecutor).submit(any(Runnable.class));
79101
}
102+
103+
@Test
104+
public void runOrPropagate_success_runs() throws Exception {
105+
Promise<String> destination = eventLoopGroup.next().newPromise();
106+
AtomicBoolean reference = new AtomicBoolean();
107+
108+
GenericFutureListener<Future<Void>> listener =
109+
NettyUtils.runOrPropagate(destination, () -> reference.set(true));
110+
111+
Promise<Void> source = eventLoopGroup.next().newPromise();
112+
source.setSuccess(null);
113+
listener.operationComplete(source);
114+
115+
assertThat(reference.get()).isTrue();
116+
}
117+
118+
@Test
119+
public void runOrPropagate_exception_propagates() throws Exception {
120+
Promise<String> destination = eventLoopGroup.next().newPromise();
121+
122+
GenericFutureListener<Future<Void>> listener =
123+
NettyUtils.runOrPropagate(destination, () -> {
124+
});
125+
126+
Promise<Void> source = eventLoopGroup.next().newPromise();
127+
source.setFailure(new RuntimeException("Intentional exception for testing purposes"));
128+
listener.operationComplete(source);
129+
130+
assertThat(destination.cause())
131+
.isInstanceOf(RuntimeException.class)
132+
.hasMessage("Intentional exception for testing purposes");
133+
}
134+
135+
@Test
136+
public void runOrPropagate_cancel_propagates() throws Exception {
137+
Promise<String> destination = eventLoopGroup.next().newPromise();
138+
139+
GenericFutureListener<Future<Void>> listener =
140+
NettyUtils.runOrPropagate(destination, () -> {
141+
});
142+
143+
Promise<Void> source = eventLoopGroup.next().newPromise();
144+
source.cancel(false);
145+
listener.operationComplete(source);
146+
147+
assertThat(destination.isCancelled()).isTrue();
148+
}
149+
150+
@Test
151+
public void consumeOrPropagate_success_consumes() throws Exception {
152+
Promise<String> destination = eventLoopGroup.next().newPromise();
153+
AtomicReference<String> reference = new AtomicReference<>();
154+
155+
GenericFutureListener<Future<String>> listener =
156+
NettyUtils.consumeOrPropagate(destination, reference::set);
157+
158+
Promise<String> source = eventLoopGroup.next().newPromise();
159+
source.setSuccess("test");
160+
listener.operationComplete(source);
161+
162+
assertThat(reference.get()).isEqualTo("test");
163+
}
164+
165+
@Test
166+
public void consumeOrPropagate_exception_propagates() throws Exception {
167+
Promise<String> destination = eventLoopGroup.next().newPromise();
168+
169+
GenericFutureListener<Future<String>> listener =
170+
NettyUtils.consumeOrPropagate(destination, s -> {
171+
});
172+
173+
Promise<String> source = eventLoopGroup.next().newPromise();
174+
source.setFailure(new RuntimeException("Intentional exception for testing purposes"));
175+
listener.operationComplete(source);
176+
177+
assertThat(destination.cause())
178+
.isInstanceOf(RuntimeException.class)
179+
.hasMessage("Intentional exception for testing purposes");
180+
}
181+
182+
@Test
183+
public void consumeOrPropagate_cancel_propagates() throws Exception {
184+
Promise<String> destination = eventLoopGroup.next().newPromise();
185+
186+
GenericFutureListener<Future<String>> listener =
187+
NettyUtils.consumeOrPropagate(destination, s -> {
188+
});
189+
190+
Promise<String> source = eventLoopGroup.next().newPromise();
191+
source.cancel(false);
192+
listener.operationComplete(source);
193+
194+
assertThat(destination.isCancelled()).isTrue();
195+
}
80196
}

0 commit comments

Comments
 (0)