Skip to content

Commit 1546625

Browse files
authored
Ensure ByteBuf#release is invoked for already sent HTTP/2 response (#3236)
Fix the observed exception: java.lang.IllegalStateException: Stream 3 in unexpected state HALF_CLOSED_LOCAL at io.netty.handler.codec.http2.DefaultHttp2ConnectionEncoder.writeData(DefaultHttp2ConnectionEncoder.java:135) at io.netty.handler.codec.http2.DecoratingHttp2FrameWriter.writeData(DecoratingHttp2FrameWriter.java:39) at io.netty.handler.codec.http2.Http2FrameCodec.write(Http2FrameCodec.java:310) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:895) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868) at io.netty.handler.codec.http2.AbstractHttp2StreamChannel.write0(AbstractHttp2StreamChannel.java:1192) at io.netty.handler.codec.http2.AbstractHttp2StreamChannel$Http2ChannelUnsafe.writeHttp2StreamFrame(AbstractHttp2StreamChannel.java:1045) at io.netty.handler.codec.http2.AbstractHttp2StreamChannel$Http2ChannelUnsafe.write(AbstractHttp2StreamChannel.java:1003) at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:889) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868) at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:113) at io.netty.handler.codec.MessageToMessageCodec.write(MessageToMessageCodec.java:116) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:891) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868) at reactor.netty.http.server.Http2StreamBridgeServerHandler.write(Http2StreamBridgeServerHandler.java:181)
1 parent 066a645 commit 1546625

File tree

2 files changed

+29
-9
lines changed

2 files changed

+29
-9
lines changed

reactor-netty-http/src/main/java/reactor/netty/http/server/Http2StreamBridgeServerHandler.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2018-2023 VMware, Inc. or its affiliates, All Rights Reserved.
2+
* Copyright (c) 2018-2024 VMware, Inc. or its affiliates, All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -167,6 +167,16 @@ else if (!pendingResponse) {
167167
@SuppressWarnings("FutureReturnValueIgnored")
168168
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
169169
if (msg instanceof ByteBuf) {
170+
if (!pendingResponse) {
171+
if (HttpServerOperations.log.isDebugEnabled()) {
172+
HttpServerOperations.log.debug(
173+
format(ctx.channel(), "Dropped HTTP content, since response has been sent already: {}"), msg);
174+
}
175+
((ByteBuf) msg).release();
176+
promise.setSuccess();
177+
return;
178+
}
179+
170180
//"FutureReturnValueIgnored" this is deliberate
171181
ctx.write(new DefaultHttpContent((ByteBuf) msg), promise);
172182
}

reactor-netty-http/src/test/java/reactor/netty/http/server/HttpServerTests.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@
111111
import org.junit.jupiter.api.Test;
112112
import org.junit.jupiter.api.Timeout;
113113
import org.junit.jupiter.params.ParameterizedTest;
114+
import org.junit.jupiter.params.provider.EnumSource;
114115
import org.junit.jupiter.params.provider.MethodSource;
115116
import org.junit.jupiter.params.provider.ValueSource;
116117
import org.reactivestreams.Publisher;
@@ -1047,7 +1048,8 @@ void testDropPublisherConnectionClose() throws Exception {
10471048
(req, out) -> {
10481049
req.addHeader("Connection", "close");
10491050
return out;
1050-
});
1051+
},
1052+
HttpProtocol.HTTP11);
10511053
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
10521054
assertThat(ReferenceCountUtil.refCnt(data)).isEqualTo(0);
10531055
}
@@ -1062,12 +1064,14 @@ void testDropMessageConnectionClose() throws Exception {
10621064
(req, out) -> {
10631065
req.addHeader("Connection", "close");
10641066
return out;
1065-
});
1067+
},
1068+
HttpProtocol.HTTP11);
10661069
assertThat(ReferenceCountUtil.refCnt(data)).isEqualTo(0);
10671070
}
10681071

1069-
@Test
1070-
void testDropPublisher_1() throws Exception {
1072+
@ParameterizedTest
1073+
@EnumSource(value = HttpProtocol.class, names = {"HTTP11", "H2C"})
1074+
void testDropPublisher_1(HttpProtocol protocol) throws Exception {
10711075
CountDownLatch latch = new CountDownLatch(1);
10721076
ByteBuf data = ByteBufAllocator.DEFAULT.buffer();
10731077
data.writeCharSequence("test", Charset.defaultCharset());
@@ -1076,7 +1080,8 @@ void testDropPublisher_1() throws Exception {
10761080
.send(Flux.defer(() -> Flux.just(data, data.retain(), data.retain()))
10771081
.doFinally(s -> latch.countDown()))
10781082
.then(),
1079-
(req, out) -> out);
1083+
(req, out) -> out,
1084+
protocol);
10801085
assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();
10811086
assertThat(ReferenceCountUtil.refCnt(data)).isEqualTo(0);
10821087
}
@@ -1089,7 +1094,8 @@ void testDropPublisher_2() throws Exception {
10891094
(req, res) -> res.header("Content-Length", "0")
10901095
.send(Mono.just(data))
10911096
.then(),
1092-
(req, out) -> out);
1097+
(req, out) -> out,
1098+
HttpProtocol.HTTP11);
10931099
assertThat(ReferenceCountUtil.refCnt(data)).isEqualTo(0);
10941100
}
10951101

@@ -1100,23 +1106,27 @@ void testDropMessage() throws Exception {
11001106
doTestDropData(
11011107
(req, res) -> res.header("Content-Length", "0")
11021108
.sendObject(data),
1103-
(req, out) -> out);
1109+
(req, out) -> out,
1110+
HttpProtocol.HTTP11);
11041111
assertThat(ReferenceCountUtil.refCnt(data)).isEqualTo(0);
11051112
}
11061113

11071114
private void doTestDropData(
11081115
BiFunction<? super HttpServerRequest, ? super
11091116
HttpServerResponse, ? extends Publisher<Void>> serverFn,
1110-
BiFunction<? super HttpClientRequest, ? super NettyOutbound, ? extends Publisher<Void>> clientFn)
1117+
BiFunction<? super HttpClientRequest, ? super NettyOutbound, ? extends Publisher<Void>> clientFn,
1118+
HttpProtocol protocol)
11111119
throws Exception {
11121120
disposableServer =
11131121
createServer()
1122+
.protocol(protocol)
11141123
.handle(serverFn)
11151124
.bindNow(Duration.ofSeconds(30));
11161125

11171126
CountDownLatch latch = new CountDownLatch(1);
11181127
String response =
11191128
createClient(disposableServer.port())
1129+
.protocol(protocol)
11201130
.doOnRequest((req, conn) -> conn.onTerminate()
11211131
.subscribe(null, null, latch::countDown))
11221132
.request(HttpMethod.GET)

0 commit comments

Comments
 (0)