Skip to content

Commit d4874c3

Browse files
committed
Release payloads by default in AbstractRSocket
1 parent 5d518fa commit d4874c3

File tree

2 files changed

+5
-7
lines changed

2 files changed

+5
-7
lines changed

rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,19 @@ public abstract class AbstractRSocket implements RSocket {
3131

3232
@Override
3333
public Mono<Void> fireAndForget(Payload payload) {
34+
payload.release();
3435
return Mono.error(new UnsupportedOperationException("Fire and forget not implemented."));
3536
}
3637

3738
@Override
3839
public Mono<Payload> requestResponse(Payload payload) {
40+
payload.release();
3941
return Mono.error(new UnsupportedOperationException("Request-Response not implemented."));
4042
}
4143

4244
@Override
4345
public Flux<Payload> requestStream(Payload payload) {
46+
payload.release();
4447
return Flux.error(new UnsupportedOperationException("Request-Stream not implemented."));
4548
}
4649

@@ -51,6 +54,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
5154

5255
@Override
5356
public Mono<Void> metadataPush(Payload payload) {
57+
payload.release();
5458
return Mono.error(new UnsupportedOperationException("Metadata-Push not implemented."));
5559
}
5660

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ public Mono<RSocket> start() {
248248
}
249249

250250
public static class ServerRSocketFactory {
251-
private Supplier<SocketAcceptor> acceptor;
251+
private SocketAcceptor acceptor;
252252
private Function<Frame, ? extends Payload> frameDecoder = DefaultPayload::create;
253253
private Consumer<Throwable> errorConsumer = Throwable::printStackTrace;
254254
private int mtu = 0;
@@ -272,11 +272,6 @@ public ServerRSocketFactory addServerPlugin(RSocketInterceptor interceptor) {
272272
}
273273

274274
public ServerTransportAcceptor acceptor(SocketAcceptor acceptor) {
275-
this.acceptor = () -> acceptor;
276-
return ServerStart::new;
277-
}
278-
279-
public ServerTransportAcceptor acceptor(Supplier<SocketAcceptor> acceptor) {
280275
this.acceptor = acceptor;
281276
return ServerStart::new;
282277
}
@@ -350,7 +345,6 @@ private Mono<Void> processSetupFrame(
350345
RSocket wrappedRSocketClient = plugins.applyClient(rSocketClient);
351346

352347
return acceptor
353-
.get()
354348
.accept(setupPayload, wrappedRSocketClient)
355349
.doOnNext(
356350
unwrappedServerSocket -> {

0 commit comments

Comments
 (0)