Skip to content

Commit 94da41a

Browse files
authored
Merge pull request #89 from rabbitmq/block-on-management-init
Block on concurrent calls during management initialization
2 parents a924052 + c3f80b1 commit 94da41a

File tree

1 file changed

+49
-39
lines changed

1 file changed

+49
-39
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 49 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ class AmqpManagement implements Management {
8282
private final TopologyListener topologyListener;
8383
private final Supplier<String> nameSupplier;
8484
private final AtomicReference<State> state = new AtomicReference<>(CREATED);
85-
private final AtomicBoolean initializing = new AtomicBoolean(false);
85+
// private final AtomicBoolean initializing = new AtomicBoolean(false);
86+
private volatile boolean initializing = false;
87+
private final Lock initializationLock = new ReentrantLock();
8688
private final Duration receiveLoopIdleTimeout;
8789
private final Lock instanceLock = new ReentrantLock();
8890

@@ -170,7 +172,7 @@ public UnbindSpecification unbind() {
170172

171173
@Override
172174
public void close() {
173-
if (this.initializing.get()) {
175+
if (this.initializing) {
174176
throw new AmqpException.AmqpResourceInvalidStateException(
175177
"Management is initializing, retry closing later.");
176178
}
@@ -203,45 +205,53 @@ public void close() {
203205

204206
void init() {
205207
if (this.state() != OPEN) {
206-
if (this.initializing.compareAndSet(false, true)) {
207-
LOGGER.debug("Initializing management ({}).", this);
208-
this.state(UNAVAILABLE);
208+
if (!this.initializing) {
209209
try {
210-
if (this.receiveLoop != null) {
211-
this.receiveLoop.cancel(true);
212-
this.receiveLoop = null;
210+
initializationLock.lock();
211+
if (!this.initializing) {
212+
this.initializing = true;
213+
LOGGER.debug("Initializing management ({}).", this);
214+
this.state(UNAVAILABLE);
215+
try {
216+
if (this.receiveLoop != null) {
217+
this.receiveLoop.cancel(true);
218+
this.receiveLoop = null;
219+
}
220+
LOGGER.debug("Creating management session ({}).", this);
221+
this.session = this.connection.nativeConnection().openSession();
222+
String linkPairName = "management-link-pair";
223+
Map<String, Object> properties = Collections.singletonMap("paired", Boolean.TRUE);
224+
LOGGER.debug("Creating management sender ({}).", this);
225+
this.sender =
226+
session.openSender(
227+
MANAGEMENT_NODE_ADDRESS,
228+
new SenderOptions()
229+
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
230+
.linkName(linkPairName)
231+
.properties(properties));
232+
233+
LOGGER.debug("Creating management receiver ({}).", this);
234+
this.receiver =
235+
session.openReceiver(
236+
MANAGEMENT_NODE_ADDRESS,
237+
new ReceiverOptions()
238+
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
239+
.linkName(linkPairName)
240+
.properties(properties)
241+
.creditWindow(100));
242+
243+
this.sender.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
244+
LOGGER.debug("Management sender created ({}).", this);
245+
this.receiver.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
246+
LOGGER.debug("Management receiver created ({}).", this);
247+
this.state(OPEN);
248+
this.initializing = false;
249+
} catch (Exception e) {
250+
throw new AmqpException(e);
251+
}
213252
}
214-
LOGGER.debug("Creating management session ({}).", this);
215-
this.session = this.connection.nativeConnection().openSession();
216-
String linkPairName = "management-link-pair";
217-
Map<String, Object> properties = Collections.singletonMap("paired", Boolean.TRUE);
218-
LOGGER.debug("Creating management sender ({}).", this);
219-
this.sender =
220-
session.openSender(
221-
MANAGEMENT_NODE_ADDRESS,
222-
new SenderOptions()
223-
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
224-
.linkName(linkPairName)
225-
.properties(properties));
226-
227-
LOGGER.debug("Creating management receiver ({}).", this);
228-
this.receiver =
229-
session.openReceiver(
230-
MANAGEMENT_NODE_ADDRESS,
231-
new ReceiverOptions()
232-
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
233-
.linkName(linkPairName)
234-
.properties(properties)
235-
.creditWindow(100));
236-
237-
this.sender.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
238-
LOGGER.debug("Management sender created ({}).", this);
239-
this.receiver.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
240-
LOGGER.debug("Management receiver created ({}).", this);
241-
this.state(OPEN);
242-
this.initializing.set(false);
243-
} catch (Exception e) {
244-
throw new AmqpException(e);
253+
} finally {
254+
initializationLock.unlock();
245255
}
246256
}
247257
}

0 commit comments

Comments
 (0)