Skip to content

Commit f3b401a

Browse files
committed
Delegate dispose/isDisposed to underlying channel in NettyDuplexConnection
1 parent 93c0137 commit f3b401a

File tree

3 files changed

+8
-32
lines changed

3 files changed

+8
-32
lines changed

gradle.properties

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,6 @@
1414
# limitations under the License.
1515
#
1616

17-
mavenversion=0.9-SNAPSHOT
17+
mavenversion=0.10-SNAPSHOT
1818
release.scope=patch
19-
release.version=0.9-SNAPSHOT
19+
release.version=0.10-SNAPSHOT

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/NettyDuplexConnection.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.reactivestreams.Publisher;
2121
import reactor.core.publisher.Flux;
2222
import reactor.core.publisher.Mono;
23-
import reactor.core.publisher.MonoProcessor;
2423
import reactor.ipc.netty.NettyContext;
2524
import reactor.ipc.netty.NettyInbound;
2625
import reactor.ipc.netty.NettyOutbound;
@@ -29,22 +28,11 @@ public class NettyDuplexConnection implements DuplexConnection {
2928
private final NettyInbound in;
3029
private final NettyOutbound out;
3130
private final NettyContext context;
32-
private final MonoProcessor<Void> onClose;
3331

3432
public NettyDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) {
3533
this.in = in;
3634
this.out = out;
3735
this.context = context;
38-
this.onClose = MonoProcessor.create();
39-
40-
context.onClose(onClose::onComplete);
41-
this.onClose
42-
.doFinally(
43-
s -> {
44-
this.context.dispose();
45-
this.context.channel().close();
46-
})
47-
.subscribe();
4836
}
4937

5038
@Override
@@ -64,16 +52,16 @@ public Flux<Frame> receive() {
6452

6553
@Override
6654
public void dispose() {
67-
onClose.onComplete();
55+
context.dispose();
6856
}
6957

7058
@Override
7159
public boolean isDisposed() {
72-
return onClose.isDisposed();
60+
return context.isDisposed();
7361
}
7462

7563
@Override
7664
public Mono<Void> onClose() {
77-
return onClose;
65+
return context.onClose();
7866
}
7967
}

rsocket-transport-netty/src/main/java/io/rsocket/transport/netty/WebsocketDuplexConnection.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.reactivestreams.Publisher;
2828
import reactor.core.publisher.Flux;
2929
import reactor.core.publisher.Mono;
30-
import reactor.core.publisher.MonoProcessor;
3130
import reactor.ipc.netty.NettyContext;
3231
import reactor.ipc.netty.NettyInbound;
3332
import reactor.ipc.netty.NettyOutbound;
@@ -43,22 +42,11 @@ public class WebsocketDuplexConnection implements DuplexConnection {
4342
private final NettyInbound in;
4443
private final NettyOutbound out;
4544
private final NettyContext context;
46-
private final MonoProcessor<Void> onClose;
4745

4846
public WebsocketDuplexConnection(NettyInbound in, NettyOutbound out, NettyContext context) {
4947
this.in = in;
5048
this.out = out;
5149
this.context = context;
52-
this.onClose = MonoProcessor.create();
53-
54-
context.onClose(onClose::onComplete);
55-
this.onClose
56-
.doFinally(
57-
s -> {
58-
this.context.dispose();
59-
this.context.channel().close();
60-
})
61-
.subscribe();
6250
}
6351

6452
@Override
@@ -87,16 +75,16 @@ public Flux<Frame> receive() {
8775

8876
@Override
8977
public void dispose() {
90-
onClose.onComplete();
78+
context.dispose();
9179
}
9280

9381
@Override
9482
public boolean isDisposed() {
95-
return onClose.isDisposed();
83+
return context.isDisposed();
9684
}
9785

9886
@Override
9987
public Mono<Void> onClose() {
100-
return onClose;
88+
return context.onClose();
10189
}
10290
}

0 commit comments

Comments
 (0)