Skip to content

Commit 3495632

Browse files
authored
fixes extra Payload release on racing complete and cancel (#894)
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent c3b4d15 commit 3495632

File tree

3 files changed

+43
-4
lines changed

3 files changed

+43
-4
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketRequester.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,9 @@ void hookOnCancel() {
315315
if (receivers.remove(streamId, receiver)) {
316316
sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId));
317317
} else {
318-
payload.release();
318+
if (this.firstRequest) {
319+
payload.release();
320+
}
319321
}
320322
}
321323

@@ -405,7 +407,9 @@ void hookOnCancel() {
405407
if (receivers.remove(streamId, receiver)) {
406408
sendProcessor.onNext(CancelFrameCodec.encode(allocator, streamId));
407409
} else {
408-
payload.release();
410+
if (this.firstRequest) {
411+
payload.release();
412+
}
409413
}
410414
}
411415

rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,12 @@ public void request(long n) {
7676
}
7777
return;
7878
}
79-
this.firstRequest = false;
8079

8180
if (WIP.getAndIncrement(this) != 0) {
8281
return;
8382
}
83+
84+
this.firstRequest = false;
8485
int missed = 1;
8586

8687
boolean firstLoop = true;

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -616,7 +616,15 @@ private static Stream<Arguments> racingCases() {
616616
}),
617617
Arguments.of(
618618
(Function<ClientSocketRule, Publisher<Payload>>)
619-
(rule) -> rule.socket.requestResponse(EmptyPayload.INSTANCE),
619+
(rule) -> {
620+
ByteBuf data = rule.allocator.buffer();
621+
data.writeCharSequence("testData", CharsetUtil.UTF_8);
622+
623+
ByteBuf metadata = rule.allocator.buffer();
624+
metadata.writeCharSequence("testMetadata", CharsetUtil.UTF_8);
625+
Payload requestPayload = ByteBufPayload.create(data, metadata);
626+
return rule.socket.requestResponse(requestPayload);
627+
},
620628
(BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>)
621629
(as, rule) -> {
622630
ByteBufAllocator allocator = rule.alloc();
@@ -630,6 +638,32 @@ private static Stream<Arguments> racingCases() {
630638
PayloadFrameCodec.encode(
631639
allocator, streamId, false, false, true, metadata, data);
632640

641+
RaceTestUtils.race(as::cancel, () -> rule.connection.addToReceivedBuffer(frame));
642+
}),
643+
Arguments.of(
644+
(Function<ClientSocketRule, Publisher<Payload>>)
645+
(rule) -> {
646+
ByteBuf data = rule.allocator.buffer();
647+
data.writeCharSequence("testData", CharsetUtil.UTF_8);
648+
649+
ByteBuf metadata = rule.allocator.buffer();
650+
metadata.writeCharSequence("testMetadata", CharsetUtil.UTF_8);
651+
Payload requestPayload = ByteBufPayload.create(data, metadata);
652+
return rule.socket.requestStream(requestPayload);
653+
},
654+
(BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>)
655+
(as, rule) -> {
656+
ByteBufAllocator allocator = rule.alloc();
657+
ByteBuf metadata = allocator.buffer();
658+
metadata.writeCharSequence("abc", CharsetUtil.UTF_8);
659+
ByteBuf data = allocator.buffer();
660+
data.writeCharSequence("def", CharsetUtil.UTF_8);
661+
as.request(Long.MAX_VALUE);
662+
int streamId = rule.getStreamIdForRequestType(REQUEST_STREAM);
663+
ByteBuf frame =
664+
PayloadFrameCodec.encode(
665+
allocator, streamId, false, true, true, metadata, data);
666+
633667
RaceTestUtils.race(as::cancel, () -> rule.connection.addToReceivedBuffer(frame));
634668
}));
635669
}

0 commit comments

Comments
 (0)