Skip to content

Commit 0de9591

Browse files
committed
Use different AMQP address format for v1 and v2
to distinguish between v1 and v2 address formats. Previously, v1 and v2 address formats overlapped and behaved differently for example for: ``` /queue/:queue /exchange/:exchange ``` This PR changes the v2 format to: ``` /e/:exchange/:routing-key /e/:exchange /q/:queue ``` to distinguish between v1 and v2 addresses. This allows to call `rabbit_deprecated_features:is_permitted(amqp_address_v1)` only if we know that the user requests address format v1. Note that `rabbit_deprecated_features:is_permitted/1` should only be called when the old feature is actually used. Use percent encoding / decoding for address URI format v2. This allows to use any UTF-8 encoded characters including slashes (`/`) in routing keys, exchange names, and queue names and is more future safe.
1 parent 1bc0d89 commit 0de9591

File tree

9 files changed

+278
-173
lines changed

9 files changed

+278
-173
lines changed

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 130 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,15 @@
1919
-rabbit_deprecated_feature(
2020
{amqp_address_v1,
2121
#{deprecation_phase => permitted_by_default,
22+
doc_url => "https://www.rabbitmq.com/docs/next/amqp#address",
2223
messages =>
2324
#{when_permitted =>
2425
"RabbitMQ AMQP address version 1 is deprecated. "
25-
"Clients should use RabbitMQ AMQP address version 2."}}
26+
"Clients should use RabbitMQ AMQP address version 2.",
27+
when_denied =>
28+
"RabbitMQ AMQP address version 1 is unsupported. "
29+
"Clients must use RabbitMQ AMQP address version 2."
30+
}}
2631
}).
2732

