Skip to content

Commit f62e73d

Browse files
NiteshKantstevegury
authored andcommitted
Frame instance was referenced longer than required. (#88)
Problem: `handleXXX` methods in `Responder` were closing over the passed `requestFrame` and using it later in the lifecycle of request processing. `Frame` objects and the underlying buffers are not designed to be retained after the scope of the parent method as these objects are threadlocal and reused. This causes issues when the frame object is referenced later in the request processing (eg: `cleanup()`) Solution: The only reason frame object was retained was to get the stream Id. This change pre-fetches the `streamId` and uses that from within the processing closure. Result: No more issue with frame access.
1 parent 61a45c0 commit f62e73d

File tree

1 file changed

+15
-17
lines changed

1 file changed

+15
-17
lines changed

src/main/java/io/reactivesocket/internal/Responder.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ public void onNext(Frame requestFrame) {
259259
} else if (requestFrame.getType() == FrameType.CANCEL) {
260260
Subscription s;
261261
synchronized (Responder.this) {
262-
s = cancellationSubscriptions.get(requestFrame.getStreamId());
262+
s = cancellationSubscriptions.get(streamId);
263263
}
264264
if (s != null) {
265265
s.cancel();
@@ -268,7 +268,7 @@ public void onNext(Frame requestFrame) {
268268
} else if (requestFrame.getType() == FrameType.REQUEST_N) {
269269
SubscriptionArbiter inFlightSubscription;
270270
synchronized (Responder.this) {
271-
inFlightSubscription = inFlight.get(requestFrame.getStreamId());
271+
inFlightSubscription = inFlight.get(streamId);
272272
}
273273
if (inFlightSubscription != null) {
274274
long requestN = Frame.RequestN.requestN(requestFrame);
@@ -399,6 +399,7 @@ private Publisher<Frame> handleRequestResponse(
399399
final RequestHandler requestHandler,
400400
final Int2ObjectHashMap<Subscription> cancellationSubscriptions) {
401401

402+
final int streamId = requestFrame.getStreamId();
402403
return child -> {
403404
Subscription s = new Subscription() {
404405

@@ -408,8 +409,6 @@ private Publisher<Frame> handleRequestResponse(
408409
@Override
409410
public void request(long n) {
410411
if (n > 0 && started.compareAndSet(false, true)) {
411-
final int streamId = requestFrame.getStreamId();
412-
413412
try {
414413
Publisher<Payload> responsePublisher =
415414
requestHandler.handleRequestResponse(requestFrame);
@@ -477,13 +476,13 @@ public void cancel() {
477476

478477
private void cleanup() {
479478
synchronized(Responder.this) {
480-
cancellationSubscriptions.remove(requestFrame.getStreamId());
479+
cancellationSubscriptions.remove(streamId);
481480
}
482481
}
483482

484483
};
485484
synchronized(Responder.this) {
486-
cancellationSubscriptions.put(requestFrame.getStreamId(), s);
485+
cancellationSubscriptions.put(streamId, s);
487486
}
488487
child.onSubscribe(s);
489488
};
@@ -541,7 +540,7 @@ private Publisher<Frame> _handleRequestStream(
541540
final Int2ObjectHashMap<Subscription> cancellationSubscriptions,
542541
final Int2ObjectHashMap<SubscriptionArbiter> inFlight,
543542
final boolean allowCompletion) {
544-
543+
final int streamId = requestFrame.getStreamId();
545544
return child -> {
546545
Subscription s = new Subscription() {
547546

@@ -556,7 +555,6 @@ public void request(long n) {
556555
}
557556
if (started.compareAndSet(false, true)) {
558557
arbiter.addTransportRequest(n);
559-
final int streamId = requestFrame.getStreamId();
560558

561559
try {
562560
Publisher<Payload> responses =
@@ -630,14 +628,14 @@ public void cancel() {
630628

631629
private void cleanup() {
632630
synchronized(Responder.this) {
633-
inFlight.remove(requestFrame.getStreamId());
634-
cancellationSubscriptions.remove(requestFrame.getStreamId());
631+
inFlight.remove(streamId);
632+
cancellationSubscriptions.remove(streamId);
635633
}
636634
}
637635

638636
};
639637
synchronized(Responder.this) {
640-
cancellationSubscriptions.put(requestFrame.getStreamId(), s);
638+
cancellationSubscriptions.put(streamId, s);
641639
}
642640
child.onSubscribe(s);
643641

@@ -704,8 +702,9 @@ private Publisher<Frame> handleRequestChannel(Frame requestFrame,
704702
Int2ObjectHashMap<SubscriptionArbiter> inFlight) {
705703

706704
UnicastSubject<Payload> channelSubject;
705+
final int streamId = requestFrame.getStreamId();
707706
synchronized(Responder.this) {
708-
channelSubject = channels.get(requestFrame.getStreamId());
707+
channelSubject = channels.get(streamId);
709708
}
710709
if (channelSubject == null) {
711710
return child -> {
@@ -722,7 +721,6 @@ public void request(long n) {
722721
}
723722
if (started.compareAndSet(false, true)) {
724723
arbiter.addTransportRequest(n);
725-
final int streamId = requestFrame.getStreamId();
726724

727725
// first request on this channel
728726
UnicastSubject<Payload> channelRequests =
@@ -816,14 +814,14 @@ public void cancel() {
816814

817815
private void cleanup() {
818816
synchronized(Responder.this) {
819-
inFlight.remove(requestFrame.getStreamId());
820-
cancellationSubscriptions.remove(requestFrame.getStreamId());
817+
inFlight.remove(streamId);
818+
cancellationSubscriptions.remove(streamId);
821819
}
822820
}
823821

824822
};
825823
synchronized(Responder.this) {
826-
cancellationSubscriptions.put(requestFrame.getStreamId(), s);
824+
cancellationSubscriptions.put(streamId, s);
827825
}
828826
child.onSubscribe(s);
829827

@@ -848,7 +846,7 @@ private void cleanup() {
848846
// handle time-gap issues like this?
849847
// TODO validate with unit tests.
850848
return PublisherUtils.errorFrame(
851-
requestFrame.getStreamId(), new RuntimeException("Channel unavailable"));
849+
streamId, new RuntimeException("Channel unavailable"));
852850
}
853851
}
854852
}

0 commit comments

Comments
 (0)