Skip to content

Commit ee47b0f

Browse files
committed
cleanup: resume token is released in ClientRSocketSession instead of
RSocketFactory KeepAliveConnection: dispose KeepAliveHandler in onClose() Signed-off-by: Maksym Ostroverkhov <[email protected]>
1 parent 9f26414 commit ee47b0f

File tree

3 files changed

+15
-9
lines changed

3 files changed

+15
-9
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,6 @@ public Mono<RSocket> start() {
278278
errorConsumer,
279279
StreamIdSupplier.clientSupplier());
280280

281-
rSocketClient.onClose().doFinally(s -> resumeToken.release()).subscribe();
282-
283281
RSocket wrappedRSocketClient = plugins.applyClient(rSocketClient);
284282

285283
RSocket unwrappedServerSocket = acceptor.get().apply(wrappedRSocketClient);

rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveConnection.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
import java.util.function.Consumer;
3232
import java.util.function.Function;
3333
import org.reactivestreams.Publisher;
34-
import reactor.core.publisher.*;
34+
import reactor.core.publisher.Flux;
35+
import reactor.core.publisher.Mono;
36+
import reactor.core.publisher.MonoProcessor;
3537

3638
public class KeepAliveConnection extends DuplexConnectionProxy
3739
implements ResumePositionsConnection {
@@ -126,12 +128,15 @@ public Flux<ByteBuf> receive() {
126128
}
127129

128130
@Override
129-
public void dispose() {
130-
KeepAliveHandler keepAliveHandler = keepAliveHandlerReady.peek();
131-
if (keepAliveHandler != null) {
132-
keepAliveHandler.dispose();
133-
}
134-
super.dispose();
131+
public Mono<Void> onClose() {
132+
return super.onClose()
133+
.doFinally(
134+
s -> {
135+
KeepAliveHandler keepAliveHandler = keepAliveHandlerReady.peek();
136+
if (keepAliveHandler != null) {
137+
keepAliveHandler.dispose();
138+
}
139+
});
135140
}
136141

137142
@Override

rsocket-core/src/main/java/io/rsocket/resume/ClientRSocketSession.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ public ClientRSocketSession(
4848
new ResumableDuplexConnection(
4949
"client", duplexConnection, config.resumeStore(), config.resumeStreamTimeout());
5050

51+
/*session completed: release token initially retained in resumeToken(ByteBuf)*/
52+
onClose().doFinally(s -> resumeToken.release()).subscribe();
53+
5154
resumableConnection
5255
.connectionErrors()
5356
.flatMap(

0 commit comments

Comments
 (0)