Skip to content

Commit b6b61c4

Browse files
committed
Improve RabbitAmqpUtils.toAmqpMessage()
The `RabbitAmqpUtils.toAmqpMessage()` utility is used on the publisher side, so, it is natural to treat such a message as a reply. Therefore, the `correlationId` is set to `messageId` of `correlationId` is not present. The `replyTo` of the Spring message is set into `to` of the AMQP message * Mention in the `Address` JavaDocs that just `routingKey` can be treated differently by clients * Fix error message in the `RabbitAmqpMessageListenerAdapter`
1 parent 355be1e commit b6b61c4

File tree

3 files changed

+21
-11
lines changed

3 files changed

+21
-11
lines changed

spring-amqp/src/main/java/org/springframework/amqp/core/Address.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import org.springframework.util.StringUtils;
2424

2525
/**
26-
* Represents an address for publication of an AMQP message. The AMQP 0-8 and 0-9
27-
* specifications have an unstructured string that is used as a "reply to" address.
26+
* Represents an address for publication of an AMQP message. The AMQP 0.9
27+
* specification has an unstructured string that is used as a "reply to" address.
2828
* There are however conventions in use and this class makes it easier to
2929
* follow these conventions, which can be easily summarised as:
3030
*
@@ -33,7 +33,10 @@
3333
* </pre>
3434
*
3535
* Here we also the exchange name to default to empty
36-
* (so just a routing key will work if you know the queue name).
36+
* (so just a routing key will work as a queue name).
37+
* <p>
38+
* For AMQP 1.0, only routing key is treated as target destination.
39+
*
3740
*
3841
* @author Mark Pollack
3942
* @author Mark Fisher
@@ -58,11 +61,11 @@ public class Address {
5861

5962
/**
6063
* Create an Address instance from a structured String with the form
61-
*
6264
* <pre class="code">
6365
* (exchange)/(routingKey)
6466
* </pre>
6567
* .
68+
* If exchange is parsed to empty string, then routing key is treated as a queue name.
6669
* @param address a structured string.
6770
*/
6871
public Address(String address) {
@@ -120,9 +123,9 @@ public boolean equals(Object o) {
120123

121124
@Override
122125
public int hashCode() {
123-
int result = this.exchangeName != null ? this.exchangeName.hashCode() : 0;
126+
int result = this.exchangeName.hashCode();
124127
int prime = 31; // NOSONAR magic #
125-
result = prime * result + (this.routingKey != null ? this.routingKey.hashCode() : 0);
128+
result = prime * result + this.routingKey.hashCode();
126129
return result;
127130
}
128131

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/RabbitAmqpUtils.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.nio.charset.StandardCharsets;
2020
import java.util.Date;
2121
import java.util.Map;
22+
import java.util.Objects;
2223
import java.util.UUID;
2324

2425
import com.rabbitmq.client.amqp.Consumer;
@@ -72,8 +73,12 @@ public static Message fromAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessa
7273
}
7374

7475
/**
75-
* Convert {@link com.rabbitmq.client.amqp.Message} into {@link Message}.
76-
* @param amqpMessage the {@link com.rabbitmq.client.amqp.Message} convert from.
76+
* Convert {@link Message} into {@link com.rabbitmq.client.amqp.Message}.
77+
* The {@link MessageProperties#getReplyTo()} is set into {@link com.rabbitmq.client.amqp.Message#to(String)}.
78+
* The {@link com.rabbitmq.client.amqp.Message#correlationId(long)} is set to
79+
* {@link MessageProperties#getCorrelationId()} if present, or to {@link MessageProperties#getMessageId()}.
80+
* @param message the {@link Message} convert from.
81+
* @param amqpMessage the {@link com.rabbitmq.client.amqp.Message} convert into.
7782
*/
7883
public static void toAmqpMessage(Message message, com.rabbitmq.client.amqp.Message amqpMessage) {
7984
MessageProperties messageProperties = message.getMessageProperties();
@@ -83,9 +88,11 @@ public static void toAmqpMessage(Message message, com.rabbitmq.client.amqp.Messa
8388
.contentEncoding(messageProperties.getContentEncoding())
8489
.contentType(messageProperties.getContentType())
8590
.messageId(messageProperties.getMessageId())
86-
.correlationId(messageProperties.getCorrelationId())
91+
.correlationId(
92+
Objects.requireNonNullElse(
93+
messageProperties.getCorrelationId(), messageProperties.getMessageId()))
8794
.priority(messageProperties.getPriority().byteValue())
88-
.replyTo(messageProperties.getReplyTo());
95+
.to(messageProperties.getReplyTo());
8996

9097
Map<String, @Nullable Object> headers = messageProperties.getHeaders();
9198
if (!headers.isEmpty()) {

spring-rabbitmq-client/src/main/java/org/springframework/amqp/rabbitmq/client/listener/RabbitAmqpMessageListenerAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public void onMessageBatch(List<Message> messages) {
120120
InvocationResult result = getHandlerAdapter()
121121
.invoke(converted, amqpAcknowledgment);
122122
if (result.getReturnValue() != null) {
123-
logger.warn("Replies are not currently supported with RabbitMQ AMQP 1.0 listeners");
123+
logger.warn("Replies for batches are not currently supported with RabbitMQ AMQP 1.0 listeners");
124124
}
125125
}
126126
catch (Exception ex) {

0 commit comments

Comments
 (0)