Skip to content

Commit d855c84

Browse files
author
Patel, Mihir
committed
'Version 1.0.0 of the Amazon SQS Java Messaging Library'
1 parent 8566c1f commit d855c84

12 files changed

+653
-733
lines changed

NOTICE.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ Amazon Technologies, Inc (http://www.amazon.com/).
88
THIRD PARTY COMPONENTS
99
**********************
1010
This software includes third party software subject to the following copyrights:
11-
- Apache ActiveMQ (see: org.apache.activemq.util.TypeConversionSupport), Copyright 2005-2013 Apache Software Foundation.
11+
- Apache ActiveMQ (see: org.apache.activemq.util.TypeConversionSupport, org.apache.activemq.command.ActiveMQStreamMessage). Copyright 2005-2013 Apache Software Foundation
1212

13-
The licenses for these third party components are included in LICENSE.txt
13+
The licenses for these third party components are included in LICENSE.txt

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Amazon SQS Java Messaging Library
22
========================================
33
The **Amazon SQS Java Messaging Library** holds the Java Message Service compatible classes, that are used
4-
for communicating with Amazon Simple Queue Service. This project builds on top of the AWS SDK for Java to use Amazon SQS as the JMS (as defined in 1.1 specification) provider for the messaging applications without running any additional software.
4+
for communicating with Amazon Simple Queue Service. This project builds on top of the AWS SDK for Java to use Amazon SQS as the JMS (as defined in 1.1 specification) provider for the messaging applications without running any additional software.
55

66
* You can download release builds through the [releases section of this](https://github.com/awslabs/amazon-sqs-java-messaging-lib) project.
77
* For more information on using the amazon-sqs-java-messaging-lib, see our getting started guide to SQS [here](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSGettingStartedGuide/Welcome.html).
@@ -11,7 +11,7 @@ for communicating with Amazon Simple Queue Service. This project builds on top o
1111
* **Sign up for AWS** — Before you begin, you need an AWS account. For more information about creating an AWS account and retrieving your AWS credentials, see [AWS Account and Credentials](http://docs.aws.amazon.com/AWSSdkDocsJava/latest/DeveloperGuide/java-dg-setup.html) in the AWS SDK for Java Developer Guide.
1212
* **Sign up for Amazon SQS** — Go to the Amazon [SQS console](https://console.aws.amazon.com/sqs/home?region=us-east-1) to sign up for the service.
1313
* **Minimum requirements** — To use the sample application, you'll need Java 1.7+ and [Maven 3](http://maven.apache.org/). For more information about the requirements, see the [Getting Started](http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSGettingStartedGuide/Welcome.html) section of the Amazon SQS Developer Guide.
14-
* **Download** — Download the [latest preview release](https://github.com/awslabs/amazon-sqs-java-messaging-lib/releases) or pick it up from Maven:
14+
* **Download** — Download the [latest preview release or](https://github.com/awslabs/amazon-sqs-java-messaging-lib/releases) pick it up from Maven:
1515
```xml
1616
<dependency>
1717
<groupId>com.amazonaws</groupId>
@@ -25,3 +25,4 @@ for communicating with Amazon Simple Queue Service. This project builds on top o
2525
##Feedback
2626
* Give us feedback [here](https://github.com/awslabs/amazon-sqs-java-messaging-lib/issues).
2727
* If you'd like to contribute a new feature or bug fix, we'd love to see Github pull requests from you.
28+

src/main/java/com/amazonaws/sqsjms/PrefetchManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,6 @@ public interface PrefetchManager {
2525
* called.
2626
*/
2727
public void messageDispatched();
28+
29+
public SQSMessageConsumer getMessageConsumer();
2830
}

src/main/java/com/amazonaws/sqsjms/SQSConnection.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,11 +194,13 @@ public void start() throws JMSException {
194194
@Override
195195
public void stop() throws JMSException {
196196
checkClosed();
197-
actionOnConnectionTaken = true;
197+
198198
if (!running) {
199199
return;
200200
}
201-
201+
202+
actionOnConnectionTaken = true;
203+
202204
if (SQSSession.SESSION_THREAD_FACTORY.wasThreadCreatedWithThisThreadGroup(Thread.currentThread())) {
203205
throw new IllegalStateException(
204206
"MessageListener must not attempt to stop its own Connection to prevent potential deadlock issues");

src/main/java/com/amazonaws/sqsjms/SQSMessageConsumer.java

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,6 @@ public class SQSMessageConsumer implements MessageConsumer, QueueReceiver {
6060
*/
6161
private final SQSMessageConsumerPrefetch sqsMessageConsumerPrefetch;
6262

63-
/** Used to synchronize on calls from message listener on its own consumer */
64-
private final Object callBackSynchronizer;
65-
6663
SQSMessageConsumer(SQSConnection parentSQSConnection, SQSSession parentSQSSession,
6764
SQSSessionCallbackScheduler sqsSessionRunnable, SQSDestination destination,
6865
Acknowledger acknowledger, NegativeAcknowledger negativeAcknowledger, ThreadFactory threadFactory) {
@@ -84,12 +81,11 @@ public class SQSMessageConsumer implements MessageConsumer, QueueReceiver {
8481
this.acknowledger = acknowledger;
8582
this.sqsSessionRunnable = sqsSessionRunnable;
8683
this.sqsMessageConsumerPrefetch = sqsMessageConsumerPrefetch;
84+
this.sqsMessageConsumerPrefetch.setMessageConsumer(this);
8785
this.negativeAcknowledger = negativeAcknowledger;
8886

8987
prefetchExecutor = Executors.newSingleThreadExecutor(threadFactory);
9088
prefetchExecutor.execute(sqsMessageConsumerPrefetch);
91-
92-
callBackSynchronizer = sqsSessionRunnable.getSynchronizer();
9389
}
9490

9591

@@ -139,15 +135,13 @@ public void close() throws JMSException {
139135
if (closed) {
140136
return;
141137
}
142-
synchronized (callBackSynchronizer) {
143-
if (Thread.currentThread() == sqsSessionRunnable.getCurrentThread()) {
144-
sqsSessionRunnable.setConsumerCloseAfterCallback(this);
145-
return;
146-
}
147-
148-
doClose();
149-
callBackSynchronizer.notifyAll();
138+
139+
if (parentSQSSession.isActiveCallbackSessionThread()) {
140+
sqsSessionRunnable.setConsumerCloseAfterCallback(this);
141+
return;
150142
}
143+
144+
doClose();
151145
}
152146

153147
void doClose() {
@@ -160,21 +154,24 @@ void doClose() {
160154
parentSQSSession.removeConsumer(this);
161155

162156
try {
163-
LOG.info("Shutting down " + SQSSession.CONSUMER_PREFETCH_EXECUTER_NAME + " executor");
164-
165-
/** Shut down executor. */
166-
prefetchExecutor.shutdown();
157+
if (!prefetchExecutor.isShutdown()) {
158+
LOG.info("Shutting down " + SQSSession.CONSUMER_PREFETCH_EXECUTER_NAME + " executor");
159+
/** Shut down executor. */
160+
prefetchExecutor.shutdown();
161+
}
162+
163+
parentSQSSession.waitForConsumerCallbackToComplete(this);
167164

168165
if (!prefetchExecutor.awaitTermination(PREFETCH_EXECUTOR_GRACEFUL_SHUTDOWN_TIME, TimeUnit.SECONDS)) {
169166

170-
LOG.warn("Can't terminate executor service " +
171-
SQSSession.CONSUMER_PREFETCH_EXECUTER_NAME + " after " + 60 +
172-
" seconds, some running threads will be shutdown immediately");
167+
LOG.warn("Can't terminate executor service " + SQSSession.CONSUMER_PREFETCH_EXECUTER_NAME +
168+
" after " + 60 + " seconds, some running threads will be shutdown immediately");
173169
prefetchExecutor.shutdownNow();
174170
}
175171
} catch (InterruptedException e) {
176172
LOG.error("Interrupted while closing the consumer.", e);
177173
}
174+
178175
closed = true;
179176
}
180177

@@ -196,22 +193,16 @@ public String getMessageSelector() throws JMSException {
196193
throw new JMSException(SQSJMSClientConstants.UNSUPPORTED_METHOD);
197194
}
198195

199-
/** This call blocks until message listener in progress have completed. */
200-
protected void stop() {
196+
/** This stops the prefetching */
197+
protected void stopPrefetch() {
201198
if (!closed) {
202-
synchronized (callBackSynchronizer) {
203-
sqsMessageConsumerPrefetch.stop();
204-
callBackSynchronizer.notifyAll();
205-
}
199+
sqsMessageConsumerPrefetch.stop();
206200
}
207201
}
208202

209-
protected void start() {
203+
protected void startPrefetch() {
210204
if (!closed) {
211-
synchronized (callBackSynchronizer) {
212-
sqsMessageConsumerPrefetch.start();
213-
callBackSynchronizer.notifyAll();
214-
}
205+
sqsMessageConsumerPrefetch.start();
215206
}
216207
}
217208

src/main/java/com/amazonaws/sqsjms/SQSMessageConsumerPrefetch.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
5555
private final Acknowledger acknowledger;
5656
private final NegativeAcknowledger negativeAcknowledger;
5757
private volatile MessageListener messageListener;
58+
59+
private SQSMessageConsumer messageConsumer;
5860

5961
private final SQSSessionCallbackScheduler sqsSessionRunnable;
6062

@@ -100,6 +102,15 @@ public class SQSMessageConsumerPrefetch implements Runnable, PrefetchManager {
100102
MessageListener getMessageListener() {
101103
return messageListener;
102104
}
105+
106+
void setMessageConsumer(SQSMessageConsumer messageConsumer) {
107+
this.messageConsumer = messageConsumer;
108+
}
109+
110+
@Override
111+
public SQSMessageConsumer getMessageConsumer() {
112+
return messageConsumer;
113+
}
103114

104115
protected void setMessageListener(MessageListener messageListener) {
105116
this.messageListener = messageListener;

0 commit comments

Comments
 (0)