22
22
import com .rabbitmq .client .amqp .AmqpException ;
23
23
import com .rabbitmq .client .amqp .Consumer ;
24
24
import com .rabbitmq .client .amqp .metrics .MetricsCollector ;
25
- import java .time .Duration ;
26
25
import java .util .concurrent .*;
27
26
import java .util .concurrent .atomic .AtomicBoolean ;
28
27
import java .util .concurrent .atomic .AtomicLong ;
@@ -59,9 +58,10 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
59
58
private final MetricsCollector metricsCollector ;
60
59
private final SessionHandler sessionHandler ;
61
60
private final AtomicLong unsettledCount = new AtomicLong (0 );
61
+ private final Runnable replenishCreditOperation = () -> replenishCreditIfNeeded ();
62
62
// native receiver internal state, accessed only in the native executor/scheduler
63
63
private ProtonReceiver protonReceiver ;
64
- private Scheduler protonExecutor ;
64
+ private volatile Scheduler protonExecutor ;
65
65
private DeliveryQueue protonDeliveryQueue ;
66
66
private ProtonSessionIncomingWindow sessionWindow ;
67
67
private ProtonLinkCreditState creditState ;
@@ -86,6 +86,42 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
86
86
this .metricsCollector .openConsumer ();
87
87
}
88
88
89
+ @ Override
90
+ public void pause () {
91
+ if (this .pauseStatus .compareAndSet (PauseStatus .UNPAUSED , PauseStatus .PAUSING )) {
92
+ try {
93
+ CountDownLatch latch = new CountDownLatch (1 );
94
+ this .echoedFlowAfterPauseLatch .set (latch );
95
+ this .protonExecutor .execute (this ::doPause );
96
+ try {
97
+ boolean echoed = latch .await (10 , TimeUnit .SECONDS );
98
+ if (echoed ) {
99
+ this .pauseStatus .set (PauseStatus .PAUSED );
100
+ } else {
101
+ LOGGER .warn ("Did not receive echoed flow to pause receiver" );
102
+ this .pauseStatus .set (PauseStatus .UNPAUSED );
103
+ }
104
+ } catch (InterruptedException e ) {
105
+ Thread .currentThread ().interrupt ();
106
+ }
107
+ } catch (Exception e ) {
108
+ this .pauseStatus .set (PauseStatus .UNPAUSED );
109
+ }
110
+ }
111
+ }
112
+
113
+ @ Override
114
+ public void unpause () {
115
+ checkOpen ();
116
+ if (this .pauseStatus .compareAndSet (PauseStatus .PAUSED , PauseStatus .UNPAUSED )) {
117
+ try {
118
+ this .nativeReceiver .addCredit (this .initialCredits );
119
+ } catch (ClientException e ) {
120
+ throw ExceptionUtils .convert (e );
121
+ }
122
+ }
123
+ }
124
+
89
125
@ Override
90
126
public void close () {
91
127
this .close (null );
@@ -121,61 +157,13 @@ private Runnable createReceiveTask(Receiver receiver, MessageHandler messageHand
121
157
this .unsettledCount .incrementAndGet ();
122
158
this .metricsCollector .consume ();
123
159
AmqpMessage message = new AmqpMessage (delivery .message ());
124
- AtomicBoolean disposed = new AtomicBoolean (false );
125
160
Consumer .Context context =
126
- new Consumer .Context () {
127
-
128
- @ Override
129
- public void accept () {
130
- if (disposed .compareAndSet (false , true )) {
131
- try {
132
- protonExecutor .execute (() -> replenishCreditIfNeeded ());
133
- delivery .disposition (DeliveryState .accepted (), true );
134
- unsettledCount .decrementAndGet ();
135
- metricsCollector .consumeDisposition (
136
- MetricsCollector .ConsumeDisposition .ACCEPTED );
137
- } catch (ClientIllegalStateException | ClientIOException e ) {
138
- LOGGER .debug ("message accept failed: {}" , e .getMessage ());
139
- } catch (ClientException e ) {
140
- throw ExceptionUtils .convert (e );
141
- }
142
- }
143
- }
144
-
145
- @ Override
146
- public void discard () {
147
- if (disposed .compareAndSet (false , true )) {
148
- try {
149
- protonExecutor .execute (() -> replenishCreditIfNeeded ());
150
- delivery .disposition (DeliveryState .rejected ("" , "" ), true );
151
- unsettledCount .decrementAndGet ();
152
- metricsCollector .consumeDisposition (
153
- MetricsCollector .ConsumeDisposition .DISCARDED );
154
- } catch (ClientIllegalStateException | ClientIOException e ) {
155
- LOGGER .debug ("message discard failed: {}" , e .getMessage ());
156
- } catch (ClientException e ) {
157
- throw ExceptionUtils .convert (e );
158
- }
159
- }
160
- }
161
-
162
- @ Override
163
- public void requeue () {
164
- if (disposed .compareAndSet (false , true )) {
165
- try {
166
- protonExecutor .execute (() -> replenishCreditIfNeeded ());
167
- delivery .disposition (DeliveryState .released (), true );
168
- unsettledCount .decrementAndGet ();
169
- metricsCollector .consumeDisposition (
170
- MetricsCollector .ConsumeDisposition .REQUEUED );
171
- } catch (ClientIllegalStateException | ClientIOException e ) {
172
- LOGGER .debug ("message requeue failed: {}" , e .getMessage ());
173
- } catch (ClientException e ) {
174
- throw ExceptionUtils .convert (e );
175
- }
176
- }
177
- }
178
- };
161
+ new DeliveryContext (
162
+ delivery ,
163
+ this .protonExecutor ,
164
+ this .metricsCollector ,
165
+ this .unsettledCount ,
166
+ this .replenishCreditOperation );
179
167
messageHandler .handle (context , message );
180
168
}
181
169
}
@@ -204,23 +192,13 @@ void recoverAfterConnectionFailure() {
204
192
this .nativeReceiver = createNativeReceiver (this .sessionHandler .sessionNoCheck (), this .address );
205
193
this .initStateFromNativeReceiver (this .nativeReceiver );
206
194
this .pauseStatus .set (PauseStatus .UNPAUSED );
195
+ this .unsettledCount .set (0 );
207
196
startReceivingLoop ();
208
197
}
209
198
210
199
private void close (Throwable cause ) {
211
200
if (this .closed .compareAndSet (false , true )) {
212
201
this .state (CLOSING , cause );
213
- if (cause == null ) {
214
- LOGGER .debug ("Pausing receiver link before detaching it" );
215
- // normal closing, pausing message dispatching
216
- this .pause ();
217
- LOGGER .debug ("Receiver link paused. Unsettled message(s): {}" , this .unsettledCount .get ());
218
- LOGGER .debug ("Waiting for unsettled messages to get settled if necessary" );
219
- waitForUnsettledMessagesToSettle ();
220
- if (this .unsettledCount .get () > 0 ) {
221
- LOGGER .debug ("Closing receiver link with {} unsettled message(s)" , this .unsettledCount );
222
- }
223
- }
224
202
this .connection .removeConsumer (this );
225
203
if (this .receiveLoop != null ) {
226
204
this .receiveLoop .cancel (true );
@@ -236,21 +214,6 @@ private void close(Throwable cause) {
236
214
}
237
215
}
238
216
239
- private void waitForUnsettledMessagesToSettle () {
240
- Duration timeout = Duration .ofSeconds (10 );
241
- Duration waitTime = Duration .ofMillis (10 );
242
- Duration waitedTime = Duration .ZERO ;
243
- while (this .unsettledCount .get () > 0 && waitedTime .compareTo (timeout ) <= 0 ) {
244
- try {
245
- Thread .sleep (waitTime .toMillis ());
246
- waitedTime = waitedTime .plus (waitTime );
247
- } catch (InterruptedException e ) {
248
- Thread .currentThread ().interrupt ();
249
- return ;
250
- }
251
- }
252
- }
253
-
254
217
long id () {
255
218
return this .id ;
256
219
}
@@ -294,7 +257,7 @@ private void initStateFromNativeReceiver(ClientReceiver receiver) {
294
257
}
295
258
296
259
private void replenishCreditIfNeeded () {
297
- if (!this .pausedOrPausing ()) {
260
+ if (!this .pausedOrPausing () && this . state () == OPEN ) {
298
261
int creditWindow = this .initialCredits ;
299
262
int currentCredit = protonReceiver .getCredit ();
300
263
if (currentCredit <= creditWindow * 0.5 ) {
@@ -311,57 +274,90 @@ private void replenishCreditIfNeeded() {
311
274
}
312
275
}
313
276
314
- void pause () {
315
- if (this .pauseStatus .compareAndSet (PauseStatus .UNPAUSED , PauseStatus .PAUSING )) {
316
- try {
317
- CountDownLatch latch = new CountDownLatch (1 );
318
- this .echoedFlowAfterPauseLatch .set (latch );
319
- this .protonExecutor .execute (this ::doPause );
320
- try {
321
- boolean echoed = latch .await (10 , TimeUnit .SECONDS );
322
- if (echoed ) {
323
- this .pauseStatus .set (PauseStatus .PAUSED );
324
- } else {
325
- LOGGER .warn ("Did not receive echoed flow to pause receiver" );
326
- this .pauseStatus .set (PauseStatus .UNPAUSED );
327
- }
328
- } catch (InterruptedException e ) {
329
- Thread .currentThread ().interrupt ();
330
- }
331
- } catch (Exception e ) {
332
- this .pauseStatus .set (PauseStatus .UNPAUSED );
333
- }
334
- }
335
- }
336
-
337
277
private void doPause () {
338
278
this .creditState .updateCredit (0 );
339
279
this .creditState .updateEcho (true );
340
280
this .sessionWindow .writeFlow (this .protonReceiver );
341
281
}
342
282
343
- void unpause () {
344
- checkOpen ();
345
- if (this .pauseStatus .compareAndSet (PauseStatus .PAUSED , PauseStatus .UNPAUSED )) {
346
- try {
347
- this .nativeReceiver .addCredit (this .initialCredits );
348
- } catch (ClientException e ) {
349
- throw ExceptionUtils .convert (e );
350
- }
351
- }
352
- }
353
-
354
283
boolean pausedOrPausing () {
355
284
return this .pauseStatus .get () != PauseStatus .UNPAUSED ;
356
285
}
357
286
358
- boolean paused () {
359
- return this .pauseStatus .get () == PauseStatus .PAUSED ;
360
- }
361
-
362
287
enum PauseStatus {
363
288
UNPAUSED ,
364
289
PAUSING ,
365
290
PAUSED
366
291
}
292
+
293
+ private static class DeliveryContext implements Consumer .Context {
294
+
295
+ private final AtomicBoolean settled = new AtomicBoolean (false );
296
+ private final Delivery delivery ;
297
+ private final Scheduler protonExecutor ;
298
+ private final MetricsCollector metricsCollector ;
299
+ private final AtomicLong unsettledCount ;
300
+ private final Runnable replenishCreditOperation ;
301
+
302
+ private DeliveryContext (
303
+ Delivery delivery ,
304
+ Scheduler protonExecutor ,
305
+ MetricsCollector metricsCollector ,
306
+ AtomicLong unsettledCount ,
307
+ Runnable replenishCreditOperation ) {
308
+ this .delivery = delivery ;
309
+ this .protonExecutor = protonExecutor ;
310
+ this .metricsCollector = metricsCollector ;
311
+ this .unsettledCount = unsettledCount ;
312
+ this .replenishCreditOperation = replenishCreditOperation ;
313
+ }
314
+
315
+ @ Override
316
+ public void accept () {
317
+ if (settled .compareAndSet (false , true )) {
318
+ try {
319
+ protonExecutor .execute (replenishCreditOperation );
320
+ delivery .disposition (DeliveryState .accepted (), true );
321
+ unsettledCount .decrementAndGet ();
322
+ metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .ACCEPTED );
323
+ } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
324
+ LOGGER .debug ("message accept failed: {}" , e .getMessage ());
325
+ } catch (ClientException e ) {
326
+ throw ExceptionUtils .convert (e );
327
+ }
328
+ }
329
+ }
330
+
331
+ @ Override
332
+ public void discard () {
333
+ if (settled .compareAndSet (false , true )) {
334
+ try {
335
+ protonExecutor .execute (replenishCreditOperation );
336
+ delivery .disposition (DeliveryState .rejected ("" , "" ), true );
337
+ unsettledCount .decrementAndGet ();
338
+ metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .DISCARDED );
339
+ } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
340
+ LOGGER .debug ("message discard failed: {}" , e .getMessage ());
341
+ } catch (ClientException e ) {
342
+ throw ExceptionUtils .convert (e );
343
+ }
344
+ }
345
+ }
346
+
347
+ @ Override
348
+ public void requeue () {
349
+ if (settled .compareAndSet (false , true )) {
350
+ try {
351
+ protonExecutor .execute (replenishCreditOperation );
352
+ delivery .disposition (DeliveryState .released (), true );
353
+ unsettledCount .decrementAndGet ();
354
+ metricsCollector .consumeDisposition (MetricsCollector .ConsumeDisposition .REQUEUED );
355
+ } catch (ClientIllegalStateException | RejectedExecutionException | ClientIOException e ) {
356
+ LOGGER .debug ("message requeue failed: {}" , e .getMessage ());
357
+ } catch (ClientException e ) {
358
+ throw ExceptionUtils .convert (e );
359
+ }
360
+ }
361
+ }
362
+ }
367
363
}
0 commit comments