File tree Expand file tree Collapse file tree 2 files changed +5
-7
lines changed
rsocket-core/src/main/java/io/rsocket Expand file tree Collapse file tree 2 files changed +5
-7
lines changed Original file line number Diff line number Diff line change @@ -31,16 +31,19 @@ public abstract class AbstractRSocket implements RSocket {
31
31
32
32
@ Override
33
33
public Mono <Void > fireAndForget (Payload payload ) {
34
+ payload .release ();
34
35
return Mono .error (new UnsupportedOperationException ("Fire and forget not implemented." ));
35
36
}
36
37
37
38
@ Override
38
39
public Mono <Payload > requestResponse (Payload payload ) {
40
+ payload .release ();
39
41
return Mono .error (new UnsupportedOperationException ("Request-Response not implemented." ));
40
42
}
41
43
42
44
@ Override
43
45
public Flux <Payload > requestStream (Payload payload ) {
46
+ payload .release ();
44
47
return Flux .error (new UnsupportedOperationException ("Request-Stream not implemented." ));
45
48
}
46
49
@@ -51,6 +54,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
51
54
52
55
@ Override
53
56
public Mono <Void > metadataPush (Payload payload ) {
57
+ payload .release ();
54
58
return Mono .error (new UnsupportedOperationException ("Metadata-Push not implemented." ));
55
59
}
56
60
Original file line number Diff line number Diff line change @@ -253,7 +253,7 @@ public Mono<RSocket> start() {
253
253
}
254
254
255
255
public static class ServerRSocketFactory {
256
- private Supplier < SocketAcceptor > acceptor ;
256
+ private SocketAcceptor acceptor ;
257
257
private Function <Frame , ? extends Payload > frameDecoder = DefaultPayload ::create ;
258
258
private Consumer <Throwable > errorConsumer = Throwable ::printStackTrace ;
259
259
private int mtu = 0 ;
@@ -277,11 +277,6 @@ public ServerRSocketFactory addServerPlugin(RSocketInterceptor interceptor) {
277
277
}
278
278
279
279
public ServerTransportAcceptor acceptor (SocketAcceptor acceptor ) {
280
- this .acceptor = () -> acceptor ;
281
- return ServerStart ::new ;
282
- }
283
-
284
- public ServerTransportAcceptor acceptor (Supplier <SocketAcceptor > acceptor ) {
285
280
this .acceptor = acceptor ;
286
281
return ServerStart ::new ;
287
282
}
@@ -357,7 +352,6 @@ private Mono<Void> processSetupFrame(
357
352
RSocket wrappedRSocketClient = plugins .applyClient (rSocketClient );
358
353
359
354
return acceptor
360
- .get ()
361
355
.accept (setupPayload , wrappedRSocketClient )
362
356
.doOnNext (
363
357
unwrappedServerSocket -> {
You can’t perform that action at this time.
0 commit comments