Skip to content

Commit 6d7a5cd

Browse files
authored
Ensure HttpServerMetricsRecorder#recordServerConnectionInactive/Close is invoked for websockets (#3229)
Fixes #3222
1 parent 5b0f94b commit 6d7a5cd

File tree

2 files changed

+127
-2
lines changed

2 files changed

+127
-2
lines changed

reactor-netty-http/src/main/java/reactor/netty/http/server/WebsocketServerOperations.java

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,11 @@ final class WebsocketServerOperations extends HttpServerOperations
9090
else {
9191
removeHandler(NettyPipeline.HttpTrafficHandler);
9292
removeHandler(NettyPipeline.AccessLogHandler);
93-
removeHandler(NettyPipeline.HttpMetricsHandler);
93+
ChannelHandler handler = channel.pipeline().get(NettyPipeline.HttpMetricsHandler);
94+
if (handler != null) {
95+
replaceHandler(NettyPipeline.HttpMetricsHandler,
96+
new WebsocketHttpServerMetricsHandler((AbstractHttpServerMetricsHandler) handler));
97+
}
9498

9599
handshakerResult = channel.newPromise();
96100
HttpRequest request = new DefaultFullHttpRequest(replaced.version(),
@@ -295,4 +299,70 @@ public String selectedSubprotocol() {
295299
static final AtomicIntegerFieldUpdater<WebsocketServerOperations> CLOSE_SENT =
296300
AtomicIntegerFieldUpdater.newUpdater(WebsocketServerOperations.class,
297301
"closeSent");
302+
303+
static final class WebsocketHttpServerMetricsHandler extends AbstractHttpServerMetricsHandler {
304+
305+
final HttpServerMetricsRecorder recorder;
306+
307+
WebsocketHttpServerMetricsHandler(AbstractHttpServerMetricsHandler copy) {
308+
super(copy);
309+
this.recorder = copy.recorder();
310+
}
311+
312+
@Override
313+
public void channelActive(ChannelHandlerContext ctx) {
314+
ctx.fireChannelActive();
315+
}
316+
317+
@Override
318+
public void channelInactive(ChannelHandlerContext ctx) {
319+
try {
320+
if (channelOpened && recorder instanceof MicrometerHttpServerMetricsRecorder) {
321+
// For custom user recorders, we don't propagate the channelInactive event, because this will be done
322+
// by the ChannelMetricsHandler itself. ChannelMetricsHandler is only present when the recorder is
323+
// not our MicrometerHttpServerMetricsRecorder. See HttpServerConfig class.
324+
channelOpened = false;
325+
// Always use the real connection local address without any proxy information
326+
recorder.recordServerConnectionClosed(ctx.channel().localAddress());
327+
}
328+
329+
if (channelActivated) {
330+
channelActivated = false;
331+
// Always use the real connection local address without any proxy information
332+
recorder.recordServerConnectionInactive(ctx.channel().localAddress());
333+
}
334+
}
335+
catch (RuntimeException e) {
336+
// Allow request-response exchange to continue, unaffected by metrics problem
337+
if (log.isWarnEnabled()) {
338+
log.warn(format(ctx.channel(), "Exception caught while recording metrics."), e);
339+
}
340+
}
341+
finally {
342+
ctx.fireChannelInactive();
343+
}
344+
}
345+
346+
@Override
347+
public void channelRead(ChannelHandlerContext ctx, Object msg) {
348+
ctx.fireChannelRead(msg);
349+
}
350+
351+
@Override
352+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
353+
ctx.fireExceptionCaught(cause);
354+
}
355+
356+
@Override
357+
@SuppressWarnings("FutureReturnValueIgnored")
358+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
359+
//"FutureReturnValueIgnored" this is deliberate
360+
ctx.write(msg, promise);
361+
}
362+
363+
@Override
364+
public HttpServerMetricsRecorder recorder() {
365+
return recorder;
366+
}
367+
}
298368
}

