Skip to content

Commit ca0bd35

Browse files
committed
enables set of tests and ensure no leaks on racing
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 3bf4c98 commit ca0bd35

File tree

4 files changed

+97
-95
lines changed

4 files changed

+97
-95
lines changed

rsocket-core/src/test/java/io/rsocket/buffer/LeaksTrackingByteBufAllocator.java

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import io.netty.buffer.ByteBuf;
44
import io.netty.buffer.ByteBufAllocator;
55
import io.netty.buffer.CompositeByteBuf;
6-
import java.util.List;
76
import java.util.concurrent.ConcurrentLinkedQueue;
87
import org.assertj.core.api.Assertions;
98

@@ -35,22 +34,9 @@ public LeaksTrackingByteBufAllocator assertHasNoLeaks() {
3534
try {
3635
Assertions.assertThat(tracker)
3736
.allSatisfy(
38-
buf -> {
39-
if (buf instanceof CompositeByteBuf) {
40-
if (buf.refCnt() > 0) {
41-
List<ByteBuf> decomposed =
42-
((CompositeByteBuf) buf).decompose(0, buf.readableBytes());
43-
for (int i = 0; i < decomposed.size(); i++) {
44-
Assertions.assertThat(decomposed.get(i))
45-
.matches(bb -> bb.refCnt() == 0, "Got unreleased CompositeByteBuf");
46-
}
47-
}
48-
49-
} else {
37+
buf ->
5038
Assertions.assertThat(buf)
51-
.matches(bb -> bb.refCnt() == 0, "buffer should be released");
52-
}
53-
});
39+
.matches(bb -> bb.refCnt() == 0, "buffer should be released"));
5440
} finally {
5541
tracker.clear();
5642
}

rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java

