Skip to content

Commit 1843841

Browse files
Merge pull request #1374 from nitsanw/master
Fix head/tail false sharing issues
2 parents a270165 + ab398c8 commit 1843841

File tree

1 file changed

+73
-49
lines changed

1 file changed

+73
-49
lines changed

rxjava-core/src/main/java/rx/internal/util/MpscPaddedQueue.java

Lines changed: 73 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,77 @@
1515
*/
1616
package rx.internal.util;
1717

18-
import java.io.Serializable;
19-
import java.util.concurrent.atomic.AtomicReference;
18+
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
19+
2020
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2121

22+
import rx.internal.util.MpscPaddedQueue.Node;
23+
24+
abstract class MpscLinkedQueuePad0<E> {
25+
long p00, p01, p02, p03, p04, p05, p06, p07;
26+
long p30, p31, p32, p33, p34, p35, p36, p37;
27+
}
28+
29+
abstract class MpscLinkedQueueHeadRef<E> extends MpscLinkedQueuePad0<E> {
30+
@SuppressWarnings("rawtypes")
31+
private static final AtomicReferenceFieldUpdater<MpscLinkedQueueHeadRef, Node> UPDATER =
32+
newUpdater(MpscLinkedQueueHeadRef.class, Node.class, "headRef");
33+
private volatile Node<E> headRef;
34+
35+
protected final Node<E> headRef() {
36+
return headRef;
37+
}
38+
protected final void headRef(Node<E> val) {
39+
headRef = val;
40+
}
41+
protected final void lazySetHeadRef(Node<E> newVal) {
42+
UPDATER.lazySet(this, newVal);
43+
}
44+
}
45+
46+
abstract class MpscLinkedQueuePad1<E> extends MpscLinkedQueueHeadRef<E> {
47+
long p00, p01, p02, p03, p04, p05, p06, p07;
48+
long p30, p31, p32, p33, p34, p35, p36, p37;
49+
}
50+
51+
abstract class MpscLinkedQueueTailRef<E> extends MpscLinkedQueuePad1<E> {
52+
@SuppressWarnings("rawtypes")
53+
private static final AtomicReferenceFieldUpdater<MpscLinkedQueueTailRef, Node> UPDATER =
54+
newUpdater(MpscLinkedQueueTailRef.class, Node.class, "tailRef");
55+
private volatile Node<E> tailRef;
56+
protected final Node<E> tailRef() {
57+
return tailRef;
58+
}
59+
protected final void tailRef(Node<E> val) {
60+
tailRef = val;
61+
}
62+
@SuppressWarnings("unchecked")
63+
protected final Node<E> getAndSetTailRef(Node<E> newVal) {
64+
return (Node<E>) UPDATER.getAndSet(this, newVal);
65+
}
66+
}
2267
/**
2368
* A multiple-producer single consumer queue implementation with padded reference to tail to avoid cache-line
2469
* thrashing. Based on Netty's <a href='https://github.com/netty/netty/blob/master/common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java'>MpscQueue implementation</a>
25-
* but using {@code AtomicReferenceFieldUpdater} instead of {@code Unsafe}.
26-
*
70+
* but using {@code AtomicReferenceFieldUpdater} instead of {@code Unsafe}.<br>
71+
* Original algorithm presented <a
72+
* href="http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue"> on 1024
73+
* Cores</a> by D. Vyukov.<br>
74+
* Data structure modified to avoid false sharing between head and tail references as per implementation of
75+
* MpscLinkedQueue on <a href="https://github.com/JCTools/JCTools">JCTools project</a>.
76+
*
2777
* @param <E> the element type
2878
*/
29-
public final class MpscPaddedQueue<E> extends AtomicReference<MpscPaddedQueue.Node<E>> {
30-
/** */
31-
private static final long serialVersionUID = 1L;
32-
/** The padded tail reference. */
33-
final PaddedNode<E> tail;
34-
79+
public final class MpscPaddedQueue<E> extends MpscLinkedQueueTailRef<E> {
80+
long p00, p01, p02, p03, p04, p05, p06, p07;
81+
long p30, p31, p32, p33, p34, p35, p36, p37;
3582
/**
3683
* Initializes the empty queue.
3784
*/
3885
public MpscPaddedQueue() {
39-
Node<E> first = new Node<E>(null);
40-
tail = new PaddedNode<E>();
41-
tail.node = first;
42-
set(first);
86+
Node<E> stub = new Node<E>(null);
87+
headRef(stub);
88+
tailRef(stub);
4389
}
4490

4591
/**
@@ -49,7 +95,7 @@ public MpscPaddedQueue() {
4995
*/
5096
public void offer(E v) {
5197
Node<E> n = new Node<E>(v);
52-
getAndSet(n).set(n);
98+
getAndSetTailRef(n).next(n);
5399
}
54100

55101
/**
@@ -63,25 +109,23 @@ public E poll() {
63109
}
64110
E v = n.value;
65111
n.value = null; // do not retain this value as the node still stays in the queue
66-
tail.lazySet(n);
112+
lazySetHeadRef(n);
67113
return v;
68114
}
69-
115+
70116
/**
71117
* Check if there is a node available without changing anything.
72118
* @return
73119
*/
74120
private Node<E> peekNode() {
75121
for (;;) {
76-
@SuppressWarnings(value = "unchecked")
77-
Node<E> t = tail.node;
78-
Node<E> n = t.get();
79-
if (n != null || get() == t) {
122+
Node<E> t = headRef();
123+
Node<E> n = t.next();
124+
if (n != null || headRef() == t) {
80125
return n;
81126
}
82127
}
83128
}
84-
85129
/**
86130
* Clears the queue.
87131
*/
@@ -92,46 +136,26 @@ public void clear() {
92136
}
93137
}
94138
}
95-
/** The front-padded node class housing the actual value. */
96-
static abstract class PaddedNodeBase<E> extends FrontPadding {
97-
private static final long serialVersionUID = 2L;
98-
volatile Node<E> node;
99-
@SuppressWarnings(value = "rawtypes")
100-
static final AtomicReferenceFieldUpdater<PaddedNodeBase, Node> NODE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PaddedNodeBase.class, Node.class, "node");
101-
public void lazySet(Node<E> newValue) {
102-
NODE_UPDATER.lazySet(this, newValue);
103-
}
104-
}
105-
/** Post-padding of the padded node base class. */
106-
static final class PaddedNode<E> extends PaddedNodeBase<E> {
107-
private static final long serialVersionUID = 3L;
108-
/** Padding. */
109-
public transient long p16, p17, p18, p19, p20, p21, p22; // 56 bytes (the remaining 8 is in the base)
110-
/** Padding. */
111-
public transient long p24, p25, p26, p27, p28, p29, p30, p31; // 64 bytes
112-
}
113139

114140
/**
115141
* Regular node with value and reference to the next node.
116142
*/
117-
static final class Node<E> implements Serializable {
118-
private static final long serialVersionUID = 4L;
143+
static final class Node<E> {
119144
E value;
120145
@SuppressWarnings(value = "rawtypes")
121146
static final AtomicReferenceFieldUpdater<Node, Node> TAIL_UPDATER = AtomicReferenceFieldUpdater.newUpdater(Node.class, Node.class, "tail");
122-
volatile Node<E> tail;
147+
private volatile Node<E> next;
123148

124-
public Node(E value) {
149+
Node(E value) {
125150
this.value = value;
126151
}
127152

128-
public void set(Node<E> newTail) {
129-
TAIL_UPDATER.lazySet(this, newTail);
153+
void next(Node<E> newNext) {
154+
TAIL_UPDATER.lazySet(this, newNext);
130155
}
131156

132-
@SuppressWarnings(value = "unchecked")
133-
public Node<E> get() {
134-
return TAIL_UPDATER.get(this);
157+
Node<E> next() {
158+
return next;
135159
}
136160
}
137161

0 commit comments

Comments
 (0)