Skip to content

Fix head/tail false sharing issues #1374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 21, 2014
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 73 additions & 49 deletions rxjava-core/src/main/java/rx/internal/util/MpscPaddedQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,77 @@
*/
package rx.internal.util;

import java.io.Serializable;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;

import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import rx.internal.util.MpscPaddedQueue.Node;

abstract class MpscLinkedQueuePad0<E> {
long p00, p01, p02, p03, p04, p05, p06, p07;
long p30, p31, p32, p33, p34, p35, p36, p37;
}

abstract class MpscLinkedQueueHeadRef<E> extends MpscLinkedQueuePad0<E> {
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<MpscLinkedQueueHeadRef, Node> UPDATER =
newUpdater(MpscLinkedQueueHeadRef.class, Node.class, "headRef");
private volatile Node<E> headRef;

protected final Node<E> headRef() {
return headRef;
}
protected final void headRef(Node<E> val) {
headRef = val;
}
protected final void lazySetHeadRef(Node<E> newVal) {
UPDATER.lazySet(this, newVal);
}
}

abstract class MpscLinkedQueuePad1<E> extends MpscLinkedQueueHeadRef<E> {
long p00, p01, p02, p03, p04, p05, p06, p07;
long p30, p31, p32, p33, p34, p35, p36, p37;
}

abstract class MpscLinkedQueueTailRef<E> extends MpscLinkedQueuePad1<E> {
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<MpscLinkedQueueTailRef, Node> UPDATER =
newUpdater(MpscLinkedQueueTailRef.class, Node.class, "tailRef");
private volatile Node<E> tailRef;
protected final Node<E> tailRef() {
return tailRef;
}
protected final void tailRef(Node<E> val) {
tailRef = val;
}
@SuppressWarnings("unchecked")
protected final Node<E> getAndSetTailRef(Node<E> newVal) {
return (Node<E>) UPDATER.getAndSet(this, newVal);
}
}
/**
* A multiple-producer single consumer queue implementation with padded reference to tail to avoid cache-line
* thrashing. Based on Netty's <a href='https://github.com/netty/netty/blob/master/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java'>MpscQueue implementation</a>
* but using {@code AtomicReferenceFieldUpdater} instead of {@code Unsafe}.
*
* but using {@code AtomicReferenceFieldUpdater} instead of {@code Unsafe}.<br>
* Original algorithm presented <a
* href="http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue"> on 1024
* Cores</a> by D. Vyukov.<br>
* Data structure modified to avoid false sharing between head and tail references as per implementation of
* MpscLinkedQueue on <a href="https://github.com/JCTools/JCTools">JCTools project</a>.
*
* @param <E> the element type
*/
public final class MpscPaddedQueue<E> extends AtomicReference<MpscPaddedQueue.Node<E>> {
/** */
private static final long serialVersionUID = 1L;
/** The padded tail reference. */
final PaddedNode<E> tail;

public final class MpscPaddedQueue<E> extends MpscLinkedQueueTailRef<E> {
long p00, p01, p02, p03, p04, p05, p06, p07;
long p30, p31, p32, p33, p34, p35, p36, p37;
/**
* Initializes the empty queue.
*/
public MpscPaddedQueue() {
Node<E> first = new Node<E>(null);
tail = new PaddedNode<E>();
tail.node = first;
set(first);
Node<E> stub = new Node<E>(null);
headRef(stub);
tailRef(stub);
}

/**
Expand All @@ -49,7 +95,7 @@ public MpscPaddedQueue() {
*/
public void offer(E v) {
Node<E> n = new Node<E>(v);
getAndSet(n).set(n);
getAndSetTailRef(n).next(n);
}

/**
Expand All @@ -63,25 +109,23 @@ public E poll() {
}
E v = n.value;
n.value = null; // do not retain this value as the node still stays in the queue
tail.lazySet(n);
lazySetHeadRef(n);
return v;
}

/**
* Check if there is a node available without changing anything.
* @return
*/
private Node<E> peekNode() {
for (;;) {
@SuppressWarnings(value = "unchecked")
Node<E> t = tail.node;
Node<E> n = t.get();
if (n != null || get() == t) {
Node<E> t = headRef();
Node<E> n = t.next();
if (n != null || headRef() == t) {
return n;
}
}
}

/**
* Clears the queue.
*/
Expand All @@ -92,46 +136,26 @@ public void clear() {
}
}
}
/** The front-padded node class housing the actual value. */
static abstract class PaddedNodeBase<E> extends FrontPadding {
private static final long serialVersionUID = 2L;
volatile Node<E> node;
@SuppressWarnings(value = "rawtypes")
static final AtomicReferenceFieldUpdater<PaddedNodeBase, Node> NODE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PaddedNodeBase.class, Node.class, "node");
public void lazySet(Node<E> newValue) {
NODE_UPDATER.lazySet(this, newValue);
}
}
/** Post-padding of the padded node base class. */
static final class PaddedNode<E> extends PaddedNodeBase<E> {
private static final long serialVersionUID = 3L;
/** Padding. */
public transient long p16, p17, p18, p19, p20, p21, p22; // 56 bytes (the remaining 8 is in the base)
/** Padding. */
public transient long p24, p25, p26, p27, p28, p29, p30, p31; // 64 bytes
}

/**
* Regular node with value and reference to the next node.
*/
static final class Node<E> implements Serializable {
private static final long serialVersionUID = 4L;
static final class Node<E> {
E value;
@SuppressWarnings(value = "rawtypes")
static final AtomicReferenceFieldUpdater<Node, Node> TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "tail");
volatile Node<E> tail;
private volatile Node<E> next;

public Node(E value) {
Node(E value) {
this.value = value;
}

public void set(Node<E> newTail) {
TAIL_UPDATER.lazySet(this, newTail);
void next(Node<E> newNext) {
TAIL_UPDATER.lazySet(this, newNext);
}

@SuppressWarnings(value = "unchecked")
public Node<E> get() {
return TAIL_UPDATER.get(this);
Node<E> next() {
return next;
}
}

Expand Down