Skip to content

Commit dc85fa7

Browse files
committed
improves LeaksTrackingByteBufAllocator to wait for all buffer to be released
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 41eec02 commit dc85fa7

File tree

2 files changed

+140
-8
lines changed

2 files changed

+140
-8
lines changed

rsocket-core/src/test/java/io/rsocket/buffer/LeaksTrackingByteBufAllocator.java

Lines changed: 136 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
package io.rsocket.buffer;
22

3+
import static java.util.concurrent.locks.LockSupport.parkNanos;
4+
35
import io.netty.buffer.ByteBuf;
46
import io.netty.buffer.ByteBufAllocator;
57
import io.netty.buffer.CompositeByteBuf;
8+
import io.netty.util.ResourceLeakDetector;
9+
import java.lang.reflect.Field;
10+
import java.time.Duration;
11+
import java.util.ArrayList;
12+
import java.util.Collections;
13+
import java.util.Set;
614
import java.util.concurrent.ConcurrentLinkedQueue;
715
import org.assertj.core.api.Assertions;
816

@@ -19,24 +27,89 @@ public class LeaksTrackingByteBufAllocator implements ByteBufAllocator {
1927
* @return
2028
*/
2129
public static LeaksTrackingByteBufAllocator instrument(ByteBufAllocator allocator) {
22-
return new LeaksTrackingByteBufAllocator(allocator);
30+
return new LeaksTrackingByteBufAllocator(allocator, Duration.ZERO, "");
31+
}
32+
33+
/**
34+
* Allows to instrument any given the instance of ByteBufAllocator
35+
*
36+
* @param allocator
37+
* @return
38+
*/
39+
public static LeaksTrackingByteBufAllocator instrument(
40+
ByteBufAllocator allocator, Duration awaitZeroRefCntDuration, String tag) {
41+
return new LeaksTrackingByteBufAllocator(allocator, awaitZeroRefCntDuration, tag);
2342
}
2443

2544
final ConcurrentLinkedQueue<ByteBuf> tracker = new ConcurrentLinkedQueue<>();
2645

2746
final ByteBufAllocator delegate;
2847

29-
private LeaksTrackingByteBufAllocator(ByteBufAllocator delegate) {
48+
final Duration awaitZeroRefCntDuration;
49+
50+
final String tag;
51+
52+
private LeaksTrackingByteBufAllocator(
53+
ByteBufAllocator delegate, Duration awaitZeroRefCntDuration, String tag) {
3054
this.delegate = delegate;
55+
this.awaitZeroRefCntDuration = awaitZeroRefCntDuration;
56+
this.tag = tag;
3157
}
3258

3359
public LeaksTrackingByteBufAllocator assertHasNoLeaks() {
3460
try {
35-
Assertions.assertThat(tracker)
36-
.allSatisfy(
37-
buf ->
38-
Assertions.assertThat(buf)
39-
.matches(bb -> bb.refCnt() == 0, "buffer should be released"));
61+
ArrayList<ByteBuf> unreleased = new ArrayList<>();
62+
for (ByteBuf bb : tracker) {
63+
if (bb.refCnt() != 0) {
64+
unreleased.add(bb);
65+
}
66+
}
67+
68+
final Duration awaitZeroRefCntDuration = this.awaitZeroRefCntDuration;
69+
if (!unreleased.isEmpty() && !awaitZeroRefCntDuration.isZero()) {
70+
final long startTime = System.currentTimeMillis();
71+
final long endTimeInMillis = startTime + awaitZeroRefCntDuration.toMillis();
72+
boolean hasUnreleased;
73+
while (System.currentTimeMillis() <= endTimeInMillis) {
74+
hasUnreleased = false;
75+
for (ByteBuf bb : unreleased) {
76+
if (bb.refCnt() != 0) {
77+
hasUnreleased = true;
78+
break;
79+
}
80+
}
81+
82+
if (!hasUnreleased) {
83+
System.out.println(tag + " all the buffers are released...");
84+
return this;
85+
}
86+
87+
System.out.println(tag + " await buffers to be released");
88+
for (int i = 0; i < 100; i++) {
89+
System.gc();
90+
parkNanos(1000);
91+
System.gc();
92+
}
93+
}
94+
}
95+
96+
Assertions.assertThat(unreleased)
97+
.allMatch(
98+
bb -> {
99+
final boolean checkResult = bb.refCnt() == 0;
100+
101+
if (!checkResult) {
102+
try {
103+
System.out.println(tag + " " + resolveTrackingInfo(bb));
104+
} catch (Exception e) {
105+
e.printStackTrace();
106+
}
107+
}
108+
109+
return checkResult;
110+
},
111+
tag);
112+
System.out.println(tag + " all the buffers are released...");
40113
} finally {
41114
tracker.clear();
42115
}
@@ -150,4 +223,60 @@ <T extends ByteBuf> T track(T buffer) {
150223

151224
return buffer;
152225
}
226+
227+
static final Class<?> simpleLeakAwareCompositeByteBufClass;
228+
static final Field leakFieldForComposite;
229+
static final Class<?> simpleLeakAwareByteBufClass;
230+
static final Field leakFieldForNormal;
231+
static final Field allLeaksField;
232+
233+
static {
234+
try {
235+
{
236+
final Class<?> aClass = Class.forName("io.netty.buffer.SimpleLeakAwareCompositeByteBuf");
237+
final Field leakField = aClass.getDeclaredField("leak");
238+
239+
leakField.setAccessible(true);
240+
241+
simpleLeakAwareCompositeByteBufClass = aClass;
242+
leakFieldForComposite = leakField;
243+
}
244+
245+
{
246+
final Class<?> aClass = Class.forName("io.netty.buffer.SimpleLeakAwareByteBuf");
247+
final Field leakField = aClass.getDeclaredField("leak");
248+
249+
leakField.setAccessible(true);
250+
251+
simpleLeakAwareByteBufClass = aClass;
252+
leakFieldForNormal = leakField;
253+
}
254+
255+
{
256+
final Class<?> aClass =
257+
Class.forName("io.netty.util.ResourceLeakDetector$DefaultResourceLeak");
258+
final Field field = aClass.getDeclaredField("allLeaks");
259+
260+
field.setAccessible(true);
261+
262+
allLeaksField = field;
263+
}
264+
} catch (Exception e) {
265+
throw new RuntimeException(e);
266+
}
267+
}
268+
269+
@SuppressWarnings("unchecked")
270+
static Set<Object> resolveTrackingInfo(ByteBuf byteBuf) throws Exception {
271+
if (ResourceLeakDetector.getLevel().ordinal()
272+
>= ResourceLeakDetector.Level.ADVANCED.ordinal()) {
273+
if (simpleLeakAwareCompositeByteBufClass.isInstance(byteBuf)) {
274+
return (Set<Object>) allLeaksField.get(leakFieldForComposite.get(byteBuf));
275+
} else if (simpleLeakAwareByteBufClass.isInstance(byteBuf)) {
276+
return (Set<Object>) allLeaksField.get(leakFieldForNormal.get(byteBuf));
277+
}
278+
}
279+
280+
return Collections.emptySet();
281+
}
153282
}

rsocket-core/src/test/java/io/rsocket/core/AbstractSocketRule.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.rsocket.buffer.LeaksTrackingByteBufAllocator;
2424
import io.rsocket.test.util.TestDuplexConnection;
2525
import io.rsocket.test.util.TestSubscriber;
26+
import java.time.Duration;
2627
import org.junit.rules.ExternalResource;
2728
import org.junit.runner.Description;
2829
import org.junit.runners.model.Statement;
@@ -42,7 +43,9 @@ public Statement apply(final Statement base, Description description) {
4243
return new Statement() {
4344
@Override
4445
public void evaluate() throws Throwable {
45-
allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
46+
allocator =
47+
LeaksTrackingByteBufAllocator.instrument(
48+
ByteBufAllocator.DEFAULT, Duration.ofSeconds(5), "");
4649
connectSub = TestSubscriber.create();
4750
init();
4851
base.evaluate();

0 commit comments

Comments
 (0)