Skip to content

Commit 9525e33

Browse files
committed
Merge pull request #2561 from akarnokd/JCToolsUpdate0129
Updating queue code from JCTools
2 parents ca02838 + f78903f commit 9525e33

File tree

7 files changed

+202
-97
lines changed

7 files changed

+202
-97
lines changed

src/main/java/rx/internal/util/unsafe/ConcurrentCircularArrayQueue.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,27 +60,33 @@ public abstract class ConcurrentCircularArrayQueue<E> extends ConcurrentCircular
6060
REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class)
6161
+ (BUFFER_PAD << (REF_ELEMENT_SHIFT - SPARSE_SHIFT));
6262
}
63-
protected final int capacity;
6463
protected final long mask;
6564
// @Stable :(
6665
protected final E[] buffer;
6766

6867
@SuppressWarnings("unchecked")
6968
public ConcurrentCircularArrayQueue(int capacity) {
70-
this.capacity = Pow2.roundToPowerOfTwo(capacity);
71-
mask = this.capacity - 1;
69+
int actualCapacity = Pow2.roundToPowerOfTwo(capacity);
70+
mask = actualCapacity - 1;
7271
// pad data on either end with some empty slots.
73-
buffer = (E[]) new Object[(this.capacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
72+
buffer = (E[]) new Object[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
7473
}
7574

7675
/**
7776
* @param index desirable element index
7877
* @return the offset in bytes within the array for a given index.
7978
*/
8079
protected final long calcElementOffset(long index) {
80+
return calcElementOffset(index, mask);
81+
}
82+
/**
83+
* @param index desirable element index
84+
* @param mask
85+
* @return the offset in bytes within the array for a given index.
86+
*/
87+
protected final long calcElementOffset(long index, long mask) {
8188
return REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);
8289
}
83-
8490
/**
8591
* A plain store (no ordering/fences) of an element to a given offset
8692
*
@@ -171,4 +177,10 @@ protected final E lvElement(E[] buffer, long offset) {
171177
public Iterator<E> iterator() {
172178
throw new UnsupportedOperationException();
173179
}
174-
}
180+
@Override
181+
public void clear() {
182+
// we have to test isEmpty because of the weaker poll() guarantee
183+
while (poll() != null || !isEmpty())
184+
;
185+
}
186+
}

src/main/java/rx/internal/util/unsafe/ConcurrentSequencedCircularArrayQueue.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,10 @@ public abstract class ConcurrentSequencedCircularArrayQueue<E> extends Concurren
3535

3636
public ConcurrentSequencedCircularArrayQueue(int capacity) {
3737
super(capacity);
38+
int actualCapacity = (int) (this.mask + 1);
3839
// pad data on either end with some empty slots.
39-
sequenceBuffer = new long[(this.capacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
40-
for (long i = 0; i < this.capacity; i++) {
40+
sequenceBuffer = new long[(actualCapacity << SPARSE_SHIFT) + BUFFER_PAD * 2];
41+
for (long i = 0; i < actualCapacity; i++) {
4142
soSequence(sequenceBuffer, calcSequenceOffset(i), i);
4243
}
4344
}
@@ -54,4 +55,4 @@ protected final long lvSequence(long[] buffer, long offset) {
5455
return UNSAFE.getLongVolatile(buffer, offset);
5556
}
5657

57-
}
58+
}

src/main/java/rx/internal/util/unsafe/MessagePassingQueue.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,26 +33,26 @@ interface MessagePassingQueue<M> {
3333

3434
/**
3535
* Called from a producer thread subject to the restrictions appropriate to the implementation and according to the
36-
* {@link Queue#offer(Object)} interface (but failure to offer doesn't necessitate queue is full).
36+
* {@link Queue#offer(Object)} interface.
3737
*
3838
* @param message
39-
* @return true if element was inserted into the queue, false if cannot enqueue
39+
* @return true if element was inserted into the queue, false iff full
4040
*/
4141
boolean offer(M message);
4242

4343
/**
4444
* Called from the consumer thread subject to the restrictions appropriate to the implementation and according to
45-
* the {@link Queue#poll()} interface (barring the hard requirement on null returns).
45+
* the {@link Queue#poll()} interface.
4646
*
47-
* @return a message from the queue if one is available, null otherwise(not necessarily empty)
47+
* @return a message from the queue if one is available, null iff empty
4848
*/
4949
M poll();
5050

5151
/**
5252
* Called from the consumer thread subject to the restrictions appropriate to the implementation and according to
53-
* the {@link Queue#peek()} interface (barring the hard requirement on null returns).
53+
* the {@link Queue#peek()} interface.
5454
*
55-
* @return a message from the queue if one is available, null otherwise(not necessarily empty)
55+
* @return a message from the queue if one is available, null iff empty
5656
*/
5757
M peek();
5858

@@ -71,4 +71,4 @@ interface MessagePassingQueue<M> {
7171
*/
7272
boolean isEmpty();
7373

74-
}
74+
}

src/main/java/rx/internal/util/unsafe/MpmcArrayQueue.java

Lines changed: 51 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ abstract class MpmcArrayQueueProducerField<E> extends MpmcArrayQueueL1Pad<E> {
3131
private final static long P_INDEX_OFFSET;
3232
static {
3333
try {
34-
P_INDEX_OFFSET =
35-
UNSAFE.objectFieldOffset(MpmcArrayQueueProducerField.class.getDeclaredField("producerIndex"));
34+
P_INDEX_OFFSET = UNSAFE.objectFieldOffset(MpmcArrayQueueProducerField.class
35+
.getDeclaredField("producerIndex"));
3636
} catch (NoSuchFieldException e) {
3737
throw new RuntimeException(e);
3838
}
@@ -65,8 +65,8 @@ abstract class MpmcArrayQueueConsumerField<E> extends MpmcArrayQueueL2Pad<E> {
6565
private final static long C_INDEX_OFFSET;
6666
static {
6767
try {
68-
C_INDEX_OFFSET =
69-
UNSAFE.objectFieldOffset(MpmcArrayQueueConsumerField.class.getDeclaredField("consumerIndex"));
68+
C_INDEX_OFFSET = UNSAFE.objectFieldOffset(MpmcArrayQueueConsumerField.class
69+
.getDeclaredField("consumerIndex"));
7070
} catch (NoSuchFieldException e) {
7171
throw new RuntimeException(e);
7272
}
@@ -87,26 +87,28 @@ protected final boolean casConsumerIndex(long expect, long newValue) {
8787
}
8888

8989
/**
90-
* A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that any and all
91-
* threads may call the offer/poll/peek methods and correctness is maintained. <br>
90+
* A Multi-Producer-Multi-Consumer queue based on a {@link ConcurrentCircularArrayQueue}. This implies that
91+
* any and all threads may call the offer/poll/peek methods and correctness is maintained. <br>
9292
* This implementation follows patterns documented on the package level for False Sharing protection.<br>
9393
* The algorithm for offer/poll is an adaptation of the one put forward by D. Vyukov (See <a
94-
* href="http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue">here</a>). The original algorithm
95-
* uses an array of structs which should offer nice locality properties but is sadly not possible in Java (waiting on
96-
* Value Types or similar). The alternative explored here utilizes 2 arrays, one for each field of the struct. There is
97-
* a further alternative in the experimental project which uses iteration phase markers to achieve the same algo and is
98-
* closer structurally to the original, but sadly does not perform as well as this implementation.<br>
94+
* href="http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue">here</a>). The original
95+
* algorithm uses an array of structs which should offer nice locality properties but is sadly not possible in
96+
* Java (waiting on Value Types or similar). The alternative explored here utilizes 2 arrays, one for each
97+
* field of the struct. There is a further alternative in the experimental project which uses iteration phase
98+
* markers to achieve the same algo and is closer structurally to the original, but sadly does not perform as
99+
* well as this implementation.<br>
99100
* Tradeoffs to keep in mind:
100101
* <ol>
101-
* <li>Padding for false sharing: counter fields and queue fields are all padded as well as either side of both arrays.
102-
* We are trading memory to avoid false sharing(active and passive).
103-
* <li>2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the elements array.
104-
* This is doubling/tripling the memory allocated for the buffer.
102+
* <li>Padding for false sharing: counter fields and queue fields are all padded as well as either side of
103+
* both arrays. We are trading memory to avoid false sharing(active and passive).
104+
* <li>2 arrays instead of one: The algorithm requires an extra array of longs matching the size of the
105+
* elements array. This is doubling/tripling the memory allocated for the buffer.
105106
* <li>Power of 2 capacity: Actual elements buffer (and sequence buffer) is the closest power of 2 larger or
106107
* equal to the requested capacity.
107108
* </ol>
108109
*
109-
* @param <E> type of the element stored in the {@link java.util.Queue}
110+
* @param <E>
111+
* type of the element stored in the {@link java.util.Queue}
110112
*/
111113
public class MpmcArrayQueue<E> extends MpmcArrayQueueConsumerField<E> {
112114
long p40, p41, p42, p43, p44, p45, p46;
@@ -123,10 +125,11 @@ public boolean offer(final E e) {
123125
}
124126

125127
// local load of field to avoid repeated loads after volatile reads
128+
final long capacity = mask + 1;
126129
final long[] lSequenceBuffer = sequenceBuffer;
127130
long currentProducerIndex;
128131
long seqOffset;
129-
132+
long cIndex = Long.MAX_VALUE;// start with bogus value, hope we don't need it
130133
while (true) {
131134
currentProducerIndex = lvProducerIndex(); // LoadLoad
132135
seqOffset = calcSequenceOffset(currentProducerIndex);
@@ -140,8 +143,10 @@ public boolean offer(final E e) {
140143
break;
141144
}
142145
// failed cas, retry 1
143-
} else if (delta < 0) {
144-
// poll has not moved this value forward
146+
} else if (delta < 0 && // poll has not moved this value forward
147+
currentProducerIndex - capacity <= cIndex && // test against cached cIndex
148+
currentProducerIndex - capacity <= (cIndex = lvConsumerIndex())) { // test against latest cIndex
149+
// Extra check required to ensure [Queue.offer == false iff queue is full]
145150
return false;
146151
}
147152

@@ -161,16 +166,17 @@ public boolean offer(final E e) {
161166

162167
/**
163168
* {@inheritDoc}
164-
* Because return null indicates queue is empty we cannot simply rely on next element visibility for poll and must
165-
* test producer index when next element is not visible.
169+
* <p>
170+
* Because return null indicates queue is empty we cannot simply rely on next element visibility for poll
171+
* and must test producer index when next element is not visible.
166172
*/
167173
@Override
168174
public E poll() {
169175
// local load of field to avoid repeated loads after volatile reads
170176
final long[] lSequenceBuffer = sequenceBuffer;
171177
long currentConsumerIndex;
172178
long seqOffset;
173-
179+
long pIndex = -1; // start with bogus value, hope we don't need it
174180
while (true) {
175181
currentConsumerIndex = lvConsumerIndex();// LoadLoad
176182
seqOffset = calcSequenceOffset(currentConsumerIndex);
@@ -183,12 +189,10 @@ public E poll() {
183189
break;
184190
}
185191
// failed cas, retry 1
186-
} else if (delta < 0) {
187-
// COMMENTED OUT: strict empty check.
188-
// if (currentConsumerIndex == lvProducerIndex()) {
189-
// return null;
190-
// }
191-
// next element is not visible, probably empty
192+
} else if (delta < 0 && // slot has not been moved by producer
193+
currentConsumerIndex >= pIndex && // test against cached pIndex
194+
currentConsumerIndex == (pIndex = lvProducerIndex())) { // update pIndex if we must
195+
// strict empty check, this ensures [Queue.poll() == null iff isEmpty()]
192196
return null;
193197
}
194198

@@ -202,22 +206,31 @@ public E poll() {
202206

203207
// Move sequence ahead by capacity, preparing it for next offer
204208
// (seeing this value from a consumer will lead to retry 2)
205-
soSequence(lSequenceBuffer, seqOffset, currentConsumerIndex + capacity);// StoreStore
209+
soSequence(lSequenceBuffer, seqOffset, currentConsumerIndex + mask + 1);// StoreStore
206210

207211
return e;
208212
}
209213

210214
@Override
211215
public E peek() {
212-
return lpElement(calcElementOffset(lvConsumerIndex()));
216+
long currConsumerIndex;
217+
E e;
218+
do {
219+
currConsumerIndex = lvConsumerIndex();
220+
// other consumers may have grabbed the element, or queue might be empty
221+
e = lpElement(calcElementOffset(currConsumerIndex));
222+
// only return null if queue is empty
223+
} while (e == null && currConsumerIndex != lvProducerIndex());
224+
return e;
213225
}
214226

215227
@Override
216228
public int size() {
217229
/*
218-
* It is possible for a thread to be interrupted or reschedule between the read of the producer and consumer
219-
* indices, therefore protection is required to ensure size is within valid range. In the event of concurrent
220-
* polls/offers to this method the size is OVER estimated as we read consumer index BEFORE the producer index.
230+
* It is possible for a thread to be interrupted or reschedule between the read of the producer and
231+
* consumer indices, therefore protection is required to ensure size is within valid range. In the
232+
* event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
233+
* index BEFORE the producer index.
221234
*/
222235
long after = lvConsumerIndex();
223236
while (true) {
@@ -229,13 +242,13 @@ public int size() {
229242
}
230243
}
231244
}
232-
245+
233246
@Override
234247
public boolean isEmpty() {
235-
// Order matters!
248+
// Order matters!
236249
// Loading consumer before producer allows for producer increments after consumer index is read.
237-
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is nothing we
238-
// can do to make this an exact method.
250+
// This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
251+
// nothing we can do to make this an exact method.
239252
return (lvConsumerIndex() == lvProducerIndex());
240253
}
241-
}
254+
}

0 commit comments

Comments
 (0)