Skip to content

Commit 7123f45

Browse files
authored
Merge pull request #169 from rabbitmq/keep-management-in-same-state
Keep management in same initial state after recovery
2 parents 2bdcfaf + d5e9ff9 commit 7123f45

File tree

8 files changed

+162
-17
lines changed

8 files changed

+162
-17
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,33 @@
1717
1818
package com.rabbitmq.client.amqp.impl;
1919

20-
import static com.rabbitmq.client.amqp.Resource.State.*;
20+
import static com.rabbitmq.client.amqp.Resource.State.CLOSED;
21+
import static com.rabbitmq.client.amqp.Resource.State.CLOSING;
22+
import static com.rabbitmq.client.amqp.Resource.State.OPEN;
23+
import static com.rabbitmq.client.amqp.Resource.State.OPENING;
24+
import static com.rabbitmq.client.amqp.Resource.State.RECOVERING;
2125
import static com.rabbitmq.client.amqp.impl.ExceptionUtils.convert;
26+
import static com.rabbitmq.client.amqp.impl.Tuples.pair;
2227
import static com.rabbitmq.client.amqp.impl.Utils.supportFilterExpressions;
2328
import static com.rabbitmq.client.amqp.impl.Utils.supportSetToken;
2429
import static java.lang.System.nanoTime;
2530
import static java.time.Duration.ofNanos;
2631

