Skip to content

Commit 8a3103e

Browse files
committed
fixes test
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 5492447 commit 8a3103e

File tree

4 files changed

+13
-7
lines changed

4 files changed

+13
-7
lines changed

rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,7 @@ public void accept(long l) {
514514
sendProcessor.onNext(RequestNFrameFlyweight.encode(allocator, streamId, n));
515515
}
516516
})
517+
.doFinally(signalType -> channelProcessors.remove(streamId))
517518
.doOnDiscard(ReferenceCounted.class, DROPPED_ELEMENTS_CONSUMER);
518519

519520
// not chained, as the payload should be enqueued in the Unicast processor before this method

rsocket-core/src/main/java/io/rsocket/frame/decoder/ZeroCopyPayloadDecoder.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
* for releasing the payload to free memory when they no long need it.
1212
*/
1313
public class ZeroCopyPayloadDecoder implements PayloadDecoder {
14-
1514
@Override
1615
public Payload apply(ByteBuf byteBuf) {
1716
ByteBuf m;

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -357,8 +357,7 @@ public void shouldThrownExceptionIfGivenPayloadIsExitsSizeAllowanceWithNoFragmen
357357
.hasMessage(INVALID_PAYLOAD_ERROR_MESSAGE))
358358
.verify();
359359
// FIXME: should be removed
360-
Assertions.assertThat(rule.connection.getSent())
361-
.allMatch(bb -> bb.release());
360+
Assertions.assertThat(rule.connection.getSent()).allMatch(bb -> bb.release());
362361
rule.assertHasNoLeaks();
363362
});
364363
}

rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ public Mono<Payload> requestResponse(Payload payload) {
175175
@Test
176176
@Timeout(2_000)
177177
public void shouldThrownExceptionIfGivenPayloadIsExitsSizeAllowanceWithNoFragmentation() {
178-
ByteBufAllocator allocator = rule.alloc();
179178
final int streamId = 4;
180179
final AtomicBoolean cancelled = new AtomicBoolean();
181180
byte[] metadata = new byte[FrameLengthFlyweight.FRAME_LENGTH_MASK];
@@ -226,18 +225,26 @@ protected void hookOnSubscribe(Subscription subscription) {
226225
.isInstanceOf(IllegalArgumentException.class)
227226
.hasToString("java.lang.IllegalArgumentException: " + INVALID_PAYLOAD_ERROR_MESSAGE);
228227
Assertions.assertThat(rule.connection.getSent())
228+
.filteredOn(bb -> FrameHeaderFlyweight.frameType(bb) == FrameType.ERROR)
229229
.hasSize(1)
230230
.first()
231-
.matches(bb -> FrameHeaderFlyweight.frameType(bb) == FrameType.ERROR)
232231
.matches(bb -> ErrorFrameFlyweight.dataUtf8(bb).contains(INVALID_PAYLOAD_ERROR_MESSAGE))
233232
.matches(ReferenceCounted::release);
234233

235234
assertThat("Subscription not cancelled.", cancelled.get(), is(true));
235+
236+
// FIXME: needs to be removed. May have cancel frame as well
237+
Assertions.assertThat(rule.connection.getSent())
238+
.filteredOn(bb -> bb.refCnt() != 0)
239+
.allMatch(ReferenceCounted::release);
236240
rule.init();
237241
rule.setAcceptingSocket(acceptingSocket);
238242
}
239-
// FIXME: needs to be removed
240-
Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release);
243+
244+
// FIXME: needs to be removed. May have cancel frame as well
245+
Assertions.assertThat(rule.connection.getSent())
246+
.filteredOn(bb -> bb.refCnt() != 0)
247+
.allMatch(ReferenceCounted::release);
241248
rule.assertHasNoLeaks();
242249
}
243250

0 commit comments

Comments
 (0)