16
16
17
17
package io .rsocket .internal ;
18
18
19
- import static org . assertj . core . api . Assertions . assertThat ;
19
+ import java . time . Duration ;
20
20
21
21
import io .netty .buffer .ByteBuf ;
22
22
import io .netty .buffer .ByteBufAllocator ;
23
23
import io .netty .buffer .Unpooled ;
24
- import io .netty .buffer .UnpooledByteBufAllocator ;
25
24
import io .netty .util .CharsetUtil ;
26
25
import io .netty .util .ReferenceCountUtil ;
27
26
import io .rsocket .buffer .LeaksTrackingByteBufAllocator ;
28
27
import io .rsocket .internal .subscriber .AssertSubscriber ;
29
- import java .time .Duration ;
30
28
import org .junit .jupiter .api .AfterAll ;
31
29
import org .junit .jupiter .api .BeforeAll ;
32
30
import org .junit .jupiter .api .Disabled ;
33
- import org .junit .jupiter .api .DisplayName ;
34
- import org .junit .jupiter .api .RepeatedTest ;
35
- import org .junit .jupiter .api .Test ;
36
- import org .junit .jupiter .api .Timeout ;
37
31
import org .junit .jupiter .params .ParameterizedTest ;
38
32
import org .junit .jupiter .params .provider .ValueSource ;
39
33
import reactor .core .Fuseable ;
40
34
import reactor .core .publisher .Hooks ;
41
- import reactor .core .publisher .Operators ;
42
35
import reactor .core .scheduler .Schedulers ;
43
36
import reactor .test .StepVerifier ;
44
37
import reactor .test .util .RaceTestUtils ;
45
38
39
+ import static org .assertj .core .api .Assertions .assertThat ;
40
+
46
41
public class UnboundedProcessorTest {
47
42
48
43
@ BeforeAll
@@ -138,7 +133,7 @@ public void ensureUnboundedProcessorDisposesQueueProperly(boolean withFusionEnab
138
133
assertSubscriber .request (1 );
139
134
});
140
135
141
- assertSubscriber .values ().forEach (ReferenceCountUtil ::safeRelease );
136
+ assertSubscriber .values ().forEach (ReferenceCountUtil ::release );
142
137
143
138
allocator .assertHasNoLeaks ();
144
139
}
@@ -167,7 +162,6 @@ public void smokeTest1(boolean withFusionEnabled) {
167
162
unboundedProcessor .subscribe (assertSubscriber );
168
163
169
164
RaceTestUtils .race (
170
- Schedulers .boundedElastic (),
171
165
() -> {
172
166
unboundedProcessor .onNext (buffer1 );
173
167
unboundedProcessor .onNextPrioritized (buffer2 );
@@ -284,10 +278,11 @@ public void smokeTest3(boolean withFusionEnabled) {
284
278
}
285
279
}
286
280
287
- @ Test
288
- @ DisplayName (
289
- "Ensures that racing between onNext | dispose | subscribe(cancelled) | terminal will not cause any issues and leaks" )
290
- public void smokeTest31 () {
281
+ @ ParameterizedTest (
282
+ name =
283
+ "Ensures that racing between onNext | dispose | subscribe(cancelled) | terminal will not cause any issues and leaks; mode[fusionEnabled={0}]" )
284
+ @ ValueSource (booleans = {true , false })
285
+ public void smokeTest31 (boolean withFusionEnabled ) {
291
286
final LeaksTrackingByteBufAllocator allocator =
292
287
LeaksTrackingByteBufAllocator .instrument (ByteBufAllocator .DEFAULT );
293
288
final RuntimeException runtimeException = new RuntimeException ("test" );
@@ -300,7 +295,8 @@ public void smokeTest31() {
300
295
final ByteBuf buffer4 = allocator .buffer (4 );
301
296
302
297
final AssertSubscriber <ByteBuf > assertSubscriber =
303
- new AssertSubscriber <ByteBuf >(0 ).requestedFusionMode (Fuseable .ANY );
298
+ new AssertSubscriber <ByteBuf >(0 )
299
+ .requestedFusionMode (withFusionEnabled ? Fuseable .ANY : Fuseable .NONE );
304
300
305
301
RaceTestUtils .race (
306
302
Schedulers .boundedElastic (),
@@ -329,44 +325,43 @@ public void smokeTest31() {
329
325
}
330
326
}
331
327
332
- @ RepeatedTest (
328
+ @ ParameterizedTest (
333
329
name =
334
- "Ensures that racing between onNext + dispose | downstream async drain should not cause any issues and leaks" ,
335
- value = 10000 )
336
- @ Timeout (60 )
337
- public void ensuresAsyncFusionAndDisposureHasNoDeadlock () {
330
+ "Ensures that racing between onNext + dispose | downstream async drain should not cause any issues and leaks; mode[fusionEnabled={0}]" )
331
+ @ ValueSource (booleans = {true , false })
332
+ public void ensuresAsyncFusionAndDisposureHasNoDeadlock (boolean withFusionEnabled ) {
338
333
final LeaksTrackingByteBufAllocator allocator =
339
- LeaksTrackingByteBufAllocator .instrument (UnpooledByteBufAllocator .DEFAULT );
340
- final UnboundedProcessor < ByteBuf > unboundedProcessor = new UnboundedProcessor <>();
341
-
342
- final ByteBuf buffer1 = allocator . buffer ( 1 );
343
- final ByteBuf buffer2 = allocator .buffer (2 );
344
- final ByteBuf buffer3 = allocator .buffer (3 );
345
- final ByteBuf buffer4 = allocator .buffer (4 );
346
- final ByteBuf buffer5 = allocator .buffer (5 );
347
- final ByteBuf buffer6 = allocator .buffer (6 );
348
-
349
- final AssertSubscriber < ByteBuf > assertSubscriber =
350
- new AssertSubscriber <>( Operators . enableOnDiscard ( null , ReferenceCountUtil :: safeRelease ));
351
-
352
- unboundedProcessor . subscribe ( assertSubscriber );
353
-
354
- RaceTestUtils . race (
355
- () -> {
356
- unboundedProcessor . onNext ( buffer1 );
357
- unboundedProcessor . onNext ( buffer2 );
358
- unboundedProcessor .onNext (buffer3 );
359
- unboundedProcessor .onNext (buffer4 );
360
- unboundedProcessor .onNext (buffer5 );
361
- unboundedProcessor .onNext (buffer6 );
362
- unboundedProcessor .dispose ( );
363
- },
364
- unboundedProcessor :: dispose );
365
-
366
- assertSubscriber
367
- . await ( Duration . ofSeconds ( 50 ))
368
- . values ()
369
- . forEach ( ReferenceCountUtil :: safeRelease );
334
+ LeaksTrackingByteBufAllocator .instrument (ByteBufAllocator .DEFAULT );
335
+
336
+ for ( int i = 0 ; i < 10000 ; i ++) {
337
+ final UnboundedProcessor < ByteBuf > unboundedProcessor = new UnboundedProcessor <>( );
338
+ final ByteBuf buffer1 = allocator .buffer (1 );
339
+ final ByteBuf buffer2 = allocator .buffer (2 );
340
+ final ByteBuf buffer3 = allocator .buffer (3 );
341
+ final ByteBuf buffer4 = allocator .buffer (4 );
342
+ final ByteBuf buffer5 = allocator .buffer (5 );
343
+ final ByteBuf buffer6 = allocator . buffer ( 6 );
344
+
345
+ final AssertSubscriber <ByteBuf > assertSubscriber =
346
+ new AssertSubscriber < ByteBuf >()
347
+ . requestedFusionMode ( withFusionEnabled ? Fuseable . ANY : Fuseable . NONE );
348
+
349
+ unboundedProcessor . subscribe ( assertSubscriber );
350
+
351
+ RaceTestUtils . race (
352
+ () -> {
353
+ unboundedProcessor .onNext (buffer1 );
354
+ unboundedProcessor .onNext (buffer2 );
355
+ unboundedProcessor .onNext (buffer3 );
356
+ unboundedProcessor .onNext (buffer4 );
357
+ unboundedProcessor .onNext ( buffer5 );
358
+ unboundedProcessor . onNext ( buffer6 );
359
+ unboundedProcessor . dispose ( );
360
+ },
361
+ unboundedProcessor :: dispose );
362
+
363
+ assertSubscriber . await ( Duration . ofSeconds ( 50 )). values (). forEach ( ReferenceCountUtil :: release );
364
+ }
370
365
371
366
allocator .assertHasNoLeaks ();
372
367
}
0 commit comments