2833
-define(PROTOCOL, amqp10).
@@ -2422,12 +2427,20 @@ ensure_source(#'v1_0.source'{address = Address,
24222427
durable = Durable},
24232428
Vhost, User, PermCache, TopicPermCache) ->
24242429
case Address of
2430+
{utf8, <<"/q/", QNameBinQuoted/binary>>} ->
2431+
%% The only possible v2 source address format is:
2432+
%% /q/:queue
2433+
QNameBin = unquote(QNameBinQuoted),
2434+
QName = queue_resource(Vhost, QNameBin),
2435+
ok = exit_if_absent(QName),
2436+
{ok, QName, PermCache, TopicPermCache};
24252437
{utf8, SourceAddr} ->
24262438
case address_v1_permitted() of
2427-
true -> ensure_source_v1(
2428-
SourceAddr, Vhost, User, Durable, PermCache, TopicPermCache);
2429-
false -> ensure_source_v2(
2430-
SourceAddr, Vhost, PermCache, TopicPermCache)
2439+
true ->
2440+
ensure_source_v1(SourceAddr, Vhost, User, Durable,
2441+
PermCache, TopicPermCache);
2442+
false ->
2443+
{error, {amqp_address_v1_not_permitted, Address}}
24312444
end;
24322445
_ ->
24332446
{error, {bad_address, Address}}
@@ -2467,19 +2480,10 @@ ensure_source_v1(Address,
24672480
Err
24682481
end
24692482
end;
2470-
{error, _} ->
2471-
ensure_source_v2(Address, Vhost, PermCache0, TopicPermCache0)
2483+
{error, _} = Err ->
2484+
Err
24722485
end.
24732486

2474-
%% The only possible v2 source address format is:
2475-
%% /queue/:queue
2476-
ensure_source_v2(<<"/queue/", QNameBin/binary>>, Vhost, PermCache, TopicPermCache) ->
2477-
QName = queue_resource(Vhost, QNameBin),
2478-
ok = exit_if_absent(QName),
2479-
{ok, QName, PermCache, TopicPermCache};
2480-
ensure_source_v2(Address, _, _, _) ->
2481-
{error, {bad_address, Address}}.
2482-
24832487
-spec ensure_target(#'v1_0.target'{},
24842488
rabbit_types:vhost(),
24852489
rabbit_types:user(),
@@ -2495,29 +2499,28 @@ ensure_target(#'v1_0.target'{dynamic = true}, _, _, _) ->
24952499
ensure_target(#'v1_0.target'{address = Address,
24962500
durable = Durable},
24972501
Vhost, User, PermCache) ->
2498-
case address_v1_permitted() of
2499-
true ->
2500-
try_target_v1(Address, Vhost, User, Durable, PermCache);
2501-
false ->
2502-
try_target_v2(Address, Vhost, User, PermCache)
2503-
end.
2504-
2505-
try_target_v1(Address, Vhost, User, Durable, PermCache0) ->
2506-
case ensure_target_v1(Address, Vhost, User, Durable, PermCache0) of
2507-
{ok, XNameBin, RKey, QNameBin, PermCache} ->
2508-
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
2509-
{error, _} ->
2510-
try_target_v2(Address, Vhost, User, PermCache0)
2511-
end.
2512-
2513-
try_target_v2(Address, Vhost, User, PermCache) ->
2514-
case ensure_target_v2(Address, Vhost) of
2515-
{ok, to, RKey, QNameBin} ->
2516-
{ok, to, RKey, QNameBin, PermCache};
2517-
{ok, XNameBin, RKey, QNameBin} ->
2518-
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
2519-
{error, _} = Err ->
2520-
Err
2502+
case target_address_version(Address) of
2503+
2 ->
2504+
case ensure_target_v2(Address, Vhost) of
2505+
{ok, to, RKey, QNameBin} ->
2506+
{ok, to, RKey, QNameBin, PermCache};
2507+
{ok, XNameBin, RKey, QNameBin} ->
2508+
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache);
2509+
{error, _} = Err ->
2510+
Err
2511+
end;
2512+
1 ->
2513+
case address_v1_permitted() of
2514+
true ->
2515+
case ensure_target_v1(Address, Vhost, User, Durable, PermCache) of
2516+
{ok, XNameBin, RKey, QNameBin, PermCache1} ->
2517+
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache1);
2518+
{error, _} = Err ->
2519+
Err
2520+
end;
2521+
false ->
2522+
{error, {amqp_address_v1_not_permitted, Address}}
2523+
end
25212524
end.
25222525

25232526
check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
@@ -2539,29 +2542,24 @@ check_exchange(XNameBin, RKey, QNameBin, User, Vhost, PermCache0) ->
25392542
exit_not_found(XName)
25402543
end.
25412544

2542-
ensure_target_v1({utf8, Address}, Vhost, User, Durable, PermCache0) ->
2543-
case rabbit_routing_parser:parse_endpoint(Address, true) of
2544-
{ok, Dest} ->
2545-
{QNameBin, PermCache} = ensure_terminus(
2546-
target, Dest, Vhost, User, Durable, PermCache0),
2547-
{XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest),
2548-
XNameBin = unicode:characters_to_binary(XNameList1),
2549-
RoutingKey = case RK of
2550-
undefined -> subject;
2551-
[] -> subject;
2552-
_ -> unicode:characters_to_binary(RK)
2553-
end,
2554-
{ok, XNameBin, RoutingKey, QNameBin, PermCache};
2555-
{error, _} = Err ->
2556-
Err
2557-
end;
2558-
ensure_target_v1(Address, _, _, _, _) ->
2559-
{error, {bad_address, Address}}.
2545+
address_v1_permitted() ->
2546+
rabbit_deprecated_features:is_permitted(amqp_address_v1).
2547+
2548+
target_address_version({utf8, <<"/e/", _/binary>>}) ->
2549+
2;
2550+
target_address_version({utf8, <<"/q/", _/binary>>}) ->
2551+
2;
2552+
target_address_version(undefined) ->
2553+
%% anonymous terminus
2554+
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay
2555+
2;
2556+
target_address_version(_Address) ->
2557+
1.
25602558

