Skip to content

Commit 363884e

Browse files
authored
Add a lock-based message passing queue to help debug a problem with Recycler (#11972)
Motivation: The Recycler has been observed to produce infinite loops on ARM CPUs. It is not clear if this is caused by a JDK bug, a JCTools bug, or a Recycler bug. Having the ability to switch out the JCTools queue implementation will aid us in the investigation. Modification: Implement MessagePassingQueue as a synchronized ArrayDeque, and add a system property to enable the use of this implementation in Recycler. Result: We should now be able to rule out the possibility of a bug in either JCTools or the Recycler.
1 parent f1eb920 commit 363884e

File tree

2 files changed

+121
-9
lines changed

2 files changed

+121
-9
lines changed

common/src/main/java/io/netty/util/Recycler.java

Lines changed: 121 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
import io.netty.util.internal.logging.InternalLoggerFactory;
2424
import org.jctools.queues.MessagePassingQueue;
2525

26+
import java.util.ArrayDeque;
27+
import java.util.Queue;
2628
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2729

30+
import static io.netty.util.internal.PlatformDependent.newMpscQueue;
2831
import static java.lang.Math.max;
2932
import static java.lang.Math.min;
3033

@@ -50,6 +53,7 @@ public String toString() {
5053
private static final int DEFAULT_MAX_CAPACITY_PER_THREAD;
5154
private static final int RATIO;
5255
private static final int DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD;
56+
private static final boolean BLOCKING_POOL;
5357

5458
static {
5559
// In the future, we might have different maxCapacity for different object types.
@@ -69,15 +73,19 @@ public String toString() {
6973
// bursts.
7074
RATIO = max(0, SystemPropertyUtil.getInt("io.netty.recycler.ratio", 8));
7175

76+
BLOCKING_POOL = SystemPropertyUtil.getBoolean("io.netty.recycler.blocking", false);
77+
7278
if (logger.isDebugEnabled()) {
7379
if (DEFAULT_MAX_CAPACITY_PER_THREAD == 0) {
7480
logger.debug("-Dio.netty.recycler.maxCapacityPerThread: disabled");
7581
logger.debug("-Dio.netty.recycler.ratio: disabled");
7682
logger.debug("-Dio.netty.recycler.chunkSize: disabled");
83+
logger.debug("-Dio.netty.recycler.blocking: disabled");
7784
} else {
7885
logger.debug("-Dio.netty.recycler.maxCapacityPerThread: {}", DEFAULT_MAX_CAPACITY_PER_THREAD);
7986
logger.debug("-Dio.netty.recycler.ratio: {}", RATIO);
8087
logger.debug("-Dio.netty.recycler.chunkSize: {}", DEFAULT_QUEUE_CHUNK_SIZE_PER_THREAD);
88+
logger.debug("-Dio.netty.recycler.blocking: {}", BLOCKING_POOL);
8189
}
8290
}
8391
}
@@ -252,9 +260,11 @@ private static final class LocalPool<T> {
252260
@SuppressWarnings("unchecked")
253261
LocalPool(int maxCapacity, int ratioInterval, int chunkSize) {
254262
this.ratioInterval = ratioInterval;
255-
// If the queue is of type MessagePassingQueue we can use a special LocalPoolQueue implementation.
256-
pooledHandles = (MessagePassingQueue<DefaultHandle<T>>) PlatformDependent
257-
.newMpscQueue(chunkSize, maxCapacity);
263+
if (BLOCKING_POOL) {
264+
pooledHandles = new BlockingMessageQueue<DefaultHandle<T>>(maxCapacity);
265+
} else {
266+
pooledHandles = (MessagePassingQueue<DefaultHandle<T>>) newMpscQueue(chunkSize, maxCapacity);
267+
}
258268
ratioCounter = ratioInterval; // Start at interval so the first one will be recycled.
259269
}
260270

@@ -279,4 +289,112 @@ DefaultHandle<T> newHandle() {
279289
return null;
280290
}
281291
}
292+
293+
/**
294+
* This is an implementation of {@link MessagePassingQueue}, similar to what might be returned from
295+
* {@link PlatformDependent#newMpscQueue(int)}, but intended to be used for debugging purpose.
296+
* The implementation relies on synchronised monitor locks for thread-safety.
297+
* The {@code drain} and {@code fill} bulk operations are not supported by this implementation.
298+
*/
299+
private static final class BlockingMessageQueue<T> implements MessagePassingQueue<T> {
300+
private final Queue<T> deque;
301+
private final int maxCapacity;
302+
303+
BlockingMessageQueue(int maxCapacity) {
304+
this.maxCapacity = maxCapacity;
305+
// This message passing queue is backed by an ArrayDeque instance,
306+
// made thread-safe by synchronising on `this` BlockingMessageQueue instance.
307+
// Why ArrayDeque?
308+
// We use ArrayDeque instead of LinkedList or LinkedBlockingQueue because it's more space efficient.
309+
// We use ArrayDeque instead of ArrayList because we need the queue APIs.
310+
// We use ArrayDeque instead of ConcurrentLinkedQueue because CLQ is unbounded and has O(n) size().
311+
// We use ArrayDeque instead of ArrayBlockingQueue because ABQ allocates its max capacity up-front,
312+
// and these queues will usually have large capacities, in potentially great numbers (one per thread),
313+
// but often only have comparatively few items in them.
314+
deque = new ArrayDeque<T>();
315+
}
316+
317+
@Override
318+
public synchronized boolean offer(T e) {
319+
if (deque.size() == maxCapacity) {
320+
return false;
321+
}
322+
return deque.offer(e);
323+
}
324+
325+
@Override
326+
public synchronized T poll() {
327+
return deque.poll();
328+
}
329+
330+
@Override
331+
public synchronized T peek() {
332+
return deque.peek();
333+
}
334+
335+
@Override
336+
public synchronized int size() {
337+
return deque.size();
338+
}
339+
340+
@Override
341+
public synchronized void clear() {
342+
deque.clear();
343+
}
344+
345+
@Override
346+
public synchronized boolean isEmpty() {
347+
return deque.isEmpty();
348+
}
349+
350+
@Override
351+
public int capacity() {
352+
return maxCapacity;
353+
}
354+
355+
@Override
356+
public boolean relaxedOffer(T e) {
357+
return offer(e);
358+
}
359+
360+
@Override
361+
public T relaxedPoll() {
362+
return poll();
363+
}
364+
365+
@Override
366+
public T relaxedPeek() {
367+
return peek();
368+
}
369+
370+
@Override
371+
public int drain(Consumer<T> c, int limit) {
372+
throw new UnsupportedOperationException();
373+
}
374+
375+
@Override
376+
public int fill(Supplier<T> s, int limit) {
377+
throw new UnsupportedOperationException();
378+
}
379+
380+
@Override
381+
public int drain(Consumer<T> c) {
382+
throw new UnsupportedOperationException();
383+
}
384+
385+
@Override
386+
public int fill(Supplier<T> s) {
387+
throw new UnsupportedOperationException();
388+
}
389+
390+
@Override
391+
public void drain(Consumer<T> c, WaitStrategy wait, ExitCondition exit) {
392+
throw new UnsupportedOperationException();
393+
}
394+
395+
@Override
396+
public void fill(Supplier<T> s, WaitStrategy wait, ExitCondition exit) {
397+
throw new UnsupportedOperationException();
398+
}
399+
}
282400
}

common/src/test/java/io/netty/util/RecyclerTest.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,6 @@ private static Recycler<HandledObject> newRecycler(int maxCapacityPerThread) {
4343
return newRecycler(maxCapacityPerThread, 8, maxCapacityPerThread >> 1);
4444
}
4545

46-
private static Recycler<HandledObject> newRecycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
47-
int ratio, int maxDelayedQueuesPerThread,
48-
int delayedQueueRatio, int chunkSize) {
49-
return newRecycler(maxCapacityPerThread, ratio, chunkSize);
50-
}
51-
5246
private static Recycler<HandledObject> newRecycler(int maxCapacityPerThread, int ratio, int chunkSize) {
5347
return new Recycler<HandledObject>(maxCapacityPerThread, ratio, chunkSize) {
5448
@Override

0 commit comments

Comments
 (0)