Skip to content

Keep management in same initial state after recovery #169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 59 additions & 9 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,33 @@
// [email protected].
package com.rabbitmq.client.amqp.impl;

import static com.rabbitmq.client.amqp.Resource.State.*;
import static com.rabbitmq.client.amqp.Resource.State.CLOSED;
import static com.rabbitmq.client.amqp.Resource.State.CLOSING;
import static com.rabbitmq.client.amqp.Resource.State.OPEN;
import static com.rabbitmq.client.amqp.Resource.State.OPENING;
import static com.rabbitmq.client.amqp.Resource.State.RECOVERING;
import static com.rabbitmq.client.amqp.impl.ExceptionUtils.convert;
import static com.rabbitmq.client.amqp.impl.Tuples.pair;
import static com.rabbitmq.client.amqp.impl.Utils.supportFilterExpressions;
import static com.rabbitmq.client.amqp.impl.Utils.supportSetToken;
import static java.lang.System.nanoTime;
import static java.time.Duration.ofNanos;

import com.rabbitmq.client.amqp.*;
import com.rabbitmq.client.amqp.Address;
import com.rabbitmq.client.amqp.AmqpException;
import com.rabbitmq.client.amqp.Connection;
import com.rabbitmq.client.amqp.ConnectionSettings;
import com.rabbitmq.client.amqp.Consumer;
import com.rabbitmq.client.amqp.ConsumerBuilder;
import com.rabbitmq.client.amqp.Management;
import com.rabbitmq.client.amqp.ObservationCollector;
import com.rabbitmq.client.amqp.Publisher;
import com.rabbitmq.client.amqp.PublisherBuilder;
import com.rabbitmq.client.amqp.RpcClient;
import com.rabbitmq.client.amqp.RpcClientBuilder;
import com.rabbitmq.client.amqp.RpcServer;
import com.rabbitmq.client.amqp.RpcServerBuilder;
import com.rabbitmq.client.amqp.impl.Tuples.Pair;
import com.rabbitmq.client.amqp.impl.Utils.RunnableWithException;
import com.rabbitmq.client.amqp.impl.Utils.StopWatch;
import com.rabbitmq.client.amqp.metrics.MetricsCollector;
Expand Down Expand Up @@ -89,7 +107,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
private final List<RpcClient> rpcClients = new CopyOnWriteArrayList<>();
private final List<RpcServer> rpcServers = new CopyOnWriteArrayList<>();
private final TopologyListener topologyListener;
private volatile EntityRecovery entityRecovery;
private final EntityRecovery entityRecovery;
private final AtomicBoolean recoveringConnection = new AtomicBoolean(false);
private final DefaultConnectionSettings<?> connectionSettings;
private final Supplier<SessionHandler> sessionHandlerSupplier;
Expand Down Expand Up @@ -118,7 +136,9 @@ final class AmqpConnection extends ResourceBase implements Connection {
AmqpConnectionBuilder.AmqpRecoveryConfiguration recoveryConfiguration =
builder.recoveryConfiguration();

this.topologyListener = createTopologyListener(builder);
Pair<TopologyListener, EntityRecovery> topologyInfra = createTopologyInfrastructure(builder);
this.topologyListener = topologyInfra.v1();
this.entityRecovery = topologyInfra.v2();

Executor de =
builder.dispatchingExecutor() == null
Expand Down Expand Up @@ -344,20 +364,25 @@ private static String extractNode(org.apache.qpid.protonj2.client.Connection con
return node;
}

TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
Pair<TopologyListener, EntityRecovery> createTopologyInfrastructure(
AmqpConnectionBuilder builder) {
TopologyListener topologyListener;
EntityRecovery entityRecovery;
if (builder.recoveryConfiguration().topology()) {
RecordingTopologyListener rtl =
new RecordingTopologyListener(
"topology-listener-connection-" + this.name(), this.environment.recoveryEventLoop());
this.entityRecovery = new EntityRecovery(this, rtl);
entityRecovery = new EntityRecovery(this, rtl);
topologyListener = rtl;
} else {
topologyListener = TopologyListener.NO_OP;
entityRecovery = null;
}
return builder.topologyListener() == null
? topologyListener
: TopologyListener.compose(List.of(builder.topologyListener(), topologyListener));
topologyListener =
builder.topologyListener() == null
? topologyListener
: TopologyListener.compose(List.of(builder.topologyListener(), topologyListener));
return pair(topologyListener, entityRecovery);
}

private BiConsumer<org.apache.qpid.protonj2.client.Connection, DisconnectionEvent>
Expand Down Expand Up @@ -455,12 +480,21 @@ private void recoverAfterConnectionFailure(
LOGGER.debug("Reconnected '{}' to {}", this.name(), this.currentConnectionLabel());
try {
if (recoveryConfiguration.topology()) {
boolean managementPreviouslyClosed = this.management.isClosed();
this.management.init();
LOGGER.debug("Recovering topology of connection '{}'...", this.name());
this.recoverTopology();
this.recoverConsumers();
this.recoverPublishers();
LOGGER.debug("Recovered topology of connection '{}'.", this.name());
if (managementPreviouslyClosed) {
LOGGER.debug("Management was closed before recovery, closing it again");
try {
this.closeManagement();
} catch (Exception e) {
LOGGER.info("Error while (re)closing management after recovery");
}
}
}
LOGGER.info(
"Recovered connection '{}' to {}", this.name(), this.currentConnectionLabel());
Expand Down Expand Up @@ -924,6 +958,22 @@ public String toString() {
return this.name();
}

private void authenticate(String username, String password) {
State state = this.state();
if (state == OPEN) {
LOGGER.debug("Setting new token for connection {}", this.name);
long start = nanoTime();
((AmqpManagement) management()).setToken(password);
LOGGER.debug(
"Set new token for connection {} in {} ms",
this.name,
ofNanos(nanoTime() - start).toMillis());
} else {
LOGGER.debug(
"Could not set new token for connection {} because its state is {}", this.name(), state);
}
}

static class NativeConnectionWrapper {

private final org.apache.qpid.protonj2.client.Connection connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
// [email protected].
package com.rabbitmq.client.amqp.impl;

import static com.rabbitmq.client.amqp.Resource.State.*;
import static com.rabbitmq.client.amqp.Resource.State.CLOSED;
import static com.rabbitmq.client.amqp.Resource.State.CLOSING;
import static com.rabbitmq.client.amqp.Resource.State.OPEN;
import static com.rabbitmq.client.amqp.impl.AmqpConsumerBuilder.*;
import static com.rabbitmq.client.amqp.metrics.MetricsCollector.ConsumeDisposition.*;
import static java.time.Duration.ofSeconds;
Expand Down
18 changes: 15 additions & 3 deletions src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,7 @@ public void close() {
"Management is initializing, retry closing later.");
}
if (this.closed.compareAndSet(false, true)) {
this.state(CLOSED);
this.releaseResources(null);
this.releaseResources(null, CLOSED);
if (this.receiver != null) {
try {
this.receiver.close();
Expand Down Expand Up @@ -293,6 +292,7 @@ void init() {
this.receiver.openFuture().get(this.rpcTimeout.toMillis(), MILLISECONDS);
LOGGER.debug("Management receiver created ({}).", this);
this.state(OPEN);
this.closed.set(false);
} catch (Exception e) {
LOGGER.info("Error during management {} initialization: {}", cName, e.getMessage());
throw ExceptionUtils.convert(e);
Expand Down Expand Up @@ -381,7 +381,15 @@ private void failRequests(AmqpException exception) {
}

void releaseResources(AmqpException e) {
this.markUnavailable();
this.releaseResources(e, null);
}

void releaseResources(AmqpException e, State state) {
if (state == null) {
this.markUnavailable();
} else {
this.state(state);
}
if (this.receiveLoop != null) {
this.receiveLoop.cancel(true);
this.receiveLoop = null;
Expand Down Expand Up @@ -872,4 +880,8 @@ public long messageCount() {
return this.messageCount;
}
}

boolean isClosed() {
return this.closed.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// [email protected].
package com.rabbitmq.client.amqp.impl;

public final class Tuples {
final class Tuples {

private Tuples() {}

Expand Down
16 changes: 16 additions & 0 deletions src/test/java/com/rabbitmq/client/amqp/impl/ManagementTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,22 @@ void receiveLoopShouldStopAfterBeingIdle() {
assertThat(management.queueInfo(info2.name())).hasName(info2.name());
}

@Test
void getManagementFromConnectionAfterManagementHasBeenClosed() {
AmqpManagement m1 = (AmqpManagement) connection.management();
String q = m1.queue().exclusive(true).declare().name();
assertThat(m1.queueInfo(q)).isEmpty();
assertThat(m1.isClosed()).isFalse();
m1.close();
assertThat(m1.isClosed()).isTrue();
assertThatThrownBy(() -> m1.queueInfo(q))
.isInstanceOf(AmqpException.AmqpResourceClosedException.class);
AmqpManagement m2 = (AmqpManagement) connection.management();
assertThat(m2.isClosed()).isFalse();
assertThat(m2.queueInfo(q)).isEmpty();
assertThat(m2).isSameAs(m1);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
@BrokerVersionAtLeast(RABBITMQ_4_1_0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import com.rabbitmq.client.amqp.*;
import com.rabbitmq.client.amqp.AmqpException.AmqpResourceClosedException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
Expand Down Expand Up @@ -656,7 +657,26 @@ void shouldRecoverEvenIfManagementIsClosed() {
closeConnectionAndWaitForRecovery();
publisher.publish(publisher.message(), ctx -> {});
assertThat(consumeSync).completes();
management.queueInfo(queueInfo.name());
assertThatThrownBy(() -> management.queueInfo(queueInfo.name()))
.isInstanceOf(AmqpResourceClosedException.class);
assertThat(connection.management().queueInfo(queueInfo.name())).isEmpty();
}
}

@Test
void managementShouldStayClosedAfterRecoveryIfClosedBefore() {
try (Connection connection = connection()) {
AmqpManagement management = (AmqpManagement) connection.management();

String q = management.queue().exclusive(true).declare().name();

management.close();

closeConnectionAndWaitForRecovery();
assertThat(management.isClosed()).isTrue();
assertThatThrownBy(() -> management.queueInfo(q))
.isInstanceOf(AmqpResourceClosedException.class);
assertThat(connection.management().queueInfo(q)).isEmpty();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package com.rabbitmq.client.amqp.oauth2;

import static com.rabbitmq.client.amqp.impl.TestUtils.waitAtMost;
import static com.rabbitmq.client.amqp.impl.Tuples.pair;
import static com.rabbitmq.client.amqp.oauth2.TokenCredentialsManager.DEFAULT_REFRESH_DELAY_STRATEGY;
import static com.rabbitmq.client.amqp.oauth2.Tuples.pair;
import static java.time.Duration.ofMillis;
import static java.time.Duration.ofSeconds;
import static java.util.stream.Collectors.toList;
Expand All @@ -30,7 +30,6 @@
import com.rabbitmq.client.amqp.impl.Assertions;
import com.rabbitmq.client.amqp.impl.TestUtils;
import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
import com.rabbitmq.client.amqp.impl.Tuples;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
Expand Down
46 changes: 46 additions & 0 deletions src/test/java/com/rabbitmq/client/amqp/oauth2/Tuples.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// If you have any questions regarding licensing, please contact us at
// [email protected].
package com.rabbitmq.client.amqp.oauth2;

final class Tuples {

private Tuples() {}

public static <A, B> Pair<A, B> pair(A v1, B v2) {
return new Pair<>(v1, v2);
}

public static class Pair<A, B> {

private final A v1;
private final B v2;

private Pair(A v1, B v2) {
this.v1 = v1;
this.v2 = v2;
}

public A v1() {
return this.v1;
}

public B v2() {
return this.v2;
}
}
}