Skip to content

Commit 99f7a27

Browse files
committed
Refactor management state management
To avoid triggering 2 initializations at the same time.
1 parent 014f5a9 commit 99f7a27

File tree

2 files changed

+128
-91
lines changed

2 files changed

+128
-91
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 127 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
1818
package com.rabbitmq.client.amqp.impl;
1919

20+
import static com.rabbitmq.client.amqp.impl.AmqpManagement.State.*;
2021
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2122

2223
import com.rabbitmq.client.amqp.AmqpException;
@@ -64,14 +65,15 @@ class AmqpManagement implements Management {
6465
private volatile Session session;
6566
private volatile Sender sender;
6667
private volatile Receiver receiver;
67-
private final AtomicBoolean initialized = new AtomicBoolean(false);
6868
private final AtomicBoolean closed = new AtomicBoolean(false);
6969
private final Duration rpcTimeout = Duration.ofSeconds(10);
7070
private final ConcurrentMap<UUID, OutstandingRequest> outstandingRequests =
7171
new ConcurrentHashMap<>();
7272
private volatile Future<?> receiveLoop;
7373
private final TopologyListener topologyListener;
7474
private final Supplier<String> nameSupplier;
75+
private final AtomicReference<State> state = new AtomicReference<>(CREATED);
76+
private final AtomicBoolean initializing = new AtomicBoolean(false);
7577

7678
AmqpManagement(AmqpManagementParameters parameters) {
7779
this.id = ID_SEQUENCE.getAndIncrement();
@@ -153,103 +155,118 @@ public UnbindSpecification unbind() {
153155

154156
@Override
155157
public void close() {
156-
if (this.closed.compareAndSet(false, true) && this.initialized.get()) {
158+
if (this.initializing.get()) {
159+
throw new AmqpException.AmqpResourceInvalidStateException(
160+
"Management is initializing, retry closing later.");
161+
}
162+
if (this.closed.compareAndSet(false, true)) {
163+
this.state(CLOSED);
157164
this.releaseResources();
158-
try {
159-
this.receiver.close();
160-
} catch (Exception e) {
161-
LOGGER.debug("Error while closing management receiver: {}", e.getMessage());
165+
if (this.receiver != null) {
166+
try {
167+
this.receiver.close();
168+
} catch (Exception e) {
169+
LOGGER.debug("Error while closing management receiver: {}", e.getMessage());
170+
}
162171
}
163-
try {
164-
this.sender.close();
165-
} catch (Exception e) {
166-
LOGGER.debug("Error while closing management sender: {}", e.getMessage());
172+
if (this.sender != null) {
173+
try {
174+
this.sender.close();
175+
} catch (Exception e) {
176+
LOGGER.debug("Error while closing management sender: {}", e.getMessage());
177+
}
167178
}
168-
try {
169-
this.session.close();
170-
} catch (Exception e) {
171-
LOGGER.debug("Error while closing management session: {}", e.getMessage());
179+
if (this.session != null) {
180+
try {
181+
this.session.close();
182+
} catch (Exception e) {
183+
LOGGER.debug("Error while closing management session: {}", e.getMessage());
184+
}
172185
}
173186
}
174187
}
175188

176189
void init() {
177-
if (!this.initialized.get()) {
178-
LOGGER.debug("Initializing management ({}).", this);
179-
try {
180-
LOGGER.debug("Creating management session ({}).", this);
181-
this.session = this.connection.nativeConnection().openSession();
182-
String linkPairName = "management-link-pair";
183-
Map<String, Object> properties = Collections.singletonMap("paired", Boolean.TRUE);
184-
LOGGER.debug("Creating management sender ({}).", this);
185-
this.sender =
186-
session.openSender(
187-
MANAGEMENT_NODE_ADDRESS,
188-
new SenderOptions()
189-
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
190-
.linkName(linkPairName)
191-
.properties(properties));
192-
193-
LOGGER.debug("Creating management receiver ({}).", this);
194-
this.receiver =
195-
session.openReceiver(
196-
MANAGEMENT_NODE_ADDRESS,
197-
new ReceiverOptions()
198-
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
199-
.linkName(linkPairName)
200-
.properties(properties)
201-
.creditWindow(100));
202-
203-
this.sender.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
204-
LOGGER.debug("Management sender created ({}).", this);
205-
this.receiver.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
206-
LOGGER.debug("Management receiver created ({}).", this);
207-
Runnable receiveTask =
208-
() -> {
209-
try {
210-
while (!Thread.currentThread().isInterrupted()) {
211-
Delivery delivery = receiver.receive(100, MILLISECONDS);
212-
if (delivery != null) {
213-
Object correlationId = delivery.message().correlationId();
214-
if (correlationId instanceof UUID) {
215-
OutstandingRequest request = outstandingRequests.remove(correlationId);
216-
if (request != null) {
217-
request.complete(delivery.message());
190+
if (this.state() != OPEN) {
191+
if (this.initializing.compareAndSet(false, true)) {
192+
LOGGER.debug("Initializing management ({}).", this);
193+
this.state(UNAVAILABLE);
194+
try {
195+
LOGGER.debug("Creating management session ({}).", this);
196+
this.session = this.connection.nativeConnection().openSession();
197+
String linkPairName = "management-link-pair";
198+
Map<String, Object> properties = Collections.singletonMap("paired", Boolean.TRUE);
199+
LOGGER.debug("Creating management sender ({}).", this);
200+
this.sender =
201+
session.openSender(
202+
MANAGEMENT_NODE_ADDRESS,
203+
new SenderOptions()
204+
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
205+
.linkName(linkPairName)
206+
.properties(properties));
207+
208+
LOGGER.debug("Creating management receiver ({}).", this);
209+
this.receiver =
210+
session.openReceiver(
211+
MANAGEMENT_NODE_ADDRESS,
212+
new ReceiverOptions()
213+
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
214+
.linkName(linkPairName)
215+
.properties(properties)
216+
.creditWindow(100));
217+
218+
this.sender.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
219+
LOGGER.debug("Management sender created ({}).", this);
220+
this.receiver.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
221+
LOGGER.debug("Management receiver created ({}).", this);
222+
Runnable receiveTask =
223+
() -> {
224+
try {
225+
while (!Thread.currentThread().isInterrupted()) {
226+
Delivery delivery = receiver.receive(100, MILLISECONDS);
227+
if (delivery != null) {
228+
Object correlationId = delivery.message().correlationId();
229+
if (correlationId instanceof UUID) {
230+
OutstandingRequest request = outstandingRequests.remove(correlationId);
231+
if (request != null) {
232+
request.complete(delivery.message());
233+
} else {
234+
LOGGER.info("Could not find outstanding request {}", correlationId);
235+
}
218236
} else {
219-
LOGGER.info("Could not find outstanding request {}", correlationId);
237+
LOGGER.info("Could not correlate inbound message with management request");
220238
}
221-
} else {
222-
LOGGER.info("Could not correlate inbound message with management request");
223239
}
224240
}
241+
} catch (ClientConnectionRemotelyClosedException
242+
| ClientLinkRemotelyClosedException e) {
243+
// receiver is closed
244+
} catch (ClientSessionRemotelyClosedException e) {
245+
this.state(UNAVAILABLE);
246+
LOGGER.info(
247+
"Management session closed in receive loop: {} ({})", e.getMessage(), this);
248+
AmqpException exception = ExceptionUtils.convert(e);
249+
this.failRequests(exception);
250+
if (exception instanceof AmqpException.AmqpSecurityException) {
251+
LOGGER.debug(
252+
"Recovering AMQP management because the failure was a security exception ({}).",
253+
this);
254+
this.init();
255+
}
256+
} catch (ClientException e) {
257+
java.util.function.Consumer<String> log =
258+
this.closed.get() ? m -> LOGGER.debug(m, e) : m -> LOGGER.warn(m, e);
259+
log.accept("Error while polling AMQP receiver");
225260
}
226-
} catch (ClientConnectionRemotelyClosedException
227-
| ClientLinkRemotelyClosedException e) {
228-
// receiver is closed
229-
} catch (ClientSessionRemotelyClosedException e) {
230-
LOGGER.info(
231-
"Management session closed in receive loop: {} ({})", e.getMessage(), this);
232-
AmqpException exception = ExceptionUtils.convert(e);
233-
this.releaseResources();
234-
this.failRequests(exception);
235-
if (exception instanceof AmqpException.AmqpSecurityException) {
236-
LOGGER.debug(
237-
"Recovering AMQP management because the failure was a security exception ({}).",
238-
this);
239-
this.init();
240-
}
241-
} catch (ClientException e) {
242-
java.util.function.Consumer<String> log =
243-
this.closed.get() ? m -> LOGGER.debug(m, e) : m -> LOGGER.warn(m, e);
244-
log.accept("Error while polling AMQP receiver");
245-
}
246-
};
247-
LOGGER.debug("Starting management receive loop ({}).", this);
248-
this.receiveLoop = this.connection.executorService().submit(receiveTask);
249-
LOGGER.debug("Management initialized ({}).", this);
250-
this.initialized.set(true);
251-
} catch (Exception e) {
252-
throw new AmqpException(e);
261+
};
262+
LOGGER.debug("Starting management receive loop ({}).", this);
263+
this.receiveLoop = this.connection.executorService().submit(receiveTask);
264+
LOGGER.debug("Management initialized ({}).", this);
265+
this.state(OPEN);
266+
this.initializing.set(false);
267+
} catch (Exception e) {
268+
throw new AmqpException(e);
269+
}
253270
}
254271
}
255272
}
@@ -266,7 +283,7 @@ private void failRequests(AmqpException exception) {
266283
}
267284

268285
void releaseResources() {
269-
this.initialized.set(false);
286+
this.markUnavailable();
270287
if (this.receiveLoop != null) {
271288
this.receiveLoop.cancel(true);
272289
}
@@ -641,10 +658,11 @@ public String toString() {
641658
}
642659

643660
private void checkAvailable() {
644-
if (this.closed.get()) {
645-
throw new AmqpException("Management is closed");
646-
} else if (!this.initialized.get()) {
647-
throw new AmqpException("Management is not available");
661+
if (this.state() == CLOSED) {
662+
throw new AmqpException.AmqpResourceClosedException("Management is closed");
663+
} else if (this.state() != OPEN) {
664+
throw new AmqpException.AmqpResourceInvalidStateException(
665+
"Management is not open, current state is %s", this.state().name());
648666
}
649667
}
650668

@@ -671,4 +689,23 @@ T body() {
671689
return this.body;
672690
}
673691
}
692+
693+
private State state() {
694+
return this.state.get();
695+
}
696+
697+
private void state(State state) {
698+
this.state.set(state);
699+
}
700+
701+
void markUnavailable() {
702+
this.state(UNAVAILABLE);
703+
}
704+
705+
enum State {
706+
CREATED,
707+
OPEN,
708+
UNAVAILABLE,
709+
CLOSED
710+
}
674711
}

src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ void publisherShouldThrowWhenQueueDoesNotExist() {
341341
}
342342

343343
@Test
344-
void publisherSendingShouldThrowWhenQueueHasBeenDeleted() throws Exception {
344+
void publisherSendingShouldThrowWhenQueueHasBeenDeleted() {
345345
connection.management().queue(name).declare();
346346
Sync closedSync = sync();
347347
AtomicReference<Throwable> closedException = new AtomicReference<>();

0 commit comments

Comments
 (0)