Skip to content

Commit ae94de0

Browse files
committed
Ensure Subscriber is removed from sendingSubscriptions
Closes gh-961 Signed-off-by: Rossen Stoyanchev <[email protected]>
1 parent d047c59 commit ae94de0

File tree

1 file changed

+6
-10
lines changed

1 file changed

+6
-10
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -406,22 +406,20 @@ private void handleRequestResponse(int streamId, Mono<Payload> response) {
406406

407407
@Override
408408
protected void hookOnNext(Payload payload) {
409-
if (isEmpty) {
410-
isEmpty = false;
411-
}
409+
isEmpty = false;
412410

413411
if (!PayloadValidationUtils.isValid(mtu, payload, maxFrameLength)) {
414412
payload.release();
415413
cancel();
416-
final IllegalArgumentException t =
417-
new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE);
418-
handleError(streamId, t);
414+
sendingSubscriptions.remove(streamId, this);
415+
handleError(streamId, new IllegalArgumentException(INVALID_PAYLOAD_ERROR_MESSAGE));
419416
return;
420417
}
421418

422419
ByteBuf byteBuf =
423420
PayloadFrameCodec.encodeNextCompleteReleasingPayload(allocator, streamId, payload);
424421
sendProcessor.onNext(byteBuf);
422+
sendingSubscriptions.remove(streamId, this);
425423
}
426424

427425
@Override
@@ -433,10 +431,8 @@ protected void hookOnError(Throwable throwable) {
433431

434432
@Override
435433
protected void hookOnComplete() {
436-
if (isEmpty) {
437-
if (sendingSubscriptions.remove(streamId, this)) {
438-
sendProcessor.onNext(PayloadFrameCodec.encodeComplete(allocator, streamId));
439-
}
434+
if (isEmpty && sendingSubscriptions.remove(streamId, this)) {
435+
sendProcessor.onNext(PayloadFrameCodec.encodeComplete(allocator, streamId));
440436
}
441437
}
442438
};

0 commit comments

Comments
 (0)