Skip to content

Commit 24c20b6

Browse files
committed
Message with non-existing exchange in to field is rejected
Instead of closing the link. References rabbitmq/rabbitmq-server#12391
1 parent c9501a8 commit 24c20b6

File tree

4 files changed

+186
-27
lines changed

4 files changed

+186
-27
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,18 @@ private static void checkBrokerVersion(org.apache.qpid.protonj2.client.Connectio
244244
}
245245
}
246246

247+
private static String brokerVersion(org.apache.qpid.protonj2.client.Connection connection) {
248+
try {
249+
return (String) connection.properties().get("version");
250+
} catch (ClientException e) {
251+
throw ExceptionUtils.convert(e);
252+
}
253+
}
254+
255+
String brokerVersion() {
256+
return brokerVersion(this.nativeConnection);
257+
}
258+
247259
private static String extractNode(org.apache.qpid.protonj2.client.Connection connection)
248260
throws ClientException {
249261
String node = (String) connection.properties().get("node");

src/main/java/com/rabbitmq/client/amqp/impl/Utils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ private static String currentVersion(String currentVersion) {
202202
/**
203203
* https://stackoverflow.com/questions/6701948/efficient-way-to-compare-version-strings-in-java
204204
*/
205-
private static int versionCompare(String str1, String str2) {
205+
static int versionCompare(String str1, String str2) {
206206
String[] vals1 = str1.split("\\.");
207207
String[] vals2 = str2.split("\\.");
208208
int i = 0;

src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static com.rabbitmq.client.amqp.Management.ExchangeType.FANOUT;
2222
import static com.rabbitmq.client.amqp.Management.QueueType.*;
2323
import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
24+
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_0_3;
2425
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
2526
import static java.nio.charset.StandardCharsets.*;
2627
import static java.util.Collections.emptyMap;
@@ -30,6 +31,7 @@
3031
import static org.assertj.core.api.Assertions.*;
3132

3233
import com.rabbitmq.client.amqp.*;
34+
import com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersionAtLeast;
3335
import com.rabbitmq.client.amqp.impl.TestUtils.DisabledIfAddressV1Permitted;
3436
import com.rabbitmq.client.amqp.impl.TestUtils.Sync;
3537
import java.util.*;
@@ -378,34 +380,41 @@ void publisherSendingShouldThrowWhenQueueHasBeenDeleted() {
378380
}
379381

380382
@Test
383+
@BrokerVersionAtLeast(RABBITMQ_4_0_3)
381384
void publisherSendingShouldThrowWhenPublishingToNonExistingExchangeWithToProperty() {
382385
String doesNotExist = uuid();
383-
Sync closedSync = sync();
384-
AtomicReference<Throwable> closedException = new AtomicReference<>();
385-
Publisher publisher =
386-
connection
387-
.publisherBuilder()
388-
.listeners(closedListener(closedSync, ctx -> closedException.set(ctx.failureCause())))
389-
.build();
390-
AtomicReference<Exception> exception = new AtomicReference<>();
391-
waitAtMost(
392-
() -> {
393-
try {
394-
publisher.publish(
395-
publisher.message().toAddress().exchange(doesNotExist).message(), ctx -> {});
396-
return false;
397-
} catch (AmqpException.AmqpEntityDoesNotExistException e) {
398-
exception.set(e);
399-
return true;
400-
}
401-
});
402-
Assertions.assertThat(closedSync).completes();
403-
of(exception.get(), closedException.get())
404-
.forEach(
405-
e ->
406-
assertThat(e)
407-
.isInstanceOf(AmqpException.AmqpEntityDoesNotExistException.class)
408-
.hasMessageContaining(doesNotExist));
386+
connection.management().queue(name).exclusive(true).declare();
387+
Publisher publisher = connection.publisherBuilder().build();
388+
Sync consumedSync = sync();
389+
connection
390+
.consumerBuilder()
391+
.queue(name)
392+
.messageHandler(
393+
(ctx, msg) -> {
394+
ctx.accept();
395+
consumedSync.down();
396+
})
397+
.build();
398+
Sync acceptedSync = sync();
399+
publisher.publish(
400+
publisher.message().toAddress().queue(name).message(), ctx -> acceptedSync.down());
401+
Assertions.assertThat(acceptedSync).completes();
402+
Assertions.assertThat(consumedSync).completes();
403+
404+
acceptedSync.reset();
405+
consumedSync.reset();
406+
407+
Sync rejectedSync = sync();
408+
publisher.publish(
409+
publisher.message().toAddress().exchange(doesNotExist).message(),
410+
ctx -> rejectedSync.down());
411+
Assertions.assertThat(rejectedSync).completes();
412+
413+
Assertions.assertThat(consumedSync).hasNotCompleted();
414+
publisher.publish(
415+
publisher.message().toAddress().queue(name).message(), ctx -> acceptedSync.down());
416+
Assertions.assertThat(acceptedSync).completes();
417+
Assertions.assertThat(consumedSync).completes();
409418
}
410419

411420
@Test
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright (c) 2024 Broadcom. All Rights Reserved.
2+
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
// If you have any questions regarding licensing, please contact us at
17+
18+
package com.rabbitmq.client.amqp.impl;
19+
20+
import com.rabbitmq.client.amqp.Environment;
21+
import java.lang.annotation.*;
22+
import java.util.function.Function;
23+
import org.junit.jupiter.api.extension.ConditionEvaluationResult;
24+
import org.junit.jupiter.api.extension.ExecutionCondition;
25+
import org.junit.jupiter.api.extension.ExtendWith;
26+
import org.junit.jupiter.api.extension.ExtensionContext;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
public final class TestConditions {
31+
32+
private static final Logger LOGGER = LoggerFactory.getLogger(TestConditions.class);
33+
34+
private TestConditions() {}
35+
36+
public enum BrokerVersion {
37+
RABBITMQ_4_0_3("4.0.3");
38+
39+
final String value;
40+
41+
BrokerVersion(String value) {
42+
this.value = value;
43+
}
44+
45+
@Override
46+
public String toString() {
47+
return this.value;
48+
}
49+
}
50+
51+
@Target({ElementType.TYPE, ElementType.METHOD})
52+
@Retention(RetentionPolicy.RUNTIME)
53+
@Documented
54+
@ExtendWith(BrokerVersionAtLeastCondition.class)
55+
public @interface BrokerVersionAtLeast {
56+
57+
BrokerVersion value();
58+
}
59+
60+
private static class BrokerVersionAtLeastCondition implements ExecutionCondition {
61+
62+
private final Function<ExtensionContext, String> versionProvider;
63+
64+
private BrokerVersionAtLeastCondition() {
65+
this.versionProvider =
66+
context -> {
67+
BrokerVersionAtLeast annotation =
68+
context.getElement().get().getAnnotation(BrokerVersionAtLeast.class);
69+
return annotation == null ? null : annotation.value().toString();
70+
};
71+
}
72+
73+
@Override
74+
public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) {
75+
if (context.getTestMethod().isEmpty()) {
76+
return ConditionEvaluationResult.enabled("Apply only to methods");
77+
}
78+
String expectedVersion = versionProvider.apply(context);
79+
if (expectedVersion == null) {
80+
return ConditionEvaluationResult.enabled("No broker version requirement");
81+
} else {
82+
String brokerVersion =
83+
context
84+
.getRoot()
85+
.getStore(ExtensionContext.Namespace.GLOBAL)
86+
.getOrComputeIfAbsent(
87+
"brokerVersion",
88+
k -> {
89+
try (Environment env = TestUtils.environmentBuilder().build()) {
90+
return ((AmqpConnection) env.connectionBuilder().build()).brokerVersion();
91+
}
92+
},
93+
String.class);
94+
95+
if (atLeastVersion(expectedVersion, brokerVersion)) {
96+
return ConditionEvaluationResult.enabled(
97+
"Broker version requirement met, expected "
98+
+ expectedVersion
99+
+ ", actual "
100+
+ brokerVersion);
101+
} else {
102+
return ConditionEvaluationResult.disabled(
103+
"Broker version requirement not met, expected "
104+
+ expectedVersion
105+
+ ", actual "
106+
+ brokerVersion);
107+
}
108+
}
109+
}
110+
}
111+
112+
private static boolean atLeastVersion(String expectedVersion, String currentVersion) {
113+
try {
114+
currentVersion = currentVersion(currentVersion);
115+
return "0.0.0".equals(currentVersion)
116+
|| Utils.versionCompare(currentVersion, expectedVersion) >= 0;
117+
} catch (RuntimeException e) {
118+
LOGGER.warn("Unable to parse broker version {}", currentVersion, e);
119+
throw e;
120+
}
121+
}
122+
123+
private static String currentVersion(String currentVersion) {
124+
// versions built from source: 3.7.0+rc.1.4.gedc5d96
125+
if (currentVersion.contains("+")) {
126+
currentVersion = currentVersion.substring(0, currentVersion.indexOf("+"));
127+
}
128+
// alpha (snapshot) versions: 3.7.0~alpha.449-1
129+
if (currentVersion.contains("~")) {
130+
currentVersion = currentVersion.substring(0, currentVersion.indexOf("~"));
131+
}
132+
// alpha (snapshot) versions: 3.7.1-alpha.40
133+
if (currentVersion.contains("-")) {
134+
currentVersion = currentVersion.substring(0, currentVersion.indexOf("-"));
135+
}
136+
return currentVersion;
137+
}
138+
}

0 commit comments

Comments
 (0)