Skip to content

Commit 2cded99

Browse files
garyrussellartembilan
authored andcommitted
GH-1325: Queue API Deprecations and Improvements
Resolves #1325
1 parent 0757aac commit 2cded99

File tree

7 files changed

+126
-31
lines changed

7 files changed

+126
-31
lines changed

spring-amqp/src/main/java/org/springframework/amqp/core/AnonymousQueue.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,15 +56,16 @@ public AnonymousQueue(org.springframework.amqp.core.NamingStrategy namingStrateg
5656

5757
/**
5858
* Construct a queue with a name provided by the supplied naming strategy with the
59-
* supplied arguments.
59+
* supplied arguments. By default, these queues will be created on the node the
60+
* application is connected to.
6061
* @param namingStrategy the naming strategy.
6162
* @param arguments the arguments.
6263
* @since 2.1
6364
*/
6465
public AnonymousQueue(org.springframework.amqp.core.NamingStrategy namingStrategy, Map<String, Object> arguments) {
6566
super(namingStrategy.generateName(), false, true, true, arguments);
66-
if (!getArguments().containsKey(X_QUEUE_MASTER_LOCATOR)) {
67-
setMasterLocator("client-local");
67+
if (!getArguments().containsKey(X_QUEUE_LEADER_LOCATOR)) {
68+
setLeaderLocator("client-local");
6869
}
6970
}
7071

spring-amqp/src/main/java/org/springframework/amqp/core/Queue.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,9 +35,17 @@ public class Queue extends AbstractDeclarable implements Cloneable {
3535
/**
3636
* Argument key for the master locator.
3737
* @since 2.1
38+
* @deprecated in favor of {@link #X_QUEUE_LEADER_LOCATOR}.
3839
*/
40+
@Deprecated
3941
public static final String X_QUEUE_MASTER_LOCATOR = "x-queue-master-locator";
4042

43+
/**
44+
* Argument key for the queue leader locator.
45+
* @since 2.1
46+
*/
47+
public static final String X_QUEUE_LEADER_LOCATOR = "x-queue-master-locator";
48+
4149
private final String name;
4250

4351
private final boolean durable;
@@ -161,13 +169,29 @@ public String getActualName() {
161169
* Set the master locator strategy argument for this queue.
162170
* @param locator the locator; null to clear the argument.
163171
* @since 2.1
172+
* @deprecated in favor of {@link #setLeaderLocator(String)}.
164173
*/
174+
@Deprecated
165175
public final void setMasterLocator(@Nullable String locator) {
166176
if (locator == null) {
167-
removeArgument(X_QUEUE_MASTER_LOCATOR);
177+
removeArgument(X_QUEUE_LEADER_LOCATOR);
178+
}
179+
else {
180+
addArgument(X_QUEUE_LEADER_LOCATOR, locator);
181+
}
182+
}
183+
184+
/**
185+
* Set the leader locator strategy argument for this queue.
186+
* @param locator the locator; null to clear the argument.
187+
* @since 2.3.7
188+
*/
189+
public final void setLeaderLocator(@Nullable String locator) {
190+
if (locator == null) {
191+
removeArgument(X_QUEUE_LEADER_LOCATOR);
168192
}
169193
else {
170-
addArgument(X_QUEUE_MASTER_LOCATOR, locator);
194+
addArgument(X_QUEUE_LEADER_LOCATOR, locator);
171195
}
172196
}
173197

spring-amqp/src/main/java/org/springframework/amqp/core/QueueBuilder.java

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,6 +22,8 @@
2222
* Builds a Spring AMQP Queue using a fluent API.
2323
*
2424
* @author Maciej Walkowiak
25+
* @author Gary Russell
26+
*
2527
* @since 1.6
2628
*
2729
*/
@@ -228,11 +230,25 @@ public QueueBuilder lazy() {
228230
* or {@link MasterLocator#random}.
229231
* @return the builder.
230232
* @since 2.2
233+
* @deprecated in favor of {@link #leaderLocator(LeaderLocator)}.
231234
*/
235+
@Deprecated
232236
public QueueBuilder masterLocator(MasterLocator locator) {
233237
return withArgument("x-queue-master-locator", locator.getValue());
234238
}
235239

240+
/**
241+
* Set the master locator mode which determines which node a queue master will be
242+
* located on a cluster of nodes.
243+
* @param locator {@link MasterLocator#minMasters}, {@link MasterLocator#clientLocal}
244+
* or {@link MasterLocator#random}.
245+
* @return the builder.
246+
* @since 2.2
247+
*/
248+
public QueueBuilder leaderLocator(LeaderLocator locator) {
249+
return withArgument("x-queue-master-locator", locator.getValue());
250+
}
251+
236252
/**
237253
* Set the 'x-single-active-consumer' queue argument.
238254
* @return the builder.
@@ -270,6 +286,9 @@ public Queue build() {
270286
return new Queue(this.name, this.durable, this.exclusive, this.autoDelete, getArguments());
271287
}
272288

289+
/**
290+
* Overflow argument values.
291+
*/
273292
public enum Overflow {
274293

275294
/**
@@ -288,12 +307,20 @@ public enum Overflow {
288307
this.value = value;
289308
}
290309

310+
/**
311+
* Return the value.
312+
* @return the value.
313+
*/
291314
public String getValue() {
292315
return this.value;
293316
}
294317

295318
}
296319

320+
/**
321+
* @deprecated in favor of {@link LeaderLocator}.
322+
*/
323+
@Deprecated
297324
public enum MasterLocator {
298325

299326
/**
@@ -317,6 +344,49 @@ public enum MasterLocator {
317344
this.value = value;
318345
}
319346

347+
/**
348+
* Return the value.
349+
* @return the value.
350+
*/
351+
public String getValue() {
352+
return this.value;
353+
}
354+
355+
}
356+
357+
/**
358+
* Locate the queue leader.
359+
*
360+
* @since 2.3.7
361+
*
362+
*/
363+
public enum LeaderLocator {
364+
365+
/**
366+
* Deploy on the node with the fewest queue leaders.
367+
*/
368+
minLeaders("min-masters"),
369+
370+
/**
371+
* Deploy on the node we are connected to.
372+
*/
373+
clientLocal("client-local"),
374+
375+
/**
376+
* Deploy on a random node.
377+
*/
378+
random("random");
379+
380+
private final String value;
381+
382+
LeaderLocator(String value) {
383+
this.value = value;
384+
}
385+
386+
/**
387+
* Return the value.
388+
* @return the value.
389+
*/
320390
public String getValue() {
321391
return this.value;
322392
}

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/FixedReplyQueueDeadLetterTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2020 the original author or authors.
2+
* Copyright 2014-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,7 +35,7 @@
3535
import org.springframework.amqp.core.ExchangeBuilder;
3636
import org.springframework.amqp.core.Queue;
3737
import org.springframework.amqp.core.QueueBuilder;
38-
import org.springframework.amqp.core.QueueBuilder.MasterLocator;
38+
import org.springframework.amqp.core.QueueBuilder.LeaderLocator;
3939
import org.springframework.amqp.core.QueueBuilder.Overflow;
4040
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
4141
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -111,7 +111,7 @@ void testQueueArgs1() throws MalformedURLException, URISyntaxException, Interrup
111111
assertThat(arguments.get("x-dead-letter-routing-key")).isEqualTo("reply.dlrk");
112112
assertThat(arguments.get("x-max-priority")).isEqualTo(4);
113113
assertThat(arguments.get("x-queue-mode")).isEqualTo("lazy");
114-
assertThat(arguments.get("x-queue-master-locator")).isEqualTo("min-masters");
114+
assertThat(arguments.get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo(LeaderLocator.minLeaders.getValue());
115115
assertThat(arguments.get("x-single-active-consumer")).isEqualTo(Boolean.TRUE);
116116
}
117117

@@ -130,7 +130,7 @@ void testQueueArgs2() throws MalformedURLException, URISyntaxException, Interrup
130130
assertThat(arguments.get("x-dead-letter-routing-key")).isEqualTo("reply.dlrk");
131131
assertThat(arguments.get("x-max-priority")).isEqualTo(4);
132132
assertThat(arguments.get("x-queue-mode")).isEqualTo("lazy");
133-
assertThat(arguments.get("x-queue-master-locator")).isEqualTo("client-local");
133+
assertThat(arguments.get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo(LeaderLocator.clientLocal.getValue());
134134
}
135135

136136
@Test
@@ -148,7 +148,7 @@ void testQueueArgs3() throws MalformedURLException, URISyntaxException, Interrup
148148
assertThat(arguments.get("x-dead-letter-routing-key")).isEqualTo("reply.dlrk");
149149
assertThat(arguments.get("x-max-priority")).isEqualTo(4);
150150
assertThat(arguments.get("x-queue-mode")).isEqualTo("lazy");
151-
assertThat(arguments.get("x-queue-master-locator")).isEqualTo("random");
151+
assertThat(arguments.get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo(LeaderLocator.random.getValue());
152152

153153
ExchangeInfo exchange = client.getExchange("/", "dlx.test.requestEx");
154154
assertThat(exchange.getArguments().get("alternate-exchange")).isEqualTo("alternate");
@@ -294,7 +294,7 @@ public Queue allArgs1() {
294294
.deadLetterRoutingKey("reply.dlrk")
295295
.maxPriority(4)
296296
.lazy()
297-
.masterLocator(MasterLocator.minMasters)
297+
.leaderLocator(LeaderLocator.minLeaders)
298298
.singleActiveConsumer()
299299
.build();
300300
}
@@ -311,7 +311,7 @@ public Queue allArgs2() {
311311
.deadLetterRoutingKey("reply.dlrk")
312312
.maxPriority(4)
313313
.lazy()
314-
.masterLocator(MasterLocator.clientLocal)
314+
.leaderLocator(LeaderLocator.clientLocal)
315315
.build();
316316
}
317317

@@ -327,7 +327,7 @@ public Queue allArgs3() {
327327
.deadLetterRoutingKey("reply.dlrk")
328328
.maxPriority(4)
329329
.lazy()
330-
.masterLocator(MasterLocator.random)
330+
.leaderLocator(LeaderLocator.random)
331331
.build();
332332
}
333333

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -367,7 +367,7 @@ public void testRetry() throws Exception {
367367
}
368368

369369
@Test
370-
public void testMasterLocator() throws Exception {
370+
public void testLeaderLocator() throws Exception {
371371
CachingConnectionFactory cf = new CachingConnectionFactory(
372372
RabbitAvailableCondition.getBrokerRunning().getConnectionFactory());
373373
RabbitAdmin admin = new RabbitAdmin(cf);
@@ -376,14 +376,14 @@ public void testMasterLocator() throws Exception {
376376
Client client = new Client("http://guest:guest@localhost:15672/api");
377377
AnonymousQueue queue1 = queue;
378378
QueueInfo info = await().until(() -> client.getQueue("/", queue1.getName()), inf -> inf != null);
379-
assertThat(info.getArguments().get(Queue.X_QUEUE_MASTER_LOCATOR)).isEqualTo("client-local");
379+
assertThat(info.getArguments().get(Queue.X_QUEUE_LEADER_LOCATOR)).isEqualTo("client-local");
380380

381381
queue = new AnonymousQueue();
382-
queue.setMasterLocator(null);
382+
queue.setLeaderLocator(null);
383383
admin.declareQueue(queue);
384384
AnonymousQueue queue2 = queue;
385385
info = await().until(() -> client.getQueue("/", queue2.getName()), inf -> inf != null);
386-
assertThat(info.getArguments().get(Queue.X_QUEUE_MASTER_LOCATOR)).isNull();
386+
assertThat(info.getArguments().get(Queue.X_QUEUE_LEADER_LOCATOR)).isNull();
387387
cf.destroy();
388388
}
389389

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/TransactionalEventListenerTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ public void publish() {
112112
publisher.publishEvent("test");
113113
}
114114

115+
@SuppressWarnings("serial")
115116
@Bean
116117
PlatformTransactionManager transactionManager(AtomicBoolean committed) {
117118
return new AbstractPlatformTransactionManager() {

src/reference/asciidoc/amqp.adoc

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -698,12 +698,12 @@ See <<cf-pub-conf-ret>>.
698698
===== Queue Affinity and the `LocalizedQueueConnectionFactory`
699699

700700
When using HA queues in a cluster, for the best performance, you may want to connect to the physical broker
701-
where the master queue resides.
701+
where the lead queue resides.
702702
The `CachingConnectionFactory` can be configured with multiple broker addresses.
703703
This is to fail over and the client attempts to connect in order.
704-
The `LocalizedQueueConnectionFactory` uses the REST API provided by the admin plugin to determine on which node the queue is mastered.
704+
The `LocalizedQueueConnectionFactory` uses the REST API provided by the management plugin to determine which node is the lead for the queue.
705705
It then creates (or retrieves from a cache) a `CachingConnectionFactory` that connects to just that node.
706-
If the connection fails, the new master node is determined and the consumer connects to it.
706+
If the connection fails, the new lead node is determined and the consumer connects to it.
707707
The `LocalizedQueueConnectionFactory` is configured with a default connection factory, in case the physical location of the queue cannot be determined, in which case it connects as normal to the cluster.
708708

709709
The `LocalizedQueueConnectionFactory` is a `RoutingConnectionFactory` and the `SimpleMessageListenerContainer` uses the queue names as the lookup key as discussed in <<routing-connection-factory>> above.
@@ -748,8 +748,7 @@ public LocalizedQueueConnectionFactory queueAffinityCF(
748748
====
749749

750750
Notice that the first three parameters are arrays of `addresses`, `adminUris`, and `nodes`.
751-
These are positional in that, when a container attempts to connect to a queue, it uses the admin API to determine on which node the queue is
752-
mastered and connects to the address in the same array position as that node.
751+
These are positional in that, when a container attempts to connect to a queue, it uses the admin API to determine which node is the lead for the queue and connects to the address in the same array position as that node.
753752

754753
[[cf-pub-conf-ret]]
755754
===== Publisher Confirms and Returns
@@ -4415,7 +4414,7 @@ A `<rabbit:queue/>` with an empty (or missing) `name` attribute always creates a
44154414

44164415
See <<anonymous-queue>> to understand why `AnonymousQueue` is preferred over broker-generated queue names as well as
44174416
how to control the format of the name.
4418-
Starting with version 2.1, anonymous queues are declared with argument `x-queue-master-locator` set to `client-local` by default.
4417+
Starting with version 2.1, anonymous queues are declared with argument `Queue.X_QUEUE_LEADER_LOCATOR` set to `client-local` by default.
44194418
This ensures that the queue is declared on the node to which the application is connected.
44204419
Declarative queues must have fixed names because they might be referenced elsewhere in the context -- such as in the
44214420
listener shown in the following example:
@@ -4549,7 +4548,7 @@ With Spring Framework 3.2 and later, this can be declared a little more succinct
45494548
----
45504549
====
45514550

4552-
When you use Java configuration, the `x-queue-master-locator` is supported as a first class property through the `setMasterLocator()` method on the `Queue` class.
4551+
When you use Java configuration, the `Queue.X_QUEUE_LEADER_LOCATOR` argument is supported as a first class property through the `setLeaderLocator()` method on the `Queue` class.
45534552
Starting with version 2.1, anonymous queues are declared with this property set to `client-local` by default.
45544553
This ensures that the queue is declared on the node the application is connected to.
45554554

@@ -4757,7 +4756,7 @@ public Queue allArgs1() {
47574756
.deadLetterRoutingKey("dlrk")
47584757
.maxPriority(4)
47594758
.lazy()
4760-
.masterLocator(MasterLocator.minMasters)
4759+
.leaderLocator(LeaderLocator.minLeaders)
47614760
.singleActiveConsumer()
47624761
.build();
47634762
}
@@ -5053,9 +5052,9 @@ The third example creates names such as `custom.gen-MRBv9sqISkuCiPfOYfpo4g`.
50535052

50545053
You can also provide your own naming strategy bean.
50555054

5056-
Starting with version 2.1, anonymous queues are declared with argument `x-queue-master-locator` set to `client-local` by default.
5055+
Starting with version 2.1, anonymous queues are declared with argument `Queue.X_QUEUE_LEADER_LOCATOR` set to `client-local` by default.
50575056
This ensures that the queue is declared on the node to which the application is connected.
5058-
You can revert to the previous behavior by calling `queue.setMasterLocator(null)` after constructing the instance.
5057+
You can revert to the previous behavior by calling `queue.setLeaderLocator(null)` after constructing the instance.
50595058

50605059
[[broker-events]]
50615060
==== Broker Event Listener

0 commit comments

Comments
 (0)