File tree Expand file tree Collapse file tree 1 file changed +4
-3
lines changed
rsocket-core/src/test/java/io/rsocket/internal Expand file tree Collapse file tree 1 file changed +4
-3
lines changed Original file line number Diff line number Diff line change 25
25
import io .netty .util .ReferenceCountUtil ;
26
26
import io .rsocket .buffer .LeaksTrackingByteBufAllocator ;
27
27
import io .rsocket .internal .subscriber .AssertSubscriber ;
28
+ import java .time .Duration ;
28
29
import org .junit .jupiter .api .BeforeAll ;
29
30
import org .junit .jupiter .api .RepeatedTest ;
30
31
import org .junit .jupiter .api .Timeout ;
@@ -146,10 +147,10 @@ public void ensureUnboundedProcessorDisposesQueueProperly(boolean withFusionEnab
146
147
147
148
@ RepeatedTest (
148
149
name =
149
- "Ensures that racing between onNext | dispose | cancel | request(n) will not cause any issues and leaks in async backFused mode " ,
150
+ "Ensures that racing between onNext + dispose | downstream async drain) should not cause any issues and leaks" ,
150
151
value = 100000 )
151
152
@ Timeout (10 )
152
- public void ensureUnboundedProcessorDisposesQueueProperlyAsyncMode () {
153
+ public void ensuresAsyncFusionAndDisposureHasNoDeadlock () {
153
154
final LeaksTrackingByteBufAllocator allocator =
154
155
LeaksTrackingByteBufAllocator .instrument (ByteBufAllocator .DEFAULT );
155
156
final UnboundedProcessor <ByteBuf > unboundedProcessor = new UnboundedProcessor <>();
@@ -175,7 +176,7 @@ public void ensureUnboundedProcessorDisposesQueueProperlyAsyncMode() {
175
176
unboundedProcessor ::dispose ,
176
177
Schedulers .elastic ());
177
178
178
- assertSubscriber .values ().forEach (ReferenceCountUtil ::safeRelease );
179
+ assertSubscriber .await ( Duration . ofSeconds ( 5 )). values ().forEach (ReferenceCountUtil ::safeRelease );
179
180
180
181
allocator .assertHasNoLeaks ();
181
182
}
You can’t perform that action at this time.
0 commit comments