@@ -70,10 +70,12 @@ final class AmqpConnection extends ResourceBase implements Connection {
70
70
private final Supplier <SessionHandler > sessionHandlerSupplier ;
71
71
private final ConnectionUtils .AffinityContext affinity ;
72
72
private final ConnectionSettings .AffinityStrategy affinityStrategy ;
73
+ private final String name ;
73
74
74
75
AmqpConnection (AmqpConnectionBuilder builder ) {
75
76
super (builder .listeners ());
76
77
this .id = ID_SEQUENCE .getAndIncrement ();
78
+ this .name = builder .name ();
77
79
this .environment = builder .environment ();
78
80
this .connectionSettings = builder .connectionSettings ().consolidate ();
79
81
this .sessionHandlerSupplier =
@@ -95,11 +97,13 @@ final class AmqpConnection extends ResourceBase implements Connection {
95
97
while (!Thread .currentThread ().isInterrupted ()) {
96
98
try {
97
99
Runnable recoveryTask = this .recoveryRequestQueue .take ();
100
+ LOGGER .debug ("Running recovery task for connection '{}'..." , this .name ());
98
101
recoveryTask .run ();
102
+ LOGGER .debug ("Recovery task for connection '{}' has run." , this .name ());
99
103
} catch (InterruptedException e ) {
100
104
return ;
101
105
} catch (Exception e ) {
102
- LOGGER .warn ("Error during connection recovery" , e );
106
+ LOGGER .warn ("Error during connection recovery for '{}'" , this . name () , e );
103
107
}
104
108
}
105
109
});
@@ -125,6 +129,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
125
129
this .affinityStrategy = null ;
126
130
}
127
131
this .management = createManagement ();
132
+ LOGGER .debug ("Opening native connection for connection '{}'..." , this .name ());
128
133
NativeConnectionWrapper ncw =
129
134
ConnectionUtils .enforceAffinity (
130
135
addrs -> {
@@ -139,6 +144,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
139
144
this .affinityStrategy ,
140
145
ConnectionUtils .NO_RETRY_STRATEGY );
141
146
this .sync (ncw );
147
+ LOGGER .debug ("Opened connection '{}' on node '{}'." , this .name (), this .connectionNodename ());
142
148
this .state (OPEN );
143
149
this .environment .metricsCollector ().openConnection ();
144
150
}
@@ -286,29 +292,36 @@ TopologyListener createTopologyListener(AmqpConnectionBuilder builder) {
286
292
BiConsumer <org .apache .qpid .protonj2 .client .Connection , DisconnectionEvent > result =
287
293
(conn , event ) -> {
288
294
ClientIOException ioex = event .failureCause ();
289
- LOGGER .debug ("Disconnect handler, error is the following:" , ioex );
295
+ LOGGER .debug ("Disconnect handler, error is the following: {} " , ioex . getMessage () );
290
296
if (this .state () == OPENING ) {
291
297
LOGGER .debug ("Connection is still opening, disconnect handler skipped" );
292
298
// the broker is not available when opening the connection
293
299
// nothing to do in this listener
294
300
return ;
295
301
}
296
302
if (this .recoveringConnection .get ()) {
297
- LOGGER .debug ("Filtering recovery task enqueueing, connection recovery in progress" );
303
+ LOGGER .debug (
304
+ "Filtering recovery task enqueueing, connection recovery of '{}' in progress" ,
305
+ this .name ());
298
306
return ;
299
307
}
300
308
AmqpException exception = ExceptionUtils .convert (event .failureCause ());
301
309
LOGGER .debug ("Converted native exception to {}" , exception .getClass ().getSimpleName ());
302
310
303
311
if (RECOVERY_PREDICATE .test (exception ) && this .state () != OPENING ) {
304
- LOGGER .debug ("Queueing recovery task, error is {}" , exception .getMessage ());
312
+ LOGGER .debug (
313
+ "Queueing recovery task for '{}', error is {}" ,
314
+ this .name (),
315
+ exception .getMessage ());
305
316
this .recoveryRequestQueue .add (
306
317
() ->
307
318
recoverAfterConnectionFailure (
308
319
recoveryConfiguration , name , exception , resultReference ));
309
320
} else {
310
321
LOGGER .debug (
311
- "Not recovering connection for error {}" , event .failureCause ().getMessage ());
322
+ "Not recovering connection '{}' for error {}" ,
323
+ this .name (),
324
+ event .failureCause ().getMessage ());
312
325
}
313
326
};
314
327
@@ -323,7 +336,9 @@ private void recoverAfterConnectionFailure(
323
336
AtomicReference <BiConsumer <org .apache .qpid .protonj2 .client .Connection , DisconnectionEvent >>
324
337
disconnectedHandlerReference ) {
325
338
LOGGER .info (
326
- "Connection to {} has been disconnected, trying to recover" , this .currentConnectionLabel ());
339
+ "Connection '{}' to '{}' has been disconnected, trying to recover" ,
340
+ this .name (),
341
+ this .currentConnectionLabel ());
327
342
this .state (RECOVERING , failureCause );
328
343
this .changeStateOfPublishers (RECOVERING , failureCause );
329
344
this .changeStateOfConsumers (RECOVERING , failureCause );
@@ -337,7 +352,7 @@ private void recoverAfterConnectionFailure(
337
352
recoverNativeConnection (
338
353
recoveryConfiguration , connectionName , disconnectedHandlerReference );
339
354
this .sync (ncw );
340
- LOGGER .debug ("Reconnected to {}" , this .currentConnectionLabel ());
355
+ LOGGER .debug ("Reconnected '{}' to {}" , this . name () , this .currentConnectionLabel ());
341
356
} catch (Exception ex ) {
342
357
if (ex instanceof InterruptedException ) {
343
358
Thread .currentThread ().interrupt ();
@@ -353,11 +368,11 @@ private void recoverAfterConnectionFailure(
353
368
354
369
try {
355
370
if (recoveryConfiguration .topology ()) {
356
- LOGGER .debug ("Recovering topology" );
371
+ LOGGER .debug ("Recovering topology of connection '{}'..." , this . name () );
357
372
this .recoverTopology ();
358
373
this .recoverConsumers ();
359
374
this .recoverPublishers ();
360
- LOGGER .debug ("Recovered topology" );
375
+ LOGGER .debug ("Recovered topology of connection '{}'." , this . name () );
361
376
}
362
377
363
378
LOGGER .info ("Recovered connection to {}" , this .currentConnectionLabel ());
@@ -631,6 +646,10 @@ String connectionNodename() {
631
646
return this .connectionNodename ;
632
647
}
633
648
649
+ String name () {
650
+ return this .name == null ? "<no-name>" : this .name ;
651
+ }
652
+
634
653
ConnectionUtils .AffinityContext affinity () {
635
654
return this .affinity ;
636
655
}
0 commit comments