Skip to content

Commit d3cc7b8

Browse files
authored
Revert "[netty-nio-client] Ensure initial channel used for protocol detection is released before re-acquiring (#2882)" (#2895)
This reverts commit 6081647.
1 parent 931ea4d commit d3cc7b8

File tree

5 files changed

+4
-205
lines changed

5 files changed

+4
-205
lines changed

.changes/next-release/bugfix-nettynioclient-0fb07b2.json

Lines changed: 0 additions & 6 deletions
This file was deleted.

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPool.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.PROTOCOL_FUTURE;
1919
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop;
20-
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.runOrPropagate;
2120

2221
import io.netty.channel.Channel;
2322
import io.netty.channel.EventLoop;
@@ -141,7 +140,7 @@ private void completeProtocolConfiguration(Channel newChannel, Protocol protocol
141140
closeAndRelease(newChannel, new IllegalStateException("Pool closed"));
142141
} else {
143142
try {
144-
configureProtocol(newChannel, protocol);
143+
protocolImplPromise.setSuccess(configureProtocol(newChannel, protocol));
145144
} catch (Throwable e) {
146145
closeAndRelease(newChannel, e);
147146
}
@@ -155,7 +154,7 @@ private void closeAndRelease(Channel newChannel, Throwable e) {
155154
protocolImplPromise.setFailure(e);
156155
}
157156

158-
private void configureProtocol(Channel newChannel, Protocol protocol) {
157+
private ChannelPool configureProtocol(Channel newChannel, Protocol protocol) {
159158
if (Protocol.HTTP1_1 == protocol) {
160159
// For HTTP/1.1 we use a traditional channel pool without multiplexing
161160
SdkChannelPool idleConnectionMetricChannelPool = new IdleConnectionCountingChannelPool(eventLoop, delegatePool);
@@ -181,10 +180,8 @@ private void configureProtocol(Channel newChannel, Protocol protocol) {
181180
.build();
182181
}
183182
// Give the channel back so it can be acquired again by protocolImpl
184-
// Await the release completion to ensure we do not unnecessarily acquire a second channel
185-
delegatePool.release(newChannel).addListener(runOrPropagate(protocolImplPromise, () -> {
186-
protocolImplPromise.trySuccess(protocolImpl);
187-
}));
183+
delegatePool.release(newChannel);
184+
return protocolImpl;
188185
}
189186

190187
@Override

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtils.java

Lines changed: 0 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.concurrent.CompletableFuture;
3030
import java.util.concurrent.TimeUnit;
3131
import java.util.function.BiConsumer;
32-
import java.util.function.Consumer;
3332
import java.util.function.Function;
3433
import javax.net.ssl.SSLEngine;
3534
import javax.net.ssl.SSLParameters;
@@ -222,48 +221,4 @@ private static void configureSslEngine(SSLEngine sslEngine) {
222221
sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
223222
sslEngine.setSSLParameters(sslParameters);
224223
}
225-
226-
/**
227-
* Create a {@link GenericFutureListener} that will propagate any failures or cancellations to the provided {@link Promise},
228-
* or invoke the provided {@link Consumer} with the result of a successful operation completion. This is useful for chaining
229-
* together multiple futures that may depend upon each other but that may not have the same return type.
230-
* <p>
231-
* Note that if you do not need the value returned by a successful completion (or if it returns {@link Void}) you may use
232-
* {@link #runOrPropagate(Promise, Runnable)} instead.
233-
*
234-
* @param destination the Promise to notify upon failure or cancellation
235-
* @param onSuccess the Consumer to invoke upon success
236-
*/
237-
public static <T> GenericFutureListener<Future<T>> consumeOrPropagate(Promise<?> destination, Consumer<T> onSuccess) {
238-
return f -> {
239-
if (f.isSuccess()) {
240-
T result = f.getNow();
241-
onSuccess.accept(result);
242-
} else if (f.isCancelled()) {
243-
destination.cancel(false);
244-
} else {
245-
destination.tryFailure(f.cause());
246-
}
247-
};
248-
}
249-
250-
/**
251-
* Create a {@link GenericFutureListener} that will propagate any failures or cancellations to the provided {@link Promise},
252-
* or invoke the provided {@link Runnable} upon successful operation completion. This is useful for chaining together multiple
253-
* futures that may depend upon each other but that may not have the same return type.
254-
*
255-
* @param destination the Promise to notify upon failure or cancellation
256-
* @param onSuccess the Runnable to invoke upon success
257-
*/
258-
public static <T> GenericFutureListener<Future<T>> runOrPropagate(Promise<?> destination, Runnable onSuccess) {
259-
return f -> {
260-
if (f.isSuccess()) {
261-
onSuccess.run();
262-
} else if (f.isCancelled()) {
263-
destination.cancel(false);
264-
} else {
265-
destination.tryFailure(f.cause());
266-
}
267-
};
268-
}
269224
}

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/http2/HttpOrHttp2ChannelPoolTest.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
package software.amazon.awssdk.http.nio.netty.internal.http2;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19-
import static org.mockito.Matchers.any;
2019
import static org.mockito.Matchers.eq;
2120
import static org.mockito.Mockito.verify;
2221
import static org.mockito.Mockito.when;
@@ -242,9 +241,7 @@ public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessFor
242241

243242
public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessForProtocol(Protocol protocol) throws Exception {
244243
Promise<Channel> acquirePromise = eventLoopGroup.next().newPromise();
245-
Promise<Void> releasePromise = eventLoopGroup.next().newPromise();
246244
when(mockDelegatePool.acquire()).thenReturn(acquirePromise);
247-
when(mockDelegatePool.release(any(Channel.class))).thenReturn(releasePromise);
248245

249246
// startConnection
250247
httpOrHttp2ChannelPool.acquire();
@@ -261,7 +258,6 @@ public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessFor
261258
eventLoopGroup.register(channel);
262259
channel.attr(PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(protocol));
263260
acquirePromise.setSuccess(channel);
264-
releasePromise.setSuccess(null);
265261

266262
metricsFuture.join();
267263
MetricCollection metrics = metricCollector.collect();
@@ -271,31 +267,4 @@ public void incompleteProtocolFutureDelaysMetricsDelegationAndForwardsSuccessFor
271267
assertThat(metrics.metricValues(HttpMetric.AVAILABLE_CONCURRENCY).get(0)).isBetween(0, 1);
272268
assertThat(metrics.metricValues(HttpMetric.LEASED_CONCURRENCY).get(0)).isBetween(0, 1);
273269
}
274-
275-
@Test(timeout = 5_000)
276-
public void protocolFutureAwaitsReleaseFuture() throws Exception {
277-
Promise<Channel> delegateAcquirePromise = eventLoopGroup.next().newPromise();
278-
Promise<Void> releasePromise = eventLoopGroup.next().newPromise();
279-
when(mockDelegatePool.acquire()).thenReturn(delegateAcquirePromise);
280-
when(mockDelegatePool.release(any(Channel.class))).thenReturn(releasePromise);
281-
282-
MockChannel channel = new MockChannel();
283-
eventLoopGroup.register(channel);
284-
channel.attr(PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(Protocol.HTTP1_1));
285-
286-
// Acquire a new connection and save the returned future
287-
Future<Channel> acquireFuture = httpOrHttp2ChannelPool.acquire();
288-
289-
// Return a successful connection from the delegate pool
290-
delegateAcquirePromise.setSuccess(channel);
291-
292-
// The returned future should not complete until the release completes
293-
assertThat(acquireFuture.isDone()).isFalse();
294-
295-
// Complete the release
296-
releasePromise.setSuccess(null);
297-
298-
// Assert the returned future completes (within the test timeout)
299-
acquireFuture.await();
300-
}
301270
}

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtilsTest.java

Lines changed: 0 additions & 116 deletions
Original file line numberDiff line numberDiff line change
@@ -23,39 +23,17 @@
2323
import static org.mockito.Mockito.when;
2424

2525
import io.netty.channel.Channel;
26-
import io.netty.channel.EventLoopGroup;
27-
import io.netty.channel.nio.NioEventLoopGroup;
2826
import io.netty.handler.ssl.SslContext;
2927
import io.netty.handler.ssl.SslContextBuilder;
3028
import io.netty.handler.ssl.SslHandler;
3129
import io.netty.util.AttributeKey;
3230
import io.netty.util.concurrent.EventExecutor;
33-
import io.netty.util.concurrent.Future;
34-
import io.netty.util.concurrent.GenericFutureListener;
35-
import io.netty.util.concurrent.Promise;
3631
import java.time.Duration;
37-
import java.util.concurrent.atomic.AtomicBoolean;
38-
import java.util.concurrent.atomic.AtomicReference;
3932
import javax.net.ssl.SSLEngine;
40-
import org.junit.AfterClass;
41-
import org.junit.BeforeClass;
4233
import org.junit.Test;
4334
import software.amazon.awssdk.http.nio.netty.internal.MockChannel;
4435

4536
public class NettyUtilsTest {
46-
47-
private static EventLoopGroup eventLoopGroup;
48-
49-
@BeforeClass
50-
public static void setup() {
51-
eventLoopGroup = new NioEventLoopGroup(1);
52-
}
53-
54-
@AfterClass
55-
public static void teardown() throws InterruptedException {
56-
eventLoopGroup.shutdownGracefully().await();
57-
}
58-
5937
@Test
6038
public void testGetOrCreateAttributeKey_calledTwiceWithSameName_returnsSameInstance() {
6139
String attr = "NettyUtilsTest.Foo";
@@ -99,98 +77,4 @@ public void doInEventLoop_notInEventLoop_submits() {
9977
NettyUtils.doInEventLoop(mockExecutor, () -> {});
10078
verify(mockExecutor).submit(any(Runnable.class));
10179
}
102-
103-
@Test
104-
public void runOrPropagate_success_runs() throws Exception {
105-
Promise<String> destination = eventLoopGroup.next().newPromise();
106-
AtomicBoolean reference = new AtomicBoolean();
107-
108-
GenericFutureListener<Future<Void>> listener =
109-
NettyUtils.runOrPropagate(destination, () -> reference.set(true));
110-
111-
Promise<Void> source = eventLoopGroup.next().newPromise();
112-
source.setSuccess(null);
113-
listener.operationComplete(source);
114-
115-
assertThat(reference.get()).isTrue();
116-
}
117-
118-
@Test
119-
public void runOrPropagate_exception_propagates() throws Exception {
120-
Promise<String> destination = eventLoopGroup.next().newPromise();
121-
122-
GenericFutureListener<Future<Void>> listener =
123-
NettyUtils.runOrPropagate(destination, () -> {
124-
});
125-
126-
Promise<Void> source = eventLoopGroup.next().newPromise();
127-
source.setFailure(new RuntimeException("Intentional exception for testing purposes"));
128-
listener.operationComplete(source);
129-
130-
assertThat(destination.cause())
131-
.isInstanceOf(RuntimeException.class)
132-
.hasMessage("Intentional exception for testing purposes");
133-
}
134-
135-
@Test
136-
public void runOrPropagate_cancel_propagates() throws Exception {
137-
Promise<String> destination = eventLoopGroup.next().newPromise();
138-
139-
GenericFutureListener<Future<Void>> listener =
140-
NettyUtils.runOrPropagate(destination, () -> {
141-
});
142-
143-
Promise<Void> source = eventLoopGroup.next().newPromise();
144-
source.cancel(false);
145-
listener.operationComplete(source);
146-
147-
assertThat(destination.isCancelled()).isTrue();
148-
}
149-
150-
@Test
151-
public void consumeOrPropagate_success_consumes() throws Exception {
152-
Promise<String> destination = eventLoopGroup.next().newPromise();
153-
AtomicReference<String> reference = new AtomicReference<>();
154-
155-
GenericFutureListener<Future<String>> listener =
156-
NettyUtils.consumeOrPropagate(destination, reference::set);
157-
158-
Promise<String> source = eventLoopGroup.next().newPromise();
159-
source.setSuccess("test");
160-
listener.operationComplete(source);
161-
162-
assertThat(reference.get()).isEqualTo("test");
163-
}
164-
165-
@Test
166-
public void consumeOrPropagate_exception_propagates() throws Exception {
167-
Promise<String> destination = eventLoopGroup.next().newPromise();
168-
169-
GenericFutureListener<Future<String>> listener =
170-
NettyUtils.consumeOrPropagate(destination, s -> {
171-
});
172-
173-
Promise<String> source = eventLoopGroup.next().newPromise();
174-
source.setFailure(new RuntimeException("Intentional exception for testing purposes"));
175-
listener.operationComplete(source);
176-
177-
assertThat(destination.cause())
178-
.isInstanceOf(RuntimeException.class)
179-
.hasMessage("Intentional exception for testing purposes");
180-
}
181-
182-
@Test
183-
public void consumeOrPropagate_cancel_propagates() throws Exception {
184-
Promise<String> destination = eventLoopGroup.next().newPromise();
185-
186-
GenericFutureListener<Future<String>> listener =
187-
NettyUtils.consumeOrPropagate(destination, s -> {
188-
});
189-
190-
Promise<String> source = eventLoopGroup.next().newPromise();
191-
source.cancel(false);
192-
listener.operationComplete(source);
193-
194-
assertThat(destination.isCancelled()).isTrue();
195-
}
19680
}

0 commit comments

Comments
 (0)