19
19
import java .util .Collections ;
20
20
import java .util .Iterator ;
21
21
import java .util .List ;
22
+ import java .util .Map ;
22
23
import java .util .Set ;
23
24
import java .util .UUID ;
24
25
@@ -94,14 +95,14 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
94
95
* Counter on how many messages are prefetched into internal messageQueue.
95
96
*/
96
97
protected int messagesPrefetched = 0 ;
97
-
98
+
98
99
/**
99
100
* Counter on how many messages have been explicitly requested.
100
101
* TODO: Consider renaming this class and several other variables now that
101
102
* this logic factors in message requests as well as prefetching.
102
103
*/
103
104
protected int messagesRequested = 0 ;
104
-
105
+
105
106
/**
106
107
* States of the prefetch thread
107
108
*/
@@ -121,7 +122,7 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
121
122
* 25ms delayInterval.
122
123
*/
123
124
protected ExponentialBackoffStrategy backoffStrategy = new ExponentialBackoffStrategy (25 ,25 ,2000 );
124
-
125
+
125
126
SQSMessageConsumerPrefetch (SQSSessionCallbackScheduler sqsSessionRunnable , Acknowledger acknowledger ,
126
127
NegativeAcknowledger negativeAcknowledger , SQSQueueDestination sqsDestination ,
127
128
AmazonSQSMessagingClientWrapper amazonSQSClient , int numberOfMessagesToPrefetch ) {
@@ -139,16 +140,16 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
139
140
MessageListener getMessageListener () {
140
141
return messageListener ;
141
142
}
142
-
143
+
143
144
void setMessageConsumer (SQSMessageConsumer messageConsumer ) {
144
145
this .messageConsumer = messageConsumer ;
145
146
}
146
-
147
+
147
148
@ Override
148
149
public SQSMessageConsumer getMessageConsumer () {
149
150
return messageConsumer ;
150
151
}
151
-
152
+
152
153
/**
153
154
* Sets the message listener.
154
155
* <P>
@@ -168,7 +169,7 @@ protected void setMessageListener(MessageListener messageListener) {
168
169
if (!running || isClosed ()) {
169
170
return ;
170
171
}
171
-
172
+
172
173
List <MessageManager > allPrefetchedMessages = new ArrayList <MessageManager >(messageQueue );
173
174
sqsSessionRunnable .scheduleCallBacks (messageListener , allPrefetchedMessages );
174
175
messageQueue .clear ();
@@ -179,7 +180,7 @@ protected void setMessageListener(MessageListener messageListener) {
179
180
messageListenerReady ();
180
181
}
181
182
}
182
-
183
+
183
184
/**
184
185
* Determine the number of messages we should attempt to fetch from SQS.
185
186
* Returns the difference between the number of messages needed (either for
@@ -189,7 +190,7 @@ private int numberOfMessagesToFetch() {
189
190
int numberOfMessagesNeeded = Math .max (numberOfMessagesToPrefetch , messagesRequested );
190
191
return Math .max (numberOfMessagesNeeded - messagesPrefetched , 0 );
191
192
}
192
-
193
+
193
194
/**
194
195
* Runs until the message consumer is closed and in-progress SQS
195
196
* <code>receiveMessage</code> call returns.
@@ -210,7 +211,7 @@ public void run() {
210
211
if (isClosed ()) {
211
212
break ;
212
213
}
213
-
214
+
214
215
synchronized (stateLock ) {
215
216
waitForStart ();
216
217
waitForPrefetch ();
@@ -238,7 +239,7 @@ public void run() {
238
239
}
239
240
}
240
241
}
241
-
242
+
242
243
/**
243
244
* Call <code>receiveMessage</code> with the given wait time.
244
245
*/
@@ -259,7 +260,7 @@ protected List<Message> getMessages(int batchSize, int waitTimeSeconds) throws J
259
260
ReceiveMessageResult receivedMessageResult = amazonSQSClient .receiveMessage (receiveMessageRequest );
260
261
return receivedMessageResult .getMessages ();
261
262
}
262
-
263
+
263
264
/**
264
265
* Converts the received message to JMS message, and pushes to messages to
265
266
* either callback scheduler for asynchronous message delivery or to
@@ -278,14 +279,14 @@ protected void processReceivedMessages(List<Message> messages) {
278
279
nackMessages .add (message .getReceiptHandle ());
279
280
}
280
281
}
281
-
282
+
282
283
synchronized (stateLock ) {
283
284
if (messageListener != null ) {
284
285
sqsSessionRunnable .scheduleCallBacks (messageListener , messageManagers );
285
286
} else {
286
287
messageQueue .addAll (messageManagers );
287
288
}
288
-
289
+
289
290
messagesPrefetched += messageManagers .size ();
290
291
notifyStateChange ();
291
292
}
@@ -359,9 +360,9 @@ protected javax.jms.Message convertToJMSMessage(Message message) throws JMSExcep
359
360
throw new JMSException ("Not a supported JMS message type" );
360
361
}
361
362
}
362
-
363
+
363
364
jmsMessage .setJMSDestination (sqsDestination );
364
-
365
+
365
366
MessageAttributeValue replyToQueueNameAttribute = message .getMessageAttributes ().get (
366
367
SQSMessage .JMS_SQS_REPLY_TO_QUEUE_NAME );
367
368
MessageAttributeValue replyToQueueUrlAttribute = message .getMessageAttributes ().get (
@@ -372,16 +373,27 @@ protected javax.jms.Message convertToJMSMessage(Message message) throws JMSExcep
372
373
Destination replyToQueue = new SQSQueueDestination (replyToQueueName , replyToQueueUrl );
373
374
jmsMessage .setJMSReplyTo (replyToQueue );
374
375
}
375
-
376
+
376
377
MessageAttributeValue correlationIdAttribute = message .getMessageAttributes ().get (
377
378
SQSMessage .JMS_SQS_CORRELATION_ID );
378
379
if (correlationIdAttribute != null ) {
379
380
jmsMessage .setJMSCorrelationID (correlationIdAttribute .getStringValue ());
380
381
}
381
-
382
+
383
+ jmsMessage .setJMSTimestamp (getJMSTimestamp (message ));
382
384
return jmsMessage ;
383
385
}
384
386
387
+ private long getJMSTimestamp (Message message ) {
388
+ Map <String , String > systemAttributes = message .getAttributes ();
389
+ String timestamp = systemAttributes .get (SQSMessagingClientConstants .SENT_TIMESTAMP );
390
+ if (timestamp != null ) {
391
+ return Long .parseLong (timestamp );
392
+ } else {
393
+ return 0L ;
394
+ }
395
+ }
396
+
385
397
protected void nackQueueMessages () {
386
398
// Also nack messages already in the messageQueue
387
399
synchronized (stateLock ) {
@@ -407,7 +419,7 @@ protected void waitForStart() throws InterruptedException {
407
419
}
408
420
}
409
421
}
410
-
422
+
411
423
@ Override
412
424
public void messageDispatched () {
413
425
synchronized (stateLock ) {
@@ -429,7 +441,7 @@ public void messageListenerReady() {
429
441
}
430
442
}
431
443
}
432
-
444
+
433
445
void requestMessage () {
434
446
synchronized (stateLock ) {
435
447
messagesRequested ++;
@@ -443,7 +455,7 @@ private void unrequestMessage() {
443
455
notifyStateChange ();
444
456
}
445
457
}
446
-
458
+
447
459
public static class MessageManager {
448
460
449
461
private final PrefetchManager prefetchManager ;
@@ -467,7 +479,7 @@ public javax.jms.Message getMessage() {
467
479
javax .jms .Message receive () throws JMSException {
468
480
return receive (0 );
469
481
}
470
-
482
+
471
483
javax .jms .Message receive (long timeout ) throws JMSException {
472
484
if (cannotDeliver ()) {
473
485
return null ;
@@ -476,7 +488,7 @@ javax.jms.Message receive(long timeout) throws JMSException {
476
488
if (timeout < 0 ) {
477
489
timeout = 0 ;
478
490
}
479
-
491
+
480
492
MessageManager messageManager = null ;
481
493
synchronized (stateLock ) {
482
494
requestMessage ();
@@ -486,7 +498,7 @@ javax.jms.Message receive(long timeout) throws JMSException {
486
498
messageManager = messageQueue .pollFirst ();
487
499
} else {
488
500
long startTime = System .currentTimeMillis ();
489
-
501
+
490
502
long waitTime = 0 ;
491
503
while (messageQueue .isEmpty () && !isClosed () &&
492
504
(timeout == 0 || (waitTime = getWaitTime (timeout , startTime )) > 0 )) {
@@ -525,7 +537,7 @@ javax.jms.Message receiveNoWait() throws JMSException {
525
537
if (cannotDeliver ()) {
526
538
return null ;
527
539
}
528
-
540
+
529
541
MessageManager messageManager ;
530
542
synchronized (stateLock ) {
531
543
if (messageQueue .isEmpty () && numberOfMessagesToPrefetch == 0 ) {
@@ -572,22 +584,22 @@ void close() {
572
584
messageListener = null ;
573
585
}
574
586
}
575
-
587
+
576
588
/**
577
589
* Helper that notifies PrefetchThread that message is dispatched and AutoAcknowledge
578
590
*/
579
591
private javax .jms .Message messageHandler (MessageManager messageManager ) throws JMSException {
580
592
if (messageManager == null ) {
581
593
return null ;
582
- }
594
+ }
583
595
javax .jms .Message message = messageManager .getMessage ();
584
-
596
+
585
597
// Notify PrefetchThread that message is dispatched
586
598
this .messageDispatched ();
587
599
acknowledger .notifyMessageReceived ((SQSMessage ) message );
588
600
return message ;
589
601
}
590
-
602
+
591
603
private boolean cannotDeliver () throws JMSException {
592
604
if (!running ) {
593
605
return true ;
@@ -602,7 +614,7 @@ private boolean cannotDeliver() throws JMSException {
602
614
}
603
615
return false ;
604
616
}
605
-
617
+
606
618
/**
607
619
* Sleeps for the configured time.
608
620
*/
@@ -613,7 +625,7 @@ protected void sleep(long sleepTimeMillis) throws InterruptedException {
613
625
throw e ;
614
626
}
615
627
}
616
-
628
+
617
629
protected boolean isClosed () {
618
630
return closed ;
619
631
}
@@ -641,7 +653,7 @@ List<SQSMessageIdentifier> purgePrefetchedMessagesWithGroups(Set<String> affecte
641
653
642
654
notifyStateChange ();
643
655
}
644
-
656
+
645
657
return purgedMessages ;
646
658
}
647
659
}
0 commit comments