Skip to content

Commit 7f0ddca

Browse files
committed
Introduce new AMQP 1.0 address format
## What? Introduce a new address format (let's call it v2) for AMQP 1.0 source and target addresses. The old format (let's call it v1) is described in https://github.com/rabbitmq/rabbitmq-server/tree/v3.13.x/deps/rabbitmq_amqp1_0#routing-and-addressing The only v2 source address format is: ``` /queue/:queue ``` The 4 possible v2 target addresses formats are: ``` /exchange/:exchange/key/:routing-key /exchange/:exchange /queue/:queue <null> ``` where the last AMQP <null> value format requires that each message’s `to` field contains one of: ``` /exchange/:exchange/key/:routing-key /exchange/:exchange /queue/:queue ``` ## Why? The AMQP address v1 format comes with the following flaws: 1. Obscure address format: Without reading the documentation, the differences for example between source addresses ``` /amq/queue/:queue /queue/:queue :queue ``` are unknown to users. Hence, the address format is obscure. 2. Implicit creation of topologies Some address formats implicitly create queues (and bindings), such as source address ``` /exchange/:exchange/:binding-key ``` or target address ``` /queue/:queue ``` These queues and bindings are never deleted (by the AMQP 1.0 plugin.) Implicit creation of such topologies is also obscure. 3. Redundant address formats ``` /queue/:queue :queue ``` have the same meaning and are therefore redundant. 4. Properties section must be parsed to determine whether a routing key is present Target address ``` /exchange/:exchange ``` requires RabbitMQ to parse the properties section in order to check whether the message `subject` is set. If `subject` is not set, the routing key will default to the empty string. 5. Using `subject` as routing key misuses the purpose of this field. According to the AMQP spec, the message `subject` field's purpose is: > A common field for summary information about the message content and purpose. 6. Exchange names, queue names and routing keys must not contain the "/" (slash) character. The current 3.13 implemenation splits by "/" disallowing these characters in exchange, and queue names, and routing keys which is unnecessary prohibitive. 7. Clients must create a separate link per target exchange While this is reasonable working assumption, there might be rare use cases where it could make sense to create many exchanges (e.g. 1 exchange per queue, see #10708) and have a single application publish to all these exchanges. With the v1 address format, for an application to send to 500 different exchanges, it needs to create 500 links. Due to these disadvantages and thanks to #10559 which allows clients to explicitly create topologies, we can create a simpler, clearer, and better v2 address format. ## How? ### Design goals Following the 7 cons from v1, the design goals for v2 are: 1. The address format should be simple so that users have a chance to understand the meaning of the address without necessarily consulting the docs. 2. The address format should not implicitly create queues, bindings, or exchanges. Instead, topologies should be created either explicitly via the new management node prior to link attachment (see #10559), or in future, we might support the `dynamic` source or target properties so that RabbitMQ creates queues dynamically. 3. No redundant address formats. 4. The target address format should explicitly state whether the routing key is present, empty, or will be provided dynamically in each message. 5. `Subject` should not be used as routing key. Instead, a better fitting field should be used. 6. Exchange names, queue names, and routing keys should allow to contain valid UTF-8 encoded data including the "/" character. 7. Allow both target exchange and routing key to by dynamically provided within each message. Furthermore 8. v2 must co-exist with v1 for at least some time. Applications should be able to upgrade to RabbitMQ 4.0 while continuing to use v1. Examples include AMQP 1.0 shovels and plugins communicating between a 4.0 and a 3.13 cluster. Starting with 4.1, we should change the AMQP 1.0 shovel and plugin clients to use only the new v2 address format. This will allow AMQP 1.0 and plugins to communicate between a 4.1 and 4.2 cluster. We will deprecate v1 in 4.0 and remove support for v1 in a later 4.x version. ### Additional Context The address is usually a String, but can be of any type. The [AMQP Addressing extension](https://docs.oasis-open.org/amqp/addressing/v1.0/addressing-v1.0.html) suggests that addresses are URIs and are therefore hierarchical and could even contain query parameters: > An AMQP address is a URI reference as defined by RFC3986. > the path expression is a sequence of identifier segments that reflects a path through an > implementation specific relationship graph of AMQP nodes and their termini. > The path expression MUST resolve to a node’s terminus in an AMQP container. The [Using the AMQP Anonymous Terminus for Message Routing Version 1.0](https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html) extension allows for the target being `null` and the `To` property to contain the node address. This corresponds to AMQP 0.9.1 where clients can send each message on the same channel to a different `{exchange, routing-key}` destination. The following v2 address formats will be used. ### v2 addresses A new deprecated feature flag `amqp_address_v1` will be introduced in 4.0 which is permitted by default. Starting with 4.1, we should change the AMQP 1.0 shovel and plugin AMQP 1.0 clients to use only the new v2 address format. However, 4.1 server code must still understand the 4.0 AMQP 1.0 shovel and plugin AMQP 1.0 clients’ v1 address format. The new deprecated feature flag will therefore be denied by default in 4.2. This allows AMQP 1.0 shovels and plugins to work between * 4.0 and 3.13 clusters using v1 * 4.1 and 4.0 clusters using v2 from 4.1 to v4.0 and v1 from 4.0 to 4.1 * 4.2 and 4.1 clusters using v2 without having to support both v1 and v2 at the same time in the AMQP 1.0 shovel and plugin clients. While supporting both v1 and v2 in these clients is feasible, it's simpler to switch the client code directly from v1 to v2. ### v2 source addresses The source address format is ``` /queue/:queue ``` If the deprecated feature flag `amqp_address_v1` is permitted and the queue does not exist, the queue will be auto-created. If the deprecated feature flag `amqp_address_v1` is denied, the queue must exist. ### v2 target addresses v1 requires attaching a new link for each destination exchange. v2 will allow dynamic `{exchange, routing-key}` combinations for a given link. v2 therefore allows for the rare use cases where a single AMQP 1.0 publisher app needs to send to many different exchanges. Setting up a link per destination exchange could be cumbersome. Hence, v2 will support the dynamic `{exchange, routing-key}` combinations of AMQP 0.9.1. To achieve this, we make use of the "Anonymous Terminus for Message Routing" extension: The target address will contain the AMQP value null. The `To` field in each message must be set and contain either address format ``` /exchange/:exchange/key/:routing-key ``` or ``` /exchange/:exchange ``` when using the empty routing key. The `to` field requires an address type and is better suited than the `subject field. Note that each message will contain this `To` value for the anonymous terminus. Hence, we should save some bytes being sent across the network and stored on disk. Using a format ``` /e/:exchange/k/:routing-key ``` saves more bytes, but is too obscure. However, we use only `/key/` instead of `/routing-key/` so save a few bytes. This also simplifies the format because users don’t have to remember whether to use spell `routing-key` or `routing_key` or `routingkey`. The other allowed target address formats are: ``` /exchange/:exchange/key/:routing-key ``` where exchange and routing key are static on the given link. ``` /exchange/:exchange ``` where exchange and routing key are static on the given link, and routing key will be the empty string (useful for example for the fanout exchange). ``` /queue/:queue ``` This provides RabbitMQ beginners the illusion of sending a message directly to a queue without having to understand what exchanges and routing keys are. If the deprecated feature flag `amqp_address_v1` is permitted and the queue does not exist, the queue will be auto-created. If the deprecated feature flag `amqp_address_v1` is denied, the queue must exist. Besides the additional queue existence check, this queue target is different from ``` /exchange//key/:queue ``` in that queue specific optimisations might be done (in future) by RabbitMQ (for example different receiving queue types could grant different amounts of link credits to the sending clients). A write permission check to the amq.default exchange will be performed nevertheless. v2 will prohibit the v1 static link & dynamic routing-key combination where the routing key is sent in the message `subject` as that’s also obscure. For this use case, v2’s new anonymous terminus can be used where both exchange and routing key are defined in the message’s `To` field. (The bare message must not be modified because it could be signed.) The alias format ``` /topic/:topic ``` will also be removed. Sending to topic exchanges is arguably an advanced feature. Users can directly use the format ``` /exchange/amq.topic/key/:topic ``` which reduces the number of redundant address formats. ### v2 address format reference To sump up (and as stated at the top of this commit message): The only v2 source address format is: ``` /queue/:queue ``` The 4 possible v2 target addresses formats are: ``` /exchange/:exchange/key/:routing-key /exchange/:exchange /queue/:queue <null> ``` where the last AMQP <null> value format requires that each message’s `to` field contains one of: ``` /exchange/:exchange/key/:routing-key /exchange/:exchange /queue/:queue ``` Hence, all 8 listed design goals are reached.
1 parent dda1c50 commit 7f0ddca

