Skip to content

Commit 20aee3f

Browse files
Merge pull request #11604 from rabbitmq/amqp-addr
Use different AMQP address format for v1 and v2
2 parents c9956b0 + 7b18bd7 commit 20aee3f

File tree

12 files changed

+448
-185
lines changed

12 files changed

+448
-185
lines changed

deps/rabbit/app.bzl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ def all_beam_files(name = "all_beam_files"):
223223
"src/rabbit_trace.erl",
224224
"src/rabbit_tracking_store.erl",
225225
"src/rabbit_upgrade_preparation.erl",
226+
"src/rabbit_uri.erl",
226227
"src/rabbit_variable_queue.erl",
227228
"src/rabbit_version.erl",
228229
"src/rabbit_vhost.erl",
@@ -481,6 +482,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
481482
"src/rabbit_trace.erl",
482483
"src/rabbit_tracking_store.erl",
483484
"src/rabbit_upgrade_preparation.erl",
485+
"src/rabbit_uri.erl",
484486
"src/rabbit_variable_queue.erl",
485487
"src/rabbit_version.erl",
486488
"src/rabbit_vhost.erl",
@@ -762,6 +764,7 @@ def all_srcs(name = "all_srcs"):
762764
"src/rabbit_tracking.erl",
763765
"src/rabbit_tracking_store.erl",
764766
"src/rabbit_upgrade_preparation.erl",
767+
"src/rabbit_uri.erl",
765768
"src/rabbit_variable_queue.erl",
766769
"src/rabbit_version.erl",
767770
"src/rabbit_vhost.erl",

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ handle_http_req(<<"GET">>,
8080
_User,
8181
_ConnPid,
8282
PermCaches) ->
83-
QNameBin = uri_string:unquote(QNameBinQuoted),
83+
QNameBin = rabbit_uri:urldecode(QNameBinQuoted),
8484
QName = queue_resource(Vhost, QNameBin),
8585
case rabbit_amqqueue:with(
8686
QName,
@@ -110,7 +110,7 @@ handle_http_req(HttpMethod = <<"PUT">>,
110110
exclusive := Exclusive,
111111
arguments := QArgs0
112112
} = decode_queue(ReqPayload),
113-
QNameBin = uri_string:unquote(QNameBinQuoted),
113+
QNameBin = rabbit_uri:urldecode(QNameBinQuoted),
114114
Owner = case Exclusive of
115115
true -> ConnPid;
116116
false -> none
@@ -185,7 +185,7 @@ handle_http_req(<<"PUT">>,
185185
User = #user{username = Username},
186186
_ConnPid,
187187
{PermCache0, TopicPermCache}) ->
188-
XNameBin = uri_string:unquote(XNameBinQuoted),
188+
XNameBin = rabbit_uri:urldecode(XNameBinQuoted),
189189
#{type := XTypeBin,
190190
durable := Durable,
191191
auto_delete := AutoDelete,
@@ -226,7 +226,7 @@ handle_http_req(<<"DELETE">>,
226226
User,
227227
ConnPid,
228228
{PermCache0, TopicPermCache}) ->
229-
QNameBin = uri_string:unquote(QNameBinQuoted),
229+
QNameBin = rabbit_uri:urldecode(QNameBinQuoted),
230230
QName = queue_resource(Vhost, QNameBin),
231231
PermCache = check_resource_access(QName, read, User, PermCache0),
232232
try rabbit_amqqueue:with_exclusive_access_or_die(
@@ -254,7 +254,7 @@ handle_http_req(<<"DELETE">>,
254254
User = #user{username = Username},
255255
ConnPid,
256256
{PermCache0, TopicPermCache}) ->
257-
QNameBin = uri_string:unquote(QNameBinQuoted),
257+
QNameBin = rabbit_uri:urldecode(QNameBinQuoted),
258258
QName = queue_resource(Vhost, QNameBin),
259259
ok = prohibit_cr_lf(QNameBin),
260260
PermCache = check_resource_access(QName, configure, User, PermCache0),
@@ -274,7 +274,7 @@ handle_http_req(<<"DELETE">>,
274274
User = #user{username = Username},
275275
_ConnPid,
276276
{PermCache0, TopicPermCache}) ->
277-
XNameBin = uri_string:unquote(XNameBinQuoted),
277+
XNameBin = rabbit_uri:urldecode(XNameBinQuoted),
278278
XName = exchange_resource(Vhost, XNameBin),
279279
ok = prohibit_cr_lf(XNameBin),
280280
ok = prohibit_default_exchange(XName),
@@ -594,9 +594,9 @@ decode_binding_path_segment(Segment) ->
594594
end,
595595
case re:run(Segment, MP, [{capture, all_but_first, binary}]) of
596596
{match, [SrcQ, <<DstKindChar>>, DstQ, KeyQ, ArgsHash]} ->
597-
Src = uri_string:unquote(SrcQ),
598-
Dst = uri_string:unquote(DstQ),
599-
Key = uri_string:unquote(KeyQ),
597+
Src = rabbit_uri:urldecode(SrcQ),
598+
Dst = rabbit_uri:urldecode(DstQ),
599+
Key = rabbit_uri:urldecode(KeyQ),
600600
DstKind = destination_char_to_kind(DstKindChar),
601601
{Src, DstKind, Dst, Key, ArgsHash};
602602
nomatch ->

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 124 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,15 @@
1919
-rabbit_deprecated_feature(
2020
{amqp_address_v1,
2121
#{deprecation_phase => permitted_by_default,
22+
doc_url => "https://www.rabbitmq.com/docs/next/amqp#address",
2223
messages =>
2324
#{when_permitted =>
2425
"RabbitMQ AMQP address version 1 is deprecated. "
25-
"Clients should use RabbitMQ AMQP address version 2."}}
26+
"Clients should use RabbitMQ AMQP address version 2.",
27+
when_denied =>
28+
"RabbitMQ AMQP address version 1 is unsupported. "
29+
"Clients must use RabbitMQ AMQP address version 2."
30+
}}
2631
}).
2732

