Skip to content

Commit 7808202

Browse files
authored
Deprecate ResponderRSocket (#802)
1 parent 922aa20 commit 7808202

File tree

2 files changed

+15
-6
lines changed

2 files changed

+15
-6
lines changed

rsocket-core/src/main/java/io/rsocket/ResponderRSocket.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
package io.rsocket;
22

3+
import java.util.function.BiFunction;
34
import org.reactivestreams.Publisher;
45
import reactor.core.publisher.Flux;
56

67
/**
78
* Extends the {@link RSocket} that allows an implementer to peek at the first request payload of a
89
* channel.
10+
*
11+
* @deprecated as of 1.0 RC7 in favor of using {@link RSocket#requestChannel(Publisher)} with {@link
12+
* Flux#switchOnFirst(BiFunction)}
913
*/
14+
@Deprecated
1015
public interface ResponderRSocket extends RSocket {
1116
/**
1217
* Implement this method to peak at the first payload of the incoming request stream without

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import io.rsocket.DuplexConnection;
2828
import io.rsocket.Payload;
2929
import io.rsocket.RSocket;
30-
import io.rsocket.ResponderRSocket;
3130
import io.rsocket.exceptions.ApplicationErrorException;
3231
import io.rsocket.frame.*;
3332
import io.rsocket.frame.decoder.PayloadDecoder;
@@ -51,7 +50,7 @@
5150
import reactor.util.concurrent.Queues;
5251

5352
/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
54-
class RSocketResponder implements ResponderRSocket {
53+
class RSocketResponder implements RSocket {
5554
private static final Consumer<ReferenceCounted> DROPPED_ELEMENTS_CONSUMER =
5655
referenceCounted -> {
5756
if (referenceCounted.refCnt() > 0) {
@@ -66,7 +65,10 @@ class RSocketResponder implements ResponderRSocket {
6665

6766
private final DuplexConnection connection;
6867
private final RSocket requestHandler;
69-
private final ResponderRSocket responderRSocket;
68+
69+
@SuppressWarnings("deprecation")
70+
private final io.rsocket.ResponderRSocket responderRSocket;
71+
7072
private final PayloadDecoder payloadDecoder;
7173
private final Consumer<Throwable> errorConsumer;
7274
private final ResponderLeaseHandler leaseHandler;
@@ -86,6 +88,7 @@ class RSocketResponder implements ResponderRSocket {
8688
private final UnboundedProcessor<ByteBuf> sendProcessor;
8789
private final ByteBufAllocator allocator;
8890

91+
@SuppressWarnings("deprecation")
8992
RSocketResponder(
9093
DuplexConnection connection,
9194
RSocket requestHandler,
@@ -99,7 +102,9 @@ class RSocketResponder implements ResponderRSocket {
99102

100103
this.requestHandler = requestHandler;
101104
this.responderRSocket =
102-
(requestHandler instanceof ResponderRSocket) ? (ResponderRSocket) requestHandler : null;
105+
(requestHandler instanceof io.rsocket.ResponderRSocket)
106+
? (io.rsocket.ResponderRSocket) requestHandler
107+
: null;
103108

104109
this.payloadDecoder = payloadDecoder;
105110
this.errorConsumer = errorConsumer;
@@ -219,8 +224,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
219224
}
220225
}
221226

222-
@Override
223-
public Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
227+
private Flux<Payload> requestChannel(Payload payload, Publisher<Payload> payloads) {
224228
try {
225229
if (leaseHandler.useLease()) {
226230
return responderRSocket.requestChannel(payload, payloads);

0 commit comments

Comments
 (0)