Skip to content

Commit f18c1ad

Browse files
committed
Stops Recovery When Client Closes
Adds a boolean flag, manuallyClosed to keep track of whether or not the AutoRecoveringConnection has been told to close by the application. If flag is set before recovery attempt, autorecovery fails out. If flag is set mid recovery attempt, creates the new connection, then immediately aborts it.
1 parent 6cd0fda commit f18c1ad

File tree

1 file changed

+64
-16
lines changed

1 file changed

+64
-16
lines changed

src/com/rabbitmq/client/impl/recovery/AutorecoveringConnection.java

Lines changed: 64 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,14 @@ public class AutorecoveringConnection implements Connection, Recoverable, Networ
6767
private final Map<String, RecordedConsumer> consumers = new ConcurrentHashMap<String, RecordedConsumer>();
6868
private final List<ConsumerRecoveryListener> consumerRecoveryListeners = new ArrayList<ConsumerRecoveryListener>();
6969
private final List<QueueRecoveryListener> queueRecoveryListeners = new ArrayList<QueueRecoveryListener>();
70-
70+
71+
// Used to block connection recovery attempts after close() is invoked.
72+
private volatile boolean manuallyClosed = false;
73+
74+
// This lock guards the manuallyClosed flag and the delegate connection. Guarding these two ensures that a new connection can never
75+
// be created after application code has initiated shutdown.
76+
private Object recoveryLock = new Object();
77+
7178
public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, Address[] addrs) {
7279
this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addrs);
7380
this.params = params;
@@ -181,48 +188,69 @@ public boolean isOpen() {
181188
* @see com.rabbitmq.client.Connection#close()
182189
*/
183190
public void close() throws IOException {
191+
synchronized(recoveryLock) {
192+
this.manuallyClosed = true;
193+
}
184194
delegate.close();
185195
}
186196

187197
/**
188198
* @see Connection#close(int)
189199
*/
190200
public void close(int timeout) throws IOException {
201+
synchronized(recoveryLock) {
202+
this.manuallyClosed = true;
203+
}
191204
delegate.close(timeout);
192205
}
193206

194207
/**
195208
* @see Connection#close(int, String, int)
196209
*/
197210
public void close(int closeCode, String closeMessage, int timeout) throws IOException {
211+
synchronized(recoveryLock) {
212+
this.manuallyClosed = true;
213+
}
198214
delegate.close(closeCode, closeMessage, timeout);
199215
}
200216

201217
/**
202218
* @see com.rabbitmq.client.Connection#abort()
203219
*/
204220
public void abort() {
221+
synchronized(recoveryLock) {
222+
this.manuallyClosed = true;
223+
}
205224
delegate.abort();
206225
}
207226

208227
/**
209228
* @see Connection#abort(int, String, int)
210229
*/
211230
public void abort(int closeCode, String closeMessage, int timeout) {
231+
synchronized(recoveryLock) {
232+
this.manuallyClosed = true;
233+
}
212234
delegate.abort(closeCode, closeMessage, timeout);
213235
}
214236

215237
/**
216238
* @see Connection#abort(int, String)
217239
*/
218240
public void abort(int closeCode, String closeMessage) {
241+
synchronized(recoveryLock) {
242+
this.manuallyClosed = true;
243+
}
219244
delegate.abort(closeCode, closeMessage);
220245
}
221246

222247
/**
223248
* @see Connection#abort(int)
224249
*/
225250
public void abort(int timeout) {
251+
synchronized(recoveryLock) {
252+
this.manuallyClosed = true;
253+
}
226254
delegate.abort(timeout);
227255
}
228256

@@ -261,7 +289,10 @@ public void clearBlockedListeners() {
261289
* @see com.rabbitmq.client.Connection#close(int, String)
262290
*/
263291
public void close(int closeCode, String closeMessage) throws IOException {
264-
delegate.close(closeCode, closeMessage);
292+
synchronized(recoveryLock) {
293+
this.manuallyClosed = true;
294+
}
295+
delegate.close(closeCode, closeMessage);
265296
}
266297

267298
/**
@@ -404,16 +435,18 @@ public void removeConsumerRecoveryListener(ConsumerRecoveryListener listener) {
404435

405436
synchronized private void beginAutomaticRecovery() throws InterruptedException, IOException, TopologyRecoveryException {
406437
Thread.sleep(this.params.getNetworkRecoveryInterval());
407-
this.recoverConnection();
408-
this.recoverShutdownListeners();
409-
this.recoverBlockedListeners();
410-
this.recoverChannels();
411-
if(this.params.isTopologyRecoveryEnabled()) {
412-
this.recoverEntities();
413-
this.recoverConsumers();
414-
}
438+
if (!this.recoverConnection())
439+
return;
440+
441+
this.recoverShutdownListeners();
442+
this.recoverBlockedListeners();
443+
this.recoverChannels();
444+
if(this.params.isTopologyRecoveryEnabled()) {
445+
this.recoverEntities();
446+
this.recoverConsumers();
447+
}
415448

416-
this.notifyRecoveryListeners();
449+
this.notifyRecoveryListeners();
417450
}
418451

419452
private void recoverShutdownListeners() {
@@ -428,18 +461,33 @@ private void recoverBlockedListeners() {
428461
}
429462
}
430463

431-
private void recoverConnection() throws IOException, InterruptedException {
432-
boolean recovering = true;
433-
while (recovering) {
464+
// Returns true if the connection was recovered,
465+
// false if application initiated shutdown while attempting recovery.
466+
private boolean recoverConnection() throws IOException, InterruptedException {
467+
while (!manuallyClosed)
468+
{
434469
try {
435-
this.delegate = this.cf.newConnection();
436-
recovering = false;
470+
RecoveryAwareAMQConnection newConn = this.cf.newConnection();
471+
synchronized(recoveryLock) {
472+
if (!manuallyClosed) {
473+
// This is the standard case.
474+
this.delegate = newConn;
475+
return true;
476+
}
477+
}
478+
// This is the once in a blue moon case.
479+
// Application code just called close as the connection
480+
// was being re-established. So we attempt to close the newly created connection.
481+
newConn.abort();
482+
return false;
437483
} catch (Exception e) {
438484
// TODO: exponential back-off
439485
Thread.sleep(this.params.getNetworkRecoveryInterval());
440486
this.getExceptionHandler().handleConnectionRecoveryException(this, e);
441487
}
442488
}
489+
490+
return false;
443491
}
444492

445493
private void recoverChannels() {

0 commit comments

Comments
 (0)