Skip to content

Commit 3ec6df8

Browse files
committed
Add comment about using request message ID for correlation
HTTP over AMQP 1.0 extension specification, 5.1: To associate a response with a request, the correlation-id value of the response properties MUST be set to the message-id value of the request properties.
1 parent d9e7a88 commit 3ec6df8

File tree

3 files changed

+22
-14
lines changed

3 files changed

+22
-14
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpManagement.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@
4141
import org.slf4j.Logger;
4242
import org.slf4j.LoggerFactory;
4343

44+
/**
45+
* Follow the <a
46+
* href="https://github.com/oasis-tcs/amqp-specs/blob/master/http-over-amqp-v1.0-wd06a.docx">HTTP
47+
* Semantics and Content over AMQP Version 1.0</a> extension specification.
48+
*
49+
* @see <a
50+
* href="https://github.com/oasis-tcs/amqp-specs/blob/master/http-over-amqp-v1.0-wd06a.docx">HTTP
51+
* Semantics and Content over AMQP Version 1.0</a>
52+
*/
4453
class AmqpManagement implements Management {
4554

4655
private static final AtomicLong ID_SEQUENCE = new AtomicLong(0);
@@ -341,8 +350,7 @@ private Response<Map<String, Object>> declare(
341350
checkAvailable();
342351
UUID requestId = messageId();
343352
try {
344-
Message<?> request =
345-
Message.create(body).messageId(requestId).to(target).subject(operation).replyTo(REPLY_TO);
353+
Message<?> request = Message.create(body).to(target).subject(operation);
346354

347355
OutstandingRequest outstandingRequest = this.request(request, requestId);
348356
outstandingRequest.block();
@@ -355,6 +363,10 @@ private Response<Map<String, Object>> declare(
355363
}
356364

357365
OutstandingRequest request(Message<?> request, UUID requestId) throws ClientException {
366+
// HTTP over AMQP 1.0 extension specification, 5.1:
367+
// To associate a response with a request, the correlation-id value of the response properties
368+
// MUST be set to the message-id value of the request properties.
369+
request.messageId(requestId).replyTo(REPLY_TO);
358370
OutstandingRequest outstandingRequest = new OutstandingRequest(this.rpcTimeout);
359371
LOGGER.debug("Enqueueing request {}", requestId);
360372
this.outstandingRequests.put(requestId, outstandingRequest);
@@ -382,12 +394,7 @@ private Map<String, Object> delete(String target, int expectedResponseCode) {
382394
checkAvailable();
383395
UUID requestId = messageId();
384396
try {
385-
Message<?> request =
386-
Message.create((Map<?, ?>) null)
387-
.messageId(requestId)
388-
.to(target)
389-
.subject(DELETE)
390-
.replyTo(REPLY_TO);
397+
Message<?> request = Message.create((Map<?, ?>) null).to(target).subject(DELETE);
391398

392399
OutstandingRequest outstandingRequest = request(request, requestId);
393400
outstandingRequest.block();
@@ -497,12 +504,7 @@ private static Optional<String> matchBinding(
497504
private OutstandingRequest get(String target) throws ClientException {
498505
checkAvailable();
499506
UUID requestId = messageId();
500-
Message<?> request =
501-
Message.create((Map<?, ?>) null)
502-
.messageId(requestId)
503-
.to(target)
504-
.subject(GET)
505-
.replyTo(REPLY_TO);
507+
Message<?> request = Message.create((Map<?, ?>) null).to(target).subject(GET);
506508

507509
OutstandingRequest outstandingRequest = request(request, requestId);
508510
outstandingRequest.block();

src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ class AmqpRpcClient implements RpcClient {
100100
DefaultAddressBuilder<?> addressBuilder = Utils.addressBuilder();
101101
addressBuilder.queue(replyTo);
102102
String replyToAddress = addressBuilder.address();
103+
// HTTP over AMQP 1.0 extension specification, 5.1:
104+
// To associate a response with a request, the correlation-id value of the response properties
105+
// MUST be set to the message-id value of the request properties.
103106
this.requestPostProcessor =
104107
(request, correlationId) -> request.replyTo(replyToAddress).messageId(correlationId);
105108
} else {

src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ public Message message(byte[] body) {
7171
}
7272
};
7373
if (builder.correlationIdExtractor() == null) {
74+
// HTTP over AMQP 1.0 extension specification, 5.1:
75+
// To associate a response with a request, the correlation-id value of the response properties
76+
// MUST be set to the message-id value of the request properties.
7477
this.correlationIdExtractor = Message::messageId;
7578
} else {
7679
this.correlationIdExtractor = builder.correlationIdExtractor();

0 commit comments

Comments
 (0)