Skip to content

Block on concurrent calls during management initialization #89

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 49 additions & 39 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ class AmqpManagement implements Management {
private final TopologyListener topologyListener;
private final Supplier<String> nameSupplier;
private final AtomicReference<State> state = new AtomicReference<>(CREATED);
private final AtomicBoolean initializing = new AtomicBoolean(false);
// private final AtomicBoolean initializing = new AtomicBoolean(false);
private volatile boolean initializing = false;
private final Lock initializationLock = new ReentrantLock();
private final Duration receiveLoopIdleTimeout;
private final Lock instanceLock = new ReentrantLock();

Expand Down Expand Up @@ -170,7 +172,7 @@ public UnbindSpecification unbind() {

@Override
public void close() {
if (this.initializing.get()) {
if (this.initializing) {
throw new AmqpException.AmqpResourceInvalidStateException(
"Management is initializing, retry closing later.");
}
Expand Down Expand Up @@ -203,45 +205,53 @@ public void close() {

void init() {
if (this.state() != OPEN) {
if (this.initializing.compareAndSet(false, true)) {
LOGGER.debug("Initializing management ({}).", this);
this.state(UNAVAILABLE);
if (!this.initializing) {
try {
if (this.receiveLoop != null) {
this.receiveLoop.cancel(true);
this.receiveLoop = null;
initializationLock.lock();
if (!this.initializing) {
this.initializing = true;
LOGGER.debug("Initializing management ({}).", this);
this.state(UNAVAILABLE);
try {
if (this.receiveLoop != null) {
this.receiveLoop.cancel(true);
this.receiveLoop = null;
}
LOGGER.debug("Creating management session ({}).", this);
this.session = this.connection.nativeConnection().openSession();
String linkPairName = "management-link-pair";
Map<String, Object> properties = Collections.singletonMap("paired", Boolean.TRUE);
LOGGER.debug("Creating management sender ({}).", this);
this.sender =
session.openSender(
MANAGEMENT_NODE_ADDRESS,
new SenderOptions()
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
.linkName(linkPairName)
.properties(properties));

LOGGER.debug("Creating management receiver ({}).", this);
this.receiver =
session.openReceiver(
MANAGEMENT_NODE_ADDRESS,
new ReceiverOptions()
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
.linkName(linkPairName)
.properties(properties)
.creditWindow(100));

this.sender.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
LOGGER.debug("Management sender created ({}).", this);
this.receiver.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
LOGGER.debug("Management receiver created ({}).", this);
this.state(OPEN);
this.initializing = false;
} catch (Exception e) {
throw new AmqpException(e);
}
}
LOGGER.debug("Creating management session ({}).", this);
this.session = this.connection.nativeConnection().openSession();
String linkPairName = "management-link-pair";
Map<String, Object> properties = Collections.singletonMap("paired", Boolean.TRUE);
LOGGER.debug("Creating management sender ({}).", this);
this.sender =
session.openSender(
MANAGEMENT_NODE_ADDRESS,
new SenderOptions()
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
.linkName(linkPairName)
.properties(properties));

LOGGER.debug("Creating management receiver ({}).", this);
this.receiver =
session.openReceiver(
MANAGEMENT_NODE_ADDRESS,
new ReceiverOptions()
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
.linkName(linkPairName)
.properties(properties)
.creditWindow(100));

this.sender.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
LOGGER.debug("Management sender created ({}).", this);
this.receiver.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
LOGGER.debug("Management receiver created ({}).", this);
this.state(OPEN);
this.initializing.set(false);
} catch (Exception e) {
throw new AmqpException(e);
} finally {
initializationLock.unlock();
}
}
}
Expand Down