Skip to content

Commit 908ab2e

Browse files
committed
provides leaks tracking tooling
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 626dd81 commit 908ab2e

File tree

1 file changed

+193
-0
lines changed

1 file changed

+193
-0
lines changed
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
package io.rsocket.buffer;
2+
3+
import io.netty.buffer.ByteBuf;
4+
import io.netty.buffer.ByteBufAllocator;
5+
import io.netty.buffer.CompositeByteBuf;
6+
import org.assertj.core.api.Assertions;
7+
8+
import java.lang.reflect.Field;
9+
import java.lang.reflect.Modifier;
10+
import java.util.List;
11+
import java.util.concurrent.ConcurrentLinkedQueue;
12+
13+
public class LeaksTrackingByteBufAllocator implements ByteBufAllocator {
14+
15+
public static LeaksTrackingByteBufAllocator instrumentDefault() {
16+
if (ByteBufAllocator.DEFAULT instanceof LeaksTrackingByteBufAllocator) {
17+
return (LeaksTrackingByteBufAllocator) ByteBufAllocator.DEFAULT;
18+
}
19+
20+
final LeaksTrackingByteBufAllocator instrumented = new LeaksTrackingByteBufAllocator(ByteBufAllocator.DEFAULT);
21+
try {
22+
Field field = ByteBufAllocator.class.getField("DEFAULT");
23+
setFinalStatic(field, instrumented);
24+
} catch (Exception e) {
25+
throw new RuntimeException(e);
26+
}
27+
return instrumented;
28+
}
29+
30+
public static void deinstrumentDefault() {
31+
if (ByteBufAllocator.DEFAULT instanceof LeaksTrackingByteBufAllocator) {
32+
LeaksTrackingByteBufAllocator instrumented = (LeaksTrackingByteBufAllocator) ByteBufAllocator.DEFAULT;
33+
try {
34+
Field field = ByteBufAllocator.class.getField("DEFAULT");
35+
setFinalStatic(field, instrumented.delegate);
36+
} catch (Exception e) {
37+
throw new RuntimeException(e);
38+
}
39+
}
40+
}
41+
42+
static void setFinalStatic(Field field, Object newValue) throws Exception {
43+
field.setAccessible(true);
44+
45+
Field modifiersField = Field.class.getDeclaredField("modifiers");
46+
modifiersField.setAccessible(true);
47+
modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
48+
49+
field.set(null, newValue);
50+
}
51+
52+
public static LeaksTrackingByteBufAllocator instrument(ByteBufAllocator allocator) {
53+
return new LeaksTrackingByteBufAllocator(allocator);
54+
}
55+
56+
final ConcurrentLinkedQueue<ByteBuf> tracker = new ConcurrentLinkedQueue<>();
57+
58+
final ByteBufAllocator delegate;
59+
60+
private LeaksTrackingByteBufAllocator(ByteBufAllocator delegate) {
61+
this.delegate = delegate;
62+
}
63+
64+
public LeaksTrackingByteBufAllocator assertHasNoLeaks() {
65+
Assertions.assertThat(tracker)
66+
.allSatisfy(buf -> {
67+
ByteBuf unwrap = buf.unwrap();
68+
if (unwrap instanceof CompositeByteBuf) {
69+
if (buf.refCnt() > 0) {
70+
List<ByteBuf> decomposed = ((CompositeByteBuf) unwrap).decompose(0, unwrap.readableBytes());
71+
for (int i = 0; i < decomposed.size(); i++) {
72+
Assertions.assertThat(decomposed.get(i))
73+
.matches(bb -> bb.refCnt() == 0, "Got unreleased CompositeByteBuf");
74+
}
75+
}
76+
77+
} else {
78+
Assertions.assertThat(buf)
79+
.matches(bb -> bb.refCnt() == 0, "buffer should be released");
80+
}
81+
});
82+
tracker.clear();
83+
return this;
84+
}
85+
86+
// Delegating logic with tracking of buffers
87+
88+
@Override
89+
public ByteBuf buffer() {
90+
return track(delegate.buffer());
91+
}
92+
93+
@Override
94+
public ByteBuf buffer(int initialCapacity) {
95+
return track(delegate.buffer(initialCapacity));
96+
}
97+
98+
@Override
99+
public ByteBuf buffer(int initialCapacity, int maxCapacity) {
100+
return track(delegate.buffer(initialCapacity, maxCapacity));
101+
}
102+
103+
@Override
104+
public ByteBuf ioBuffer() {
105+
return track(delegate.ioBuffer());
106+
}
107+
108+
@Override
109+
public ByteBuf ioBuffer(int initialCapacity) {
110+
return track(delegate.ioBuffer(initialCapacity));
111+
}
112+
113+
@Override
114+
public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) {
115+
return track(delegate.ioBuffer(initialCapacity, maxCapacity));
116+
}
117+
118+
@Override
119+
public ByteBuf heapBuffer() {
120+
return track(delegate.heapBuffer());
121+
}
122+
123+
@Override
124+
public ByteBuf heapBuffer(int initialCapacity) {
125+
return track(delegate.heapBuffer(initialCapacity));
126+
}
127+
128+
@Override
129+
public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
130+
return track(delegate.heapBuffer(initialCapacity, maxCapacity));
131+
}
132+
133+
@Override
134+
public ByteBuf directBuffer() {
135+
return track(delegate.directBuffer());
136+
}
137+
138+
@Override
139+
public ByteBuf directBuffer(int initialCapacity) {
140+
return track(delegate.directBuffer(initialCapacity));
141+
}
142+
143+
@Override
144+
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
145+
return track(delegate.directBuffer(initialCapacity, maxCapacity));
146+
}
147+
148+
@Override
149+
public CompositeByteBuf compositeBuffer() {
150+
return track(delegate.compositeBuffer());
151+
}
152+
153+
@Override
154+
public CompositeByteBuf compositeBuffer(int maxNumComponents) {
155+
return track(delegate.compositeBuffer(maxNumComponents));
156+
}
157+
158+
@Override
159+
public CompositeByteBuf compositeHeapBuffer() {
160+
return track(delegate.compositeHeapBuffer());
161+
}
162+
163+
@Override
164+
public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
165+
return track(delegate.compositeHeapBuffer(maxNumComponents));
166+
}
167+
168+
@Override
169+
public CompositeByteBuf compositeDirectBuffer() {
170+
return track(delegate.compositeDirectBuffer());
171+
}
172+
173+
@Override
174+
public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
175+
return track(delegate.compositeDirectBuffer(maxNumComponents));
176+
}
177+
178+
@Override
179+
public boolean isDirectBufferPooled() {
180+
return delegate.isDirectBufferPooled();
181+
}
182+
183+
@Override
184+
public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
185+
return delegate.calculateNewCapacity(minNewCapacity, maxCapacity);
186+
}
187+
188+
<T extends ByteBuf> T track(T buffer) {
189+
tracker.offer(buffer);
190+
191+
return buffer;
192+
}
193+
}

0 commit comments

Comments
 (0)