Skip to content

Commit 8ab7391

Browse files
committed
Responder ignore LEASE message rather than returning an Error.
Context: A Responder listen to input from a connection, a client's Responder will then see the LEASE from the Server. It should be ignore.
1 parent 4032fbd commit 8ab7391

File tree

1 file changed

+76
-43
lines changed

1 file changed

+76
-43
lines changed

src/main/java/io/reactivesocket/internal/Responder.java

Lines changed: 76 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@
3939
/**
4040
* Protocol implementation abstracted over a {@link DuplexConnection}.
4141
* <p>
42-
* Concrete implementations of {@link DuplexConnection} over TCP, WebSockets, Aeron, etc can be passed to this class for protocol handling. The request handlers passed in at creation will be invoked
42+
* Concrete implementations of {@link DuplexConnection} over TCP, WebSockets,
43+
* Aeron, etc can be passed to this class for protocol handling. The request
44+
* handlers passed in at creation will be invoked
4345
* for each request over the connection.
4446
*/
4547
public class Responder {
@@ -52,7 +54,15 @@ public class Responder {
5254
private final Consumer<ConnectionSetupPayload> setupCallback;
5355
private final boolean isServer;
5456

55-
private Responder(boolean isServer, DuplexConnection connection, ConnectionSetupHandler connectionHandler, RequestHandler requestHandler, LeaseGovernor leaseGovernor, Consumer<Throwable> errorStream, Consumer<ConnectionSetupPayload> setupCallback) {
57+
private Responder(
58+
boolean isServer,
59+
DuplexConnection connection,
60+
ConnectionSetupHandler connectionHandler,
61+
RequestHandler requestHandler,
62+
LeaseGovernor leaseGovernor,
63+
Consumer<Throwable> errorStream,
64+
Consumer<ConnectionSetupPayload> setupCallback
65+
) {
5666
this.isServer = isServer;
5767
this.connection = connection;
5868
this.connectionHandler = connectionHandler;
@@ -64,42 +74,64 @@ private Responder(boolean isServer, DuplexConnection connection, ConnectionSetup
6474
}
6575

6676
/**
67-
* @param connectionHandler
68-
* Handle connection setup and set up request handling.
69-
* @param errorStream
70-
* A {@link Consumer<Throwable>} which will receive all errors that occurs processing requests.
71-
* <p>
72-
* This include fireAndForget which ONLY emit errors server-side via this mechanism.
77+
* @param connectionHandler Handle connection setup and set up request
78+
* handling.
79+
* @param errorStream A {@link Consumer<Throwable>} which will receive
80+
* all errors that occurs processing requests.
81+
* This include fireAndForget which ONLY emit errors
82+
* server-side via this mechanism.
7383
* @return responder instance
7484
*/
75-
public static <T> Responder createServerResponder(DuplexConnection connection, ConnectionSetupHandler connectionHandler, LeaseGovernor leaseGovernor, Consumer<Throwable> errorStream, Completable responderCompletable, Consumer<ConnectionSetupPayload> setupCallback) {
76-
Responder responder = new Responder(true, connection, connectionHandler, null, leaseGovernor, errorStream, setupCallback);
85+
public static <T> Responder createServerResponder(
86+
DuplexConnection connection,
87+
ConnectionSetupHandler connectionHandler,
88+
LeaseGovernor leaseGovernor,
89+
Consumer<Throwable> errorStream,
90+
Completable responderCompletable,
91+
Consumer<ConnectionSetupPayload> setupCallback
92+
) {
93+
Responder responder = new Responder(true, connection, connectionHandler, null,
94+
leaseGovernor, errorStream, setupCallback);
7795
responder.start(responderCompletable);
7896
return responder;
7997
}
8098

81-
public static <T> Responder createServerResponder(DuplexConnection connection, ConnectionSetupHandler connectionHandler, LeaseGovernor leaseGovernor, Consumer<Throwable> errorStream, Completable responderCompletable) {
82-
return createServerResponder(connection, connectionHandler, leaseGovernor, errorStream, responderCompletable, s -> {});
99+
public static <T> Responder createServerResponder(
100+
DuplexConnection connection,
101+
ConnectionSetupHandler connectionHandler,
102+
LeaseGovernor leaseGovernor,
103+
Consumer<Throwable> errorStream,
104+
Completable responderCompletable
105+
) {
106+
return createServerResponder(connection, connectionHandler, leaseGovernor,
107+
errorStream, responderCompletable, s -> {});
83108
}
84109

85-
public static <T> Responder createClientResponder(DuplexConnection connection, RequestHandler requestHandler, LeaseGovernor leaseGovernor, Consumer<Throwable> errorStream, Completable responderCompletable) {
86-
Responder responder = new Responder(false, connection, null, requestHandler, leaseGovernor, errorStream, s -> {});
110+
public static <T> Responder createClientResponder(
111+
DuplexConnection connection,
112+
RequestHandler requestHandler,
113+
LeaseGovernor leaseGovernor,
114+
Consumer<Throwable> errorStream,
115+
Completable responderCompletable
116+
) {
117+
Responder responder = new Responder(false, connection, null, requestHandler,
118+
leaseGovernor, errorStream, s -> {});
87119
responder.start(responderCompletable);
88120
return responder;
89121
}
90122

91123
/**
92-
* Send a LEASE frame immediately. Only way a LEASE is sent. Handled entirely by application logic.
124+
* Send a LEASE frame immediately. Only way a LEASE is sent. Handled
125+
* entirely by application logic.
93126
*
94127
* @param ttl of lease
95128
* @param numberOfRequests of lease
96129
*/
97-
public void sendLease(final int ttl, final int numberOfRequests)
98-
{
99-
connection.addOutput(PublisherUtils.just(Frame.Lease.from(ttl, numberOfRequests, Frame.NULL_BYTEBUFFER)), new Completable() {
130+
public void sendLease(final int ttl, final int numberOfRequests) {
131+
Frame leaseFrame = Frame.Lease.from(ttl, numberOfRequests, Frame.NULL_BYTEBUFFER);
132+
connection.addOutput(PublisherUtils.just(leaseFrame), new Completable() {
100133
@Override
101-
public void success() {
102-
}
134+
public void success() {}
103135

104136
@Override
105137
public void error(Throwable e) {
@@ -113,8 +145,7 @@ public void error(Throwable e) {
113145
*
114146
* @return time from {@link System#nanoTime()} of last keepalive
115147
*/
116-
public long timeOfLastKeepalive()
117-
{
148+
public long timeOfLastKeepalive() {
118149
return timeOfLastKeepalive;
119150
}
120151

@@ -124,8 +155,10 @@ private void start(final Completable responderCompletable) {
124155
/* streams in flight that can receive REQUEST_N messages */
125156
final Int2ObjectHashMap<SubscriptionArbiter> inFlight = new Int2ObjectHashMap<>();
126157
/* bidirectional channels */
127-
final Int2ObjectHashMap<UnicastSubject<Payload>> channels = new Int2ObjectHashMap<>(); // TODO should/can we make this optional so that it only gets allocated per connection if channels are
128-
// used?
158+
// TODO: should/can we make this optional so that it only gets allocated per connection if
159+
// channels are used?
160+
final Int2ObjectHashMap<UnicastSubject<Payload>> channels = new Int2ObjectHashMap<>();
161+
129162
final AtomicBoolean childTerminated = new AtomicBoolean(false);
130163
final AtomicReference<Disposable> transportSubscription = new AtomicReference<>();
131164

@@ -143,14 +176,16 @@ public void onSubscribe(Disposable d) {
143176
}
144177
}
145178

146-
volatile RequestHandler requestHandler = !isServer ? clientRequestHandler : null; // null until after first Setup frame
179+
// null until after first Setup frame
180+
volatile RequestHandler requestHandler = !isServer ? clientRequestHandler : null;
147181

148182
@Override
149183
public void onNext(Frame requestFrame) {
150184
final int streamId = requestFrame.getStreamId();
151185
if (requestHandler == null) { // this will only happen when isServer==true
152186
if (childTerminated.get()) {
153-
// already terminated, but still receiving latent messages ... ignore them while shutdown occurs
187+
// already terminated, but still receiving latent messages...
188+
// ignore them while shutdown occurs
154189
return;
155190
}
156191
if (requestFrame.getType().equals(FrameType.SETUP)) {
@@ -210,44 +245,42 @@ public void onNext(Frame requestFrame) {
210245
return;
211246
} else if (requestFrame.getType() == FrameType.REQUEST_N) {
212247
SubscriptionArbiter inFlightSubscription = null;
213-
synchronized (Responder.this)
214-
{
248+
synchronized (Responder.this) {
215249
inFlightSubscription = inFlight.get(requestFrame.getStreamId());
216250
}
217-
if (inFlightSubscription != null)
218-
{
219-
inFlightSubscription.addApplicationRequest(Frame.RequestN.requestN(requestFrame));
251+
if (inFlightSubscription != null) {
252+
long requestN = Frame.RequestN.requestN(requestFrame);
253+
inFlightSubscription.addApplicationRequest(requestN);
220254
return;
221255
}
222-
// TODO should we do anything if we don't find the stream? emitting an error is risky as the responder could have terminated and cleaned up already
256+
// TODO should we do anything if we don't find the stream? emitting an
257+
// error is risky as the responder could have terminated and cleaned up already
223258
} else if (requestFrame.getType() == FrameType.KEEPALIVE) {
224259
// this client is alive.
225260
timeOfLastKeepalive = System.nanoTime();
226261
// echo back if flag set
227-
if (Frame.Keepalive.hasRespondFlag(requestFrame))
228-
{
262+
if (Frame.Keepalive.hasRespondFlag(requestFrame)) {
229263
responsePublisher = PublisherUtils.just(Frame.Keepalive.from(requestFrame.getData(), false));
230-
}
231-
else
232-
{
264+
} else {
233265
return;
234266
}
267+
} else if (requestFrame.getType() == FrameType.LEASE) {
268+
// LEASE only concerns the Requester
235269
} else {
236270
responsePublisher = PublisherUtils.errorFrame(streamId, new IllegalStateException("Unexpected prefix: " + requestFrame.getType()));
237271
}
238272
} catch (Throwable e) {
239-
// synchronous try/catch since we execute user functions in the handlers and they could throw
240-
errorStream.accept(new RuntimeException("Error in request handling.", e));
241-
// error message to user
242-
responsePublisher = PublisherUtils.errorFrame(streamId, new RuntimeException("Unhandled error processing request"));
243-
}
273+
// synchronous try/catch since we execute user functions in the handlers and they could throw
274+
errorStream.accept(new RuntimeException("Error in request handling.", e));
275+
// error message to user
276+
responsePublisher = PublisherUtils.errorFrame(streamId, new RuntimeException("Unhandled error processing request"));
277+
}
244278
} else {
245279
final RejectedException exception = new RejectedException("No associated lease");
246280
responsePublisher = PublisherUtils.errorFrame(streamId, exception);
247281
}
248282

249283
connection.addOutput(responsePublisher, new Completable() {
250-
251284
@Override
252285
public void success() {
253286
// TODO Auto-generated method stub

0 commit comments

Comments
 (0)