Skip to content

Commit 17f0aa1

Browse files
committed
'Version 1.0.2 of the Amazon SQS Java Messaging Library'
1 parent 20b280e commit 17f0aa1

File tree

53 files changed

+1588
-255
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+1588
-255
lines changed

NOTICE.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Amazon SQS Java Messaging Library
2-
Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
33

44
This product includes software developed by
55
Amazon Technologies, Inc (http://www.amazon.com/).

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ for communicating with Amazon Simple Queue Service. This project builds on top o
1616
<dependency>
1717
<groupId>com.amazonaws</groupId>
1818
<artifactId>amazon-sqs-java-messaging-lib</artifactId>
19-
<version>1.0.1</version>
19+
<version>1.0.2</version>
2020
<type>jar</type>
2121
</dependency>
2222
```

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
<groupId>com.amazonaws</groupId>
88
<artifactId>amazon-sqs-java-messaging-lib</artifactId>
9-
<version>1.0.1</version>
9+
<version>1.0.2</version>
1010
<packaging>jar</packaging>
1111
<name>Amazon SQS Java Messaging Library</name>
1212
<description>The Amazon SQS Java Messaging Library holds the Java Message Service compatible classes, that are used
@@ -38,7 +38,7 @@
3838
</developers>
3939

4040
<properties>
41-
<aws-java-sdk.version>1.11.18</aws-java-sdk.version>
41+
<aws-java-sdk.version>1.11.106</aws-java-sdk.version>
4242
</properties>
4343

4444
<dependencies>

src/main/java/com/amazon/sqs/javamessaging/AmazonSQSMessagingClientWrapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
* Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License").
55
* You may not use this file except in compliance with the License.

src/main/java/com/amazon/sqs/javamessaging/PrefetchManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
* Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License").
55
* You may not use this file except in compliance with the License.

src/main/java/com/amazon/sqs/javamessaging/SQSConnection.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
* Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License").
55
* You may not use this file except in compliance with the License.

src/main/java/com/amazon/sqs/javamessaging/SQSConnectionFactory.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
* Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License").
55
* You may not use this file except in compliance with the License.
@@ -131,8 +131,7 @@ public Builder(String region) {
131131

132132
public Builder() {
133133
clientConfiguration = new ClientConfiguration();
134-
clientConfiguration.setUserAgent(
135-
clientConfiguration.getUserAgent() + SQSMessagingClientConstants.APPENDED_USER_AGENT_HEADER_VERSION );
134+
clientConfiguration.setUserAgentPrefix(clientConfiguration.getUserAgentPrefix() + SQSMessagingClientConstants.APPENDED_USER_AGENT_HEADER_VERSION );
136135

137136
// Set default numberOfMessagesToPrefetch to MIN_BATCH.
138137
this.numberOfMessagesToPrefetch = SQSMessagingClientConstants.MIN_BATCH;
@@ -220,11 +219,11 @@ public ClientConfiguration getClientConfiguration() {
220219

221220
public void setClientConfiguration(ClientConfiguration clientConfig) {
222221
clientConfiguration = new ClientConfiguration( clientConfig );
223-
if( clientConfig.getUserAgent() == null || clientConfig.getUserAgent().isEmpty() ) {
224-
clientConfig.setUserAgent( ClientConfiguration.DEFAULT_USER_AGENT );
222+
if( clientConfig.getUserAgentPrefix() == null || clientConfig.getUserAgentPrefix().isEmpty() ) {
223+
clientConfig.setUserAgentPrefix( ClientConfiguration.DEFAULT_USER_AGENT );
225224
}
226-
clientConfiguration.setUserAgent(
227-
clientConfig.getUserAgent() + SQSMessagingClientConstants.APPENDED_USER_AGENT_HEADER_VERSION );
225+
clientConfiguration.setUserAgentPrefix(
226+
clientConfig.getUserAgentPrefix() + SQSMessagingClientConstants.APPENDED_USER_AGENT_HEADER_VERSION );
228227
}
229228

230229
public int getNumberOfMessagesToPrefetch() {

src/main/java/com/amazon/sqs/javamessaging/SQSConnectionMetaData.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
* Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License").
55
* You may not use this file except in compliance with the License.

src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumer.java

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
* Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License").
55
* You may not use this file except in compliance with the License.
@@ -15,6 +15,7 @@
1515
package com.amazon.sqs.javamessaging;
1616

1717
import java.util.List;
18+
import java.util.Set;
1819
import java.util.concurrent.ExecutorService;
1920
import java.util.concurrent.Executors;
2021
import java.util.concurrent.ThreadFactory;
@@ -31,7 +32,6 @@
3132
import org.apache.commons.logging.Log;
3233
import org.apache.commons.logging.LogFactory;
3334

34-
3535
import com.amazon.sqs.javamessaging.acknowledge.Acknowledger;
3636
import com.amazon.sqs.javamessaging.acknowledge.NegativeAcknowledger;
3737
import com.amazon.sqs.javamessaging.acknowledge.SQSMessageIdentifier;
@@ -57,11 +57,8 @@ public class SQSMessageConsumer implements MessageConsumer, QueueReceiver {
5757
protected volatile boolean closed = false;
5858

5959
private final SQSQueueDestination sqsDestination;
60-
private final Acknowledger acknowledger;
6160
private final SQSSession parentSQSSession;
6261

63-
private final NegativeAcknowledger negativeAcknowledger;
64-
6562
private final SQSSessionCallbackScheduler sqsSessionRunnable;
6663

6764
/**
@@ -92,11 +89,9 @@ public class SQSMessageConsumer implements MessageConsumer, QueueReceiver {
9289
SQSMessageConsumerPrefetch sqsMessageConsumerPrefetch) {
9390
this.parentSQSSession = parentSQSSession;
9491
this.sqsDestination = destination;
95-
this.acknowledger = acknowledger;
9692
this.sqsSessionRunnable = sqsSessionRunnable;
9793
this.sqsMessageConsumerPrefetch = sqsMessageConsumerPrefetch;
9894
this.sqsMessageConsumerPrefetch.setMessageConsumer(this);
99-
this.negativeAcknowledger = negativeAcknowledger;
10095

10196
prefetchExecutor = Executors.newSingleThreadExecutor(threadFactory);
10297
prefetchExecutor.execute(sqsMessageConsumerPrefetch);
@@ -249,21 +244,6 @@ void doClose() {
249244
closed = true;
250245
}
251246

252-
253-
/**
254-
* Set the message visibility as zero for the list of messages which are not
255-
* acknowledged and delete them from the list of unacknowledged messages.
256-
*/
257-
void recover() throws JMSException {
258-
List<SQSMessageIdentifier> unAckedMessages = acknowledger.getUnAckMessages();
259-
if (!unAckedMessages.isEmpty()) {
260-
negativeAcknowledger.bulkAction(unAckedMessages, unAckedMessages.size());
261-
acknowledger.forgetUnAckMessages();
262-
}
263-
264-
265-
}
266-
267247
boolean isClosed() {
268248
return closed;
269249
}
@@ -293,4 +273,8 @@ private void checkClosed() throws IllegalStateException {
293273
throw new IllegalStateException("Consumer is closed");
294274
}
295275
}
276+
277+
List<SQSMessageIdentifier> purgePrefetchedMessagesWithGroups(Set<String> affectedGroups) throws JMSException {
278+
return sqsMessageConsumerPrefetch.purgePrefetchedMessagesWithGroups(affectedGroups);
279+
}
296280
}

src/main/java/com/amazon/sqs/javamessaging/SQSMessageConsumerPrefetch.java

Lines changed: 55 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
* Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License").
55
* You may not use this file except in compliance with the License.
@@ -16,17 +16,20 @@
1616

1717
import java.util.ArrayDeque;
1818
import java.util.ArrayList;
19+
import java.util.Iterator;
1920
import java.util.List;
21+
import java.util.Set;
22+
import java.util.UUID;
2023

2124
import javax.jms.JMSException;
2225
import javax.jms.MessageListener;
2326

24-
2527
import org.apache.commons.logging.Log;
2628
import org.apache.commons.logging.LogFactory;
2729

2830
import com.amazon.sqs.javamessaging.acknowledge.Acknowledger;
2931
import com.amazon.sqs.javamessaging.acknowledge.NegativeAcknowledger;
32+
import com.amazon.sqs.javamessaging.acknowledge.SQSMessageIdentifier;
3033
import com.amazon.sqs.javamessaging.message.SQSBytesMessage;
3134
import com.amazon.sqs.javamessaging.message.SQSMessage;
3235
import com.amazon.sqs.javamessaging.message.SQSObjectMessage;
@@ -156,9 +159,10 @@ protected void setMessageListener(MessageListener messageListener) {
156159
if (!running || isClosed()) {
157160
return;
158161
}
159-
while (!messageQueue.isEmpty()) {
160-
sqsSessionRunnable.scheduleCallBack(messageListener, messageQueue.pollFirst());
161-
}
162+
163+
List<MessageManager> allPrefetchedMessages = new ArrayList<MessageManager>(messageQueue);
164+
sqsSessionRunnable.scheduleCallBacks(messageListener, allPrefetchedMessages);
165+
messageQueue.clear();
162166
}
163167
}
164168

@@ -203,7 +207,7 @@ public void run() {
203207
} catch (Throwable e) {
204208
LOG.error("Unexpected exception when prefetch messages:", e);
205209
nackQueueMessages = true;
206-
throw e;
210+
throw new RuntimeException(e);
207211
} finally {
208212
if (isClosed() || nackQueueMessages) {
209213
nackQueueMessages();
@@ -225,6 +229,11 @@ protected List<Message> getMessages(int prefetchBatchSize) throws InterruptedExc
225229
.withAttributeNames(ALL)
226230
.withMessageAttributeNames(ALL)
227231
.withWaitTimeSeconds(WAIT_TIME_SECONDS);
232+
//if the receive request is for FIFO queue, provide a unique receive request attempt it, so that
233+
//failed calls retried by SDK will claim the same messages
234+
if (sqsDestination.isFifo()) {
235+
receiveMessageRequest.withReceiveRequestAttemptId(UUID.randomUUID().toString());
236+
}
228237
List<Message> messages = null;
229238
try {
230239
ReceiveMessageResult receivedMessageResult = amazonSQSClient.receiveMessage(receiveMessageRequest);
@@ -250,27 +259,26 @@ protected List<Message> getMessages(int prefetchBatchSize) throws InterruptedExc
250259
*/
251260
protected void processReceivedMessages(List<Message> messages) {
252261
List<String> nackMessages = new ArrayList<String>();
262+
List<MessageManager> messageManagers = new ArrayList<MessageManager>();
253263
for (Message message : messages) {
254264
try {
255265
javax.jms.Message jmsMessage = convertToJMSMessage(message);
256-
257-
if (messageListener != null) {
258-
sqsSessionRunnable.scheduleCallBack(messageListener, new MessageManager(this, jmsMessage));
259-
synchronized (stateLock) {
260-
messagesPrefetched++;
261-
notifyStateChange();
262-
}
263-
} else {
264-
synchronized (stateLock) {
265-
messageQueue.addLast(new MessageManager(this, jmsMessage));
266-
messagesPrefetched++;
267-
notifyStateChange();
268-
}
269-
}
266+
messageManagers.add(new MessageManager(this, jmsMessage));
270267
} catch (JMSException e) {
271268
nackMessages.add(message.getReceiptHandle());
272269
}
273270
}
271+
272+
synchronized (stateLock) {
273+
if (messageListener != null) {
274+
sqsSessionRunnable.scheduleCallBacks(messageListener, messageManagers);
275+
} else {
276+
messageQueue.addAll(messageManagers);
277+
}
278+
279+
messagesPrefetched += messageManagers.size();
280+
notifyStateChange();
281+
}
274282

275283
// Nack any messages that cannot be serialized to JMSMessage.
276284
try {
@@ -516,4 +524,31 @@ protected void sleep(long sleepTimeMillis) throws InterruptedException {
516524
protected boolean isClosed() {
517525
return closed;
518526
}
527+
528+
List<SQSMessageIdentifier> purgePrefetchedMessagesWithGroups(Set<String> affectedGroups) throws JMSException {
529+
List<SQSMessageIdentifier> purgedMessages = new ArrayList<SQSMessageIdentifier>();
530+
synchronized (stateLock) {
531+
//let's walk over the prefetched messages
532+
Iterator<MessageManager> managerIterator = messageQueue.iterator();
533+
while (managerIterator.hasNext()) {
534+
MessageManager messageManager = managerIterator.next();
535+
SQSMessage prefetchedMessage = (SQSMessage)messageManager.getMessage();
536+
SQSMessageIdentifier messageIdentifier = SQSMessageIdentifier.fromSQSMessage(prefetchedMessage);
537+
538+
//is the prefetch entry for one of the affected group ids?
539+
if (affectedGroups.contains(messageIdentifier.getGroupId())) {
540+
//we will purge this prefetched message
541+
purgedMessages.add(messageIdentifier);
542+
//remove from prefetch queue
543+
managerIterator.remove();
544+
//we are done with it and can prefetch more messages
545+
this.messagesPrefetched--;
546+
}
547+
}
548+
549+
notifyStateChange();
550+
}
551+
552+
return purgedMessages;
553+
}
519554
}

0 commit comments

Comments
 (0)