reactor-netty-http/src/test/java/reactor/netty/http/HttpMetricsHandlerTests.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ static void createSelfSignedCertificate() throws CertificateException {
171171
* <li> /5 is used by testServerConnectionsRecorder test</li>
172172
* <li> /6 is used by testServerConnectionsMicrometerConnectionClose test</li>
173173
* <li> /7 is used by testServerConnectionsRecorderConnectionClose test</li>
174+
* <li> /8 is used by testServerConnectionsWebsocketMicrometer test</li>
175+
* <li> /9 is used by testServerConnectionsWebsocketRecorder test</li>
174176
* </ul>
175177
*/
176178
@BeforeEach
@@ -200,7 +202,12 @@ void setUp() {
200202
.get("/7", (req, res) -> {
201203
checkServerConnectionsRecorder(req);
202204
return Mono.delay(Duration.ofMillis(200)).then(res.send());
203-
}));
205+
})
206+
.get("/8", (req, res) -> res.sendWebsocket((in, out) ->
207+
out.sendString(Mono.just("Hello World!").doOnNext(b -> checkServerConnectionsMicrometer(req)))))
208+
.get("/9", (req, res) -> res.sendWebsocket((in, out) ->
209+
out.sendString(Mono.just("Hello World!").doOnNext(b -> checkServerConnectionsRecorder(req)))))
210+
);
204211

205212
provider = ConnectionProvider.create("HttpMetricsHandlerTests", 1);
206213
httpClient = createClient(provider, () -> disposableServer.address())
@@ -746,6 +753,28 @@ void testServerConnectionsMicrometerConnectionClose(HttpProtocol[] serverProtoco
746753
}
747754
}
748755

756+
@Test
757+
void testServerConnectionsWebsocketMicrometer() throws Exception {
758+
disposableServer = httpServer
759+
.doOnConnection(cnx -> ServerCloseHandler.INSTANCE.register(cnx.channel()))
760+
.bindNow();
761+
762+
String address = formatSocketAddress(disposableServer.address());
763+
764+
httpClient.websocket()
765+
.uri("/8")
766+
.handle((in, out) -> in.receive().aggregate().asString())
767+
.as(StepVerifier::create)
768+
.expectNext("Hello World!")
769+
.expectComplete()
770+
.verify(Duration.ofSeconds(30));
771+
772+
// make sure the client socket is closed on the server side before checking server metrics
773+
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
774+
assertGauge(registry, SERVER_CONNECTIONS_TOTAL, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0);
775+
assertGauge(registry, SERVER_CONNECTIONS_ACTIVE, URI, HTTP, LOCAL_ADDRESS, address).hasValueEqualTo(0);
776+
}
777+
749778
@ParameterizedTest
750779
@MethodSource("httpCompatibleProtocols")
751780
void testServerConnectionsRecorder(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@@ -849,6 +878,32 @@ void testServerConnectionsRecorderConnectionClose(HttpProtocol[] serverProtocols
849878
}
850879
}
851880

881+
@Test
882+
void testServerConnectionsWebsocketRecorder() throws Exception {
883+
ServerRecorder.INSTANCE.reset();
884+
disposableServer = httpServer.metrics(true, ServerRecorder.supplier(), Function.identity())
885+
.doOnConnection(cnx -> ServerCloseHandler.INSTANCE.register(cnx.channel()))
886+
.bindNow();
887+
888+
String address = formatSocketAddress(disposableServer.address());
889+
890+
httpClient.websocket()
891+
.uri("/9")
892+
.handle((in, out) -> in.receive().aggregate().asString())
893+
.as(StepVerifier::create)
894+
.expectNext("Hello World!")
895+
.expectComplete()
896+
.verify(Duration.ofSeconds(30));
897+
898+
// make sure the client socket is closed on the server side before checking server metrics
899+
assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();
900+
assertThat(ServerRecorder.INSTANCE.error.get()).isNull();
901+
assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0);
902+
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsAmount.get()).isEqualTo(0);
903+
assertThat(ServerRecorder.INSTANCE.onActiveConnectionsLocalAddr.get()).isEqualTo(address);
904+
assertThat(ServerRecorder.INSTANCE.onInactiveConnectionsLocalAddr.get()).isEqualTo(address);
905+
}
906+
852907
@Test
853908
void testIssue896() throws Exception {
854909
disposableServer = httpServer.noSSL()

0 commit comments

Comments
 (0)