44
44
import io .rsocket .util .EmptyPayload ;
45
45
import io .rsocket .util .MultiSubscriberRSocket ;
46
46
import org .assertj .core .api .Assertions ;
47
+ import org .junit .Ignore ;
47
48
import org .junit .Rule ;
48
49
import org .junit .Test ;
49
50
import org .junit .jupiter .params .provider .Arguments ;
@@ -363,59 +364,75 @@ private static Stream<Arguments> racingCases() {
363
364
364
365
RaceTestUtils .race (as ::cancel , () -> rule .connection .addToReceivedBuffer (frame ));
365
366
}
366
- )/*,*/
367
- // Arguments.of(
368
- // (Runnable) () -> System.out.println("RequestChannel upstream cancellation 1"),
369
- // (Function<ClientSocketRule, Publisher<Payload>>) (rule) -> {
370
- // LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrumentDefault();
371
- // ByteBuf metadata = allocator.buffer();
372
- // metadata.writeCharSequence("abc", CharsetUtil.UTF_8);
373
- // ByteBuf data = allocator.buffer();
374
- // data.writeCharSequence("def", CharsetUtil.UTF_8);
375
- // return rule.socket.requestChannel(Flux.just(ByteBufPayload.create(data, metadata)));
376
- // },
377
- // (BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>) (as, rule) -> {
378
- // LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrumentDefault();
379
- // int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL);
380
- // ByteBuf frame = CancelFrameFlyweight.encode(allocator, streamId);
381
- //
382
- // RaceTestUtils.race(() -> as.request(1), () -> rule.connection.addToReceivedBuffer(frame));
383
- // }
384
- // ),
385
- // Arguments.of(
386
- // (Runnable) () -> System.out.println("RequestChannel upstream cancellation 2"),
387
- // (Function<ClientSocketRule, Publisher<Payload>>) (rule) -> {
388
- // return rule.socket.requestChannel(Flux.just(ByteBufPayload.create("a", "b"), ByteBufPayload.create("c", "d"), ByteBufPayload.create("e", "f")));
389
- // },
390
- // (BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>) (as, rule) -> {
391
- // LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrumentDefault();
392
- // int streamId = rule.getStreamIdForRequestType(REQUEST_CHANNEL);
393
- // ByteBuf frame = CancelFrameFlyweight.encode(allocator, streamId);
394
- //
395
- // as.request(1);
396
- //
397
- // RaceTestUtils.race(() -> as.request(Long.MAX_VALUE), () -> rule.connection.addToReceivedBuffer(frame));
398
- // }
399
- // ),
400
- // Arguments.of(
401
- // (Runnable) () -> System.out.println("RequestResponse downstream cancellation"),
402
- // (Function<ClientSocketRule, Publisher<Payload>>) (rule) -> rule.socket.requestResponse(EmptyPayload.INSTANCE),
403
- // (BiConsumer<AssertSubscriber<Payload>, ClientSocketRule>) (as, rule) -> {
404
- // LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrumentDefault();
405
- // ByteBuf metadata = allocator.buffer();
406
- // metadata.writeCharSequence("abc", CharsetUtil.UTF_8);
407
- // ByteBuf data = allocator.buffer();
408
- // data.writeCharSequence("def", CharsetUtil.UTF_8);
409
- // int streamId = rule.getStreamIdForRequestType(REQUEST_RESPONSE);
410
- // ByteBuf frame = PayloadFrameFlyweight.encode(allocator, streamId, false, false, true, metadata, data);
411
- //
412
- // RaceTestUtils.race(as::cancel, () -> rule.connection.addToReceivedBuffer(frame));
413
- // }
414
- // )
367
+ ),
368
+ Arguments .of (
369
+ (Runnable ) () -> System .out .println ("RequestChannel upstream cancellation 1" ),
370
+ (Function <ClientSocketRule , Publisher <Payload >>) (rule ) -> {
371
+ LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator .instrumentDefault ();
372
+ ByteBuf metadata = allocator .buffer ();
373
+ metadata .writeCharSequence ("abc" , CharsetUtil .UTF_8 );
374
+ ByteBuf data = allocator .buffer ();
375
+ data .writeCharSequence ("def" , CharsetUtil .UTF_8 );
376
+ return rule .socket .requestChannel (Flux .just (ByteBufPayload .create (data , metadata )));
377
+ },
378
+ (BiConsumer <AssertSubscriber <Payload >, ClientSocketRule >) (as , rule ) -> {
379
+ LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator .instrumentDefault ();
380
+ int streamId = rule .getStreamIdForRequestType (REQUEST_CHANNEL );
381
+ ByteBuf frame = CancelFrameFlyweight .encode (allocator , streamId );
382
+
383
+ RaceTestUtils .race (() -> as .request (1 ), () -> rule .connection .addToReceivedBuffer (frame ));
384
+ }
385
+ ),
386
+ Arguments .of (
387
+ (Runnable ) () -> System .out .println ("RequestChannel upstream cancellation 2" ),
388
+ (Function <ClientSocketRule , Publisher <Payload >>) (rule ) -> {
389
+ return rule .socket .requestChannel (Flux .just (ByteBufPayload .create ("a" , "b" ), ByteBufPayload .create ("c" , "d" ), ByteBufPayload .create ("e" , "f" )));
390
+ },
391
+ (BiConsumer <AssertSubscriber <Payload >, ClientSocketRule >) (as , rule ) -> {
392
+ LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator .instrumentDefault ();
393
+ int streamId = rule .getStreamIdForRequestType (REQUEST_CHANNEL );
394
+ ByteBuf frame = CancelFrameFlyweight .encode (allocator , streamId );
395
+
396
+ as .request (1 );
397
+
398
+ RaceTestUtils .race (() -> as .request (Long .MAX_VALUE ), () -> rule .connection .addToReceivedBuffer (frame ));
399
+ }
400
+ ),
401
+ Arguments .of (
402
+ (Runnable ) () -> System .out .println ("RequestChannel remote error" ),
403
+ (Function <ClientSocketRule , Publisher <Payload >>) (rule ) -> {
404
+ return rule .socket .requestChannel (Flux .just (ByteBufPayload .create ("a" , "b" ), ByteBufPayload .create ("c" , "d" ), ByteBufPayload .create ("e" , "f" )));
405
+ },
406
+ (BiConsumer <AssertSubscriber <Payload >, ClientSocketRule >) (as , rule ) -> {
407
+ LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator .instrumentDefault ();
408
+ int streamId = rule .getStreamIdForRequestType (REQUEST_CHANNEL );
409
+ ByteBuf frame = ErrorFrameFlyweight .encode (allocator , streamId , new RuntimeException ("test" ));
410
+
411
+ as .request (1 );
412
+
413
+ RaceTestUtils .race (() -> as .request (Long .MAX_VALUE ), () -> rule .connection .addToReceivedBuffer (frame ));
414
+ }
415
+ ),
416
+ Arguments .of (
417
+ (Runnable ) () -> System .out .println ("RequestResponse downstream cancellation" ),
418
+ (Function <ClientSocketRule , Publisher <Payload >>) (rule ) -> rule .socket .requestResponse (EmptyPayload .INSTANCE ),
419
+ (BiConsumer <AssertSubscriber <Payload >, ClientSocketRule >) (as , rule ) -> {
420
+ LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator .instrumentDefault ();
421
+ ByteBuf metadata = allocator .buffer ();
422
+ metadata .writeCharSequence ("abc" , CharsetUtil .UTF_8 );
423
+ ByteBuf data = allocator .buffer ();
424
+ data .writeCharSequence ("def" , CharsetUtil .UTF_8 );
425
+ int streamId = rule .getStreamIdForRequestType (REQUEST_RESPONSE );
426
+ ByteBuf frame = PayloadFrameFlyweight .encode (allocator , streamId , false , false , true , metadata , data );
427
+
428
+ RaceTestUtils .race (as ::cancel , () -> rule .connection .addToReceivedBuffer (frame ));
429
+ }
430
+ )
415
431
);
416
432
}
417
433
418
434
@ Test
435
+ @ Ignore ("Due to https://github.com/reactor/reactor-core/pull/2114" )
419
436
@ SuppressWarnings ("unchecked" )
420
437
public void checkNoLeaksOnRacingTest () {
421
438
@@ -433,7 +450,6 @@ public void checkNoLeaksOnRacingTest() {
433
450
434
451
public void checkNoLeaksOnRacing (LeaksTrackingByteBufAllocator allocator , Function <ClientSocketRule , Publisher <Payload >> initiator , BiConsumer <AssertSubscriber <Payload >, ClientSocketRule > runner ) {
435
452
for (int i = 0 ; i < 100000 ; i ++) {
436
- System .out .println (i );
437
453
ClientSocketRule clientSocketRule = new ClientSocketRule ();
438
454
try {
439
455
clientSocketRule .apply (new Statement () {
0 commit comments