Skip to content

Commit 7c0240f

Browse files
committed
Add exchange deletion checks
1 parent ad8595f commit 7c0240f

File tree

9 files changed

+171
-101
lines changed

9 files changed

+171
-101
lines changed

deps/rabbit/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1279,7 +1279,7 @@ rabbitmq_integration_suite(
12791279
":test_event_recorder_beam",
12801280
],
12811281
runtime_deps = [
1282-
"//deps/amqp10_client:erlang_app",
1282+
"//deps/rabbitmq_amqp_client:erlang_app",
12831283
],
12841284
)
12851285

deps/rabbit/src/rabbit_access_control.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ check_resource_access(User = #user{username = Username,
189189
check_access(
190190
fun() -> Module:check_resource_access(
191191
auth_user(User, Impl), Resource, Permission, Context) end,
192-
Module, "~s access to ~s refused for user '~s'",
192+
Module, "~s access to ~ts refused for user '~ts'",
193193
[Permission, rabbit_misc:rs(Resource), Username]);
194194
(_, Else) -> Else
195195
end, ok, Modules).
@@ -202,7 +202,7 @@ check_topic_access(User = #user{username = Username,
202202
check_access(
203203
fun() -> Module:check_topic_access(
204204
auth_user(User, Impl), Resource, Permission, Context) end,
205-
Module, "~s access to topic '~s' in exchange ~s refused for user '~s'",
205+
Module, "~s access to topic '~ts' in exchange ~s refused for user '~ts'",
206206
[Permission, maps:get(routing_key, Context), rabbit_misc:rs(Resource), Username]);
207207
(_, Else) -> Else
208208
end, ok, Modules).

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -148,22 +148,19 @@ handle_http_req(<<"DELETE">>,
148148
{<<"200">>, [], RespPayload};
149149

150150
handle_http_req(<<"DELETE">>,
151-
[<<"exchanges">>, XNameBinQ],
151+
[<<"exchanges">>, XNameBinQuoted],
152152
_Query,
153153
null,
154154
Vhost,
155-
#user{username = Username},
155+
User = #user{username = Username},
156156
_ConnPid) ->
157-
XNameBin = uri_string:unquote(XNameBinQ),
157+
XNameBin = uri_string:unquote(XNameBinQuoted),
158158
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
159-
ok = case rabbit_exchange:delete(XName, false, Username) of
160-
ok ->
161-
ok;
162-
{error, not_found} ->
163-
ok
164-
%% %% TODO return deletion failure
165-
%% {error, in_use} ->
166-
end,
159+
ok = prohibit_cr_lf(XNameBin),
160+
ok = prohibit_default_exchange(XNameBin),
161+
ok = prohibit_reserved_amq(XName),
162+
ok = check_resource_access(XName, configure, User),
163+
_ = rabbit_exchange:delete(XName, false, Username),
167164
{<<"204">>, [], null};
168165

169166
handle_http_req(<<"POST">>,
@@ -457,13 +454,12 @@ prohibit_reserved_amq(#resource{}) ->
457454
rabbit_types:user()) -> ok.
458455
check_resource_access(Resource, Perm, User) ->
459456
try rabbit_access_control:check_resource_access(User, Resource, Perm, #{})
460-
catch exit:#amqp_error{name = not_allowed} ->
457+
catch exit:#amqp_error{name = access_refused,
458+
explanation = Explanation} ->
461459
%% For authorization failures, let's be more strict: Close the entire
462-
%% AMQP session instead of only returning a HTTP Status Code 403.
460+
%% AMQP session instead of only returning an HTTP Status Code 403.
463461
rabbit_amqp_util:protocol_error(
464-
?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS,
465-
"~s access refused for user '~ts' to ~ts",
466-
[Perm, User, rabbit_misc:rs(Resource)])
462+
?V_1_0_AMQP_ERROR_UNAUTHORIZED_ACCESS, Explanation, [])
467463
end.
468464

469465
-spec throw(binary(), io:format(), [term()]) -> no_return().

deps/rabbit/src/rabbit_amqp_util.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
-spec protocol_error(term(), io:format(), [term()]) ->
1414
no_return().
1515
protocol_error(Condition, Msg, Args) ->
16-
Description = list_to_binary(lists:flatten(io_lib:format(Msg, Args))),
16+
Description = unicode:characters_to_binary(lists:flatten(io_lib:format(Msg, Args))),
1717
Reason = #'v1_0.error'{condition = Condition,
1818
description = {utf8, Description}},
1919
exit(Reason).

