Skip to content

Commit 06314d4

Browse files
committed
Use HTTP API v2
1 parent b6dfc9f commit 06314d4

File tree

13 files changed

+507
-413
lines changed

13 files changed

+507
-413
lines changed

deps/rabbit/src/rabbit.erl

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1709,7 +1709,15 @@ persist_static_configuration() ->
17091709
_ ->
17101710
?MAX_MSG_SIZE
17111711
end,
1712-
ok = persistent_term:put(max_message_size, MaxMsgSize).
1712+
ok = persistent_term:put(max_message_size, MaxMsgSize),
1713+
1714+
%% This regex matches for example
1715+
%% src=e1;dstq=q2;key=my-key;args=
1716+
%% Source, destination and binding key values must be percent encoded.
1717+
%% Binding args use the URL safe Base 64 Alphabet: https://datatracker.ietf.org/doc/html/rfc4648#section-5
1718+
{ok, MP} = re:compile(
1719+
<<"^src=([0-9A-Za-z\-.\_\~%]+);dst([eq])=([0-9A-Za-z\-.\_\~%]+);key=([0-9A-Za-z\-.\_\~%]*);args=([0-9A-Za-z\-\_]*)$">>),
1720+
ok = persistent_term:put(mp_binding_uri_path_segment, MP).
17131721

