28
28
import java .util .concurrent .atomic .AtomicBoolean ;
29
29
import java .util .concurrent .atomic .AtomicLong ;
30
30
import java .util .concurrent .atomic .AtomicReference ;
31
+ import java .util .concurrent .locks .Lock ;
32
+ import java .util .concurrent .locks .ReentrantLock ;
31
33
import java .util .function .Supplier ;
32
34
import java .util .stream .Collectors ;
33
35
import java .util .stream .IntStream ;
@@ -72,6 +74,8 @@ class AmqpManagement implements Management {
72
74
private final Supplier <String > nameSupplier ;
73
75
private final AtomicReference <State > state = new AtomicReference <>(CREATED );
74
76
private final AtomicBoolean initializing = new AtomicBoolean (false );
77
+ private final Duration receiveLoopIdleTimeout ;
78
+ private final Lock instanceLock = new ReentrantLock ();
75
79
76
80
AmqpManagement (AmqpManagementParameters parameters ) {
77
81
this .id = ID_SEQUENCE .getAndIncrement ();
@@ -81,6 +85,10 @@ class AmqpManagement implements Management {
81
85
? TopologyListener .NO_OP
82
86
: parameters .topologyListener ();
83
87
this .nameSupplier = parameters .nameSupplier ();
88
+ this .receiveLoopIdleTimeout =
89
+ parameters .receiveLoopIdleTimeout () == null
90
+ ? Duration .ofSeconds (20 )
91
+ : parameters .receiveLoopIdleTimeout ();
84
92
}
85
93
86
94
@ Override
@@ -190,6 +198,10 @@ void init() {
190
198
LOGGER .debug ("Initializing management ({})." , this );
191
199
this .state (UNAVAILABLE );
192
200
try {
201
+ if (this .receiveLoop != null ) {
202
+ this .receiveLoop .cancel (true );
203
+ this .receiveLoop = null ;
204
+ }
193
205
LOGGER .debug ("Creating management session ({})." , this );
194
206
this .session = this .connection .nativeConnection ().openSession ();
195
207
String linkPairName = "management-link-pair" ;
@@ -217,49 +229,6 @@ void init() {
217
229
LOGGER .debug ("Management sender created ({})." , this );
218
230
this .receiver .openFuture ().get (this .rpcTimeout .toMillis (), MILLISECONDS );
219
231
LOGGER .debug ("Management receiver created ({})." , this );
220
- Runnable receiveTask =
221
- () -> {
222
- try {
223
- while (!Thread .currentThread ().isInterrupted ()) {
224
- Delivery delivery = receiver .receive (100 , MILLISECONDS );
225
- if (delivery != null ) {
226
- Object correlationId = delivery .message ().correlationId ();
227
- if (correlationId instanceof UUID ) {
228
- OutstandingRequest request = outstandingRequests .remove (correlationId );
229
- if (request != null ) {
230
- request .complete (delivery .message ());
231
- } else {
232
- LOGGER .info ("Could not find outstanding request {}" , correlationId );
233
- }
234
- } else {
235
- LOGGER .info ("Could not correlate inbound message with management request" );
236
- }
237
- }
238
- }
239
- } catch (ClientConnectionRemotelyClosedException
240
- | ClientLinkRemotelyClosedException e ) {
241
- // receiver is closed
242
- } catch (ClientSessionRemotelyClosedException e ) {
243
- this .state (UNAVAILABLE );
244
- LOGGER .info (
245
- "Management session closed in receive loop: {} ({})" , e .getMessage (), this );
246
- AmqpException exception = ExceptionUtils .convert (e );
247
- this .failRequests (exception );
248
- if (exception instanceof AmqpException .AmqpSecurityException ) {
249
- LOGGER .debug (
250
- "Recovering AMQP management because the failure was a security exception ({})." ,
251
- this );
252
- this .init ();
253
- }
254
- } catch (ClientException e ) {
255
- java .util .function .Consumer <String > log =
256
- this .closed .get () ? m -> LOGGER .debug (m , e ) : m -> LOGGER .info (m , e );
257
- log .accept ("Error while polling AMQP receiver" );
258
- }
259
- };
260
- LOGGER .debug ("Starting management receive loop ({})." , this );
261
- this .receiveLoop = this .connection .executorService ().submit (receiveTask );
262
- LOGGER .debug ("Management initialized ({})." , this );
263
232
this .state (OPEN );
264
233
this .initializing .set (false );
265
234
} catch (Exception e ) {
@@ -269,6 +238,58 @@ void init() {
269
238
}
270
239
}
271
240
241
+ private Runnable receiveTask () {
242
+ return () -> {
243
+ try {
244
+ Duration waitDuration = Duration .ofMillis (100 );
245
+ long idleTime = 0 ;
246
+ while (!Thread .currentThread ().isInterrupted ()) {
247
+ Delivery delivery = receiver .receive (waitDuration .toMillis (), MILLISECONDS );
248
+ if (delivery != null ) {
249
+ idleTime = 0 ;
250
+ Object correlationId = delivery .message ().correlationId ();
251
+ if (correlationId instanceof UUID ) {
252
+ OutstandingRequest request = outstandingRequests .remove (correlationId );
253
+ if (request != null ) {
254
+ request .complete (delivery .message ());
255
+ } else {
256
+ LOGGER .info ("Could not find outstanding request {}" , correlationId );
257
+ }
258
+ } else {
259
+ LOGGER .info ("Could not correlate inbound message with management request" );
260
+ }
261
+ } else {
262
+ idleTime += waitDuration .toMillis ();
263
+ if (idleTime > receiveLoopIdleTimeout .toMillis ()) {
264
+ LOGGER .debug (
265
+ "Management receive loop has been idle for more than {}, finishing it." ,
266
+ this .receiveLoopIdleTimeout );
267
+ this .receiveLoop = null ;
268
+ return ;
269
+ }
270
+ }
271
+ }
272
+ } catch (ClientConnectionRemotelyClosedException | ClientLinkRemotelyClosedException e ) {
273
+ // receiver is closed
274
+ } catch (ClientSessionRemotelyClosedException e ) {
275
+ this .state (UNAVAILABLE );
276
+ LOGGER .info ("Management session closed in receive loop: {} ({})" , e .getMessage (), this );
277
+ AmqpException exception = ExceptionUtils .convert (e );
278
+ this .failRequests (exception );
279
+ if (exception instanceof AmqpException .AmqpSecurityException ) {
280
+ LOGGER .debug (
281
+ "Recovering AMQP management because the failure was a security exception ({})." ,
282
+ this );
283
+ this .init ();
284
+ }
285
+ } catch (ClientException e ) {
286
+ java .util .function .Consumer <String > log =
287
+ this .closed .get () ? m -> LOGGER .debug (m , e ) : m -> LOGGER .info (m , e );
288
+ log .accept ("Error while polling AMQP receiver" );
289
+ }
290
+ };
291
+ }
292
+
272
293
private void failRequests (AmqpException exception ) {
273
294
Iterator <Map .Entry <UUID , OutstandingRequest >> iterator =
274
295
this .outstandingRequests .entrySet ().iterator ();
@@ -284,6 +305,7 @@ void releaseResources() {
284
305
this .markUnavailable ();
285
306
if (this .receiveLoop != null ) {
286
307
this .receiveLoop .cancel (true );
308
+ this .receiveLoop = null ;
287
309
}
288
310
}
289
311
@@ -338,6 +360,22 @@ OutstandingRequest request(Message<?> request, UUID requestId) throws ClientExce
338
360
this .outstandingRequests .put (requestId , outstandingRequest );
339
361
LOGGER .debug ("Sending request {}" , requestId );
340
362
this .sender .send (request );
363
+ Future <?> loop = this .receiveLoop ;
364
+ if (loop == null ) {
365
+ this .instanceLock .lock ();
366
+ try {
367
+ loop = this .receiveLoop ;
368
+ if (loop == null ) {
369
+ Runnable receiveTask = receiveTask ();
370
+ LOGGER .debug ("Starting management receive loop ({})." , this );
371
+ this .receiveLoop =
372
+ this .connection .environment ().managementExecutorService ().submit (receiveTask );
373
+ LOGGER .debug ("Management initialized ({})." , this );
374
+ }
375
+ } finally {
376
+ this .instanceLock .unlock ();
377
+ }
378
+ }
341
379
return outstandingRequest ;
342
380
}
343
381
@@ -548,6 +586,11 @@ TopologyListener recovery() {
548
586
return this .topologyListener ;
549
587
}
550
588
589
+ // for testing
590
+ boolean hasReceiveLoop () {
591
+ return this .receiveLoop != null ;
592
+ }
593
+
551
594
private static class DefaultQueueInfo implements QueueInfo {
552
595
553
596
private final String name ;
0 commit comments