Skip to content

Commit ccde433

Browse files
committed
ReactiveSocket frame logger (tcp)
#### Problem Tracking ReactiveSocket frames is not easy with wire logging as it logs the bytes written/read. #### Modification Added a netty handler to be added as the first handler in the pipeline and logs frame objects as-is written and read on the channel. Following netty's logging handler design, this handler can be configured to log at a particular log level, which can be changed by the user at runtime. #### Result Better tracking of ReactiveSocket frames on the channel.
1 parent 5bb6cea commit ccde433

File tree

3 files changed

+111
-0
lines changed

3 files changed

+111
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2016 Netflix, Inc.
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
* <p>
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
* <p>
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivesocket.transport.tcp;
15+
16+
import io.netty.channel.ChannelDuplexHandler;
17+
import io.netty.channel.ChannelHandlerContext;
18+
import io.netty.channel.ChannelPromise;
19+
import io.reactivesocket.Frame;
20+
import org.slf4j.Logger;
21+
import org.slf4j.LoggerFactory;
22+
import org.slf4j.event.Level;
23+
24+
public class ReactiveSocketFrameLogger extends ChannelDuplexHandler {
25+
26+
private final Logger logger;
27+
private final Level logLevel;
28+
29+
public ReactiveSocketFrameLogger(String name, Level logLevel) {
30+
this.logLevel = logLevel;
31+
logger = LoggerFactory.getLogger(name);
32+
}
33+
34+
@Override
35+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
36+
logFrameIfEnabled(ctx, msg, " Writing frame: ");
37+
super.write(ctx, msg, promise);
38+
}
39+
40+
@Override
41+
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
42+
logFrameIfEnabled(ctx, msg, " Read frame: ");
43+
super.channelRead(ctx, msg);
44+
}
45+
46+
private void logFrameIfEnabled(ChannelHandlerContext ctx, Object msg, String logMsgPrefix) {
47+
if (msg instanceof Frame) {
48+
Frame f = (Frame) msg;
49+
switch (logLevel) {
50+
case ERROR:
51+
if (logger.isErrorEnabled()) {
52+
logger.error(ctx.channel() + logMsgPrefix + f);
53+
}
54+
break;
55+
case WARN:
56+
if (logger.isWarnEnabled()) {
57+
logger.warn(ctx.channel() + logMsgPrefix + f);
58+
}
59+
break;
60+
case INFO:
61+
if (logger.isInfoEnabled()) {
62+
logger.info(ctx.channel() + logMsgPrefix + f);
63+
}
64+
break;
65+
case DEBUG:
66+
if (logger.isDebugEnabled()) {
67+
logger.debug(ctx.channel() + logMsgPrefix + f);
68+
}
69+
break;
70+
case TRACE:
71+
if (logger.isTraceEnabled()) {
72+
logger.trace(ctx.channel() + logMsgPrefix + f);
73+
}
74+
break;
75+
}
76+
}
77+
}
78+
}

reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/client/TcpTransportClient.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import io.reactivesocket.Frame;
2222
import io.reactivesocket.transport.TransportClient;
2323
import io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec;
24+
import io.reactivesocket.transport.tcp.ReactiveSocketFrameLogger;
2425
import io.reactivesocket.transport.tcp.ReactiveSocketLengthCodec;
2526
import io.reactivesocket.transport.tcp.TcpDuplexConnection;
2627
import io.reactivex.netty.protocol.tcp.client.TcpClient;
2728
import org.reactivestreams.Publisher;
29+
import org.slf4j.event.Level;
2830

2931
import java.net.SocketAddress;
3032
import java.util.function.Function;
@@ -56,6 +58,20 @@ public TcpTransportClient configureClient(Function<TcpClient<Frame, Frame>, TcpC
5658
return new TcpTransportClient(configurator.apply(rxNettyClient));
5759
}
5860

61+
/**
62+
* Enable logging of every frame read and written on every connection created by this client.
63+
*
64+
* @param name Name of the logger.
65+
* @param logLevel Level at which the messages will be logged.
66+
*
67+
* @return A new {@link TcpTransportClient}
68+
*/
69+
public TcpTransportClient logReactiveSocketFrames(String name, Level logLevel) {
70+
return configureClient(c ->
71+
c.addChannelHandlerLast("reactive-socket-frame-codec", () -> new ReactiveSocketFrameLogger(name, logLevel))
72+
);
73+
}
74+
5975
public static TcpTransportClient create(SocketAddress serverAddress) {
6076
return new TcpTransportClient(_configureClient(TcpClient.newClient(serverAddress)));
6177
}

reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/TcpTransportServer.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020
import io.reactivesocket.Frame;
2121
import io.reactivesocket.transport.TransportServer;
2222
import io.reactivesocket.transport.tcp.ReactiveSocketFrameCodec;
23+
import io.reactivesocket.transport.tcp.ReactiveSocketFrameLogger;
2324
import io.reactivesocket.transport.tcp.ReactiveSocketLengthCodec;
2425
import io.reactivesocket.transport.tcp.TcpDuplexConnection;
26+
import io.reactivesocket.transport.tcp.client.TcpTransportClient;
2527
import io.reactivex.netty.channel.Connection;
2628
import io.reactivex.netty.protocol.tcp.server.ConnectionHandler;
2729
import io.reactivex.netty.protocol.tcp.server.TcpServer;
30+
import org.slf4j.event.Level;
2831
import rx.Observable;
2932

3033
import java.net.SocketAddress;
@@ -64,6 +67,20 @@ public TcpTransportServer configureServer(Function<TcpServer<Frame, Frame>, TcpS
6467
return new TcpTransportServer(configurator.apply(rxNettyServer));
6568
}
6669

70+
/**
71+
* Enable logging of every frame read and written on every connection accepted by this server.
72+
*
73+
* @param name Name of the logger.
74+
* @param logLevel Level at which the messages will be logged.
75+
*
76+
* @return A new {@link TcpTransportServer}
77+
*/
78+
public TcpTransportServer logReactiveSocketFrames(String name, Level logLevel) {
79+
return configureServer(c -> c.addChannelHandlerLast("reactive-socket-frame-codec",
80+
() -> new ReactiveSocketFrameLogger(name, logLevel))
81+
);
82+
}
83+
6784
public static TcpTransportServer create() {
6885
return create(TcpServer.newServer());
6986
}

0 commit comments

Comments
 (0)