Skip to content

Commit 691e044

Browse files
steveguryNiteshKant
authored andcommitted
Refactor ReactiveSocketServerHandler to be not shareable. (#94)
** Problem ** There's a memory leak in `ReactiveSocketServerHandler`, it keep adding entries in the `duplexConnections` map but never remove them. ** Solution ** Instead of having only one `ReactiveSocketServerHandler`, and manage resources manually, we let Netty allocate one instance per Channel. Then no resource management is necessary. ** Modifications ** I refactored all the uage of `ReactiveSocketServerHandler` whithout changing the logic. I also got rid of the method ``` public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } ``` since we only use `writeAndFlush` in the DuplexConnection (we're sure there's nothing to flush).
1 parent 54e1318 commit 691e044

File tree

8 files changed

+222
-320
lines changed

8 files changed

+222
-320
lines changed

reactivesocket-transport-tcp/src/examples/java/io/reactivesocket/transport/tcp/EchoServerHandler.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,10 @@
1616
public class EchoServerHandler extends ByteToMessageDecoder {
1717
private static SimpleChannelInboundHandler<FullHttpRequest> httpHandler = new HttpServerHandler();
1818

19-
private static ReactiveSocketServerHandler reactiveSocketHandler = ReactiveSocketServerHandler.create((setupPayload, rs) ->
20-
new RequestHandler.Builder().withRequestResponse(payload -> s -> {
21-
s.onNext(payload);
22-
s.onComplete();
23-
}).build());
19+
private static RequestHandler requestHandler = new RequestHandler.Builder().withRequestResponse(payload -> s -> {
20+
s.onNext(payload);
21+
s.onComplete();
22+
}).build();
2423

2524
@Override
2625
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
@@ -61,6 +60,8 @@ private void switchToHttp(ChannelHandlerContext ctx) {
6160

6261
private void switchToReactiveSocket(ChannelHandlerContext ctx) {
6362
ChannelPipeline p = ctx.pipeline();
63+
ReactiveSocketServerHandler reactiveSocketHandler =
64+
ReactiveSocketServerHandler.create((setupPayload, rs) -> requestHandler);
6465
p.addLast(reactiveSocketHandler);
6566
p.remove(this);
6667
}

reactivesocket-transport-tcp/src/main/java/io/reactivesocket/transport/tcp/server/ReactiveSocketServerHandler.java

Lines changed: 20 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@
1616
package io.reactivesocket.transport.tcp.server;
1717

1818
import io.netty.buffer.ByteBuf;
19-
import io.netty.channel.ChannelHandler;
2019
import io.netty.channel.ChannelHandlerContext;
21-
import io.netty.channel.ChannelId;
2220
import io.netty.channel.ChannelInboundHandlerAdapter;
2321
import io.netty.channel.ChannelPipeline;
2422
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
@@ -28,51 +26,51 @@
2826
import io.reactivesocket.LeaseGovernor;
2927
import io.reactivesocket.ReactiveSocket;
3028
import io.reactivesocket.transport.tcp.MutableDirectByteBuf;
31-
import org.agrona.BitUtil;
3229
import org.slf4j.Logger;
3330
import org.slf4j.LoggerFactory;
3431

35-
import java.util.concurrent.ConcurrentHashMap;
32+
import static org.agrona.BitUtil.SIZE_OF_INT;
3633

37-
@ChannelHandler.Sharable
3834
public class ReactiveSocketServerHandler extends ChannelInboundHandlerAdapter {
39-
private Logger logger = LoggerFactory.getLogger(ReactiveSocketServerHandler.class);
40-
41-
private ConcurrentHashMap<ChannelId, ServerTcpDuplexConnection> duplexConnections = new ConcurrentHashMap<>();
35+
private static final Logger logger = LoggerFactory.getLogger(ReactiveSocketServerHandler.class);
36+
private static final int MAX_FRAME_LENGTH = Integer.MAX_VALUE >> 1;
4237

4338
private ConnectionSetupHandler setupHandler;
44-
4539
private LeaseGovernor leaseGovernor;
40+
private ServerTcpDuplexConnection connection;
4641

4742
protected ReactiveSocketServerHandler(ConnectionSetupHandler setupHandler, LeaseGovernor leaseGovernor) {
4843
this.setupHandler = setupHandler;
4944
this.leaseGovernor = leaseGovernor;
45+
this.connection = null;
5046
}
5147

5248
public static ReactiveSocketServerHandler create(ConnectionSetupHandler setupHandler) {
5349
return create(setupHandler, LeaseGovernor.UNLIMITED_LEASE_GOVERNOR);
5450
}
5551

5652
public static ReactiveSocketServerHandler create(ConnectionSetupHandler setupHandler, LeaseGovernor leaseGovernor) {
57-
return new
58-
ReactiveSocketServerHandler(
59-
setupHandler,
60-
leaseGovernor);
61-
53+
return new ReactiveSocketServerHandler(setupHandler, leaseGovernor);
6254
}
6355

6456
@Override
6557
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
6658
ChannelPipeline cp = ctx.pipeline();
6759
if (cp.get(LengthFieldBasedFrameDecoder.class) == null) {
68-
ctx
69-
.pipeline()
70-
.addBefore(
71-
ctx.name(),
72-
LengthFieldBasedFrameDecoder.class.getName(),
73-
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE >> 1, 0, BitUtil.SIZE_OF_INT, -1 * BitUtil.SIZE_OF_INT, 0));
60+
LengthFieldBasedFrameDecoder frameDecoder =
61+
new LengthFieldBasedFrameDecoder(MAX_FRAME_LENGTH, 0, SIZE_OF_INT, -1 * SIZE_OF_INT, 0);
62+
ctx.pipeline()
63+
.addBefore(ctx.name(), LengthFieldBasedFrameDecoder.class.getName(), frameDecoder);
7464
}
65+
}
7566

67+
@Override
68+
public void channelActive(ChannelHandlerContext ctx) throws Exception {
69+
connection = new ServerTcpDuplexConnection(ctx);
70+
ReactiveSocket reactiveSocket =
71+
DefaultReactiveSocket.fromServerConnection(connection, setupHandler, leaseGovernor, Throwable::printStackTrace);
72+
// Note: No blocking code here (still it should be refactored)
73+
reactiveSocket.startAndWait();
7674
}
7775

7876
@Override
@@ -81,29 +79,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
8179
try {
8280
MutableDirectByteBuf mutableDirectByteBuf = new MutableDirectByteBuf(content);
8381
Frame from = Frame.from(mutableDirectByteBuf, 0, mutableDirectByteBuf.capacity());
84-
channelRegistered(ctx);
85-
ServerTcpDuplexConnection connection = duplexConnections.computeIfAbsent(ctx.channel().id(), i -> {
86-
logger.info("No connection found for channel id: " + i + " from host " + ctx.channel().remoteAddress().toString());
87-
ServerTcpDuplexConnection c = new ServerTcpDuplexConnection(ctx);
88-
ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromServerConnection(c, setupHandler, leaseGovernor, throwable -> throwable.printStackTrace());
89-
reactiveSocket.startAndWait();
90-
return c;
91-
});
82+
9283
if (connection != null) {
93-
connection
94-
.getSubscribers()
95-
.forEach(o -> o.onNext(from));
84+
connection.getSubscribers().forEach(o -> o.onNext(from));
9685
}
9786
} finally {
9887
content.release();
9988
}
10089
}
10190

102-
@Override
103-
public void channelReadComplete(ChannelHandlerContext ctx) {
104-
ctx.flush();
105-
}
106-
10791
@Override
10892
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
10993
super.exceptionCaught(ctx, cause);

reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/ClientServerTest.java

Lines changed: 44 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -52,55 +52,52 @@ public class ClientServerTest {
5252
static EventLoopGroup bossGroup = new NioEventLoopGroup(1);
5353
static EventLoopGroup workerGroup = new NioEventLoopGroup(4);
5454

55-
static ReactiveSocketServerHandler serverHandler = ReactiveSocketServerHandler.create((setupPayload, rs) ->
56-
new RequestHandler() {
57-
@Override
58-
public Publisher<Payload> handleRequestResponse(Payload payload) {
59-
return s -> {
60-
//System.out.println("Handling request/response payload => " + s.toString());
61-
Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata");
62-
s.onNext(response);
63-
s.onComplete();
64-
};
65-
}
66-
67-
@Override
68-
public Publisher<Payload> handleRequestStream(Payload payload) {
55+
static RequestHandler requestHandler = new RequestHandler() {
56+
@Override
57+
public Publisher<Payload> handleRequestResponse(Payload payload) {
58+
return s -> {
6959
Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata");
60+
s.onNext(response);
61+
s.onComplete();
62+
};
63+
}
7064

71-
return RxReactiveStreams
72-
.toPublisher(Observable
73-
.range(1, 10)
74-
.map(i -> response));
75-
}
65+
@Override
66+
public Publisher<Payload> handleRequestStream(Payload payload) {
67+
Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata");
7668

77-
@Override
78-
public Publisher<Payload> handleSubscription(Payload payload) {
79-
Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata");
69+
return RxReactiveStreams
70+
.toPublisher(Observable
71+
.range(1, 10)
72+
.map(i -> response));
73+
}
74+
75+
@Override
76+
public Publisher<Payload> handleSubscription(Payload payload) {
77+
Payload response = TestUtil.utf8EncodedPayload("hello world", "metadata");
78+
79+
return RxReactiveStreams
80+
.toPublisher(Observable
81+
.range(1, 10)
82+
.map(i -> response)
83+
.repeat());
84+
}
85+
86+
@Override
87+
public Publisher<Void> handleFireAndForget(Payload payload) {
88+
return Subscriber::onComplete;
89+
}
90+
91+
@Override
92+
public Publisher<Payload> handleChannel(Payload initialPayload, Publisher<Payload> inputs) {
93+
return null;
94+
}
8095

81-
return RxReactiveStreams
82-
.toPublisher(Observable
83-
.range(1, 10)
84-
.map(i -> response)
85-
.repeat());
86-
}
87-
88-
@Override
89-
public Publisher<Void> handleFireAndForget(Payload payload) {
90-
return Subscriber::onComplete;
91-
}
92-
93-
@Override
94-
public Publisher<Payload> handleChannel(Payload initialPayload, Publisher<Payload> inputs) {
95-
return null;
96-
}
97-
98-
@Override
99-
public Publisher<Void> handleMetadataPush(Payload payload) {
100-
return null;
101-
}
96+
@Override
97+
public Publisher<Void> handleMetadataPush(Payload payload) {
98+
return null;
10299
}
103-
);
100+
};
104101

105102
@BeforeClass
106103
public static void setup() throws Exception {
@@ -112,6 +109,8 @@ public static void setup() throws Exception {
112109
@Override
113110
protected void initChannel(Channel ch) throws Exception {
114111
ChannelPipeline pipeline = ch.pipeline();
112+
ReactiveSocketServerHandler serverHandler =
113+
ReactiveSocketServerHandler.create((setupPayload, rs) -> requestHandler);
115114
pipeline.addLast(serverHandler);
116115
}
117116
});
@@ -123,7 +122,7 @@ protected void initChannel(Channel ch) throws Exception {
123122
).toBlocking().single();
124123

125124
client = DefaultReactiveSocket
126-
.fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), t -> t.printStackTrace());
125+
.fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), Throwable::printStackTrace);
127126

