Skip to content

Commit 27773f4

Browse files
authored
Merge pull request #1557 from dagnir/h2-flush-before-read
Flush parent before reading on H2 connection
2 parents 99dc750 + 219c46f commit 27773f4

File tree

5 files changed

+160
-1
lines changed

5 files changed

+160
-1
lines changed

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPool.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.netty.util.concurrent.Future;
2828
import io.netty.util.concurrent.Promise;
2929
import software.amazon.awssdk.annotations.SdkInternalApi;
30+
import software.amazon.awssdk.http.nio.netty.internal.http2.FlushOnReadHandler;
3031

3132
/**
3233
* Removes any per request {@link ChannelHandler} from the pipeline prior to releasing
@@ -79,6 +80,7 @@ private void removePerRequestHandlers(Channel channel) {
7980
removeIfExists(channel.pipeline(),
8081
HttpStreamsClientHandler.class,
8182
LastHttpContentHandler.class,
83+
FlushOnReadHandler.class,
8284
ResponseHandler.class,
8385
ReadTimeoutHandler.class,
8486
WriteTimeoutHandler.class);

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.slf4j.LoggerFactory;
6363
import software.amazon.awssdk.annotations.SdkInternalApi;
6464
import software.amazon.awssdk.http.Protocol;
65+
import software.amazon.awssdk.http.nio.netty.internal.http2.FlushOnReadHandler;
6566
import software.amazon.awssdk.http.nio.netty.internal.http2.Http2ToHttpInboundAdapter;
6667
import software.amazon.awssdk.http.nio.netty.internal.http2.HttpToHttp2OutboundAdapter;
6768
import software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils;
@@ -172,6 +173,9 @@ private boolean tryConfigurePipeline() {
172173
}
173174

174175
pipeline.addLast(LastHttpContentHandler.create());
176+
if (Protocol.HTTP2.equals(protocol)) {
177+
pipeline.addLast(FlushOnReadHandler.getInstance());
178+
}
175179
pipeline.addLast(new HttpStreamsClientHandler());
176180
pipeline.addLast(ResponseHandler.getInstance());
177181

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.internal.http2;
17+
18+
import io.netty.channel.ChannelHandler;
19+
import io.netty.channel.ChannelHandlerContext;
20+
import io.netty.channel.ChannelOutboundHandlerAdapter;
21+
import software.amazon.awssdk.annotations.SdkInternalApi;
22+
23+
/**
24+
* This is an HTTP/2 related workaround for an issue where a WINDOW_UPDATE is
25+
* queued but not written to the socket, causing a read() on the channel to
26+
* hang if the remote endpoint thinks our inbound window is 0.
27+
*/
28+
@SdkInternalApi
29+
@ChannelHandler.Sharable
30+
public final class FlushOnReadHandler extends ChannelOutboundHandlerAdapter {
31+
private static final FlushOnReadHandler INSTANCE = new FlushOnReadHandler();
32+
33+
private FlushOnReadHandler() {
34+
}
35+
36+
@Override
37+
public void read(ChannelHandlerContext ctx) {
38+
//Note: order is important, we need to fire the read() event first
39+
// since it's what triggers the WINDOW_UPDATE frame write
40+
ctx.read();
41+
ctx.channel().parent().flush();
42+
}
43+
44+
public static FlushOnReadHandler getInstance() {
45+
return INSTANCE;
46+
}
47+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.internal.http2;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.mockito.Mockito.when;
20+
import io.netty.channel.Channel;
21+
import io.netty.channel.ChannelHandlerContext;
22+
import org.junit.Test;
23+
import org.junit.runner.RunWith;
24+
import org.mockito.InOrder;
25+
import org.mockito.Mock;
26+
import org.mockito.Mockito;
27+
import org.mockito.runners.MockitoJUnitRunner;
28+
29+
@RunWith(MockitoJUnitRunner.class)
30+
public class FlushOnReadTest {
31+
32+
@Mock
33+
private ChannelHandlerContext mockCtx;
34+
35+
@Mock
36+
private Channel mockChannel;
37+
38+
@Mock
39+
private Channel mockParentChannel;
40+
41+
@Test
42+
public void read_forwardsReadBeforeParentFlush() {
43+
when(mockCtx.channel()).thenReturn(mockChannel);
44+
when(mockChannel.parent()).thenReturn(mockParentChannel);
45+
46+
FlushOnReadHandler handler = FlushOnReadHandler.getInstance();
47+
48+
handler.read(mockCtx);
49+
50+
InOrder inOrder = Mockito.inOrder(mockCtx, mockParentChannel);
51+
52+
inOrder.verify(mockCtx).read();
53+
inOrder.verify(mockParentChannel).flush();
54+
}
55+
56+
@Test
57+
public void getInstance_returnsSingleton() {
58+
assertThat(FlushOnReadHandler.getInstance() == FlushOnReadHandler.getInstance()).isTrue();
59+
}
60+
}

services/kinesis/src/it/java/software/amazon/awssdk/services/kinesis/SubscribeToShardIntegrationTest.java

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
1919

20+
import io.reactivex.Flowable;
2021
import java.time.Duration;
2122
import java.util.ArrayList;
2223
import java.util.List;
@@ -37,6 +38,8 @@
3738
import software.amazon.awssdk.core.SdkBytes;
3839
import software.amazon.awssdk.core.async.SdkPublisher;
3940
import software.amazon.awssdk.http.SdkHttpResponse;
41+
import software.amazon.awssdk.http.nio.netty.Http2Configuration;
42+
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
4043
import software.amazon.awssdk.services.kinesis.model.ConsumerStatus;
4144
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
4245
import software.amazon.awssdk.services.kinesis.model.Record;
@@ -82,6 +85,38 @@ public void tearDown() {
8285
.enforceConsumerDeletion(true)).join();
8386
}
8487

