Skip to content

Commit c2e023a

Browse files
authored
adds MpscUnboundedArrayQueue changes from JCTools (#968)
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 43d4d0f commit c2e023a

15 files changed

+690
-284
lines changed

rsocket-core/src/main/java/io/rsocket/internal/jctools/queues/BaseLinkedQueue.java

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,34 +13,51 @@
1313
*/
1414
package io.rsocket.internal.jctools.queues;
1515

16-
import static io.rsocket.internal.jctools.util.UnsafeAccess.UNSAFE;
17-
import static io.rsocket.internal.jctools.util.UnsafeAccess.fieldOffset;
16+
import static io.rsocket.internal.jctools.queues.UnsafeAccess.UNSAFE;
17+
import static io.rsocket.internal.jctools.queues.UnsafeAccess.fieldOffset;
1818

1919
import java.util.AbstractQueue;
2020
import java.util.Iterator;
2121

2222
abstract class BaseLinkedQueuePad0<E> extends AbstractQueue<E> implements MessagePassingQueue<E> {
23-
long p00, p01, p02, p03, p04, p05, p06, p07;
24-
long p10, p11, p12, p13, p14, p15, p16;
23+
byte b000, b001, b002, b003, b004, b005, b006, b007; // 8b
24+
byte b010, b011, b012, b013, b014, b015, b016, b017; // 16b
25+
byte b020, b021, b022, b023, b024, b025, b026, b027; // 24b
26+
byte b030, b031, b032, b033, b034, b035, b036, b037; // 32b
27+
byte b040, b041, b042, b043, b044, b045, b046, b047; // 40b
28+
byte b050, b051, b052, b053, b054, b055, b056, b057; // 48b
29+
byte b060, b061, b062, b063, b064, b065, b066, b067; // 56b
30+
byte b070, b071, b072, b073, b074, b075, b076, b077; // 64b
31+
byte b100, b101, b102, b103, b104, b105, b106, b107; // 72b
32+
byte b110, b111, b112, b113, b114, b115, b116, b117; // 80b
33+
byte b120, b121, b122, b123, b124, b125, b126, b127; // 88b
34+
byte b130, b131, b132, b133, b134, b135, b136, b137; // 96b
35+
byte b140, b141, b142, b143, b144, b145, b146, b147; // 104b
36+
byte b150, b151, b152, b153, b154, b155, b156, b157; // 112b
37+
byte b160, b161, b162, b163, b164, b165, b166, b167; // 120b
38+
// byte b170,b171,b172,b173,b174,b175,b176,b177;//128b
39+
// * drop 8b as object header acts as padding and is >= 8b *
2540
}
2641

2742
// $gen:ordered-fields
2843
abstract class BaseLinkedQueueProducerNodeRef<E> extends BaseLinkedQueuePad0<E> {
2944
static final long P_NODE_OFFSET =
3045
fieldOffset(BaseLinkedQueueProducerNodeRef.class, "producerNode");
3146

32-
private LinkedQueueNode<E> producerNode;
47+
private volatile LinkedQueueNode<E> producerNode;
3348

3449
final void spProducerNode(LinkedQueueNode<E> newValue) {
35-
producerNode = newValue;
50+
UNSAFE.putObject(this, P_NODE_OFFSET, newValue);
51+
}
52+
53+
final void soProducerNode(LinkedQueueNode<E> newValue) {
54+
UNSAFE.putOrderedObject(this, P_NODE_OFFSET, newValue);
3655
}
3756

38-
@SuppressWarnings("unchecked")
3957
final LinkedQueueNode<E> lvProducerNode() {
40-
return (LinkedQueueNode<E>) UNSAFE.getObjectVolatile(this, P_NODE_OFFSET);
58+
return producerNode;
4159
}
4260

43-
@SuppressWarnings("unchecked")
4461
final boolean casProducerNode(LinkedQueueNode<E> expect, LinkedQueueNode<E> newValue) {
4562
return UNSAFE.compareAndSwapObject(this, P_NODE_OFFSET, expect, newValue);
4663
}
@@ -51,8 +68,22 @@ final LinkedQueueNode<E> lpProducerNode() {
5168
}
5269

5370
abstract class BaseLinkedQueuePad1<E> extends BaseLinkedQueueProducerNodeRef<E> {
54-
long p01, p02, p03, p04, p05, p06, p07;
55-
long p10, p11, p12, p13, p14, p15, p16, p17;
71+
byte b000, b001, b002, b003, b004, b005, b006, b007; // 8b
72+
byte b010, b011, b012, b013, b014, b015, b016, b017; // 16b
73+
byte b020, b021, b022, b023, b024, b025, b026, b027; // 24b
74+
byte b030, b031, b032, b033, b034, b035, b036, b037; // 32b
75+
byte b040, b041, b042, b043, b044, b045, b046, b047; // 40b
76+
byte b050, b051, b052, b053, b054, b055, b056, b057; // 48b
77+
byte b060, b061, b062, b063, b064, b065, b066, b067; // 56b
78+
byte b070, b071, b072, b073, b074, b075, b076, b077; // 64b
79+
byte b100, b101, b102, b103, b104, b105, b106, b107; // 72b
80+
byte b110, b111, b112, b113, b114, b115, b116, b117; // 80b
81+
byte b120, b121, b122, b123, b124, b125, b126, b127; // 88b
82+
byte b130, b131, b132, b133, b134, b135, b136, b137; // 96b
83+
byte b140, b141, b142, b143, b144, b145, b146, b147; // 104b
84+
byte b150, b151, b152, b153, b154, b155, b156, b157; // 112b
85+
byte b160, b161, b162, b163, b164, b165, b166, b167; // 120b
86+
byte b170, b171, b172, b173, b174, b175, b176, b177; // 128b
5687
}
5788

5889
// $gen:ordered-fields
@@ -77,16 +108,27 @@ final LinkedQueueNode<E> lpConsumerNode() {
77108
}
78109

79110
abstract class BaseLinkedQueuePad2<E> extends BaseLinkedQueueConsumerNodeRef<E> {
80-
long p01, p02, p03, p04, p05, p06, p07;
81-
long p10, p11, p12, p13, p14, p15, p16, p17;
111+
byte b000, b001, b002, b003, b004, b005, b006, b007; // 8b
112+
byte b010, b011, b012, b013, b014, b015, b016, b017; // 16b
113+
byte b020, b021, b022, b023, b024, b025, b026, b027; // 24b
114+
byte b030, b031, b032, b033, b034, b035, b036, b037; // 32b
115+
byte b040, b041, b042, b043, b044, b045, b046, b047; // 40b
116+
byte b050, b051, b052, b053, b054, b055, b056, b057; // 48b
117+
byte b060, b061, b062, b063, b064, b065, b066, b067; // 56b
118+
byte b070, b071, b072, b073, b074, b075, b076, b077; // 64b
119+
byte b100, b101, b102, b103, b104, b105, b106, b107; // 72b
120+
byte b110, b111, b112, b113, b114, b115, b116, b117; // 80b
121+
byte b120, b121, b122, b123, b124, b125, b126, b127; // 88b
122+
byte b130, b131, b132, b133, b134, b135, b136, b137; // 96b
123+
byte b140, b141, b142, b143, b144, b145, b146, b147; // 104b
124+
byte b150, b151, b152, b153, b154, b155, b156, b157; // 112b
125+
byte b160, b161, b162, b163, b164, b165, b166, b167; // 120b
126+
byte b170, b171, b172, b173, b174, b175, b176, b177; // 128b
82127
}
83128

84129
/**
85130
* A base data structure for concurrent linked queues. For convenience also pulled in common single
86131
* consumer methods since at this time there's no plan to implement MC.
87-
*
88-
* @param <E>
89-
* @author nitsanw
90132
*/
91133
abstract class BaseLinkedQueue<E> extends BaseLinkedQueuePad2<E> {
92134

@@ -158,8 +200,10 @@ public final int size() {
158200
* @see MessagePassingQueue#isEmpty()
159201
*/
160202
@Override
161-
public final boolean isEmpty() {
162-
return lvConsumerNode() == lvProducerNode();
203+
public boolean isEmpty() {
204+
LinkedQueueNode<E> consumerNode = lvConsumerNode();
205+
LinkedQueueNode<E> producerNode = lvProducerNode();
206+
return consumerNode == producerNode;
163207
}
164208

165209
protected E getSingleConsumerNodeValue(

0 commit comments

Comments
 (0)