@@ -53,7 +53,6 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
53
53
54
54
private volatile ClientReceiver nativeReceiver ;
55
55
private final AtomicBoolean closed = new AtomicBoolean (false );
56
- private volatile Future <?> receiveLoop ;
57
56
private final int initialCredits ;
58
57
private final MessageHandler messageHandler ;
59
58
private final Long id ;
@@ -70,6 +69,9 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
70
69
private final SessionHandler sessionHandler ;
71
70
private final AtomicLong unsettledMessageCount = new AtomicLong (0 );
72
71
private final Runnable replenishCreditOperation = this ::replenishCreditIfNeeded ;
72
+ private final ExecutorService dispatchingExecutorService ;
73
+ private final java .util .function .Consumer <Delivery > nativeHandler ;
74
+ private final java .util .function .Consumer <ClientException > nativeReceiverCloseHandler ;
73
75
// native receiver internal state, accessed only in the native executor/scheduler
74
76
private ProtonReceiver protonReceiver ;
75
77
private volatile Scheduler protonExecutor ;
@@ -96,16 +98,34 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
96
98
ofNullable (builder .subscriptionListener ()).orElse (NO_OP_SUBSCRIPTION_LISTENER );
97
99
this .connection = builder .connection ();
98
100
this .sessionHandler = this .connection .createSessionHandler ();
101
+
102
+ this .dispatchingExecutorService = connection .dispatchingExecutorService ();
103
+ this .nativeHandler = createNativeHandler (messageHandler );
104
+ this .nativeReceiverCloseHandler =
105
+ e ->
106
+ this .dispatchingExecutorService .submit (
107
+ () -> {
108
+ // get result to make spotbugs happy
109
+ boolean ignored = maybeCloseConsumerOnException (this , e );
110
+ });
99
111
this .nativeReceiver =
100
112
this .createNativeReceiver (
101
113
this .sessionHandler .session (),
102
114
this .address ,
103
115
this .linkProperties ,
104
116
this .filters ,
105
- this .subscriptionListener );
117
+ this .subscriptionListener ,
118
+ this .nativeHandler ,
119
+ this .nativeReceiverCloseHandler );
106
120
this .initStateFromNativeReceiver (this .nativeReceiver );
107
121
this .metricsCollector = this .connection .metricsCollector ();
108
- this .startReceivingLoop ();
122
+ try {
123
+ this .nativeReceiver .addCredit (this .initialCredits );
124
+ } catch (ClientException e ) {
125
+ AmqpException ex = ExceptionUtils .convert (e );
126
+ this .close (ex );
127
+ throw ex ;
128
+ }
109
129
this .state (OPEN );
110
130
this .metricsCollector .openConsumer ();
111
131
}
@@ -163,7 +183,9 @@ private ClientReceiver createNativeReceiver(
163
183
String address ,
164
184
Map <String , Object > properties ,
165
185
Map <String , DescribedType > filters ,
166
- SubscriptionListener subscriptionListener ) {
186
+ SubscriptionListener subscriptionListener ,
187
+ java .util .function .Consumer <Delivery > nativeHandler ,
188
+ java .util .function .Consumer <ClientException > closeHandler ) {
167
189
try {
168
190
filters = new LinkedHashMap <>(filters );
169
191
StreamOptions streamOptions = AmqpConsumerBuilder .streamOptions (filters );
@@ -173,6 +195,8 @@ private ClientReceiver createNativeReceiver(
173
195
.deliveryMode (DeliveryMode .AT_LEAST_ONCE )
174
196
.autoAccept (false )
175
197
.autoSettle (false )
198
+ .handler (nativeHandler )
199
+ .closeHandler (closeHandler )
176
200
.creditWindow (0 )
177
201
.properties (properties );
178
202
Map <String , Object > localSourceFilters = Collections .emptyMap ();
@@ -201,6 +225,37 @@ private ClientReceiver createNativeReceiver(
201
225
}
202
226
}
203
227
228
+ private java .util .function .Consumer <Delivery > createNativeHandler (MessageHandler handler ) {
229
+ return delivery -> {
230
+ this .unsettledMessageCount .incrementAndGet ();
231
+ this .metricsCollector .consume ();
232
+ this .dispatchingExecutorService .submit (
233
+ () -> {
234
+ AmqpMessage message ;
235
+ try {
236
+ message = new AmqpMessage (delivery .message ());
237
+ } catch (ClientException e ) {
238
+ LOGGER .warn ("Error while decoding message: {}" , e .getMessage ());
239
+ try {
240
+ delivery .disposition (DeliveryState .rejected ("" , "" ), true );
241
+ } catch (ClientException ex ) {
242
+ LOGGER .warn ("Error while rejecting non-decoded message: {}" , ex .getMessage ());
243
+ }
244
+ return ;
245
+ }
246
+ Consumer .Context context =
247
+ new DeliveryContext (
248
+ delivery ,
249
+ this .protonExecutor ,
250
+ this .metricsCollector ,
251
+ this .unsettledMessageCount ,
252
+ this .replenishCreditOperation ,
253
+ this );
254
+ handler .handle (context , message );
255
+ });
256
+ };
257
+ }
258
+
204
259
private Runnable createReceiveTask (Receiver receiver , MessageHandler messageHandler ) {
205
260
return () -> {
206
261
try {
@@ -217,7 +272,8 @@ private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHand
217
272
this .protonExecutor ,
218
273
this .metricsCollector ,
219
274
this .unsettledMessageCount ,
220
- this .replenishCreditOperation );
275
+ this .replenishCreditOperation ,
276
+ this );
221
277
messageHandler .handle (context , message );
222
278
}
223
279
}
@@ -237,11 +293,6 @@ private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHand
237
293
};
238
294
}
239
295
240
- private void startReceivingLoop () {
241
- Runnable receiveTask = createReceiveTask (nativeReceiver , messageHandler );
242
- this .receiveLoop = this .connection .environment ().consumerExecutorService ().submit (receiveTask );
243
- }
244
-
245
296
void recoverAfterConnectionFailure () {
246
297
this .nativeReceiver =
247
298
RetryUtils .callAndMaybeRetry (
@@ -251,7 +302,9 @@ void recoverAfterConnectionFailure() {
251
302
this .address ,
252
303
this .linkProperties ,
253
304
this .filters ,
254
- this .subscriptionListener ),
305
+ this .subscriptionListener ,
306
+ this .nativeHandler ,
307
+ this .nativeReceiverCloseHandler ),
255
308
e -> {
256
309
boolean shouldRetry =
257
310
e instanceof AmqpException .AmqpResourceClosedException
@@ -267,22 +320,24 @@ void recoverAfterConnectionFailure() {
267
320
this .initStateFromNativeReceiver (this .nativeReceiver );
268
321
this .pauseStatus .set (PauseStatus .UNPAUSED );
269
322
this .unsettledMessageCount .set (0 );
270
- startReceivingLoop ();
323
+ try {
324
+ this .nativeReceiver .addCredit (this .initialCredits );
325
+ } catch (ClientException e ) {
326
+ throw ExceptionUtils .convert (e );
327
+ }
271
328
}
272
329
273
330
private void close (Throwable cause ) {
274
331
if (this .closed .compareAndSet (false , true )) {
275
332
this .state (CLOSING , cause );
276
333
this .connection .removeConsumer (this );
277
- if (this .receiveLoop != null ) {
278
- this .receiveLoop .cancel (true );
279
- }
280
334
try {
281
335
this .nativeReceiver .close ();
282
336
this .sessionHandler .close ();
283
337
} catch (Exception e ) {
284
338
LOGGER .warn ("Error while closing receiver" , e );
285
339
}
340
+
286
341
this .state (CLOSED , cause );
287
342
this .metricsCollector .closeConsumer ();
288
343
}
@@ -372,18 +427,21 @@ private static class DeliveryContext implements Consumer.Context {
372
427
private final MetricsCollector metricsCollector ;
373
428
private final AtomicLong unsettledMessageCount ;
374
429
private final Runnable replenishCreditOperation ;
430
+ private final AmqpConsumer consumer ;
375
431
376
432
private DeliveryContext (
377
433
Delivery delivery ,
378
434
Scheduler protonExecutor ,
379
435
MetricsCollector metricsCollector ,
380
436
AtomicLong unsettledMessageCount ,
381
- Runnable replenishCreditOperation ) {
437
+ Runnable replenishCreditOperation ,
438
+ AmqpConsumer consumer ) {
382
439
this .delivery = delivery ;
383
440
this .protonExecutor = protonExecutor ;
384
441
this .metricsCollector = metricsCollector ;
385
442
this .unsettledMessageCount = unsettledMessageCount ;
386
443
this .replenishCreditOperation = replenishCreditOperation ;
444
+ this .consumer = consumer ;
387
445
}
388
446
389
447
@ Override
@@ -394,10 +452,8 @@ public void accept() {
394
452
delivery .disposition (DeliveryState .accepted (), true );
395
453
unsettledMessageCount .decrementAndGet ();
396
454
metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .ACCEPTED );
397
- } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
398
- LOGGER .debug ("message accept failed: {}" , e .getMessage ());
399
- } catch (ClientException e ) {
400
- throw ExceptionUtils .convert (e );
455
+ } catch (Exception e ) {
456
+ handleException (e , "accept" );
401
457
}
402
458
}
403
459
}
@@ -410,10 +466,8 @@ public void discard() {
410
466
delivery .disposition (DeliveryState .rejected ("" , "" ), true );
411
467
unsettledMessageCount .decrementAndGet ();
412
468
metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .DISCARDED );
413
- } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
414
- LOGGER .debug ("message discard failed: {}" , e .getMessage ());
415
- } catch (ClientException e ) {
416
- throw ExceptionUtils .convert (e );
469
+ } catch (Exception e ) {
470
+ handleException (e , "discard" );
417
471
}
418
472
}
419
473
}
@@ -428,10 +482,8 @@ public void discard(Map<String, Object> annotations) {
428
482
delivery .disposition (DeliveryState .modified (true , true , annotations ), true );
429
483
unsettledMessageCount .decrementAndGet ();
430
484
metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .DISCARDED );
431
- } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
432
- LOGGER .debug ("message discard (modified) failed: {}" , e .getMessage ());
433
- } catch (ClientException e ) {
434
- throw ExceptionUtils .convert (e );
485
+ } catch (Exception e ) {
486
+ handleException (e , "discard (modified)" );
435
487
}
436
488
}
437
489
}
@@ -444,10 +496,8 @@ public void requeue() {
444
496
delivery .disposition (DeliveryState .released (), true );
445
497
unsettledMessageCount .decrementAndGet ();
446
498
metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .REQUEUED );
447
- } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
448
- LOGGER .debug ("message requeue failed: {}" , e .getMessage ());
449
- } catch (ClientException e ) {
450
- throw ExceptionUtils .convert (e );
499
+ } catch (Exception e ) {
500
+ handleException (e , "requeue" );
451
501
}
452
502
}
453
503
}
@@ -462,12 +512,34 @@ public void requeue(Map<String, Object> annotations) {
462
512
delivery .disposition (DeliveryState .modified (false , false , annotations ), true );
463
513
unsettledMessageCount .decrementAndGet ();
464
514
metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .REQUEUED );
465
- } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
466
- LOGGER .debug ("message requeue (modified) failed: {}" , e .getMessage ());
467
- } catch (ClientException e ) {
468
- throw ExceptionUtils .convert (e );
515
+ } catch (Exception e ) {
516
+ handleException (e , "requeue (modified)" );
469
517
}
470
518
}
471
519
}
520
+
521
+ private void handleException (Exception ex , String operation ) {
522
+ if (maybeCloseConsumerOnException (this .consumer , ex )) {
523
+ return ;
524
+ }
525
+ if (ex instanceof ClientIllegalStateException
526
+ || ex instanceof RejectedExecutionException
527
+ || ex instanceof ClientIOException ) {
528
+ LOGGER .debug ("message {} failed: {}" , operation , ex .getMessage ());
529
+ } else if (ex instanceof ClientException ) {
530
+ throw ExceptionUtils .convert ((ClientException ) ex );
531
+ }
532
+ }
533
+ }
534
+
535
+ private static boolean maybeCloseConsumerOnException (AmqpConsumer consumer , Exception ex ) {
536
+ if (ex instanceof ClientLinkRemotelyClosedException ) {
537
+ ClientLinkRemotelyClosedException e = (ClientLinkRemotelyClosedException ) ex ;
538
+ if (ExceptionUtils .notFound (e ) || ExceptionUtils .resourceDeleted (e )) {
539
+ consumer .close (ExceptionUtils .convert (e ));
540
+ return true ;
541
+ }
542
+ }
543
+ return false ;
472
544
}
473
545
}
0 commit comments