Skip to content

Commit 3072d2e

Browse files
OlegDokukarobertroeser
authored andcommitted
fixes onClose issue (#608)
Signed-off-by: Robert Roeser <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 148e6ea commit 3072d2e

File tree

3 files changed

+56
-62
lines changed

3 files changed

+56
-62
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.rsocket.internal;
2+
3+
import io.rsocket.DuplexConnection;
4+
import reactor.core.publisher.Mono;
5+
import reactor.core.publisher.MonoProcessor;
6+
7+
public abstract class BaseDuplexConnection implements DuplexConnection {
8+
private MonoProcessor<Void> onClose = MonoProcessor.create();
9+
10+
public BaseDuplexConnection() {
11+
onClose.doFinally(s -> doOnClose()).subscribe();
12+
}
13+
14+
protected abstract void doOnClose();
15+
16+
@Override
17+
public final Mono<Void> onClose() {
18+
return onClose;
19+
}
20+
21+
@Override
22+
public final void dispose() {
23+
onClose.onComplete();
24+
}
25+
26+
@Override
27+
public final boolean isDisposed() {
28+
return onClose.isDisposed();
29+
}
30+
}

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/TcpDuplexConnection.java

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,18 @@
2020
import io.netty.buffer.ByteBufAllocator;
2121
import io.rsocket.DuplexConnection;
2222
import io.rsocket.frame.FrameLengthFlyweight;
23+
import io.rsocket.internal.BaseDuplexConnection;
2324
import java.util.Objects;
2425
import org.reactivestreams.Publisher;
25-
import reactor.core.Disposable;
2626
import reactor.core.Fuseable;
2727
import reactor.core.publisher.Flux;
2828
import reactor.core.publisher.Mono;
2929
import reactor.netty.Connection;
30-
import reactor.netty.FutureMono;
3130

3231
/** An implementation of {@link DuplexConnection} that connects via TCP. */
33-
public final class TcpDuplexConnection implements DuplexConnection {
32+
public final class TcpDuplexConnection extends BaseDuplexConnection {
3433

3534
private final Connection connection;
36-
private final Disposable channelClosed;
3735
private final ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
3836
private final boolean encodeLength;
3937

@@ -55,37 +53,21 @@ public TcpDuplexConnection(Connection connection) {
5553
public TcpDuplexConnection(Connection connection, boolean encodeLength) {
5654
this.encodeLength = encodeLength;
5755
this.connection = Objects.requireNonNull(connection, "connection must not be null");
58-
this.channelClosed =
59-
FutureMono.from(connection.channel().closeFuture())
60-
.doFinally(
61-
s -> {
62-
if (!isDisposed()) {
63-
dispose();
64-
}
65-
})
66-
.subscribe();
67-
}
6856

69-
@Override
70-
public void dispose() {
71-
connection.dispose();
72-
}
73-
74-
@Override
75-
public boolean isDisposed() {
76-
return connection.isDisposed();
57+
connection
58+
.channel()
59+
.closeFuture()
60+
.addListener(
61+
future -> {
62+
if (!isDisposed()) dispose();
63+
});
7764
}
7865

7966
@Override
80-
public Mono<Void> onClose() {
81-
return connection
82-
.onDispose()
83-
.doFinally(
84-
s -> {
85-
if (!channelClosed.isDisposed()) {
86-
channelClosed.dispose();
87-
}
88-
});
67+
protected void doOnClose() {
68+
if (!connection.isDisposed()) {
69+
connection.dispose();
70+
}
8971
}
9072

9173
@Override

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@
1818
import io.netty.buffer.ByteBuf;
1919
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
2020
import io.rsocket.DuplexConnection;
21+
import io.rsocket.internal.BaseDuplexConnection;
2122
import java.util.Objects;
2223
import org.reactivestreams.Publisher;
23-
import reactor.core.Disposable;
2424
import reactor.core.Fuseable;
2525
import reactor.core.publisher.Flux;
2626
import reactor.core.publisher.Mono;
2727
import reactor.netty.Connection;
28-
import reactor.netty.FutureMono;
2928
import reactor.util.concurrent.Queues;
3029

3130
/**
@@ -35,10 +34,9 @@
3534
* for message oriented transports so this must be specifically dropped from Frames sent and
3635
* stitched back on for frames received.
3736
*/
38-
public final class WebsocketDuplexConnection implements DuplexConnection {
37+
public final class WebsocketDuplexConnection extends BaseDuplexConnection {
3938

4039
private final Connection connection;
41-
private final Disposable channelClosed;
4240

4341
/**
4442
* Creates a new instance
@@ -47,37 +45,21 @@ public final class WebsocketDuplexConnection implements DuplexConnection {
4745
*/
4846
public WebsocketDuplexConnection(Connection connection) {
4947
this.connection = Objects.requireNonNull(connection, "connection must not be null");
50-
this.channelClosed =
51-
FutureMono.from(connection.channel().closeFuture())
52-
.doFinally(
53-
s -> {
54-
if (!isDisposed()) {
55-
dispose();
56-
}
57-
})
58-
.subscribe();
59-
}
60-
61-
@Override
62-
public void dispose() {
63-
connection.dispose();
64-
}
6548

66-
@Override
67-
public boolean isDisposed() {
68-
return connection.isDisposed();
49+
connection
50+
.channel()
51+
.closeFuture()
52+
.addListener(
53+
future -> {
54+
if (!isDisposed()) dispose();
55+
});
6956
}
7057

7158
@Override
72-
public Mono<Void> onClose() {
73-
return connection
74-
.onDispose()
75-
.doFinally(
76-
s -> {
77-
if (!channelClosed.isDisposed()) {
78-
channelClosed.dispose();
79-
}
80-
});
59+
protected void doOnClose() {
60+
if (!connection.isDisposed()) {
61+
connection.dispose();
62+
}
8163
}
8264

8365
@Override

0 commit comments

Comments
 (0)