88+
@Test
89+
public void subscribeToShard_smallWindow_doesNotTimeOutReads() {
90+
// We want sufficiently large records (relative to the initial window
91+
// size we're choosing) so the client has to send multiple
92+
// WINDOW_UPDATEs to receive them
93+
for (int i = 0; i < 16; ++i) {
94+
putRecord(64 * 1024);
95+
}
96+
97+
KinesisAsyncClient smallWindowAsyncClient = KinesisAsyncClient.builder()
98+
.credentialsProvider(CREDENTIALS_PROVIDER_CHAIN)
99+
.httpClientBuilder(NettyNioAsyncHttpClient.builder()
100+
.http2Configuration(Http2Configuration.builder()
101+
.initialWindowSize(16384)
102+
.build()))
103+
.build();
104+
105+
try {
106+
smallWindowAsyncClient.subscribeToShard(r -> r.consumerARN(consumerArn)
107+
.shardId(shardId)
108+
.startingPosition(s -> s.type(ShardIteratorType.TRIM_HORIZON)),
109+
SubscribeToShardResponseHandler.builder()
110+
.onEventStream(es -> Flowable.fromPublisher(es).forEach(e -> {}))
111+
.onResponse(this::verifyHttpMetadata)
112+
.build())
113+
.join();
114+
115+
} finally {
116+
smallWindowAsyncClient.close();
117+
}
118+
}
119+
85120
@Test
86121
public void subscribeToShard_ReceivesAllData() {
87122
List<SdkBytes> producedData = new ArrayList<>();
@@ -178,14 +213,25 @@ private void waitForStreamToBeActive() {
178213
.orFailAfter(Duration.ofMinutes(5));
179214
}
180215

216+
181217
/**
182218
* Puts a random record to the stream.
183219
*
184220
* @return Record data that was put.
185221
*/
186222
private Optional<SdkBytes> putRecord() {
223+
return putRecord(50);
224+
}
225+
226+
/**
227+
* Puts a random record to the stream.
228+
*
229+
* @param len The number of bytes to generate for the record.
230+
* @return Record data that was put.
231+
*/
232+
private Optional<SdkBytes> putRecord(int len) {
187233
try {
188-
SdkBytes data = SdkBytes.fromByteArray(RandomUtils.nextBytes(50));
234+
SdkBytes data = SdkBytes.fromByteArray(RandomUtils.nextBytes(len));
189235
asyncClient.putRecord(PutRecordRequest.builder()
190236
.streamName(streamName)
191237
.data(data)

0 commit comments

Comments
 (0)