2833
-define(PROTOCOL, amqp10).
@@ -2422,12 +2427,24 @@ ensure_source(#'v1_0.source'{address = Address,
24222427
durable = Durable},
24232428
Vhost, User, PermCache, TopicPermCache) ->
24242429
case Address of
2430+
{utf8, <<"/q/", QNameBinQuoted/binary>>} ->
2431+
%% The only possible v2 source address format is:
2432+
%% /q/:queue
2433+
try rabbit_uri:urldecode(QNameBinQuoted) of
2434+
QNameBin ->
2435+
QName = queue_resource(Vhost, QNameBin),
2436+
ok = exit_if_absent(QName),
2437+
{ok, QName, PermCache, TopicPermCache}
2438+
catch error:_ ->
2439+
{error, {bad_address, Address}}
2440+
end;
24252441
{utf8, SourceAddr} ->
24262442
case address_v1_permitted() of
2427-
true -> ensure_source_v1(
2428-
SourceAddr, Vhost, User, Durable, PermCache, TopicPermCache);
2429-
false -> ensure_source_v2(
2430-
SourceAddr, Vhost, PermCache, TopicPermCache)
2443+
true ->
2444+
ensure_source_v1(SourceAddr, Vhost, User, Durable,
2445+
PermCache, TopicPermCache);
2446+
false ->
2447+
{error, {amqp_address_v1_not_permitted, Address}}
24312448
end;
24322449
_ ->
24332450
{error, {bad_address, Address}}
@@ -2467,19 +2484,10 @@ ensure_source_v1(Address,
24672484
Err
24682485
end
24692486
end;
2470-
{error, _} ->
2471-
ensure_source_v2(Address, Vhost, PermCache0, TopicPermCache0)
2487+
{error, _} = Err ->
2488+
Err
24722489
end.
24732490

2474-
%% The only possible v2 source address format is:
2475-
%% /queue/:queue
2476-
ensure_source_v2(<<"/queue/", QNameBin/binary>>, Vhost, PermCache, TopicPermCache) ->
2477-
QName = queue_resource(Vhost, QNameBin),
2478-
ok = exit_if_absent(QName),
2479-
{ok, QName, PermCache, TopicPermCache};
2480-
ensure_source_v2(Address, _, _, _) ->
2481-
{error, {bad_address, Address}}.
2482-
24832491
-spec ensure_target(#'v1_0.target'{},
24842492
rabbit_types:vhost(),
24852493
rabbit_types:user(),
@@ -2495,29 +2503,28 @@ ensure_target(#'v1_0.target'{dynamic = true}, _, _, _) ->
24952503
ensure_target(#'v1_0.target'{address = Address,
24962504
durable = Durable},
24972505
Vhost, User, PermCache) ->
2498-
case address_v1_permitted() of
2499-
true ->
2500-
try_target_v1(Address, Vhost, User, Durable, PermCache);
2501-
false ->
2502-
try_target_v2(Address, Vhost, User, PermCache)
2503-
end.
2504-
2505-
try_target_v1(Address, Vhost, User, Durable, PermCache0) ->
2506-
case ensure_target_v1(Address, Vhost, User, Durable, PermCache0) of
2507-
{ok, XNameBin, RKey, QNameBin, PermCache} ->
2508-
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
2509-
{error, _} ->
2510-
try_target_v2(Address, Vhost, User, PermCache0)
2511-
end.
2512-
2513-
try_target_v2(Address, Vhost, User, PermCache) ->
2514-
case ensure_target_v2(Address, Vhost) of
2515-
{ok, to, RKey, QNameBin} ->
2516-
{ok, to, RKey, QNameBin, PermCache};
2517-
{ok, XNameBin, RKey, QNameBin} ->
2518-
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
2519-
{error, _} = Err ->
2520-
Err
2506+
case target_address_version(Address) of
2507+
2 ->
2508+
case ensure_target_v2(Address, Vhost) of
2509+
{ok, to, RKey, QNameBin} ->
2510+
{ok, to, RKey, QNameBin, PermCache};
2511+
{ok, XNameBin, RKey, QNameBin} ->
2512+
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
2513+
{error, _} = Err ->
2514+
Err
2515+
end;
2516+
1 ->
2517+
case address_v1_permitted() of
2518+
true ->
2519+
case ensure_target_v1(Address, Vhost, User, Durable, PermCache) of
2520+
{ok, XNameBin, RKey, QNameBin, PermCache1} ->
2521+
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache1);
2522+
{error, _} = Err ->
2523+
Err
2524+
end;
2525+
false ->
2526+
{error, {amqp_address_v1_not_permitted, Address}}
2527+
end
25212528
end.
25222529

25232530
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
@@ -2539,29 +2546,24 @@ check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
25392546
exit_not_found(XName)
25402547
end.
25412548

2542-
ensure_target_v1({utf8, Address}, Vhost, User, Durable, PermCache0) ->
2543-
case rabbit_routing_parser:parse_endpoint(Address, true) of
2544-
{ok, Dest} ->
2545-
{QNameBin, PermCache} = ensure_terminus(
2546-
target, Dest, Vhost, User, Durable, PermCache0),
2547-
{XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest),
2548-
XNameBin = unicode:characters_to_binary(XNameList1),
2549-
RoutingKey = case RK of
2550-
undefined -> subject;
2551-
[] -> subject;
2552-
_ -> unicode:characters_to_binary(RK)
2553-
end,
2554-
{ok, XNameBin, RoutingKey, QNameBin, PermCache};
2555-
{error, _} = Err ->
2556-
Err
2557-
end;
2558-
ensure_target_v1(Address, _, _, _, _) ->
2559-
{error, {bad_address, Address}}.
2549+
address_v1_permitted() ->
2550+
rabbit_deprecated_features:is_permitted(amqp_address_v1).
2551+
2552+
target_address_version({utf8, <<"/e/", _/binary>>}) ->
2553+
2;
2554+
target_address_version({utf8, <<"/q/", _/binary>>}) ->
2555+
2;
2556+
target_address_version(undefined) ->
2557+
%% anonymous terminus
2558+
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay
2559+
2;
2560+
target_address_version(_Address) ->
2561+
1.
25602562

25612563
%% The possible v2 target address formats are:
2562-
%% /exchange/:exchange/key/:routing-key
2563-
%% /exchange/:exchange
2564-
%% /queue/:queue
2564+
%% /e/:exchange/:routing-key
2565+
%% /e/:exchange
2566+
%% /q/:queue
25652567
%% <null>
25662568
ensure_target_v2({utf8, String}, Vhost) ->
25672569
case parse_target_v2_string(String) of
@@ -2576,42 +2578,64 @@ ensure_target_v2({utf8, String}, Vhost) ->
25762578
ensure_target_v2(undefined, _) ->
25772579
%% anonymous terminus
25782580
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay
2579-
{ok, to, to, undefined};
2580-
ensure_target_v2(Address, _) ->
2581-
{error, {bad_address, Address}}.
2581+
{ok, to, to, undefined}.
2582+
2583+
parse_target_v2_string(String) ->
2584+
try parse_target_v2_string0(String)
2585+
catch error:_ ->
2586+
{error, bad_address}
2587+
end.
25822588

2583-
parse_target_v2_string(<<"/exchange/", Rest/binary>>) ->
2584-
case split_exchange_target(Rest) of
2585-
{?DEFAULT_EXCHANGE_NAME, _} ->
2589+
parse_target_v2_string0(<<"/e/", Rest/binary>>) ->
2590+
Key = cp_slash,
2591+
Pattern = try persistent_term:get(Key)
2592+
catch error:badarg ->
2593+
Cp = binary:compile_pattern(<<"/">>),
2594+
ok = persistent_term:put(Key, Cp),
2595+
Cp
2596+
end,
2597+
case binary:split(Rest, Pattern, [global]) of
2598+
[?DEFAULT_EXCHANGE_NAME | _] ->
25862599
{error, bad_address};
2587-
{<<"amq.default">>, _} ->
2600+
[<<"amq.default">> | _] ->
25882601
{error, bad_address};
2589-
{XNameBin, RKey} ->
2590-
{ok, XNameBin, RKey, undefined}
2602+
[XNameBinQuoted] ->
2603+
XNameBin = rabbit_uri:urldecode(XNameBinQuoted),
2604+
{ok, XNameBin, <<>>, undefined};
2605+
[XNameBinQuoted, RKeyQuoted] ->
2606+
XNameBin = rabbit_uri:urldecode(XNameBinQuoted),
2607+
RKey = rabbit_uri:urldecode(RKeyQuoted),
2608+
{ok, XNameBin, RKey, undefined};
2609+
_ ->
2610+
{error, bad_address}
25912611
end;
2592-
parse_target_v2_string(<<"/queue/">>) ->
2612+
parse_target_v2_string0(<<"/q/">>) ->
25932613
%% empty queue name is invalid
25942614
{error, bad_address};
2595-
parse_target_v2_string(<<"/queue/", QNameBin/binary>>) ->
2615+
parse_target_v2_string0(<<"/q/", QNameBinQuoted/binary>>) ->
2616+
QNameBin = rabbit_uri:urldecode(QNameBinQuoted),
25962617
{ok, ?DEFAULT_EXCHANGE_NAME, QNameBin, QNameBin};
2597-
parse_target_v2_string(_) ->
2618+
parse_target_v2_string0(_) ->
25982619
{error, bad_address}.
25992620

2600-
%% Empty exchange name (default exchange) is valid.
2601-
split_exchange_target(Target) ->
2602-
Key = cp_amqp_target_address,
2603-
Pattern = try persistent_term:get(Key)
2604-
catch error:badarg ->
2605-
Cp = binary:compile_pattern(<<"/key/">>),
2606-
ok = persistent_term:put(Key, Cp),
2607-
Cp
2608-
end,
2609-
case binary:split(Target, Pattern) of
2610-
[XNameBin] ->
2611-
{XNameBin, <<>>};
2612-
[XNameBin, RoutingKey] ->
2613-
{XNameBin, RoutingKey}
2614-
end.
2621+
ensure_target_v1({utf8, Address}, Vhost, User, Durable, PermCache0) ->
2622+
case rabbit_routing_parser:parse_endpoint(Address, true) of
2623+
{ok, Dest} ->
2624+
{QNameBin, PermCache} = ensure_terminus(
2625+
target, Dest, Vhost, User, Durable, PermCache0),
2626+
{XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest),
2627+
XNameBin = unicode:characters_to_binary(XNameList1),
2628+
RoutingKey = case RK of
2629+
undefined -> subject;
2630+
[] -> subject;
2631+
_ -> unicode:characters_to_binary(RK)
2632+
end,
2633+
{ok, XNameBin, RoutingKey, QNameBin, PermCache};
2634+
{error, _} = Err ->
2635+
Err
2636+
end;
2637+
ensure_target_v1(Address, _, _, _, _) ->
2638+
{error, {bad_address, Address}}.
26152639

26162640
handle_outgoing_mgmt_link_flow_control(
26172641
#management_link{delivery_count = DeliveryCountSnd} = Link0,
@@ -3355,14 +3379,24 @@ error_not_found(Resource) ->
33553379
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
33563380
description = {utf8, Description}}.
33573381

3358-
address_v1_permitted() ->
3359-
rabbit_deprecated_features:is_permitted(amqp_address_v1).
3360-
33613382
-spec cap_credit(rabbit_queue_type:credit()) ->
33623383
0..?LINK_CREDIT_RCV_FROM_QUEUE_MAX.
33633384
cap_credit(DesiredCredit) ->
33643385
min(DesiredCredit, ?LINK_CREDIT_RCV_FROM_QUEUE_MAX).
33653386

3387+
ensure_mc_cluster_compat(Mc) ->
3388+
IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1),
3389+
case IsEnabled of
3390+
true ->
3391+
Mc;
3392+
false ->
3393+
McEnv = #{message_containers_store_amqp_v1 => IsEnabled},
3394+
%% other nodes in the cluster may not understand the new internal
3395+
%% amqp mc format - in this case we convert to AMQP legacy format
3396+
%% for compatibility
3397+
mc:convert(mc_amqpl, Mc, McEnv)
3398+
end.
3399+
33663400
format_status(
33673401
#{state := #state{cfg = Cfg,
33683402
outgoing_pending = OutgoingPending,
@@ -3407,16 +3441,3 @@ format_status(
34073441
permission_cache => PermissionCache,
34083442
topic_permission_cache => TopicPermissionCache},
34093443
maps:update(state, State, Status).
3410-
3411-
ensure_mc_cluster_compat(Mc) ->
3412-
IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1),
3413-
case IsEnabled of
3414-
true ->
3415-
Mc;
3416-
false ->
3417-
McEnv = #{message_containers_store_amqp_v1 => IsEnabled},
3418-
%% other nodes in the cluster may not understand the new internal
3419-
%% amqp mc format - in this case we convert to AMQP legacy format
3420-
%% for compatibility
3421-
mc:convert(mc_amqpl, Mc, McEnv)
3422-
end.

0 commit comments

Comments
 (0)