128127
client.startAndWait();
129128
}

reactivesocket-transport-tcp/src/test/java/io/reactivesocket/transport/tcp/Ping.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,15 @@
3535

3636
public class Ping {
3737
public static void main(String... args) throws Exception {
38+
NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
39+
3840
Publisher<ClientTcpDuplexConnection> publisher = ClientTcpDuplexConnection
39-
.create(InetSocketAddress.createUnresolved("localhost", 7878), new NioEventLoopGroup(1));
41+
.create(InetSocketAddress.createUnresolved("localhost", 7878), eventLoopGroup);
4042

4143
ClientTcpDuplexConnection duplexConnection = RxReactiveStreams.toObservable(publisher).toBlocking().last();
42-
ReactiveSocket reactiveSocket = DefaultReactiveSocket.fromClientConnection(duplexConnection, ConnectionSetupPayload.create("UTF-8", "UTF-8"), t -> t.printStackTrace());
44+
ConnectionSetupPayload setupPayload = ConnectionSetupPayload.create("UTF-8", "UTF-8");
45+
ReactiveSocket reactiveSocket =
46+
DefaultReactiveSocket.fromClientConnection(duplexConnection, setupPayload, Throwable::printStackTrace);
4347

4448
reactiveSocket.startAndWait();
4549

@@ -80,13 +84,13 @@ public ByteBuffer getMetadata() {
8084
.toObservable(
8185
reactiveSocket
8286
.requestResponse(keyPayload))
83-
.doOnError(t -> t.printStackTrace())
87+
.doOnError(Throwable::printStackTrace)
8488
.doOnNext(s -> {
8589
long diff = System.nanoTime() - start;
8690
histogram.recordValue(diff);
8791
});
8892
}, 16)
89-
.doOnError(t -> t.printStackTrace())
93+
.doOnError(Throwable::printStackTrace)
9094
.subscribe(new Subscriber<Payload>() {
9195
@Override
9296
public void onCompleted() {

0 commit comments

Comments
 (0)