25 files changed

+1533
-412
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -662,7 +662,11 @@ make_target(#{role := {receiver, _Source, _Pid}}) ->
662662
#'v1_0.target'{};
663663
make_target(#{role := {sender, #{address := Address} = Target}}) ->
664664
Durable = translate_terminus_durability(maps:get(durable, Target, none)),
665-
#'v1_0.target'{address = {utf8, Address},
665+
TargetAddr = case is_binary(Address) of
666+
true -> {utf8, Address};
667+
false -> Address
668+
end,
669+
#'v1_0.target'{address = TargetAddr,
666670
durable = {uint, Durable}}.
667671

668672
max_message_size(#{max_message_size := Size})

deps/amqp10_client/test/system_SUITE.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ init_per_suite(Config) ->
8383
]).
8484

8585
end_per_suite(Config) ->
86-
rabbit_ct_helpers:run_teardown_steps(Config,
86+
rabbit_ct_helpers:run_teardown_steps(
87+
Config,
8788
[
8889
fun stop_amqp10_client_app/1
8990
]).

deps/rabbit/BUILD.bazel

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1264,6 +1264,7 @@ rabbitmq_integration_suite(
12641264

12651265
rabbitmq_integration_suite(
12661266
name = "amqp_auth_SUITE",
1267+
shard_count = 2,
12671268
additional_beam = [
12681269
":test_event_recorder_beam",
12691270
],
@@ -1272,6 +1273,14 @@ rabbitmq_integration_suite(
12721273
],
12731274
)
12741275

1276+
rabbitmq_integration_suite(
1277+
name = "amqp_address_SUITE",
1278+
shard_count = 2,
1279+
runtime_deps = [
1280+
"//deps/rabbitmq_amqp_client:erlang_app",
1281+
],
1282+
)
1283+
12751284
rabbitmq_integration_suite(
12761285
name = "amqp_credit_api_v2_SUITE",
12771286
runtime_deps = [

deps/rabbit/app.bzl

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2212,3 +2212,12 @@ def test_suite_beam_files(name = "test_suite_beam_files"):
22122212
erlc_opts = "//:test_erlc_opts",
22132213
deps = ["//deps/rabbit_common:erlang_app"],
22142214
)
2215+
erlang_bytecode(
2216+
name = "amqp_address_SUITE_beam_files",
2217+
testonly = True,
2218+
srcs = ["test/amqp_address_SUITE.erl"],
2219+
outs = ["test/amqp_address_SUITE.beam"],
2220+
app_name = "rabbit",
2221+
erlc_opts = "//:test_erlc_opts",
2222+
deps = ["//deps/amqp10_common:erlang_app", "//deps/rabbitmq_amqp_client:erlang_app"],
2223+
)

deps/rabbit/src/mc.erl

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -146,20 +146,17 @@ init(Proto, Data, Anns) ->
146146
init(Proto, Data, Anns, #{}).
147147

148148
-spec init(protocol(), term(), annotations(), environment()) -> state().
149-
init(Proto, Data, Anns0, Env)
150-
when is_atom(Proto)
151-
andalso is_map(Anns0)
152-
andalso is_map(Env) ->
149+
init(Proto, Data, Anns0, Env) ->
153150
{ProtoData, ProtoAnns} = Proto:init(Data),
154-
Anns = case maps:size(Env) == 0 of
155-
true ->
156-
Anns0;
157-
false ->
158-
Anns0#{env => Env}
159-
end,
151+
Anns1 = case map_size(Env) == 0 of
152+
true -> Anns0;
153+
false -> Anns0#{env => Env}
154+
end,
155+
Anns2 = maps:merge(ProtoAnns, Anns1),
156+
Anns = set_received_at_timestamp(Anns2),
160157
#?MODULE{protocol = Proto,
161158
data = ProtoData,
162-
annotations = set_received_at_timestamp(maps:merge(ProtoAnns, Anns))}.
159+
annotations = Anns}.
163160

164161
-spec size(state()) ->
165162
{MetadataSize :: non_neg_integer(),
@@ -196,7 +193,7 @@ take_annotation(_Key, BasicMessage) ->
196193
-spec set_annotation(ann_key(), ann_value(), state()) ->
197194
state().
198195
set_annotation(Key, Value, #?MODULE{annotations = Anns} = State) ->
199-
State#?MODULE{annotations = maps:put(Key, Value, Anns)};
196+
State#?MODULE{annotations = Anns#{Key => Value}};
200197
set_annotation(Key, Value, BasicMessage) ->
201198
mc_compat:set_annotation(Key, Value, BasicMessage).
202199

@@ -313,7 +310,7 @@ property(_Property, _BasicMsg) ->
313310

314311
-spec set_ttl(undefined | non_neg_integer(), state()) -> state().
315312
set_ttl(Value, #?MODULE{annotations = Anns} = State) ->
316-
State#?MODULE{annotations = maps:put(ttl, Value, Anns)};
313+
State#?MODULE{annotations = Anns#{ttl => Value}};
317314
set_ttl(Value, BasicMsg) ->
318315
mc_compat:set_ttl(Value, BasicMsg).
319316

deps/rabbit/src/mc_amqp.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ property(user_id, #msg{properties = #'v1_0.properties'{user_id = UserId}}) ->
117117
UserId;
118118
property(subject, #msg{properties = #'v1_0.properties'{subject = Subject}}) ->
119119
Subject;
120+
property(to, #msg{properties = #'v1_0.properties'{to = To}}) ->
121+
To;
120122
property(_Prop, #msg{}) ->
121123
undefined.
122124

deps/rabbit/src/rabbit.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1709,8 +1709,7 @@ persist_static_configuration() ->
17091709
_ ->
17101710
?MAX_MSG_SIZE
17111711
end,
1712-
ok = persistent_term:put(max_message_size, MaxMsgSize),
1713-
ok = rabbit_amqp_management:persist_static_configuration().
1712+
ok = persistent_term:put(max_message_size, MaxMsgSize).
17141713

17151714
persist_static_configuration(Params) ->
17161715
App = ?MODULE,

0 commit comments

Comments
 (0)