Skip to content

Commit 9521a9d

Browse files
committed
removes deprecated errorConsumer
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent cec7a78 commit 9521a9d

15 files changed

+78
-251
lines changed

rsocket-core/src/main/java/io/rsocket/RSocketFactory.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public static class ClientRSocketFactory implements ClientTransportAcceptor {
111111
private Resume resume;
112112

113113
public ClientRSocketFactory() {
114-
this(RSocketConnector.create().errorConsumer(Throwable::printStackTrace));
114+
this(RSocketConnector.create());
115115
}
116116

117117
public ClientRSocketFactory(RSocketConnector connector) {
@@ -395,7 +395,6 @@ public ClientRSocketFactory fragment(int mtu) {
395395

396396
/** @deprecated this is deprecated with no replacement. */
397397
public ClientRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
398-
connector.errorConsumer(errorConsumer);
399398
return this;
400399
}
401400

@@ -417,7 +416,7 @@ public static class ServerRSocketFactory implements ServerTransportAcceptor {
417416
private Resume resume;
418417

419418
public ServerRSocketFactory() {
420-
this(RSocketServer.create().errorConsumer(Throwable::printStackTrace));
419+
this(RSocketServer.create());
421420
}
422421

423422
public ServerRSocketFactory(RSocketServer server) {
@@ -499,7 +498,6 @@ public ServerRSocketFactory fragment(int mtu) {
499498

500499
/** @deprecated this is deprecated with no replacement. */
501500
public ServerRSocketFactory errorConsumer(Consumer<Throwable> errorConsumer) {
502-
server.errorConsumer(errorConsumer);
503501
return this;
504502
}
505503

rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,6 @@ public class RSocketConnector {
9191
private int mtu = 0;
9292
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
9393

94-
private Consumer<Throwable> errorConsumer = ex -> {};
95-
9694
private RSocketConnector() {}
9795

9896
/**
@@ -436,17 +434,6 @@ public RSocketConnector payloadDecoder(PayloadDecoder decoder) {
436434
return this;
437435
}
438436

439-
/**
440-
* @deprecated this is deprecated with no replacement and will be removed after {@link
441-
* io.rsocket.RSocketFactory} is removed.
442-
*/
443-
@Deprecated
444-
public RSocketConnector errorConsumer(Consumer<Throwable> errorConsumer) {
445-
Objects.requireNonNull(errorConsumer);
446-
this.errorConsumer = errorConsumer;
447-
return this;
448-
}
449-
450437
/**
451438
* The final step to connect with the transport to use as input and the resulting {@code
452439
* Mono<RSocket>} as output. Each subscriber to the returned {@code Mono} starts a new connection
@@ -524,7 +511,6 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
524511
new RSocketRequester(
525512
multiplexer.asClientConnection(),
526513
payloadDecoder,
527-
errorConsumer,
528514
StreamIdSupplier.clientSupplier(),
529515
mtu,
530516
(int) keepAliveInterval.toMillis(),
@@ -564,7 +550,6 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
564550
CLIENT_TAG,
565551
wrappedConnection.alloc(),
566552
leases.sender(),
567-
errorConsumer,
568553
leases.stats())
569554
: ResponderLeaseHandler.None;
570555

@@ -573,7 +558,6 @@ public Mono<RSocket> connect(Supplier<ClientTransport> transportSupplier) {
573558
multiplexer.asServerConnection(),
574559
wrappedRSocketHandler,
575560
payloadDecoder,
576-
errorConsumer,
577561
responderLeaseHandler,
578562
mtu);
579563

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@
5959
import org.reactivestreams.Publisher;
6060
import org.reactivestreams.Subscriber;
6161
import org.reactivestreams.Subscription;
62+
import org.slf4j.Logger;
63+
import org.slf4j.LoggerFactory;
6264
import reactor.core.publisher.BaseSubscriber;
6365
import reactor.core.publisher.Flux;
6466
import reactor.core.publisher.Mono;
@@ -74,9 +76,8 @@
7476
* Requester Side of a RSocket socket. Sends {@link ByteBuf}s to a {@link RSocketResponder} of peer
7577
*/
7678
class RSocketRequester implements RSocket {
77-
private static final AtomicReferenceFieldUpdater<RSocketRequester, Throwable> TERMINATION_ERROR =
78-
AtomicReferenceFieldUpdater.newUpdater(
79-
RSocketRequester.class, Throwable.class, "terminationError");
79+
private static final Logger LOGGER = LoggerFactory.getLogger(RSocketRequester.class);
80+
8081
private static final Exception CLOSED_CHANNEL_EXCEPTION = new ClosedChannelException();
8182
private static final Consumer<ReferenceCounted> DROPPED_ELEMENTS_CONSUMER =
8283
referenceCounted -> {
@@ -93,9 +94,14 @@ class RSocketRequester implements RSocket {
9394
CLOSED_CHANNEL_EXCEPTION.setStackTrace(new StackTraceElement[0]);
9495
}
9596

97+
private volatile Throwable terminationError;
98+
99+
private static final AtomicReferenceFieldUpdater<RSocketRequester, Throwable> TERMINATION_ERROR =
100+
AtomicReferenceFieldUpdater.newUpdater(
101+
RSocketRequester.class, Throwable.class, "terminationError");
102+
96103
private final DuplexConnection connection;
97104
private final PayloadDecoder payloadDecoder;
98-
private final Consumer<Throwable> errorConsumer;
99105
private final StreamIdSupplier streamIdSupplier;
100106
private final IntObjectMap<Subscription> senders;
101107
private final IntObjectMap<Processor<Payload, Payload>> receivers;
@@ -104,14 +110,12 @@ class RSocketRequester implements RSocket {
104110
private final RequesterLeaseHandler leaseHandler;
105111
private final ByteBufAllocator allocator;
106112
private final KeepAliveFramesAcceptor keepAliveFramesAcceptor;
107-
private volatile Throwable terminationError;
108113
private final MonoProcessor<Void> onClose;
109114
private final Scheduler serialScheduler;
110115

111116
RSocketRequester(
112117
DuplexConnection connection,
113118
PayloadDecoder payloadDecoder,
114-
Consumer<Throwable> errorConsumer,
115119
StreamIdSupplier streamIdSupplier,
116120
int mtu,
117121
int keepAliveTickPeriod,
@@ -122,7 +126,6 @@ class RSocketRequester implements RSocket {
122126
this.connection = connection;
123127
this.allocator = connection.alloc();
124128
this.payloadDecoder = payloadDecoder;
125-
this.errorConsumer = errorConsumer;
126129
this.streamIdSupplier = streamIdSupplier;
127130
this.mtu = mtu;
128131
this.leaseHandler = leaseHandler;
@@ -140,7 +143,7 @@ class RSocketRequester implements RSocket {
140143
.subscribe(null, this::tryTerminateOnConnectionError, this::tryTerminateOnConnectionClose);
141144
connection.send(sendProcessor).subscribe(null, this::handleSendProcessorError);
142145

143-
connection.receive().subscribe(this::handleIncomingFrames, errorConsumer);
146+
connection.receive().subscribe(this::handleIncomingFrames, e -> {});
144147

145148
if (keepAliveTickPeriod != 0 && keepAliveHandler != null) {
146149
KeepAliveSupport keepAliveSupport =
@@ -396,7 +399,6 @@ private Flux<Payload> handleChannel(Flux<Payload> request) {
396399
payload.release();
397400
final IllegalArgumentException t =
398401
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
399-
errorConsumer.accept(t);
400402
return Mono.error(t);
401403
}
402404
return handleChannel(payload, flux);
@@ -446,7 +448,6 @@ protected void hookOnNext(Payload payload) {
446448
cancel();
447449
final IllegalArgumentException t =
448450
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
449-
errorConsumer.accept(t);
450451
// no need to send any errors.
451452
sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId));
452453
receiver.onError(t);
@@ -610,9 +611,9 @@ private void handleStreamZero(FrameType type, ByteBuf frame) {
610611
break;
611612
default:
612613
// Ignore unknown frames. Throwing an error will close the socket.
613-
errorConsumer.accept(
614-
new IllegalStateException(
615-
"Client received supported frame on stream 0: " + frame.toString()));
614+
if (LOGGER.isDebugEnabled()) {
615+
LOGGER.debug("Requester received unsupported frame on stream 0: " + frame.toString());
616+
}
616617
}
617618
}
618619

@@ -669,7 +670,7 @@ private void handleFrame(int streamId, FrameType type, ByteBuf frame) {
669670
}
670671
default:
671672
throw new IllegalStateException(
672-
"Client received supported frame on stream " + streamId + ": " + frame.toString());
673+
"Requester received unsupported frame on stream " + streamId + ": " + frame.toString());
673674
}
674675
}
675676

@@ -737,7 +738,9 @@ private void terminate(Throwable e) {
737738
try {
738739
receiver.onError(e);
739740
} catch (Throwable t) {
740-
errorConsumer.accept(t);
741+
if (LOGGER.isDebugEnabled()) {
742+
LOGGER.debug("Dropped exception", t);
743+
}
741744
}
742745
});
743746
}
@@ -749,14 +752,15 @@ private void terminate(Throwable e) {
749752
try {
750753
sender.cancel();
751754
} catch (Throwable t) {
752-
errorConsumer.accept(t);
755+
if (LOGGER.isDebugEnabled()) {
756+
LOGGER.debug("Dropped exception", t);
757+
}
753758
}
754759
});
755760
}
756761
senders.clear();
757762
receivers.clear();
758763
sendProcessor.dispose();
759-
errorConsumer.accept(e);
760764
onClose.onError(e);
761765
}
762766

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@
4343
import org.reactivestreams.Publisher;
4444
import org.reactivestreams.Subscriber;
4545
import org.reactivestreams.Subscription;
46+
import org.slf4j.Logger;
47+
import org.slf4j.LoggerFactory;
4648
import reactor.core.Disposable;
4749
import reactor.core.Exceptions;
4850
import reactor.core.publisher.*;
@@ -51,6 +53,8 @@
5153

5254
/** Responder side of RSocket. Receives {@link ByteBuf}s from a peer's {@link RSocketRequester} */
5355
class RSocketResponder implements RSocket {
56+
private static final Logger LOGGER = LoggerFactory.getLogger(RSocketResponder.class);
57+
5458
private static final Consumer<ReferenceCounted> DROPPED_ELEMENTS_CONSUMER =
5559
referenceCounted -> {
5660
if (referenceCounted.refCnt() > 0) {
@@ -70,7 +74,6 @@ class RSocketResponder implements RSocket {
7074
private final io.rsocket.ResponderRSocket responderRSocket;
7175

7276
private final PayloadDecoder payloadDecoder;
73-
private final Consumer<Throwable> errorConsumer;
7477
private final ResponderLeaseHandler leaseHandler;
7578
private final Disposable leaseHandlerDisposable;
7679
private final MonoProcessor<Void> onClose;
@@ -88,12 +91,10 @@ class RSocketResponder implements RSocket {
8891
private final UnboundedProcessor<ByteBuf> sendProcessor;
8992
private final ByteBufAllocator allocator;
9093

91-
@SuppressWarnings("deprecation")
9294
RSocketResponder(
9395
DuplexConnection connection,
9496
RSocket requestHandler,
9597
PayloadDecoder payloadDecoder,
96-
Consumer<Throwable> errorConsumer,
9798
ResponderLeaseHandler leaseHandler,
9899
int mtu) {
99100
this.connection = connection;
@@ -107,7 +108,6 @@ class RSocketResponder implements RSocket {
107108
: null;
108109

109110
this.payloadDecoder = payloadDecoder;
110-
this.errorConsumer = errorConsumer;
111111
this.leaseHandler = leaseHandler;
112112
this.sendingSubscriptions = new SynchronizedIntObjectHashMap<>();
113113
this.channelProcessors = new SynchronizedIntObjectHashMap<>();
@@ -119,7 +119,7 @@ class RSocketResponder implements RSocket {
119119

120120
connection.send(sendProcessor).subscribe(null, this::handleSendProcessorError);
121121

122-
connection.receive().subscribe(this::handleFrame, errorConsumer);
122+
connection.receive().subscribe(this::handleFrame, e -> {});
123123
leaseHandlerDisposable = leaseHandler.send(sendProcessor::onNextPrioritized);
124124

125125
this.connection
@@ -136,7 +136,9 @@ private void handleSendProcessorError(Throwable t) {
136136
try {
137137
subscription.cancel();
138138
} catch (Throwable e) {
139-
errorConsumer.accept(e);
139+
if (LOGGER.isDebugEnabled()) {
140+
LOGGER.debug("Dropped exception", t);
141+
}
140142
}
141143
});
142144

@@ -147,7 +149,9 @@ private void handleSendProcessorError(Throwable t) {
147149
try {
148150
subscription.onError(t);
149151
} catch (Throwable e) {
150-
errorConsumer.accept(e);
152+
if (LOGGER.isDebugEnabled()) {
153+
LOGGER.debug("Dropped exception", t);
154+
}
151155
}
152156
});
153157
}
@@ -376,9 +380,7 @@ protected void hookOnSubscribe(Subscription subscription) {
376380
}
377381

378382
@Override
379-
protected void hookOnError(Throwable throwable) {
380-
errorConsumer.accept(throwable);
381-
}
383+
protected void hookOnError(Throwable throwable) {}
382384

383385
@Override
384386
protected void hookFinally(SignalType type) {
@@ -587,9 +589,7 @@ protected void hookOnSubscribe(Subscription subscription) {
587589
}
588590

589591
@Override
590-
protected void hookOnError(Throwable throwable) {
591-
errorConsumer.accept(throwable);
592-
}
592+
protected void hookOnError(Throwable throwable) {}
593593
});
594594
}
595595

@@ -603,7 +603,6 @@ private void handleCancelFrame(int streamId) {
603603
}
604604

605605
private void handleError(int streamId, Throwable t) {
606-
errorConsumer.accept(t);
607606
sendProcessor.onNext(ErrorFrameCodec.encode(allocator, streamId, t));
608607
}
609608

rsocket-core/src/main/java/io/rsocket/core/RSocketServer.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,6 @@ public final class RSocketServer {
6767
private int mtu = 0;
6868
private PayloadDecoder payloadDecoder = PayloadDecoder.DEFAULT;
6969

70-
private Consumer<Throwable> errorConsumer = ex -> {};
71-
7270
private RSocketServer() {}
7371

7472
/** Static factory method to create an {@code RSocketServer}. */
@@ -247,16 +245,6 @@ public RSocketServer payloadDecoder(PayloadDecoder decoder) {
247245
return this;
248246
}
249247

250-
/**
251-
* @deprecated this is deprecated with no replacement and will be removed after {@link
252-
* io.rsocket.RSocketFactory} is removed.
253-
*/
254-
@Deprecated
255-
public RSocketServer errorConsumer(Consumer<Throwable> errorConsumer) {
256-
this.errorConsumer = errorConsumer;
257-
return this;
258-
}
259-
260248
/**
261249
* Start the server on the given transport.
262250
*
@@ -387,7 +375,6 @@ private Mono<Void> acceptSetup(
387375
new RSocketRequester(
388376
wrappedMultiplexer.asServerConnection(),
389377
payloadDecoder,
390-
errorConsumer,
391378
StreamIdSupplier.serverSupplier(),
392379
mtu,
393380
setupPayload.keepAliveInterval(),
@@ -414,19 +401,14 @@ private Mono<Void> acceptSetup(
414401
ResponderLeaseHandler responderLeaseHandler =
415402
leaseEnabled
416403
? new ResponderLeaseHandler.Impl<>(
417-
SERVER_TAG,
418-
connection.alloc(),
419-
leases.sender(),
420-
errorConsumer,
421-
leases.stats())
404+
SERVER_TAG, connection.alloc(), leases.sender(), leases.stats())
422405
: ResponderLeaseHandler.None;
423406

424407
RSocket rSocketResponder =
425408
new RSocketResponder(
426409
connection,
427410
wrappedRSocketHandler,
428411
payloadDecoder,
429-
errorConsumer,
430412
responderLeaseHandler,
431413
mtu);
432414
})

rsocket-core/src/main/java/io/rsocket/internal/ClientServerInputMultiplexer.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,7 @@ public ClientServerInputMultiplexer(
119119
break;
120120
}
121121
},
122-
t -> {
123-
LOGGER.error("Error receiving frame:", t);
124-
dispose();
125-
});
122+
t -> {});
126123
}
127124

128125
public DuplexConnection asClientServerConnection() {

0 commit comments

Comments
 (0)