25612559
%% The possible v2 target address formats are:
2562-
%% /exchange/:exchange/key/:routing-key
2563-
%% /exchange/:exchange
2564-
%% /queue/:queue
2560+
%% /e/:exchange/:routing-key
2561+
%% /e/:exchange
2562+
%% /q/:queue
25652563
%% <null>
25662564
ensure_target_v2({utf8, String}, Vhost) ->
25672565
case parse_target_v2_string(String) of
@@ -2576,43 +2574,77 @@ ensure_target_v2({utf8, String}, Vhost) ->
25762574
ensure_target_v2(undefined, _) ->
25772575
%% anonymous terminus
25782576
%% https://docs.oasis-open.org/amqp/anonterm/v1.0/cs01/anonterm-v1.0-cs01.html#doc-anonymous-relay
2579-
{ok, to, to, undefined};
2580-
ensure_target_v2(Address, _) ->
2581-
{error, {bad_address, Address}}.
2577+
{ok, to, to, undefined}.
25822578

2583-
parse_target_v2_string(<<"/exchange/", Rest/binary>>) ->
2584-
case split_exchange_target(Rest) of
2585-
{?DEFAULT_EXCHANGE_NAME, _} ->
2579+
parse_target_v2_string(<<"/e/", Rest/binary>>) ->
2580+
Key = cp_slash,
2581+
Pattern = try persistent_term:get(Key)
2582+
catch error:badarg ->
2583+
Cp = binary:compile_pattern(<<"/">>),
2584+
ok = persistent_term:put(Key, Cp),
2585+
Cp
2586+
end,
2587+
case binary:split(Rest, Pattern, [global]) of
2588+
[?DEFAULT_EXCHANGE_NAME | _] ->
25862589
{error, bad_address};
2587-
{<<"amq.default">>, _} ->
2590+
[<<"amq.default">> | _] ->
25882591
{error, bad_address};
2589-
{XNameBin, RKey} ->
2590-
{ok, XNameBin, RKey, undefined}
2592+
[XNameBinQuoted] ->
2593+
XNameBin = unquote(XNameBinQuoted),
2594+
{ok, XNameBin, <<>>, undefined};
2595+
[XNameBinQuoted, RKeyQuoted] ->
2596+
XNameBin = unquote(XNameBinQuoted),
2597+
RKey = unquote(RKeyQuoted),
2598+
{ok, XNameBin, RKey, undefined};
2599+
_ ->
2600+
{error, bad_address}
25912601
end;
2592-
parse_target_v2_string(<<"/queue/">>) ->
2602+
parse_target_v2_string(<<"/q/">>) ->
25932603
%% empty queue name is invalid
25942604
{error, bad_address};
2595-
parse_target_v2_string(<<"/queue/", QNameBin/binary>>) ->
2605+
parse_target_v2_string(<<"/q/", QNameBinQuoted/binary>>) ->
2606+
QNameBin = unquote(QNameBinQuoted),
25962607
{ok, ?DEFAULT_EXCHANGE_NAME, QNameBin, QNameBin};
25972608
parse_target_v2_string(_) ->
25982609
{error, bad_address}.
25992610