27-
import com.rabbitmq.client.amqp.*;
32+
import com.rabbitmq.client.amqp.Address;
33+
import com.rabbitmq.client.amqp.AmqpException;
34+
import com.rabbitmq.client.amqp.Connection;
35+
import com.rabbitmq.client.amqp.ConnectionSettings;
36+
import com.rabbitmq.client.amqp.Consumer;
37+
import com.rabbitmq.client.amqp.ConsumerBuilder;
38+
import com.rabbitmq.client.amqp.Management;
2839
import com.rabbitmq.client.amqp.ObservationCollector;
40+
import com.rabbitmq.client.amqp.Publisher;
41+
import com.rabbitmq.client.amqp.PublisherBuilder;
42+
import com.rabbitmq.client.amqp.RpcClient;
43+
import com.rabbitmq.client.amqp.RpcClientBuilder;
44+
import com.rabbitmq.client.amqp.RpcServer;
45+
import com.rabbitmq.client.amqp.RpcServerBuilder;
46+
import com.rabbitmq.client.amqp.impl.Tuples.Pair;
2947
import com.rabbitmq.client.amqp.impl.Utils.RunnableWithException;
3048
import com.rabbitmq.client.amqp.impl.Utils.StopWatch;
3149
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
@@ -89,7 +107,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
89107
private final List<RpcClient> rpcClients = new CopyOnWriteArrayList<>();
90108
private final List<RpcServer> rpcServers = new CopyOnWriteArrayList<>();
91109
private final TopologyListener topologyListener;
92-
private volatile EntityRecovery entityRecovery;
110+
private final EntityRecovery entityRecovery;
93111
private final AtomicBoolean recoveringConnection = new AtomicBoolean(false);
94112
private final DefaultConnectionSettings<?> connectionSettings;
95113
private final Supplier<SessionHandler> sessionHandlerSupplier;
@@ -118,7 +136,9 @@ final class AmqpConnection extends ResourceBase implements Connection {
118136
AmqpConnectionBuilder.AmqpRecoveryConfiguration recoveryConfiguration =
119137
builder.recoveryConfiguration();
120138

121-
this.topologyListener = createTopologyListener(builder);
139+
Pair<TopologyListener, EntityRecovery> topologyInfra = createTopologyInfrastructure(builder);
140+
this.topologyListener = topologyInfra.v1();
141+
this.entityRecovery = topologyInfra.v2();
122142

123143
Executor de =
124144
builder.dispatchingExecutor() == null
@@ -344,20 +364,25 @@ private static String extractNode(org.apache.qpid.protonj2.client.Connection con
344364
return node;
345365
}
346366

347-
TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
367+
Pair<TopologyListener, EntityRecovery> createTopologyInfrastructure(
368+
AmqpConnectionBuilder builder) {
348369
TopologyListener topologyListener;
370+
EntityRecovery entityRecovery;
349371
if (builder.recoveryConfiguration().topology()) {
350372
RecordingTopologyListener rtl =
351373
new RecordingTopologyListener(
352374
"topology-listener-connection-" + this.name(), this.environment.recoveryEventLoop());
353-
this.entityRecovery = new EntityRecovery(this, rtl);
375+
entityRecovery = new EntityRecovery(this, rtl);
354376
topologyListener = rtl;
355377
} else {
356378
topologyListener = TopologyListener.NO_OP;
379+
entityRecovery = null;
357380
}
358-
return builder.topologyListener() == null
359-
? topologyListener
360-
: TopologyListener.compose(List.of(builder.topologyListener(), topologyListener));
381+
topologyListener =
382+
builder.topologyListener() == null
383+
? topologyListener
384+
: TopologyListener.compose(List.of(builder.topologyListener(), topologyListener));
385+
return pair(topologyListener, entityRecovery);
361386
}
362387

363388
private BiConsumer<org.apache.qpid.protonj2.client.Connection, DisconnectionEvent>
@@ -455,12 +480,21 @@ private void recoverAfterConnectionFailure(
455480
LOGGER.debug("Reconnected '{}' to {}", this.name(), this.currentConnectionLabel());
456481
try {
457482
if (recoveryConfiguration.topology()) {
483+
boolean managementPreviouslyClosed = this.management.isClosed();
458484
this.management.init();
459485
LOGGER.debug("Recovering topology of connection '{}'...", this.name());
460486
this.recoverTopology();
461487
this.recoverConsumers();
462488
this.recoverPublishers();
463489
LOGGER.debug("Recovered topology of connection '{}'.", this.name());
490+
if (managementPreviouslyClosed) {
491+
LOGGER.debug("Management was closed before recovery, closing it again");
492+
try {
493+
this.closeManagement();
494+
} catch (Exception e) {
495+
LOGGER.info("Error while (re)closing management after recovery");
496+
}
497+
}
464498
}
465499
LOGGER.info(
466500
"Recovered connection '{}' to {}", this.name(), this.currentConnectionLabel());
@@ -924,6 +958,22 @@ public String toString() {
924958
return this.name();
925959
}
926960

961+
private void authenticate(String username, String password) {
962+
State state = this.state();
963+
if (state == OPEN) {
964+
LOGGER.debug("Setting new token for connection {}", this.name);
965+
long start = nanoTime();
966+
((AmqpManagement) management()).setToken(password);
967+
LOGGER.debug(
968+
"Set new token for connection {} in {} ms",
969+
this.name,
970+
ofNanos(nanoTime() - start).toMillis());
971+
} else {
972+
LOGGER.debug(
973+
"Could not set new token for connection {} because its state is {}", this.name(), state);
974+
}
975+
}
976+
927977
static class NativeConnectionWrapper {
928978

929979
private final org.apache.qpid.protonj2.client.Connection connection;

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
1818
package com.rabbitmq.client.amqp.impl;
1919

20-
import static com.rabbitmq.client.amqp.Resource.State.*;
20+
import static com.rabbitmq.client.amqp.Resource.State.CLOSED;
21+
import static com.rabbitmq.client.amqp.Resource.State.CLOSING;
22+
import static com.rabbitmq.client.amqp.Resource.State.OPEN;
2123
import static com.rabbitmq.client.amqp.impl.AmqpConsumerBuilder.*;
2224
import static com.rabbitmq.client.amqp.metrics.MetricsCollector.ConsumeDisposition.*;
2325
import static java.time.Duration.ofSeconds;

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,8 +220,7 @@ public void close() {
220220
"Management is initializing, retry closing later.");
221221
}
222222
if (this.closed.compareAndSet(false, true)) {
223-
this.state(CLOSED);
224-
this.releaseResources(null);
223+
this.releaseResources(null, CLOSED);
225224
if (this.receiver != null) {
226225
try {
227226
this.receiver.close();
@@ -293,6 +292,7 @@ void init() {
293292
this.receiver.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
294293
LOGGER.debug("Management receiver created ({}).", this);
295294
this.state(OPEN);
295+
this.closed.set(false);
296296
} catch (Exception e) {
297297
LOGGER.info("Error during management {} initialization: {}", cName, e.getMessage());
298298
throw ExceptionUtils.convert(e);
@@ -381,7 +381,15 @@ private void failRequests(AmqpException exception) {
381381
}
382382

383383
void releaseResources(AmqpException e) {
384-
this.markUnavailable();
384+
this.releaseResources(e, null);
385+
}
386+
387+
void releaseResources(AmqpException e, State state) {
388+
if (state == null) {
389+
this.markUnavailable();
390+
} else {
391+
this.state(state);
392+
}
385393
if (this.receiveLoop != null) {
386394
this.receiveLoop.cancel(true);
387395
this.receiveLoop = null;
@@ -872,4 +880,8 @@ public long messageCount() {
872880
return this.messageCount;
873881
}
874882
}
883+
884+
boolean isClosed() {
885+
return this.closed.get();
886+
}
875887
}

src/test/java/com/rabbitmq/client/amqp/impl/Tuples.java renamed to src/main/java/com/rabbitmq/client/amqp/impl/Tuples.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
1818
package com.rabbitmq.client.amqp.impl;
1919

20-
public final class Tuples {
20+
final class Tuples {
2121

2222
private Tuples() {}
2323

src/test/java/com/rabbitmq/client/amqp/impl/ManagementTest.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,22 @@ void receiveLoopShouldStopAfterBeingIdle() {
9797
assertThat(management.queueInfo(info2.name())).hasName(info2.name());
9898
}
9999

100+
@Test
101+
void getManagementFromConnectionAfterManagementHasBeenClosed() {
102+
AmqpManagement m1 = (AmqpManagement) connection.management();
103+
String q = m1.queue().exclusive(true).declare().name();
104+
assertThat(m1.queueInfo(q)).isEmpty();
105+
assertThat(m1.isClosed()).isFalse();
106+
m1.close();
107+
assertThat(m1.isClosed()).isTrue();
108+
assertThatThrownBy(() -> m1.queueInfo(q))
109+
.isInstanceOf(AmqpException.AmqpResourceClosedException.class);
110+
AmqpManagement m2 = (AmqpManagement) connection.management();
111+
assertThat(m2.isClosed()).isFalse();
112+
assertThat(m2.queueInfo(q)).isEmpty();
113+
assertThat(m2).isSameAs(m1);
114+
}
115+
100116
@ParameterizedTest
101117
@ValueSource(booleans = {true, false})
102118
@BrokerVersionAtLeast(RABBITMQ_4_1_0)

src/test/java/com/rabbitmq/client/amqp/impl/TopologyRecoveryTest.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import static org.assertj.core.api.Assertions.assertThatThrownBy;
2828

2929
import com.rabbitmq.client.amqp.*;
30+
import com.rabbitmq.client.amqp.AmqpException.AmqpResourceClosedException;
3031
import java.util.List;
3132
import java.util.Set;
3233
import java.util.concurrent.*;
@@ -656,7 +657,26 @@ void shouldRecoverEvenIfManagementIsClosed() {
656657
closeConnectionAndWaitForRecovery();
657658
publisher.publish(publisher.message(), ctx -> {});
658659
assertThat(consumeSync).completes();
659-
management.queueInfo(queueInfo.name());
660+
assertThatThrownBy(() -> management.queueInfo(queueInfo.name()))
661+
.isInstanceOf(AmqpResourceClosedException.class);
662+
assertThat(connection.management().queueInfo(queueInfo.name())).isEmpty();
663+
}
664+
}
665+
666+
@Test
667+
void managementShouldStayClosedAfterRecoveryIfClosedBefore() {
668+
try (Connection connection = connection()) {
669+
AmqpManagement management = (AmqpManagement) connection.management();
670+
671+
String q = management.queue().exclusive(true).declare().name();
672+
673+
management.close();
674+
675+
closeConnectionAndWaitForRecovery();
676+
assertThat(management.isClosed()).isTrue();
677+
assertThatThrownBy(() -> management.queueInfo(q))
678+
.isInstanceOf(AmqpResourceClosedException.class);
679+
assertThat(connection.management().queueInfo(q)).isEmpty();
660680
}
661681
}
662682

src/test/java/com/rabbitmq/client/amqp/oauth2/TokenCredentialsManagerTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
package com.rabbitmq.client.amqp.oauth2;
1919

2020
import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost;
21-
import static com.rabbitmq.client.amqp.impl.Tuples.pair;
2221
import static com.rabbitmq.client.amqp.oauth2.TokenCredentialsManager.DEFAULT_REFRESH_DELAY_STRATEGY;
22+
import static com.rabbitmq.client.amqp.oauth2.Tuples.pair;
2323
import static java.time.Duration.ofMillis;
2424
import static java.time.Duration.ofSeconds;
2525
import static java.util.stream.Collectors.toList;
@@ -30,7 +30,6 @@
3030
import com.rabbitmq.client.amqp.impl.Assertions;
3131
import com.rabbitmq.client.amqp.impl.TestUtils;
3232
import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
33-
import com.rabbitmq.client.amqp.impl.Tuples;
3433
import java.time.Duration;
3534
import java.time.Instant;
3635
import java.util.List;
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.client.amqp.oauth2;
19+
20+
final class Tuples {
21+
22+
private Tuples() {}
23+
24+
public static <A, B> Pair<A, B> pair(A v1, B v2) {
25+
return new Pair<>(v1, v2);
26+
}
27+
28+
public static class Pair<A, B> {
29+
30+
private final A v1;
31+
private final B v2;
32+
33+
private Pair(A v1, B v2) {
34+
this.v1 = v1;
35+
this.v2 = v2;
36+
}
37+
38+
public A v1() {
39+
return this.v1;
40+
}
41+
42+
public B v2() {
43+
return this.v2;
44+
}
45+
}
46+
}

0 commit comments

Comments
 (0)