Lines changed: 55 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.netty.buffer.ByteBufAllocator;
3939
import io.netty.buffer.Unpooled;
4040
import io.netty.util.CharsetUtil;
41+
import io.netty.util.ReferenceCountUtil;
4142
import io.netty.util.ReferenceCounted;
4243
import io.rsocket.Payload;
4344
import io.rsocket.RSocket;
@@ -71,6 +72,7 @@
7172
import java.util.function.Function;
7273
import java.util.stream.Stream;
7374
import org.assertj.core.api.Assertions;
75+
import org.junit.jupiter.api.AfterEach;
7476
import org.junit.jupiter.api.BeforeEach;
7577
import org.junit.jupiter.api.Disabled;
7678
import org.junit.jupiter.api.Test;
@@ -84,6 +86,7 @@
8486
import org.reactivestreams.Subscription;
8587
import reactor.core.publisher.BaseSubscriber;
8688
import reactor.core.publisher.Flux;
89+
import reactor.core.publisher.Hooks;
8790
import reactor.core.publisher.Mono;
8891
import reactor.core.publisher.MonoProcessor;
8992
import reactor.core.publisher.UnicastProcessor;
@@ -97,6 +100,8 @@ public class RSocketRequesterTest {
97100

98101
@BeforeEach
99102
public void setUp() throws Throwable {
103+
Hooks.onNextDropped(ReferenceCountUtil::safeRelease);
104+
Hooks.onErrorDropped((t) -> {});
100105
rule = new ClientSocketRule();
101106
rule.apply(
102107
new Statement() {
@@ -107,6 +112,12 @@ public void evaluate() {}
107112
.evaluate();
108113
}
109114

115+
@AfterEach
116+
public void tearDown() {
117+
Hooks.resetOnErrorDropped();
118+
Hooks.resetOnNextDropped();
119+
}
120+
110121
@Test
111122
@Timeout(2_000)
112123
public void testInvalidFrameOnStream0() {
@@ -403,21 +414,8 @@ static Stream<BiFunction<RSocket, Payload, Publisher<?>>> prepareCalls() {
403414
rule.assertHasNoLeaks();
404415
}
405416

406-
@Test
407-
@Disabled("Due to https://github.com/reactor/reactor-core/pull/2114")
408-
@SuppressWarnings("unchecked")
409-
public void checkNoLeaksOnRacingTest() {
410-
411-
racingCases()
412-
.forEach(
413-
a -> {
414-
((Runnable) a.get()[0]).run();
415-
checkNoLeaksOnRacing(
416-
(Function<ClientSocketRule, Publisher<Payload>>) a.get()[1],
417-
(BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>) a.get()[2]);
418-
});
419-
}
420-
417+
@ParameterizedTest
418+
@MethodSource("racingCases")
421419
public void checkNoLeaksOnRacing(
422420
Function<ClientSocketRule, Publisher<Payload>> initiator,
423421
BiConsumer<AssertSubscriber<Payload>, ClientSocketRule> runner) {
@@ -437,7 +435,7 @@ public void evaluate() {}
437435
}
438436

439437
Publisher<Payload> payloadP = initiator.apply(clientSocketRule);
440-
AssertSubscriber<Payload> assertSubscriber = AssertSubscriber.create();
438+
AssertSubscriber<Payload> assertSubscriber = AssertSubscriber.create(0);
441439

442440
if (payloadP instanceof Flux) {
443441
((Flux<Payload>) payloadP).doOnNext(Payload::release).subscribe(assertSubscriber);
@@ -450,14 +448,13 @@ public void evaluate() {}
450448
Assertions.assertThat(clientSocketRule.connection.getSent())
451449
.allMatch(ReferenceCounted::release);
452450

453-
rule.assertHasNoLeaks();
451+
clientSocketRule.assertHasNoLeaks();
454452
}
455453
}
456454

457455
private static Stream<Arguments> racingCases() {
458456
return Stream.of(
459457
Arguments.of(
460-
(Runnable) () -> System.out.println("RequestStream downstream cancellation case"),
461458
(Function<ClientSocketRule, Publisher<Payload>>)
462459
(rule) -> rule.socket.requestStream(EmptyPayload.INSTANCE),
463460
(BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>)
@@ -467,6 +464,7 @@ private static Stream<Arguments> racingCases() {
467464
metadata.writeCharSequence("abc", CharsetUtil.UTF_8);
468465
ByteBuf data = allocator.buffer();
469466
data.writeCharSequence("def", CharsetUtil.UTF_8);
467+
as.request(1);
470468
int streamId = rule.getStreamIdForRequestType(REQUEST_STREAM);
471469
ByteBuf frame =
472470
PayloadFrameFlyweight.encode(
@@ -475,7 +473,6 @@ private static Stream<Arguments> racingCases() {
475473
RaceTestUtils.race(as::cancel, () -> rule.connection.addToReceivedBuffer(frame));
476474
}),
477475
Arguments.of(
478-
(Runnable) () -> System.out.println("RequestChannel downstream cancellation case"),
479476
(Function<ClientSocketRule, Publisher<Payload>>)
480477
(rule) -> rule.socket.requestChannel(Flux.just(EmptyPayload.INSTANCE)),
481478
(BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>)
@@ -485,6 +482,7 @@ private static Stream<Arguments> racingCases() {
485482
metadata.writeCharSequence("abc", CharsetUtil.UTF_8);
486483
ByteBuf data = allocator.buffer();
487484
data.writeCharSequence("def", CharsetUtil.UTF_8);
485+
as.request(1);
488486
int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL);
489487
ByteBuf frame =
490488
PayloadFrameFlyweight.encode(
@@ -493,79 +491,93 @@ private static Stream<Arguments> racingCases() {
493491
RaceTestUtils.race(as::cancel, () -> rule.connection.addToReceivedBuffer(frame));
494492
}),
495493
Arguments.of(
496-
(Runnable) () -> System.out.println("RequestChannel upstream cancellation 1"),
497494
(Function<ClientSocketRule, Publisher<Payload>>)
498495
(rule) -> {
499496
ByteBufAllocator allocator = rule.alloc();
500497
ByteBuf metadata = allocator.buffer();
501-
metadata.writeCharSequence("abc", CharsetUtil.UTF_8);
498+
metadata.writeCharSequence("metadata", CharsetUtil.UTF_8);
502499
ByteBuf data = allocator.buffer();
503-
data.writeCharSequence("def", CharsetUtil.UTF_8);
504-
return rule.socket.requestChannel(
505-
Flux.just(ByteBufPayload.create(data, metadata)));
500+
data.writeCharSequence("data", CharsetUtil.UTF_8);
501+
final Payload payload = ByteBufPayload.create(data, metadata);
502+
503+
return rule.socket.requestStream(payload);
506504
},
507505
(BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>)
508-
(as, rule) -> {
506+
(as, rule) -> RaceTestUtils.race(() -> as.request(1), as::cancel)),
507+
Arguments.of(
508+
(Function<ClientSocketRule, Publisher<Payload>>)
509+
(rule) -> {
509510
ByteBufAllocator allocator = rule.alloc();
510-
int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL);
511-
ByteBuf frame = CancelFrameFlyweight.encode(allocator, streamId);
512-
513-
RaceTestUtils.race(
514-
() -> as.request(1), () -> rule.connection.addToReceivedBuffer(frame));
515-
}),
511+
return rule.socket.requestChannel(
512+
Flux.generate(
513+
() -> 1L,
514+
(index, sink) -> {
515+
ByteBuf metadata = allocator.buffer();
516+
metadata.writeCharSequence("metadata", CharsetUtil.UTF_8);
517+
ByteBuf data = allocator.buffer();
518+
data.writeCharSequence("data", CharsetUtil.UTF_8);
519+
final Payload payload = ByteBufPayload.create(data, metadata);
520+
sink.next(payload);
521+
sink.complete();
522+
return ++index;
523+
}));
524+
},
525+
(BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>)
526+
(as, rule) -> RaceTestUtils.race(() -> as.request(1), as::cancel)),
516527
Arguments.of(
517-
(Runnable) () -> System.out.println("RequestChannel upstream cancellation 2"),
518528
(Function<ClientSocketRule, Publisher<Payload>>)
519529
(rule) ->
520530
rule.socket.requestChannel(
521531
Flux.generate(
522532
() -> 1L,
523533
(index, sink) -> {
524-
final Payload payload =
525-
ByteBufPayload.create("d" + index, "m" + index);
534+
ByteBuf data = rule.alloc().buffer();
535+
data.writeCharSequence("d" + index, CharsetUtil.UTF_8);
536+
ByteBuf metadata = rule.alloc().buffer();
537+
metadata.writeCharSequence("m" + index, CharsetUtil.UTF_8);
538+
final Payload payload = ByteBufPayload.create(data, metadata);
526539
sink.next(payload);
527540
return ++index;
528541
})),
529542
(BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>)
530543
(as, rule) -> {
531544
ByteBufAllocator allocator = rule.alloc();
545+
as.request(1);
532546
int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL);
533547
ByteBuf frame = CancelFrameFlyweight.encode(allocator, streamId);
534548

535-
as.request(1);
536-
537549
RaceTestUtils.race(
538550
() -> as.request(Long.MAX_VALUE),
539551
() -> rule.connection.addToReceivedBuffer(frame));
540552
}),
541553
Arguments.of(
542-
(Runnable) () -> System.out.println("RequestChannel remote error"),
543554
(Function<ClientSocketRule, Publisher<Payload>>)
544555
(rule) ->
545556
rule.socket.requestChannel(
546557
Flux.generate(
547558
() -> 1L,
548559
(index, sink) -> {
549-
final Payload payload =
550-
ByteBufPayload.create("d" + index, "m" + index);
560+
ByteBuf data = rule.alloc().buffer();
561+
data.writeCharSequence("d" + index, CharsetUtil.UTF_8);
562+
ByteBuf metadata = rule.alloc().buffer();
563+
metadata.writeCharSequence("m" + index, CharsetUtil.UTF_8);
564+
final Payload payload = ByteBufPayload.create(data, metadata);
551565
sink.next(payload);
552566
return ++index;
553567
})),
554568
(BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>)
555569
(as, rule) -> {
556570
ByteBufAllocator allocator = rule.alloc();
571+
as.request(1);
557572
int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL);
558573
ByteBuf frame =
559574
ErrorFrameFlyweight.encode(allocator, streamId, new RuntimeException("test"));
560575

561-
as.request(1);
562-
563576
RaceTestUtils.race(
564577
() -> as.request(Long.MAX_VALUE),
565578
() -> rule.connection.addToReceivedBuffer(frame));
566579
}),
567580
Arguments.of(
568-
(Runnable) () -> System.out.println("RequestResponse downstream cancellation"),
569581
(Function<ClientSocketRule, Publisher<Payload>>)
570582
(rule) -> rule.socket.requestResponse(EmptyPayload.INSTANCE),
571583
(BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>)
@@ -575,6 +587,7 @@ private static Stream<Arguments> racingCases() {
575587
metadata.writeCharSequence("abc", CharsetUtil.UTF_8);
576588
ByteBuf data = allocator.buffer();
577589
data.writeCharSequence("def", CharsetUtil.UTF_8);
590+
as.request(Long.MAX_VALUE);
578591
int streamId = rule.getStreamIdForRequestType(REQUEST_RESPONSE);
579592
ByteBuf frame =
580593
PayloadFrameFlyweight.encode(

0 commit comments

Comments
 (0)