Skip to content

Commit 89c8361

Browse files
committed
Always send a message with body
As per section 3.2 of the AMQP 1.0 specification, a message must have a body, even if it is empty. We use an empty array of bytes for a default, non-null body. See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
1 parent a5fd6ec commit 89c8361

File tree

2 files changed

+7
-4
lines changed

2 files changed

+7
-4
lines changed

src/main/java/com/rabbitmq/model/amqp/AmqpMessage.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@
2929

3030
class AmqpMessage implements Message {
3131

32+
private static final byte[] EMPTY_BODY = new byte[0];
33+
3234
private final org.apache.qpid.protonj2.client.Message<byte[]> delegate;
3335

3436
AmqpMessage() {
35-
this(org.apache.qpid.protonj2.client.Message.create());
37+
this(org.apache.qpid.protonj2.client.Message.create(EMPTY_BODY));
3638
}
3739

3840
AmqpMessage(byte[] body) {

src/test/java/com/rabbitmq/model/amqp/ClientTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -292,26 +292,27 @@ void exchangeDeletionImpactOnSender(TestInfo info) throws Exception {
292292
Client client = client()) {
293293
connection.management().exchange().name(exchange).type(FANOUT).declare();
294294

295+
byte[] body = new byte[0];
295296
org.apache.qpid.protonj2.client.Connection protonConnection = connection(client);
296297
Session session = protonConnection.openSession();
297298
Sender sender =
298299
session.openSender(
299300
"/exchange/" + exchange, new SenderOptions().deliveryMode(AT_LEAST_ONCE));
300-
Tracker tracker = sender.send(Message.create());
301+
Tracker tracker = sender.send(Message.create(body));
301302
tracker.awaitSettlement(10, SECONDS);
302303
assertThat(tracker.remoteState()).isEqualTo(released());
303304

304305
connection.management().binding().sourceExchange(exchange).destinationQueue(q).bind();
305306

306-
tracker = sender.send(Message.create());
307+
tracker = sender.send(Message.create(body));
307308
tracker.awaitSettlement(10, SECONDS);
308309
assertThat(tracker.remoteState()).isEqualTo(DeliveryState.accepted());
309310

310311
connection.management().exchangeDeletion().delete(exchange);
311312
try {
312313
int count = 0;
313314
while (count++ < 10) {
314-
tracker = sender.send(Message.create());
315+
tracker = sender.send(Message.create(body));
315316
tracker.awaitSettlement(10, SECONDS);
316317
assertThat(tracker.remoteState()).isEqualTo(released());
317318
Thread.sleep(100);

0 commit comments

Comments
 (0)