Skip to content

Commit d27f123

Browse files
committed
Small fixes after address format v2 changes
1 parent ee76460 commit d27f123

File tree

6 files changed

+20
-15
lines changed

6 files changed

+20
-15
lines changed

src/docs/asciidoc/usage.adoc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,21 +65,21 @@ include::{test-examples}/Api.java[tag=message-publishing]
6565
--------
6666
include::{test-examples}/Api.java[tag=target-address-exchange-key]
6767
--------
68-
<1> Translates to `/exchange/foo/key/bar`
68+
<1> Translates to `/e/foo/bar`
6969

7070
.Target address format: exchange
7171
[source,java,indent=0]
7272
--------
7373
include::{test-examples}/Api.java[tag=target-address-exchange]
7474
--------
75-
<1> Translates to `/exchange/foo`
75+
<1> Translates to `/e/foo`
7676

7777
.Target address format: queue
7878
[source,java,indent=0]
7979
--------
8080
include::{test-examples}/Api.java[tag=target-address-queue]
8181
--------
82-
<1> Translates to `/queue/some-queue`
82+
<1> Translates to `/q/some-queue`
8383

8484
.Target address format: address in `to` field
8585
[source,java,indent=0]

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
6161
private final MetricsCollector metricsCollector;
6262
private final SessionHandler sessionHandler;
6363
private final AtomicLong unsettledMessageCount = new AtomicLong(0);
64-
private final Runnable replenishCreditOperation = () -> replenishCreditIfNeeded();
64+
private final Runnable replenishCreditOperation = this::replenishCreditIfNeeded;
6565
// native receiver internal state, accessed only in the native executor/scheduler
6666
private ProtonReceiver protonReceiver;
6767
private volatile Scheduler protonExecutor;
@@ -78,7 +78,9 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
7878
.connection()
7979
.observationCollector()
8080
.subscribe(builder.queue(), builder.messageHandler());
81-
this.address = "/queue/" + builder.queue();
81+
DefaultAddressBuilder<?> addressBuilder = Utils.addressBuilder();
82+
addressBuilder.queue(builder.queue());
83+
this.address = addressBuilder.address();
8284
this.filters = Collections.unmodifiableMap(builder.filters());
8385
this.connection = builder.connection();
8486
this.sessionHandler = this.connection.createSessionHandler();

src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ class AmqpRpcClient implements RpcClient {
9797
}
9898

9999
if (builder.requestPostProcessor() == null) {
100-
DefaultAddressBuilder<?> addressBuilder = new DefaultAddressBuilder<>(null) {};
100+
DefaultAddressBuilder<?> addressBuilder = Utils.addressBuilder();
101101
addressBuilder.queue(replyTo);
102102
String replyToAddress = addressBuilder.address();
103103
this.requestPostProcessor =

src/main/java/com/rabbitmq/client/amqp/impl/Utils.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ static void throwIfInterrupted() throws InterruptedException {
153153
}
154154
}
155155

156+
static DefaultAddressBuilder<?> addressBuilder() {
157+
return new DefaultAddressBuilder<>(null) {};
158+
}
159+
156160
private static class NameSupplier implements Supplier<String> {
157161

158162
private final String prefix;

src/test/java/com/rabbitmq/client/amqp/impl/AddressFormatTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ void exchangeKeyInToField(TestInfo info) {
193193
}
194194

195195
@Test
196-
void noToFieldDoesNotCloseAllConnectionPublishers() throws InterruptedException {
196+
void noToFieldDoesNotCloseAllConnectionPublishers() {
197197
Management management = connection.management();
198198
String q = management.queue().exclusive(true).declare().name();
199199
TestUtils.Sync sync = TestUtils.sync();

src/test/java/com/rabbitmq/client/amqp/impl/ClientTest.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ void deliveryCount() throws Exception {
9191
publisher.message("".getBytes(UTF_8)), context -> publishLatch.countDown()));
9292

9393
org.apache.qpid.protonj2.client.Connection protonConnection = connection(client);
94-
Receiver receiver = protonConnection.openReceiver("/queue/" + q, new ReceiverOptions());
94+
Receiver receiver = protonConnection.openReceiver("/q/" + q, new ReceiverOptions());
9595
int receivedMessages = 0;
9696
while (receiver.receive(100, TimeUnit.MILLISECONDS) != null) {
9797
receivedMessages++;
@@ -108,15 +108,15 @@ void largeMessage() throws Exception {
108108
connection(client, o -> o.traceFrames(false).maxFrameSize(maxFrameSize));
109109

110110
Sender sender =
111-
connection.openSender("/queue/" + q, new SenderOptions().deliveryMode(AT_LEAST_ONCE));
111+
connection.openSender("/q/" + q, new SenderOptions().deliveryMode(AT_LEAST_ONCE));
112112
byte[] body = new byte[maxFrameSize * 4];
113113
Arrays.fill(body, (byte) 'A');
114114
Tracker tracker = sender.send(Message.create(body));
115115
tracker.awaitSettlement();
116116

117117
Receiver receiver =
118118
connection.openReceiver(
119-
"/queue/" + q,
119+
"/q/" + q,
120120
new ReceiverOptions()
121121
.deliveryMode(AT_LEAST_ONCE)
122122
.autoSettle(false)
@@ -138,7 +138,7 @@ void largeMessageStreamSupport() throws Exception {
138138

139139
StreamSender sender =
140140
connection.openStreamSender(
141-
"/queue/" + q, new StreamSenderOptions().deliveryMode(AT_LEAST_ONCE));
141+
"/q/" + q, new StreamSenderOptions().deliveryMode(AT_LEAST_ONCE));
142142
StreamSenderMessage message = sender.beginMessage();
143143
byte[] body = new byte[maxFrameSize * 4];
144144
Arrays.fill(body, (byte) 'A');
@@ -155,7 +155,7 @@ void largeMessageStreamSupport() throws Exception {
155155

156156
StreamReceiver receiver =
157157
connection.openStreamReceiver(
158-
"/queue/" + q,
158+
"/q/" + q,
159159
new StreamReceiverOptions()
160160
.deliveryMode(AT_LEAST_ONCE)
161161
.autoAccept(false)
@@ -269,7 +269,7 @@ void queueDeletionImpactOnReceiver(TestInfo info) throws Exception {
269269

270270
org.apache.qpid.protonj2.client.Connection protonConnection = connection(client);
271271
Session session = protonConnection.openSession();
272-
Receiver receiver = session.openReceiver("/queue/" + queue);
272+
Receiver receiver = session.openReceiver("/q/" + queue);
273273
receiver.openFuture().get();
274274
Delivery delivery = receiver.tryReceive();
275275
assertThat(delivery).isNull();
@@ -295,8 +295,7 @@ void exchangeDeletionImpactOnSender(TestInfo info) throws Exception {
295295
org.apache.qpid.protonj2.client.Connection protonConnection = connection(client);
296296
Session session = protonConnection.openSession();
297297
Sender sender =
298-
session.openSender(
299-
"/exchange/" + exchange, new SenderOptions().deliveryMode(AT_LEAST_ONCE));
298+
session.openSender("/e/" + exchange, new SenderOptions().deliveryMode(AT_LEAST_ONCE));
300299
Tracker tracker = sender.send(Message.create(body));
301300
tracker.awaitSettlement(10, SECONDS);
302301
assertThat(tracker.remoteState()).isEqualTo(released());

0 commit comments

Comments
 (0)