Skip to content

adds MpscUnboundedArrayQueue changes from JCTools #968

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,34 +13,51 @@
*/
package io.rsocket.internal.jctools.queues;

import static io.rsocket.internal.jctools.util.UnsafeAccess.UNSAFE;
import static io.rsocket.internal.jctools.util.UnsafeAccess.fieldOffset;
import static io.rsocket.internal.jctools.queues.UnsafeAccess.UNSAFE;
import static io.rsocket.internal.jctools.queues.UnsafeAccess.fieldOffset;

import java.util.AbstractQueue;
import java.util.Iterator;

abstract class BaseLinkedQueuePad0<E> extends AbstractQueue<E> implements MessagePassingQueue<E> {
long p00, p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16;
byte b000, b001, b002, b003, b004, b005, b006, b007; // 8b
byte b010, b011, b012, b013, b014, b015, b016, b017; // 16b
byte b020, b021, b022, b023, b024, b025, b026, b027; // 24b
byte b030, b031, b032, b033, b034, b035, b036, b037; // 32b
byte b040, b041, b042, b043, b044, b045, b046, b047; // 40b
byte b050, b051, b052, b053, b054, b055, b056, b057; // 48b
byte b060, b061, b062, b063, b064, b065, b066, b067; // 56b
byte b070, b071, b072, b073, b074, b075, b076, b077; // 64b
byte b100, b101, b102, b103, b104, b105, b106, b107; // 72b
byte b110, b111, b112, b113, b114, b115, b116, b117; // 80b
byte b120, b121, b122, b123, b124, b125, b126, b127; // 88b
byte b130, b131, b132, b133, b134, b135, b136, b137; // 96b
byte b140, b141, b142, b143, b144, b145, b146, b147; // 104b
byte b150, b151, b152, b153, b154, b155, b156, b157; // 112b
byte b160, b161, b162, b163, b164, b165, b166, b167; // 120b
// byte b170,b171,b172,b173,b174,b175,b176,b177;//128b
// * drop 8b as object header acts as padding and is >= 8b *
}

// $gen:ordered-fields
abstract class BaseLinkedQueueProducerNodeRef<E> extends BaseLinkedQueuePad0<E> {
static final long P_NODE_OFFSET =
fieldOffset(BaseLinkedQueueProducerNodeRef.class, "producerNode");

private LinkedQueueNode<E> producerNode;
private volatile LinkedQueueNode<E> producerNode;

final void spProducerNode(LinkedQueueNode<E> newValue) {
producerNode = newValue;
UNSAFE.putObject(this, P_NODE_OFFSET, newValue);
}

final void soProducerNode(LinkedQueueNode<E> newValue) {
UNSAFE.putOrderedObject(this, P_NODE_OFFSET, newValue);
}

@SuppressWarnings("unchecked")
final LinkedQueueNode<E> lvProducerNode() {
return (LinkedQueueNode<E>) UNSAFE.getObjectVolatile(this, P_NODE_OFFSET);
return producerNode;
}

@SuppressWarnings("unchecked")
final boolean casProducerNode(LinkedQueueNode<E> expect, LinkedQueueNode<E> newValue) {
return UNSAFE.compareAndSwapObject(this, P_NODE_OFFSET, expect, newValue);
}
Expand All @@ -51,8 +68,22 @@ final LinkedQueueNode<E> lpProducerNode() {
}

abstract class BaseLinkedQueuePad1<E> extends BaseLinkedQueueProducerNodeRef<E> {
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;
byte b000, b001, b002, b003, b004, b005, b006, b007; // 8b
byte b010, b011, b012, b013, b014, b015, b016, b017; // 16b
byte b020, b021, b022, b023, b024, b025, b026, b027; // 24b
byte b030, b031, b032, b033, b034, b035, b036, b037; // 32b
byte b040, b041, b042, b043, b044, b045, b046, b047; // 40b
byte b050, b051, b052, b053, b054, b055, b056, b057; // 48b
byte b060, b061, b062, b063, b064, b065, b066, b067; // 56b
byte b070, b071, b072, b073, b074, b075, b076, b077; // 64b
byte b100, b101, b102, b103, b104, b105, b106, b107; // 72b
byte b110, b111, b112, b113, b114, b115, b116, b117; // 80b
byte b120, b121, b122, b123, b124, b125, b126, b127; // 88b
byte b130, b131, b132, b133, b134, b135, b136, b137; // 96b
byte b140, b141, b142, b143, b144, b145, b146, b147; // 104b
byte b150, b151, b152, b153, b154, b155, b156, b157; // 112b
byte b160, b161, b162, b163, b164, b165, b166, b167; // 120b
byte b170, b171, b172, b173, b174, b175, b176, b177; // 128b
}

// $gen:ordered-fields
Expand All @@ -77,16 +108,27 @@ final LinkedQueueNode<E> lpConsumerNode() {
}

abstract class BaseLinkedQueuePad2<E> extends BaseLinkedQueueConsumerNodeRef<E> {
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;
byte b000, b001, b002, b003, b004, b005, b006, b007; // 8b
byte b010, b011, b012, b013, b014, b015, b016, b017; // 16b
byte b020, b021, b022, b023, b024, b025, b026, b027; // 24b
byte b030, b031, b032, b033, b034, b035, b036, b037; // 32b
byte b040, b041, b042, b043, b044, b045, b046, b047; // 40b
byte b050, b051, b052, b053, b054, b055, b056, b057; // 48b
byte b060, b061, b062, b063, b064, b065, b066, b067; // 56b
byte b070, b071, b072, b073, b074, b075, b076, b077; // 64b
byte b100, b101, b102, b103, b104, b105, b106, b107; // 72b
byte b110, b111, b112, b113, b114, b115, b116, b117; // 80b
byte b120, b121, b122, b123, b124, b125, b126, b127; // 88b
byte b130, b131, b132, b133, b134, b135, b136, b137; // 96b
byte b140, b141, b142, b143, b144, b145, b146, b147; // 104b
byte b150, b151, b152, b153, b154, b155, b156, b157; // 112b
byte b160, b161, b162, b163, b164, b165, b166, b167; // 120b
byte b170, b171, b172, b173, b174, b175, b176, b177; // 128b
}

/**
* A base data structure for concurrent linked queues. For convenience also pulled in common single
* consumer methods since at this time there's no plan to implement MC.
*
* @param <E>
* @author nitsanw
*/
abstract class BaseLinkedQueue<E> extends BaseLinkedQueuePad2<E> {

Expand Down Expand Up @@ -158,8 +200,10 @@ public final int size() {
* @see MessagePassingQueue#isEmpty()
*/
@Override
public final boolean isEmpty() {
return lvConsumerNode() == lvProducerNode();
public boolean isEmpty() {
LinkedQueueNode<E> consumerNode = lvConsumerNode();
LinkedQueueNode<E> producerNode = lvProducerNode();
return consumerNode == producerNode;
}

protected E getSingleConsumerNodeValue(
Expand Down
Loading