@@ -90,52 +90,27 @@ final class AmqpConnection extends ResourceBase implements Connection {
90
90
91
91
if (recoveryConfiguration .activated ()) {
92
92
this .recoveryRequestQueue = new ArrayBlockingQueue <>(10 );
93
- Duration recoveryCheckPeriod = Duration .ofMillis (100L );
94
93
this .recoveryTask =
95
- this .scheduledExecutorService ()
96
- .scheduleAtFixedRate (
94
+ this .executorService ()
95
+ .submit (
97
96
() -> {
98
- Runnable recoveryTask = this .recoveryRequestQueue .poll ();
99
- if (recoveryTask != null ) {
100
- this .executorService ()
101
- .submit (
102
- () -> {
103
- try {
104
- LOGGER .debug (
105
- "Running recovery task for connection '{}'..." , this .name ());
106
- recoveryTask .run ();
107
- LOGGER .debug (
108
- "Recovery task for connection '{}' has run." , this .name ());
109
- } catch (Exception e ) {
110
- LOGGER .warn (
111
- "Error during connection recovery for '{}'" , this .name (), e );
112
- }
113
- });
97
+ LOGGER .debug ("Starting recovery loop for '{}'" , this .name ());
98
+ boolean keepGoing = true ;
99
+ while (keepGoing ) {
100
+ try {
101
+ Runnable recoveryTask = this .recoveryRequestQueue .take ();
102
+ LOGGER .debug ("Running recovery task for connection '{}'..." , this .name ());
103
+ recoveryTask .run ();
104
+ LOGGER .debug ("Recovery task for connection '{}' has run." , this .name ());
105
+ } catch (InterruptedException e ) {
106
+ LOGGER .info ("Recovery loop for '{}' has been interrupted." , this .name ());
107
+ keepGoing = false ;
108
+ } catch (Exception e ) {
109
+ LOGGER .warn ("Error during connection recovery for '{}'" , this .name (), e );
110
+ }
114
111
}
115
- },
116
- recoveryCheckPeriod .toMillis (),
117
- recoveryCheckPeriod .toMillis (),
118
- TimeUnit .MILLISECONDS );
119
- // this.recoveryLoop =
120
- // this.executorService()
121
- // .submit(
122
- // () -> {
123
- // while (!Thread.currentThread().isInterrupted()) {
124
- // try {
125
- // Runnable recoveryTask = this.recoveryRequestQueue.take();
126
- // LOGGER.debug("Running recovery task for connection '{}'...",
127
- // this.name());
128
- // recoveryTask.run();
129
- // LOGGER.debug("Recovery task for connection '{}' has run.",
130
- // this.name());
131
- // } catch (InterruptedException e) {
132
- // return;
133
- // } catch (Exception e) {
134
- // LOGGER.warn("Error during connection recovery for '{}'",
135
- // this.name(), e);
136
- // }
137
- // }
138
- // });
112
+ LOGGER .info ("Closing recovery loop for '{}'." , this .name ());
113
+ });
139
114
disconnectHandler = recoveryDisconnectHandler (recoveryConfiguration , builder .name ());
140
115
} else {
141
116
disconnectHandler =
@@ -171,7 +146,8 @@ final class AmqpConnection extends ResourceBase implements Connection {
171
146
this .affinity ,
172
147
this .environment .affinityCache (),
173
148
this .affinityStrategy ,
174
- ConnectionUtils .NO_RETRY_STRATEGY );
149
+ ConnectionUtils .NO_RETRY_STRATEGY ,
150
+ this .name ());
175
151
this .sync (ncw );
176
152
LOGGER .debug ("Opened connection '{}' on node '{}'." , this .name (), this .connectionNodename ());
177
153
this .state (OPEN );
@@ -259,17 +235,18 @@ private NativeConnectionWrapper connect(
259
235
Address address = connectionSettings .selectAddress (addresses );
260
236
StopWatch stopWatch = new StopWatch ();
261
237
try {
262
- LOGGER .trace ("Connecting..." );
238
+ LOGGER .trace ("Connecting '{}' to {} ..." , this . name (), address );
263
239
org .apache .qpid .protonj2 .client .Connection connection =
264
240
this .environment .client ().connect (address .host (), address .port (), connectionOptions );
241
+ LOGGER .debug ("Created native connection instance for '{}'" , this .name ());
265
242
ExceptionUtils .wrapGet (connection .openFuture ());
266
- LOGGER .trace ("Connection attempt succeeded" );
243
+ LOGGER .debug ("Connection attempt '{}' succeeded" , this . name () );
267
244
checkBrokerVersion (connection );
268
245
return new NativeConnectionWrapper (connection , extractNode (connection ), address );
269
246
} catch (ClientException e ) {
270
247
throw ExceptionUtils .convert (e );
271
248
} finally {
272
- LOGGER .trace ("Connection attempt took {}" , stopWatch .stop ());
249
+ LOGGER .debug ("Connection attempt for '{}' took {}" , this . name () , stopWatch .stop ());
273
250
}
274
251
}
275
252
@@ -302,7 +279,9 @@ private static String extractNode(org.apache.qpid.protonj2.client.Connection con
302
279
TopologyListener createTopologyListener (AmqpConnectionBuilder builder ) {
303
280
TopologyListener topologyListener ;
304
281
if (builder .recoveryConfiguration ().topology ()) {
305
- RecordingTopologyListener rtl = new RecordingTopologyListener (this .executorService ());
282
+ RecordingTopologyListener rtl =
283
+ new RecordingTopologyListener (
284
+ "topology-listener-connection-" + this .name (), this .executorService ());
306
285
this .entityRecovery = new EntityRecovery (this , rtl );
307
286
topologyListener = rtl ;
308
287
} else {
@@ -346,9 +325,12 @@ TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
346
325
this .name (),
347
326
exception .getMessage ());
348
327
this .recoveryRequestQueue .add (
349
- () ->
328
+ () -> {
329
+ if (!this .recoveringConnection .get ()) {
350
330
recoverAfterConnectionFailure (
351
- recoveryConfiguration , name , exception , resultReference ));
331
+ recoveryConfiguration , name , exception , resultReference );
332
+ }
333
+ });
352
334
} else {
353
335
LOGGER .debug (
354
336
"Not recovering connection '{}' for error {}" ,
@@ -364,7 +346,7 @@ TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
364
346
private void recoverAfterConnectionFailure (
365
347
AmqpConnectionBuilder .AmqpRecoveryConfiguration recoveryConfiguration ,
366
348
String connectionName ,
367
- AmqpException failureCause ,
349
+ Exception failureCause ,
368
350
AtomicReference <BiConsumer <org .apache .qpid .protonj2 .client .Connection , DisconnectionEvent >>
369
351
disconnectedHandlerReference ) {
370
352
LOGGER .info (
@@ -380,41 +362,59 @@ private void recoverAfterConnectionFailure(
380
362
this .connectionAddress = null ;
381
363
LOGGER .debug ("Releasing management resource of connection '{}'." , this .name ());
382
364
this .releaseManagementResources ();
383
- try {
384
- this .recoveringConnection .set (true );
385
- LOGGER .debug ("Connection attempt for '{}'." , this .name ());
386
- NativeConnectionWrapper ncw =
387
- recoverNativeConnection (
388
- recoveryConfiguration , connectionName , disconnectedHandlerReference );
389
- this .sync (ncw );
390
- LOGGER .debug ("Reconnected '{}' to {}" , this .name (), this .currentConnectionLabel ());
391
- } catch (Exception ex ) {
392
- if (ex instanceof InterruptedException ) {
393
- Thread .currentThread ().interrupt ();
365
+ if (this .recoveringConnection .compareAndSet (false , true )) {
366
+ try {
367
+ this .recoveringConnection .set (true );
368
+ LOGGER .debug ("Connection attempt for '{}'." , this .name ());
369
+ NativeConnectionWrapper ncw =
370
+ recoverNativeConnection (
371
+ recoveryConfiguration , connectionName , disconnectedHandlerReference );
372
+ this .sync (ncw );
373
+ LOGGER .debug ("Reconnected '{}' to {}" , this .name (), this .currentConnectionLabel ());
374
+ } catch (Exception ex ) {
375
+ if (ex instanceof InterruptedException ) {
376
+ Thread .currentThread ().interrupt ();
377
+ }
378
+ this .close (ex );
379
+ return ;
380
+ } finally {
381
+ this .recoveringConnection .set (false );
394
382
}
395
- this .closed .set (true );
396
- this .changeStateOfPublishers (CLOSED , ex );
397
- this .changeStateOfConsumers (CLOSED , ex );
398
- this .state (CLOSED , ex );
383
+ } else {
384
+ LOGGER .debug ("Connection '{}' already recovering, returning." , this .name ());
399
385
return ;
400
- } finally {
401
- this .recoveringConnection .set (false );
402
386
}
403
387
404
388
try {
405
389
if (recoveryConfiguration .topology ()) {
390
+ this .management .init ();
406
391
LOGGER .debug ("Recovering topology of connection '{}'..." , this .name ());
407
392
this .recoverTopology ();
408
393
this .recoverConsumers ();
409
394
this .recoverPublishers ();
410
395
LOGGER .debug ("Recovered topology of connection '{}'." , this .name ());
411
396
}
412
-
413
- LOGGER .info ("Recovered connection to {}" , this .currentConnectionLabel ());
397
+ LOGGER .info ("Recovered connection '{}' to {}" , this .name (), this .currentConnectionLabel ());
414
398
this .state (OPEN );
415
399
} catch (Exception ex ) {
416
400
// likely InterruptedException or IO exception
417
- LOGGER .info ("Error while trying to recover connection" , ex );
401
+ LOGGER .info (
402
+ "Error while trying to recover topology for connection '{}': {}" ,
403
+ this .name (),
404
+ ex .getMessage ());
405
+ if (RECOVERY_PREDICATE .test (ex )) {
406
+ LOGGER .debug (
407
+ "Error during topology recoverable, queueing recovery task for '{}', error is {}" ,
408
+ this .name (),
409
+ ex .getMessage ());
410
+ this .recoveryRequestQueue .add (
411
+ () -> {
412
+ if (!this .recoveringConnection .get ()) {
413
+ recoverAfterConnectionFailure (
414
+ recoveryConfiguration , name , ex , disconnectedHandlerReference );
415
+ }
416
+ });
417
+ }
418
418
}
419
419
}
420
420
@@ -428,10 +428,14 @@ private NativeConnectionWrapper recoverNativeConnection(
428
428
while (true ) {
429
429
Duration delay = recoveryConfiguration .backOffDelayPolicy ().delay (attempt );
430
430
if (BackOffDelayPolicy .TIMEOUT .equals (delay )) {
431
+ LOGGER .debug ("Reached timeout to recover '{}'" , this .name ());
431
432
throw new AmqpException ("Recovery retry timed out" );
432
433
} else {
433
434
try {
435
+ LOGGER .debug (
436
+ "Waiting for {} before trying to recover connection '{}'" , delay , this .name ());
434
437
Thread .sleep (delay .toMillis ());
438
+ LOGGER .debug ("Done waiting for '{}' recovery." , this .name ());
435
439
} catch (InterruptedException ex ) {
436
440
Thread .currentThread ().interrupt ();
437
441
LOGGER .info ("Thread interrupted while waiting during connection recovery" );
@@ -440,38 +444,40 @@ private NativeConnectionWrapper recoverNativeConnection(
440
444
}
441
445
442
446
try {
443
- NativeConnectionWrapper result =
444
- ConnectionUtils .enforceAffinity (
445
- addrs -> {
446
- NativeConnectionWrapper wrapper =
447
- connect (
448
- this .connectionSettings ,
449
- connectionName ,
450
- disconnectedHandlerReference .get (),
451
- addrs );
452
- this .nativeConnection = wrapper .connection ();
453
- return wrapper ;
454
- },
455
- this .management ,
456
- this .affinity ,
457
- this .environment .affinityCache (),
458
- this .affinityStrategy ,
459
- new ConnectionUtils .RetryStrategy () {
460
- @ Override
461
- public <T > T maybeRetry (Supplier <T > task ) {
462
- return RetryUtils .callAndMaybeRetry (
463
- task ::get ,
464
- e -> true ,
465
- recoveryConfiguration .backOffDelayPolicy (),
466
- "Connection affinity operation" );
467
- }
468
- });
469
- return result ;
447
+ LOGGER . debug ( "Trying to create native connection for '{}'." , this . name ());
448
+ return ConnectionUtils .enforceAffinity (
449
+ addrs -> {
450
+ NativeConnectionWrapper wrapper =
451
+ connect (
452
+ this .connectionSettings ,
453
+ connectionName ,
454
+ disconnectedHandlerReference .get (),
455
+ addrs );
456
+ this .nativeConnection = wrapper .connection ();
457
+ return wrapper ;
458
+ },
459
+ this .management ,
460
+ this .affinity ,
461
+ this .environment .affinityCache (),
462
+ this .affinityStrategy ,
463
+ new ConnectionUtils .RetryStrategy () {
464
+ @ Override
465
+ public <T > T maybeRetry (Supplier <T > task ) {
466
+ return RetryUtils .callAndMaybeRetry (
467
+ task ::get ,
468
+ e -> true ,
469
+ recoveryConfiguration .backOffDelayPolicy (),
470
+ "Connection affinity operation" );
471
+ }
472
+ },
473
+ this . name ()) ;
470
474
} catch (Exception ex ) {
471
475
LOGGER .info ("Error while trying to recover connection" , ex );
472
476
if (!RECOVERY_PREDICATE .test (ex )) {
473
477
LOGGER .info (
474
- "Stopping connection recovery, exception is not recoverable: {}" , ex .getMessage ());
478
+ "Stopping connection '{}' recovery, exception is not recoverable: {}" ,
479
+ this .name (),
480
+ ex .getMessage ());
475
481
throw new AmqpException ("Could not recover connection after fatal exception" , ex );
476
482
}
477
483
}
@@ -721,7 +727,10 @@ private void close(Throwable cause) {
721
727
consumer .close ();
722
728
}
723
729
try {
724
- this .nativeConnection .close ();
730
+ org .apache .qpid .protonj2 .client .Connection nc = this .nativeConnection ;
731
+ if (nc != null ) {
732
+ nc .close ();
733
+ }
725
734
} catch (Exception e ) {
726
735
LOGGER .warn ("Error while closing native connection" , e );
727
736
}
0 commit comments