17141722
persist_static_configuration(Params) ->
17151723
App = ?MODULE,

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 273 additions & 255 deletions
Large diffs are not rendered by default.

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -491,8 +491,7 @@ handle_1_0_connection_frame(
491491
%% "the value in idle-time-out SHOULD be half the peer's actual timeout threshold" [2.4.5]
492492
idle_time_out = {uint, ReceiveTimeoutMillis div 2},
493493
container_id = {utf8, rabbit_nodes:cluster_name()},
494-
offered_capabilities = {array, symbol, [{symbol, <<"AMQP_MANAGEMENT_V1_0">>},
495-
{symbol, <<"LINK_PAIR_V1_0">>}]},
494+
offered_capabilities = {array, symbol, [{symbol, <<"LINK_PAIR_V1_0">>}]},
496495
properties = server_properties()}),
497496
State;
498497
handle_1_0_connection_frame(#'v1_0.close'{}, State0) ->

deps/rabbit/src/rabbit_amqp_session.erl

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,14 @@
4949
%% we grant depending on how fast target queue(s) actually confirm messages.
5050
-define(LINK_CREDIT_RCV, 128).
5151

52+
%% Pure HTTP clients of a future HTTP API (v2) would call endpoints as follows:
53+
%% GET /v2/vhosts/:vhost/queues/:queue
54+
%%
55+
%% Here, we use the terminus address /management/v2 so that AMQP 1.0 clients declare the HTTP API version
56+
%% at link attachment time. The vhost is already determined at AMQP connection open time.
57+
%% Therefore, there is no need to send the HTTP API version and the vhost in every HTTP over AMQP request.
58+
-define(MANAGEMENT_NODE_ADDRESS, <<"/management/v2">>).
59+
5260
-export([start_link/8,
5361
process_frame/2,
5462
list_local/0,
@@ -735,7 +743,7 @@ handle_control(#'v1_0.attach'{
735743
name = Name = {utf8, LinkName},
736744
handle = Handle = ?UINT(HandleInt),
737745
source = Source = #'v1_0.source'{address = ClientTerminusAddress},
738-
target = Target = #'v1_0.target'{address = {utf8, <<"$management">>}},
746+
target = Target = #'v1_0.target'{address = {utf8, ?MANAGEMENT_NODE_ADDRESS}},
739747
initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt),
740748
properties = Properties
741749
} = Attach,
@@ -792,7 +800,7 @@ handle_control(#'v1_0.attach'{
792800
role = ?AMQP_ROLE_RECEIVER,
793801
name = Name = {utf8, LinkName},
794802
handle = Handle = ?UINT(HandleInt),
795-
source = Source = #'v1_0.source'{address = {utf8, <<"$management">>}},
803+
source = Source = #'v1_0.source'{address = {utf8, ?MANAGEMENT_NODE_ADDRESS}},
796804
target = Target = #'v1_0.target'{address = ClientTerminusAddress},
797805
rcv_settle_mode = RcvSettleMode,
798806
max_message_size = MaybeMaxMessageSize,
@@ -1680,7 +1688,7 @@ incoming_mgmt_link_transfer(
16801688
Settled = default(MaybeSettled, false),
16811689
validate_transfer_rcv_settle_mode(RcvSettleMode, Settled),
16821690
validate_message_size(Request, IncomingMaxMessageSize),
1683-
Response = rabbit_amqp_management:process_request(Request, Vhost, User, ReaderPid),
1691+
Response = rabbit_amqp_management:handle_request(Request, Vhost, User, ReaderPid),
16841692

16851693
Transfer = #'v1_0.transfer'{
16861694
handle = ?UINT(OutgoingHandleInt),
@@ -1690,7 +1698,7 @@ incoming_mgmt_link_transfer(
16901698
settled = true},
16911699
?DEBUG("~s Outbound content:~n ~tp~n",
16921700
[?MODULE, [amqp10_framing:pprint(Section) ||
1693-
Section <- amqp10_framing:decode_bin(iolist_to_binary(Respon))]]),
1701+
Section <- amqp10_framing:decode_bin(iolist_to_binary(Response))]]),
16941702
validate_message_size(Response, OutgoingMaxMessageSize),
16951703
Frames = transfer_frames(Transfer, Response, MaxFrameSize),
16961704
PendingTransfer = #pending_management_transfer{frames = Frames},
@@ -1917,7 +1925,7 @@ ensure_target(#'v1_0.target'{address = Address,
19171925
{ok, Dest} ->
19181926
QNameBin = ensure_terminus(target, Dest, Vhost, User, Durable),
19191927
{XNameList1, RK} = rabbit_routing_parser:parse_routing(Dest),
1920-
XNameBin = list_to_binary(XNameList1),
1928+
XNameBin = unicode:characters_to_binary(XNameList1),
19211929
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
19221930
{ok, X} = rabbit_exchange:lookup(XName),
19231931
check_internal_exchange(X),
@@ -1933,7 +1941,7 @@ ensure_target(#'v1_0.target'{address = Address,
19331941
RoutingKey = case RK of
19341942
undefined -> undefined;
19351943
[] -> undefined;
1936-
_ -> list_to_binary(RK)
1944+
_ -> unicode:characters_to_binary(RK)
19371945
end,
19381946
{ok, Exchange, RoutingKey, QNameBin};
19391947
{error, _} = E ->
@@ -2117,8 +2125,8 @@ ensure_source(#'v1_0.source'{address = Address,
21172125
true = string:equal(QNameList, QNameBin),
21182126
{ok, QNameBin};
21192127
{XNameList, RoutingKeyList} ->
2120-
RoutingKey = list_to_binary(RoutingKeyList),
2121-
XNameBin = list_to_binary(XNameList),
2128+
RoutingKey = unicode:characters_to_binary(RoutingKeyList),
2129+
XNameBin = unicode:characters_to_binary(XNameList),
21222130
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
21232131
QName = rabbit_misc:r(Vhost, queue, QNameBin),
21242132
Binding = #binding{source = XName,
@@ -2322,19 +2330,19 @@ ensure_terminus(target, {queue, undefined}, _, _, _) ->
23222330
%% Default exchange exists.
23232331
undefined;
23242332
ensure_terminus(_, {queue, QNameList}, Vhost, User, Durability) ->
2325-
declare_queue(list_to_binary(QNameList), Vhost, User, Durability);
2333+
declare_queue(unicode:characters_to_binary(QNameList), Vhost, User, Durability);
23262334
ensure_terminus(_, {amqqueue, QNameList}, Vhost, _, _) ->
23272335
%% Target "/amq/queue/" is handled specially due to AMQP legacy:
23282336
%% "Queue names starting with "amq." are reserved for pre-declared and
23292337
%% standardised queues. The client MAY declare a queue starting with "amq."
23302338
%% if the passive option is set, or the queue already exists."
2331-
QNameBin = list_to_binary(QNameList),
2339+
QNameBin = unicode:characters_to_binary(QNameList),
23322340
ok = exit_if_absent(queue, Vhost, QNameBin),
23332341
QNameBin.
23342342

2335-
exit_if_absent(Type, Vhost, Name) ->
2336-
ResourceName = rabbit_misc:r(Vhost, Type, rabbit_data_coercion:to_binary(Name)),
2337-
Mod = case Type of
2343+
exit_if_absent(Kind, Vhost, Name) ->
2344+
ResourceName = rabbit_misc:r(Vhost, Kind, unicode:characters_to_binary(Name)),
2345+
Mod = case Kind of
23382346
exchange -> rabbit_exchange;
23392347
queue -> rabbit_amqqueue
23402348
end,

deps/rabbit/src/rabbit_channel.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2420,6 +2420,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
24202420
Args0),
24212421
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
24222422
Durable = DurableDeclare andalso not ExclusiveDeclare,
2423+
Kind = queue,
24232424
ActualNameBin = case StrippedQueueNameBin of
24242425
<<>> ->
24252426
case rabbit_amqqueue:is_server_named_allowed(Args) of
@@ -2431,9 +2432,9 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
24312432
"Cannot declare a server-named queue for type ~tp",
24322433
[rabbit_amqqueue:get_queue_type(Args)])
24332434
end;
2434-
Other -> check_name('queue', Other)
2435+
Other -> check_name(Kind, Other)
24352436
end,
2436-
QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
2437+
QueueName = rabbit_misc:r(VHostPath, Kind, ActualNameBin),
24372438
check_configure_permitted(QueueName, User, AuthzContext),
24382439
rabbit_core_metrics:queue_declared(QueueName),
24392440
case rabbit_amqqueue:with(

deps/rabbit/test/amqp_client_SUITE.erl

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,7 @@ init_per_testcase(T, Config)
167167
T =:= roundtrip_with_drain_quorum_queue orelse
168168
T =:= timed_get_quorum_queue orelse
169169
T =:= available_messages_quorum_queue ->
170-
case rabbit_ct_broker_helpers:rpc(
171-
Config, rabbit_feature_flags, is_enabled, [credit_api_v2]) of
170+
case rpc(Config, rabbit_feature_flags, is_enabled, [credit_api_v2]) of
172171
true ->
173172
rabbit_ct_helpers:testcase_started(Config, T);
174173
false ->
@@ -2322,9 +2321,8 @@ async_notify(SenderSettleMode, QType, Config) ->
23222321
%% If it is a stream we need to wait until there is a local member
23232322
%% on the node we want to subscibe from before proceeding.
23242323
rabbit_ct_helpers:await_condition(
2325-
fun() -> rabbit_ct_broker_helpers:rpc(
2326-
Config, 0, ?MODULE, has_local_member,
2327-
[rabbit_misc:r(<<"/">>, queue, QName)])
2324+
fun() -> rpc(Config, 0, ?MODULE, has_local_member,
2325+
[rabbit_misc:r(<<"/">>, queue, QName)])
23282326
end, 30_000);
23292327
_ ->
23302328
ok

deps/rabbit_common/src/rabbit_routing_parser.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ parse_endpoint(Destination, AllowAnonymousQueue) when is_binary(Destination) ->
2121
parse_endpoint(unicode:characters_to_list(Destination),
2222
AllowAnonymousQueue);
2323
parse_endpoint(Destination, AllowAnonymousQueue) when is_list(Destination) ->
24-
case re:split(Destination, "/", [{return, list}]) of
24+
case re:split(Destination, "/", [unicode, {return, list}]) of
2525
[Name] ->
2626
{ok, {queue, unescape(Name)}};
2727
["", Type | Rest]

0 commit comments

Comments
 (0)