2600-
%% Empty exchange name (default exchange) is valid.
2601-
split_exchange_target(Target) ->
2602-
Key = cp_amqp_target_address,
2603-
Pattern = try persistent_term:get(Key)
2604-
catch error:badarg ->
2605-
Cp = binary:compile_pattern(<<"/key/">>),
2606-
ok = persistent_term:put(Key, Cp),
2607-
Cp
2608-
end,
2609-
case binary:split(Target, Pattern) of
2610-
[XNameBin] ->
2611-
{XNameBin, <<>>};
2612-
[XNameBin, RoutingKey] ->
2613-
{XNameBin, RoutingKey}
2611+
ensure_target_v1({utf8, Address}, Vhost, User, Durable, PermCache0) ->
2612+
case rabbit_routing_parser:parse_endpoint(Address, true) of
2613+
{ok, Dest} ->
2614+
{QNameBin, PermCache} = ensure_terminus(
2615+
target, Dest, Vhost, User, Durable, PermCache0),
2616+
{XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest),
2617+
XNameBin = unicode:characters_to_binary(XNameList1),
2618+
RoutingKey = case RK of
2619+
undefined -> subject;
2620+
[] -> subject;
2621+
_ -> unicode:characters_to_binary(RK)
2622+
end,
2623+
{ok, XNameBin, RoutingKey, QNameBin, PermCache};
2624+
{error, _} = Err ->
2625+
Err
2626+
end;
2627+
ensure_target_v1(Address, _, _, _, _) ->
2628+
{error, {bad_address, Address}}.
2629+
2630+
%% uri_string:unquote/1 is implemented inefficiently because it always creates
2631+
%% a new binary. We optimise for the common case: When no character is percent
2632+
%% encoded, we avoid a new binary being created.
2633+
unquote(Bin) ->
2634+
case is_quoted(Bin) of
2635+
true ->
2636+
uri_string:unquote(Bin);
2637+
false ->
2638+
Bin
26142639
end.
26152640

2641+
is_quoted(<<>>) ->
2642+
false;
2643+
is_quoted(<<$%, _/binary>>) ->
2644+
true;
2645+
is_quoted(<<_, Rest/binary>>) ->
2646+
is_quoted(Rest).
2647+
26162648
handle_outgoing_mgmt_link_flow_control(
26172649
#management_link{delivery_count = DeliveryCountSnd} = Link0,
26182650
#'v1_0.flow'{handle = Handle = ?UINT(HandleInt),
@@ -3355,14 +3387,24 @@ error_not_found(Resource) ->
33553387
condition = ?V_1_0_AMQP_ERROR_NOT_FOUND,
33563388
description = {utf8, Description}}.
33573389

3358-
address_v1_permitted() ->
3359-
rabbit_deprecated_features:is_permitted(amqp_address_v1).
3360-
33613390
-spec cap_credit(rabbit_queue_type:credit()) ->
33623391
0..?LINK_CREDIT_RCV_FROM_QUEUE_MAX.
33633392
cap_credit(DesiredCredit) ->
33643393
min(DesiredCredit, ?LINK_CREDIT_RCV_FROM_QUEUE_MAX).
33653394

3395+
ensure_mc_cluster_compat(Mc) ->
3396+
IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1),
3397+
case IsEnabled of
3398+
true ->
3399+
Mc;
3400+
false ->
3401+
McEnv = #{message_containers_store_amqp_v1 => IsEnabled},
3402+
%% other nodes in the cluster may not understand the new internal
3403+
%% amqp mc format - in this case we convert to AMQP legacy format
3404+
%% for compatibility
3405+
mc:convert(mc_amqpl, Mc, McEnv)
3406+
end.
3407+
33663408
format_status(
33673409
#{state := #state{cfg = Cfg,
33683410
outgoing_pending = OutgoingPending,
@@ -3407,16 +3449,3 @@ format_status(
34073449
permission_cache => PermissionCache,
34083450
topic_permission_cache => TopicPermissionCache},
34093451
maps:update(state, State, Status).
3410-
3411-
ensure_mc_cluster_compat(Mc) ->
3412-
IsEnabled = rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1),
3413-
case IsEnabled of
3414-
true ->
3415-
Mc;
3416-
false ->
3417-
McEnv = #{message_containers_store_amqp_v1 => IsEnabled},
3418-
%% other nodes in the cluster may not understand the new internal
3419-
%% amqp mc format - in this case we convert to AMQP legacy format
3420-
%% for compatibility
3421-
mc:convert(mc_amqpl, Mc, McEnv)
3422-
end.

0 commit comments

Comments
 (0)