Skip to content

Commit 764cf0d

Browse files
committed
Schedule recovery probing
1 parent 0bcf300 commit 764cf0d

File tree

1 file changed

+48
-19
lines changed

1 file changed

+48
-19
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
6363
private final List<RpcServer> rpcServers = new CopyOnWriteArrayList<>();
6464
private final TopologyListener topologyListener;
6565
private volatile EntityRecovery entityRecovery;
66-
private final Future<?> recoveryLoop;
66+
private final Future<?> recoveryTask;
6767
private final BlockingQueue<Runnable> recoveryRequestQueue;
6868
private final AtomicBoolean recoveringConnection = new AtomicBoolean(false);
6969
private final DefaultConnectionSettings<?> connectionSettings;
@@ -90,23 +90,52 @@ final class AmqpConnection extends ResourceBase implements Connection {
9090

9191
if (recoveryConfiguration.activated()) {
9292
this.recoveryRequestQueue = new ArrayBlockingQueue<>(10);
93-
this.recoveryLoop =
94-
this.executorService()
95-
.submit(
93+
Duration recoveryCheckPeriod = Duration.ofMillis(100L);
94+
this.recoveryTask =
95+
this.scheduledExecutorService()
96+
.scheduleAtFixedRate(
9697
() -> {
97-
while (!Thread.currentThread().isInterrupted()) {
98-
try {
99-
Runnable recoveryTask = this.recoveryRequestQueue.take();
100-
LOGGER.debug("Running recovery task for connection '{}'...", this.name());
101-
recoveryTask.run();
102-
LOGGER.debug("Recovery task for connection '{}' has run.", this.name());
103-
} catch (InterruptedException e) {
104-
return;
105-
} catch (Exception e) {
106-
LOGGER.warn("Error during connection recovery for '{}'", this.name(), e);
107-
}
98+
Runnable recoveryTask = this.recoveryRequestQueue.poll();
99+
if (recoveryTask != null) {
100+
this.executorService()
101+
.submit(
102+
() -> {
103+
try {
104+
LOGGER.debug(
105+
"Running recovery task for connection '{}'...", this.name());
106+
recoveryTask.run();
107+
LOGGER.debug(
108+
"Recovery task for connection '{}' has run.", this.name());
109+
} catch (Exception e) {
110+
LOGGER.warn(
111+
"Error during connection recovery for '{}'", this.name(), e);
112+
}
113+
});
108114
}
109-
});
115+
},
116+
recoveryCheckPeriod.toMillis(),
117+
recoveryCheckPeriod.toMillis(),
118+
TimeUnit.MILLISECONDS);
119+
// this.recoveryLoop =
120+
// this.executorService()
121+
// .submit(
122+
// () -> {
123+
// while (!Thread.currentThread().isInterrupted()) {
124+
// try {
125+
// Runnable recoveryTask = this.recoveryRequestQueue.take();
126+
// LOGGER.debug("Running recovery task for connection '{}'...",
127+
// this.name());
128+
// recoveryTask.run();
129+
// LOGGER.debug("Recovery task for connection '{}' has run.",
130+
// this.name());
131+
// } catch (InterruptedException e) {
132+
// return;
133+
// } catch (Exception e) {
134+
// LOGGER.warn("Error during connection recovery for '{}'",
135+
// this.name(), e);
136+
// }
137+
// }
138+
// });
110139
disconnectHandler = recoveryDisconnectHandler(recoveryConfiguration, builder.name());
111140
} else {
112141
disconnectHandler =
@@ -116,7 +145,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
116145
this.close(failureCause);
117146
};
118147
this.recoveryRequestQueue = null;
119-
this.recoveryLoop = null;
148+
this.recoveryTask = null;
120149
}
121150
if (this.connectionSettings.affinity().activated()) {
122151
this.affinity =
@@ -668,8 +697,8 @@ private void close(Throwable cause) {
668697
if (this.closed.compareAndSet(false, true)) {
669698
this.state(CLOSING, cause);
670699
this.environment.removeConnection(this);
671-
if (this.recoveryLoop != null) {
672-
this.recoveryLoop.cancel(true);
700+
if (this.recoveryTask != null) {
701+
this.recoveryTask.cancel(true);
673702
}
674703
if (this.topologyListener instanceof AutoCloseable) {
675704
try {

0 commit comments

Comments
 (0)