Skip to content

Commit 8566c1f

Browse files
committed
'Version 1.0.0 of the Amazon SQS Java Messaging Library'
1 parent cc6a348 commit 8566c1f

30 files changed

+7160
-146
lines changed

src/main/java/com/amazonaws/sqsjms/AmazonSQSClientJMSWrapper.java

Lines changed: 126 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
3636
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
3737
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
38+
import com.amazonaws.services.sqs.model.CreateQueueRequest;
3839
import com.amazonaws.services.sqs.model.GetQueueUrlRequest;
3940
import com.amazonaws.services.sqs.model.GetQueueUrlResult;
4041
import com.amazonaws.services.sqs.model.QueueDoesNotExistException;
@@ -47,14 +48,17 @@
4748
* This is a JMS Wrapper of AmazonSQSClient. This class changes all
4849
* AmazonServiceException and AmazonClientException into
4950
* JMSException/JMSSecurityException.
50-
*
51-
* @author jinsrhee
5251
*/
5352
public class AmazonSQSClientJMSWrapper {
5453
private static final Log LOG = LogFactory.getLog(AmazonSQSClientJMSWrapper.class);
5554

5655
private static final Set<String> SECURITY_EXCEPTION_ERROR_CODES;
5756
static {
57+
/**
58+
* List of exceptions that can classified as security. These exceptions
59+
* are not thrown during connection-set-up rather after the service
60+
* calls of the AmazonSQSClient
61+
*/
5862
SECURITY_EXCEPTION_ERROR_CODES = new HashSet<String>();
5963
SECURITY_EXCEPTION_ERROR_CODES.add("MissingClientTokenId");
6064
SECURITY_EXCEPTION_ERROR_CODES.add("InvalidClientTokenId");
@@ -63,7 +67,13 @@ public class AmazonSQSClientJMSWrapper {
6367
}
6468

6569
private final AmazonSQS amazonSQSClient;
66-
70+
71+
/**
72+
* @param amazonSQSClient
73+
* The AWS SDK Client for SQS.
74+
* @throws JMSException
75+
* if the client is null
76+
*/
6777
public AmazonSQSClientJMSWrapper(AmazonSQS amazonSQSClient) throws JMSException {
6878
if (amazonSQSClient == null) {
6979
throw new JMSException("Amazon SQS client cannot be null");
@@ -75,11 +85,18 @@ public AmazonSQSClientJMSWrapper(AmazonSQS amazonSQSClient) throws JMSException
7585
* If one uses any other AWS SDK operations other than explicitly listed
7686
* here, the exceptions thrown by those operations will not be wrapped as
7787
* JMSExceptions.
88+
* @return amazonSQSClient
7889
*/
7990
public AmazonSQS getAmazonSQSClient() {
8091
return amazonSQSClient;
8192
}
8293

94+
/**
95+
* @param endpoint
96+
* The endpoint (ex: "sqs.us-east-1.amazonaws.com") of the region
97+
* specific AWS endpoint this client will communicate with.
98+
* @throws JMSException
99+
*/
83100
public void setEndpoint(String endpoint) throws JMSException {
84101
try {
85102
amazonSQSClient.setEndpoint(endpoint);
@@ -89,6 +106,13 @@ public void setEndpoint(String endpoint) throws JMSException {
89106
}
90107
}
91108

109+
/**
110+
* @param region
111+
* The region this client will communicate with. See
112+
* {@link Region#getRegion(com.amazonaws.regions.Regions)} for
113+
* accessing a given region.
114+
* @throws JMSException
115+
*/
92116
public void setRegion(Region region) throws JMSException {
93117
try {
94118
amazonSQSClient.setRegion(region);
@@ -98,22 +122,41 @@ public void setRegion(Region region) throws JMSException {
98122
}
99123
}
100124

125+
/**
126+
* @param deleteMessageRequest
127+
* Container for the necessary parameters to execute the
128+
* deleteMessage service method on AmazonSQS.
129+
* @throws JMSException
130+
*/
101131
public void deleteMessage(DeleteMessageRequest deleteMessageRequest) throws JMSException {
102132
try {
103133
amazonSQSClient.deleteMessage(deleteMessageRequest);
104134
} catch (AmazonClientException e) {
105135
throw handleException(e, "deleteMessage");
106136
}
107137
}
108-
138+
139+
/**
140+
* @param deleteMessageBatchRequest
141+
* Container for the necessary parameters to execute the
142+
* deleteMessageBatch service method on AmazonSQS. This is the
143+
* batch version of deleteMessage. Max batch size is 10.
144+
* @throws JMSException
145+
*/
109146
public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest deleteMessageBatchRequest) throws JMSException {
110147
try {
111148
return amazonSQSClient.deleteMessageBatch(deleteMessageBatchRequest);
112149
} catch (AmazonClientException e) {
113150
throw handleException(e, "deleteMessageBatch");
114151
}
115152
}
116-
153+
154+
/**
155+
* @param sendMessageRequest
156+
* Container for the necessary parameters to execute the
157+
* sendMessage service method on AmazonSQS.
158+
* @throws JMSException
159+
*/
117160
public SendMessageResult sendMessage(SendMessageRequest sendMessageRequest) throws JMSException {
118161
try {
119162
return amazonSQSClient.sendMessage(sendMessageRequest);
@@ -141,11 +184,25 @@ public boolean queueExists(String queueName) throws JMSException {
141184
throw handleException(e, "getQueueUrl");
142185
}
143186
}
144-
187+
188+
/**
189+
* @param queueName
190+
* @return The response from the GetQueueUrl service method, as returned by
191+
* AmazonSQS, which will include queue`s URL
192+
* @throws JMSException
193+
*/
145194
public GetQueueUrlResult getQueueUrl(String queueName) throws JMSException {
146195
return getQueueUrl(new GetQueueUrlRequest(queueName));
147196
}
148-
197+
198+
/**
199+
* @param getQueueUrlRequest
200+
* Container for the necessary parameters to execute the
201+
* getQueueUrl service method on AmazonSQS.
202+
* @return The response from the GetQueueUrl service method, as returned by
203+
* AmazonSQS, which will include queue`s URL
204+
* @throws JMSException
205+
*/
149206
public GetQueueUrlResult getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) throws JMSException {
150207
try {
151208
return amazonSQSClient.getQueueUrl(getQueueUrlRequest);
@@ -154,30 +211,80 @@ public GetQueueUrlResult getQueueUrl(GetQueueUrlRequest getQueueUrlRequest) thro
154211
}
155212
}
156213

214+
/**
215+
* This function creates the queue with the default queue attributes.
216+
*
217+
* @param queueName
218+
* @return The response from the createQueue service method, as returned by
219+
* AmazonSQS. This call creates a new queue, or returns the URL of
220+
* an existing one.
221+
* @throws JMSException
222+
*/
157223
public CreateQueueResult createQueue(String queueName) throws JMSException {
158224
try {
159225
return amazonSQSClient.createQueue(queueName);
160226
} catch (AmazonClientException e) {
161227
throw handleException(e, "createQueue");
162228
}
163229
}
164-
165-
public void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws JMSException {
230+
231+
/**
232+
* @param createQueueRequest
233+
* Container for the necessary parameters to execute the
234+
* createQueue service method on AmazonSQS.
235+
* @return The response from the createQueue service method, as returned by
236+
* AmazonSQS. This call creates a new queue, or returns the URL of
237+
* an existing one.
238+
* @throws JMSException
239+
*/
240+
public CreateQueueResult createQueue(CreateQueueRequest createQueueRequest) throws JMSException {
166241
try {
167-
amazonSQSClient.changeMessageVisibility(changeMessageVisibilityRequest);
242+
return amazonSQSClient.createQueue(createQueueRequest);
168243
} catch (AmazonClientException e) {
169-
throw handleException(e, "changeMessageVisibility");
244+
throw handleException(e, "createQueue");
170245
}
171246
}
172247

248+
/**
249+
* @param receiveMessageRequest
250+
* Container for the necessary parameters to execute the
251+
* receiveMessage service method on AmazonSQS.
252+
* @return The response from the ReceiveMessage service method, as returned
253+
* by AmazonSQS.
254+
* @throws JMSException
255+
*/
173256
public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageRequest) throws JMSException {
174257
try {
175258
return amazonSQSClient.receiveMessage(receiveMessageRequest);
176259
} catch (AmazonClientException e) {
177260
throw handleException(e, "receiveMessage");
178261
}
179262
}
180-
263+
264+
/**
265+
* @param changeMessageVisibilityRequest
266+
* Container for the necessary parameters to execute the
267+
* changeMessageVisibility service method on AmazonSQS.
268+
* @return The response from the changeMessageVisibility service method, as
269+
* returned by AmazonSQS.
270+
* @throws JMSException
271+
*/
272+
public void changeMessageVisibility(ChangeMessageVisibilityRequest changeMessageVisibilityRequest) throws JMSException {
273+
try {
274+
amazonSQSClient.changeMessageVisibility(changeMessageVisibilityRequest);
275+
} catch (AmazonClientException e) {
276+
throw handleException(e, "changeMessageVisibility");
277+
}
278+
}
279+
280+
/**
281+
* @param changeMessageVisibilityBatchRequest
282+
* Container for the necessary parameters to execute the
283+
* changeMessageVisibilityBatch service method on AmazonSQS.
284+
* @return The response from the changeMessageVisibilityBatch service
285+
* method, as returned by AmazonSQS.
286+
* @throws JMSException
287+
*/
181288
public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest)
182289
throws JMSException {
183290
try {
@@ -188,19 +295,20 @@ public ChangeMessageVisibilityBatchResult changeMessageVisibilityBatch(ChangeMes
188295
}
189296

190297
/**
191-
* Create generic error message for AmazonServiceException.
192-
* Message include ActionCall, RequestId, HTTPStatusCode, AmazonErrorCode.
298+
* Create generic error message for AmazonServiceException. Message include
299+
* ActionCall, RequestId, HTTPStatusCode, AmazonErrorCode.
193300
*/
194301
private String logAndGetAmazonServiceException(AmazonServiceException ase, String action) {
195-
String errorMessage = "AmazonServiceException: " + action + ". RequestId: " + ase.getRequestId() + "\nHTTPStatusCode: "
196-
+ ase.getStatusCode() + " AmazonErrorCode: " + ase.getErrorCode();
302+
String errorMessage = "AmazonServiceException: " + action + ". RequestId: " + ase.getRequestId() +
303+
"\nHTTPStatusCode: " + ase.getStatusCode() + " AmazonErrorCode: " +
304+
ase.getErrorCode();
197305
LOG.error(errorMessage, ase);
198306
return errorMessage;
199307
}
200308

201309
/**
202-
* Create generic error message for AmazonClientException.
203-
* Message include ActionCall.
310+
* Create generic error message for AmazonClientException. Message include
311+
* ActionCall.
204312
*/
205313
private String logAndGetAmazonClientException(AmazonClientException ace, String action) {
206314
String errorMessage = "AmazonClientException: " + action + ".";

src/main/java/com/amazonaws/sqsjms/BulkSQSOperation.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,19 @@
2424

2525
import com.amazonaws.sqsjms.acknowledge.SQSMessageIdentifier;
2626

27+
/**
28+
* This is used by different acknowledgers that requires partitioning of the
29+
* list, and execute actions on the partitions
30+
*/
2731
public abstract class BulkSQSOperation {
32+
2833
/**
29-
* Bulk action on List of messageIdentifier. Up to the indexOfMessage.
34+
* Bulk action on list of message identifiers up to the indexOfMessage.
3035
*/
3136
public void bulkAction(List<SQSMessageIdentifier> messageIdentifierList, int indexOfMessage)
3237
throws JMSException {
3338
Map<String, List<String>> receiptHandleWithSameQueueUrl = new HashMap<String, List<String>>();
34-
39+
3540
// Add all messages up to and including requested message into Map.
3641
// Map contains key as queueUrl and value as list receiptHandles from
3742
// that queueUrl.

src/main/java/com/amazonaws/sqsjms/PrefetchManager.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
package com.amazonaws.sqsjms;
1616

1717
/**
18-
* This interface is helper to notify when the prefetch should be stopped.
18+
* This interface is helper to notify when the prefetch should be resumed.
1919
*/
2020
public interface PrefetchManager {
2121

2222
/**
23-
* Notify the prefetchThread that the message is dispatched from messageQueue when user calls for receive.
23+
* Notify the prefetchThread that the message is dispatched from
24+
* messageQueue when user calls for receive or message listener onMessage is
25+
* called.
2426
*/
2527
public void messageDispatched();
26-
}
28+
}

src/main/java/com/amazonaws/sqsjms/SQSBytesMessage.java

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.DataOutputStream;
2121
import java.io.EOFException;
2222
import java.io.IOException;
23+
import java.util.Arrays;
2324

2425
import javax.jms.BytesMessage;
2526
import javax.jms.JMSException;
@@ -53,8 +54,8 @@ public class SQSBytesMessage extends SQSMessage implements BytesMessage {
5354
SQSBytesMessage(Acknowledger acknowledger, String queueUrl, Message sqsMessage) throws JMSException {
5455
super(acknowledger, queueUrl, sqsMessage);
5556
try {
56-
bytes = Base64.decode(sqsMessage.getBody());
57-
dataOut.write(bytes);
57+
/** Bytes is set by the reset() */
58+
dataOut.write(Base64.decode(sqsMessage.getBody()));
5859
reset();
5960
} catch (IOException e) {
6061
LOG.error("IOException: Message cannot be written", e);
@@ -77,7 +78,7 @@ public long getBodyLength() throws JMSException {
7778
checkCanRead();
7879
return bytes.length;
7980
}
80-
81+
8182
@Override
8283
public boolean readBoolean() throws JMSException {
8384
checkCanRead();
@@ -218,7 +219,8 @@ public int readBytes(byte[] value) throws JMSException {
218219
@Override
219220
public int readBytes(byte[] value, int length) throws JMSException {
220221
if (length < 0) {
221-
throw new IndexOutOfBoundsException("Length bytes to read can't be smaller than 0 but was " + length);
222+
throw new IndexOutOfBoundsException("Length bytes to read can't be smaller than 0 but was " +
223+
length);
222224
}
223225
checkCanRead();
224226
try {
@@ -385,24 +387,21 @@ public void writeObject(Object value) throws JMSException {
385387
} else if (value instanceof byte[]) {
386388
writeBytes((byte[]) value);
387389
} else {
388-
throw new MessageFormatException("Cannot write non-primitive type:" + value.getClass());
390+
throw new MessageFormatException("Cannot write non-primitive type: " + value.getClass());
389391
}
390392
}
391393

392394
/**
393-
* Puts the message body in read-only mode and repositions the stream of bytes to the beginning.
395+
* Puts the message body in read-only mode and repositions the stream of
396+
* bytes to the beginning.
394397
*/
395398
@Override
396399
public void reset() throws JMSException {
397-
try {
398-
if (dataOut != null) {
399-
dataOut.flush();
400-
bytes = bytesOut.toByteArray();
401-
dataOut = null;
402-
bytesOut = null;
403-
}
404-
} catch (IOException e) {
405-
throw convertExceptionToJMSException(e);
400+
401+
if (dataOut != null) {
402+
bytes = bytesOut.toByteArray();
403+
dataOut = null;
404+
bytesOut = null;
406405
}
407406
dataIn = new DataInputStream(new ByteArrayInputStream(bytes));
408407
}
@@ -423,7 +422,20 @@ public void clearBody() throws JMSException {
423422
dataOut = new DataOutputStream(bytesOut);
424423
setBodyWritePermissions(true);
425424
}
426-
425+
426+
/**
427+
* Reads the body of message, which can be either the body returned from the
428+
* the receives message as bytes or the bytes put in bytesOut if it is a
429+
* sent message.
430+
*/
431+
byte[] getBodyAsBytes() throws JMSException {
432+
if (bytes != null) {
433+
return Arrays.copyOf(bytes, bytes.length);
434+
} else {
435+
return bytesOut.toByteArray();
436+
}
437+
}
438+
427439
private void checkCanRead() throws JMSException {
428440
if (bytes == null) {
429441
throw new MessageNotReadableException("Message is not readable");

0 commit comments

Comments
 (0)