deps/rabbit/test/amqp_auth_SUITE.erl

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,11 @@ groups() ->
4747
vhost_absent,
4848
vhost_connection_limit,
4949
user_connection_limit,
50-
vhost_queue_limit
50+
vhost_queue_limit,
51+
52+
%% AMQP Management operations against HTTP API v2
53+
declare_exchange,
54+
delete_exchange
5155
]
5256
}
5357
].
@@ -537,6 +541,46 @@ vhost_queue_limit(Config) ->
537541
ok = close_connection_sync(C2),
538542
ok = rabbit_ct_broker_helpers:clear_vhost_limit(Config, 0, Vhost).
539543

544+
declare_exchange(Config) ->
545+
{Conn, _Session, LinkPair} = init_pair(Config),
546+
XName = <<"📮"/utf8>>,
547+
ExpectedErr = error_unauthorized(
548+
<<"configure access to exchange '", XName/binary,
549+
"' in vhost 'test vhost' refused for user 'test user'">>),
550+
?assertEqual({error, {session_ended, ExpectedErr}},
551+
rabbitmq_amqp_client:declare_exchange(LinkPair, XName, #{})),
552+
ok = close_connection_sync(Conn).
553+
554+
delete_exchange(Config) ->
555+
{Conn1, _, LinkPair1} = init_pair(Config),
556+
XName = <<"📮"/utf8>>,
557+
ok = set_permissions(Config, XName, <<>>, <<>>),
558+
ok = rabbitmq_amqp_client:declare_exchange(LinkPair1, XName, #{}),
559+
ok = clear_permissions(Config),
560+
ExpectedErr = error_unauthorized(
561+
<<"configure access to exchange '", XName/binary,
562+
"' in vhost 'test vhost' refused for user 'test user'">>),
563+
?assertEqual({error, {session_ended, ExpectedErr}},
564+
rabbitmq_amqp_client:delete_exchange(LinkPair1, XName)),
565+
ok = close_connection_sync(Conn1),
566+
567+
ok = set_permissions(Config, XName, <<>>, <<>>),
568+
Init = {_, _, LinkPair2} = init_pair(Config),
569+
ok = rabbitmq_amqp_client:delete_exchange(LinkPair2, XName),
570+
ok = cleanup_pair(Init).
571+
572+
init_pair(Config) ->
573+
OpnConf = connection_config(Config),
574+
{ok, Connection} = amqp10_client:open_connection(OpnConf),
575+
{ok, Session} = amqp10_client:begin_session_sync(Connection),
576+
{ok, LinkPair} = rabbitmq_amqp_client:attach_management_link_pair_sync(Session, <<"mgmt link pair">>),
577+
{Connection, Session, LinkPair}.
578+
579+
cleanup_pair({Connection, Session, LinkPair}) ->
580+
ok = rabbitmq_amqp_client:detach_management_link_pair_sync(LinkPair),
581+
ok = amqp10_client:end_session(Session),
582+
ok = amqp10_client:close_connection(Connection).
583+
540584
connection_config(Config) ->
541585
Vhost = ?config(test_vhost, Config),
542586
connection_config(Config, Vhost).

deps/rabbitmq_amqp_client/app.bzl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ def all_srcs(name = "all_srcs"):
2222
srcs = ["src/rabbitmq_amqp_client.erl"],
2323
)
2424
filegroup(name = "private_hdrs")
25-
filegroup(name = "public_hdrs", srcs = [
26-
"include/rabbitmq_amqp_client.hrl",
27-
])
25+
filegroup(
26+
name = "public_hdrs",
27+
srcs = ["include/rabbitmq_amqp_client.hrl"],
28+
)
2829
filegroup(name = "priv")
2930
filegroup(
3031
name = "license_files",
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1-
-record(link_pair, {outgoing_link :: amqp10_client:link_ref(),
1+
-record(link_pair, {session :: pid(),
2+
outgoing_link :: amqp10_client:link_ref(),
23
incoming_link :: amqp10_client:link_ref()}).
34
-type link_pair() :: #link_pair{}.

deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl

Lines changed: 48 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313

1414
-export[attach_management_link_pair_sync/2,
1515
detach_management_link_pair_sync/1,
16-
declare_queue/2,
17-
declare_exchange/2,
16+
declare_queue/3,
17+
declare_exchange/3,
1818
bind_queue/5,
1919
bind_exchange/5,
2020
unbind_queue/5,
@@ -61,7 +61,8 @@ attach_management_link_pair_sync(Session, Name) ->
6161
{ok, IncomingRef} ?= attach(Session, IncomingAttachArgs),
6262
ok ?= await_attached(OutgoingRef),
6363
ok ?= await_attached(IncomingRef),
64-
{ok, #link_pair{outgoing_link = OutgoingRef,
64+
{ok, #link_pair{session = Session,
65+
outgoing_link = OutgoingRef,
6566
incoming_link = IncomingRef}}
6667
end.
6768

@@ -117,28 +118,26 @@ await_detached(Ref) ->
117118
{error, timeout}
118119
end.
119120

120-
-spec declare_queue(link_pair(), queue_properties()) ->
121+
-spec declare_queue(link_pair(), binary(), queue_properties()) ->
121122
{ok, map()} | {error, term()}.
122-
declare_queue(LinkPair, QueueProperties) ->
123-
{QName, Body0} = maps:fold(
124-
fun(name, V, {undefined, L}) when is_binary(V) ->
125-
{V, L};
126-
(durable, V, {N, L}) when is_boolean(V) ->
127-
{N, [{{utf8, <<"durable">>}, {boolean, V}} | L]};
128-
(exclusive, V, {N, L}) when is_boolean(V) ->
129-
{N, [{{utf8, <<"exclusive">>}, {boolean, V}} | L]};
130-
(auto_delete, V, {N, L}) when is_boolean(V) ->
131-
{N, [{{utf8, <<"auto_delete">>}, {boolean, V}} | L]};
132-
(arguments, V, {N, L0}) ->
133-
KVList = maps:fold(
134-
fun(K = <<"x-", _/binary>>, TaggedVal = {T, _}, L)
135-
when is_atom(T) ->
136-
[{{utf8, K}, TaggedVal} | L]
137-
end, [], V),
138-
{N, [{{utf8, <<"arguments">>}, {map, KVList}} | L0]}
139-
end, {undefined, []}, QueueProperties),
123+
declare_queue(LinkPair, QueueName, QueueProperties) ->
124+
Body0 = maps:fold(
125+
fun(durable, V, L) when is_boolean(V) ->
126+
[{{utf8, <<"durable">>}, {boolean, V}} | L];
127+
(exclusive, V, L) when is_boolean(V) ->
128+
[{{utf8, <<"exclusive">>}, {boolean, V}} | L];
129+
(auto_delete, V, L) when is_boolean(V) ->
130+
[{{utf8, <<"auto_delete">>}, {boolean, V}} | L];
131+
(arguments, V, L) ->
132+
KVList = maps:fold(
133+
fun(K = <<"x-", _/binary>>, TaggedVal = {T, _}, L0)
134+
when is_atom(T) ->
135+
[{{utf8, K}, TaggedVal} | L0]
136+
end, [], V),
137+
[{{utf8, <<"arguments">>}, {map, KVList}} | L]
138+
end, [], QueueProperties),
140139
Body = {map, Body0},
141-
QNameQuoted = uri_string:quote(QName),
140+
QNameQuoted = uri_string:quote(QueueName),
142141
Props = #{subject => <<"PUT">>,
143142
to => <<"/queues/", QNameQuoted/binary>>},
144143

@@ -316,31 +315,29 @@ purge_or_delete_queue(LinkPair, QueueName, PathSuffix) ->
316315
Err
317316
end.
318317

319-
-spec declare_exchange(link_pair(), exchange_properties()) ->
318+
-spec declare_exchange(link_pair(), binary(), exchange_properties()) ->
320319
ok | {error, term()}.
321-
declare_exchange(LinkPair, ExchangeProperties) ->
322-
{XName, Body0} = maps:fold(
323-
fun(name, V, {undefined, L}) when is_binary(V) ->
324-
{V, L};
325-
(type, V, {N, L}) when is_binary(V) ->
326-
{N, [{{utf8, <<"type">>}, {utf8, V}} | L]};
327-
(durable, V, {N, L}) when is_boolean(V) ->
328-
{N, [{{utf8, <<"durable">>}, {boolean, V}} | L]};
329-
(auto_delete, V, {N, L}) when is_boolean(V) ->
330-
{N, [{{utf8, <<"auto_delete">>}, {boolean, V}} | L]};
331-
(internal, V, {N, L}) when is_boolean(V) ->
332-
{N, [{{utf8, <<"internal">>}, {boolean, V}} | L]};
333-
(arguments, V, {N, L0}) ->
334-
KVList = maps:fold(
335-
fun(K = <<"x-", _/binary>>, TaggedVal = {T, _}, L)
336-
when is_atom(T) ->
337-
[{{utf8, K}, TaggedVal} | L]
338-
end, [], V),
339-
{N, [{{utf8, <<"arguments">>}, {map, KVList}} | L0]}
340-
end, {undefined, []}, ExchangeProperties),
320+
declare_exchange(LinkPair, ExchangeName, ExchangeProperties) ->
321+
Body0 = maps:fold(
322+
fun(type, V, L) when is_binary(V) ->
323+
[{{utf8, <<"type">>}, {utf8, V}} | L];
324+
(durable, V, L) when is_boolean(V) ->
325+
[{{utf8, <<"durable">>}, {boolean, V}} | L];
326+
(auto_delete, V, L) when is_boolean(V) ->
327+
[{{utf8, <<"auto_delete">>}, {boolean, V}} | L];
328+
(internal, V, L) when is_boolean(V) ->
329+
[{{utf8, <<"internal">>}, {boolean, V}} | L];
330+
(arguments, V, L) ->
331+
KVList = maps:fold(
332+
fun(K = <<"x-", _/binary>>, TaggedVal = {T, _}, L0)
333+
when is_atom(T) ->
334+
[{{utf8, K}, TaggedVal} | L0]
335+
end, [], V),
336+
[{{utf8, <<"arguments">>}, {map, KVList}} | L]
337+
end, [], ExchangeProperties),
341338
Body = {map, Body0},
342339

343-
XNameQuoted = uri_string:quote(XName),
340+
XNameQuoted = uri_string:quote(ExchangeName),
344341
Props = #{subject => <<"PUT">>,
345342
to => <<"/exchanges/", XNameQuoted/binary>>},
346343

@@ -377,7 +374,8 @@ delete_exchange(LinkPair, ExchangeName) ->
377374

378375
-spec request(link_pair(), amqp10_msg:amqp10_properties(), amqp10_prim()) ->
379376
{ok, Response :: amqp10_msg:amqp10_msg()} | {error, term()}.
380-
request(#link_pair{outgoing_link = OutgoingLink,
377+
request(#link_pair{session = Session,
378+
outgoing_link = OutgoingLink,
381379
incoming_link = IncomingLink}, Properties, Body) ->
382380
MessageId = message_id(),
383381
Properties1 = Properties#{message_id => {binary, MessageId},
@@ -389,9 +387,11 @@ request(#link_pair{outgoing_link = OutgoingLink,
389387
ok ->
390388
receive {amqp10_msg, IncomingLink, Response} ->
391389
#{correlation_id := MessageId} = amqp10_msg:properties(Response),
392-
{ok, Response}
390+
{ok, Response};
391+
{amqp10_event, {session, Session, {ended, Reason}}} ->
392+
{error, {session_ended, Reason}}
393393
after ?TIMEOUT ->
394-
{error, response_timeout}
394+
{error, timeout}
395395
end;
396396
Err ->
397397
Err

0 commit comments

Comments
 (0)