Skip to content

Commit 02e568c

Browse files
committed
Declare + delete exchange
1 parent 4642879 commit 02e568c

File tree

4 files changed

+202
-22
lines changed

4 files changed

+202
-22
lines changed

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 70 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -59,28 +59,55 @@ process_http_request(<<"POST">>,
5959
Vhost,
6060
#user{username = Username},
6161
_) ->
62-
#{name := QNameBin,
63-
durable := Durable,
64-
auto_delete := AutoDelete,
65-
owner := Owner,
66-
arguments := QArgs} = decode_queue(ReqPayload),
67-
QType = rabbit_amqqueue:get_queue_type(QArgs),
68-
QName = rabbit_misc:r(Vhost, queue, QNameBin),
69-
Q0 = amqqueue:new(QName, none, Durable, AutoDelete, Owner,
70-
QArgs, Vhost, #{user => Username}, QType),
71-
{new, _Q} = rabbit_queue_type:declare(Q0, node()),
62+
{Type, Id, Self1, Target} =
63+
case decode_entity(ReqPayload) of
64+
#{type := <<"queue">> = Type0,
65+
name := QNameBin,
66+
durable := Durable,
67+
auto_delete := AutoDelete,
68+
owner := Owner,
69+
arguments := QArgs} ->
70+
QType = rabbit_amqqueue:get_queue_type(QArgs),
71+
QName = rabbit_misc:r(Vhost, queue, QNameBin),
72+
Q0 = amqqueue:new(QName, none, Durable, AutoDelete, Owner,
73+
QArgs, Vhost, #{user => Username}, QType),
74+
{new, _Q} = rabbit_queue_type:declare(Q0, node()),
75+
Self0 = <<"/$management/queues/", QNameBin/binary>>,
76+
Target0 = <<"/queue/", QNameBin/binary>>,
77+
{Type0, QNameBin, Self0, Target0};
78+
#{type := <<"exchange">> = Type0,
79+
name := XNameBin,
80+
exchange_type := XType,
81+
durable := Durable,
82+
auto_delete := AutoDelete,
83+
internal := Internal,
84+
arguments := XArgs} ->
85+
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
86+
CheckedXType = rabbit_exchange:check_type(XType),
87+
_X = rabbit_exchange:declare(XName,
88+
CheckedXType,
89+
Durable,
90+
AutoDelete,
91+
Internal,
92+
XArgs,
93+
Username),
94+
Self0 = <<"/$management/exchanges/", XNameBin/binary>>,
95+
Target0 = <<"/exchange/", XNameBin/binary>>,
96+
{Type0, XNameBin, Self0, Target0}
97+
98+
end,
99+
Self = {utf8, Self1},
72100
Props = #'v1_0.properties'{
73101
subject = {utf8, <<"201">>},
74102
content_type = {symbol, <<"application/amqp-management+amqp;type=entity-collection">>}
75103
},
76-
Self = {utf8, <<"/$management/queues/", QNameBin/binary>>},
77104
AppProps = #'v1_0.application_properties'{
78105
%% TODO include vhost in URI?
79106
content = [{{utf8, <<"location">>}, Self}]},
80-
RespPayload = {map, [{{utf8, <<"type">>}, {utf8, <<"queue">>}},
81-
{{utf8, <<"id">>}, {utf8, QNameBin}},
107+
RespPayload = {map, [{{utf8, <<"type">>}, {utf8, Type}},
108+
{{utf8, <<"id">>}, {utf8, Id}},
82109
{{utf8, <<"self">>}, Self},
83-
{{utf8, <<"target">>}, {utf8, <<"/queue/", QNameBin/binary>>}},
110+
{{utf8, <<"target">>}, {utf8, Target}},
84111
{{utf8, <<"management">>}, {utf8, <<"$management">>}}
85112
]},
86113
{Props, AppProps, RespPayload};
@@ -164,12 +191,33 @@ process_http_request(<<"DELETE">>,
164191
},
165192
AppProps = #'v1_0.application_properties'{content = []},
166193
RespPayload = {map, [{{utf8, <<"message_count">>}, {ulong, NumMsgs}}]},
194+
{Props, AppProps, RespPayload};
195+
196+
process_http_request(<<"DELETE">>,
197+
<<"/$management/exchanges/", XNameBin/binary>>,
198+
undefined,
199+
[],
200+
Vhost,
201+
#user{username = Username},
202+
_ConnectionPid) ->
203+
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
204+
ok = case rabbit_exchange:delete(XName, false, Username) of
205+
ok ->
206+
ok;
207+
{error, not_found} ->
208+
ok
209+
%% %% TODO return deletion failure
210+
%% {error, in_use} ->
211+
end,
212+
Props = #'v1_0.properties'{subject = {utf8, <<"204">>}},
213+
AppProps = #'v1_0.application_properties'{content = []},
214+
RespPayload = {map, []},
167215
{Props, AppProps, RespPayload}.
168216

169-
decode_queue({map, KVList}) ->
217+
decode_entity({map, KVList}) ->
170218
lists:foldl(
171-
fun({{utf8, <<"type">>}, {utf8, <<"queue">>}}, Acc) ->
172-
Acc;
219+
fun({{utf8, <<"type">>}, {utf8, V}}, Acc) ->
220+
Acc#{type => V};
173221
({{utf8, <<"name">>}, {utf8, V}}, Acc) ->
174222
Acc#{name => V};
175223
({{utf8, <<"durable">>}, V}, Acc)
@@ -184,6 +232,11 @@ decode_queue({map, KVList}) ->
184232
({{utf8, <<"auto_delete">>}, V}, Acc)
185233
when is_boolean(V) ->
186234
Acc#{auto_delete => V};
235+
({{utf8, <<"exchange_type">>}, {utf8, V}}, Acc) ->
236+
Acc#{exchange_type => V};
237+
({{utf8, <<"internal">>}, V}, Acc)
238+
when is_boolean(V) ->
239+
Acc#{internal => V};
187240
({{utf8, <<"arguments">>}, {map, List}}, Acc) ->
188241
Args = [{Key, longstr, V}
189242
|| {{utf8, Key = <<"x-", _/binary>>},

deps/rabbit/src/rabbit_exchange.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ serial(X) ->
9494
-spec declare
9595
(name(), type(), boolean(), boolean(), boolean(),
9696
rabbit_framing:amqp_table(), rabbit_types:username())
97-
-> rabbit_types:exchange().
97+
-> rabbit_types:exchange() | {error, term()}.
9898

9999
declare(XName, Type, Durable, AutoDelete, Internal, Args, Username) ->
100100
X = rabbit_exchange_decorator:set(

deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,10 @@
1313
bind_queue/5,
1414
% unbind_queue/5,
1515
purge_queue/2,
16-
delete_queue/2].
16+
delete_queue/2,
17+
declare_exchange/2,
18+
delete_exchange/2
19+
].
1720

1821
-define(TIMEOUT, 10_000).
1922
-define(MANAGEMENT_NODE_ADDRESS, <<"$management">>).
@@ -30,6 +33,13 @@
3033
auto_delete => boolean(),
3134
arguments => x_args()}.
3235

36+
-type exchange_properties() :: #{name := binary(),
37+
type => binary(),
38+
durable => boolean(),
39+
auto_delete => boolean(),
40+
internal => boolean(),
41+
arguments => x_args()}.
42+
3343
-export_type([link_pair/0]).
3444

3545
-spec attach_management_link_pair_sync(pid(), binary()) ->
@@ -240,6 +250,91 @@ delete_queue(#link_pair{outgoing_link = OutgoingLink,
240250
{ok, #{message_count => Count}}
241251
end.
242252

253+
-spec declare_exchange(link_pair(), exchange_properties()) ->
254+
{ok, map()} | {error, term()}.
255+
declare_exchange(#link_pair{outgoing_link = OutgoingLink,
256+
incoming_link = IncomingLink},
257+
ExchangeProperties) ->
258+
Body0 = maps:fold(
259+
fun(name, V, Acc) when is_binary(V) ->
260+
[{{utf8, <<"name">>}, {utf8, V}} | Acc];
261+
(type, V, Acc) when is_binary(V) ->
262+
[{{utf8, <<"exchange_type">>}, {utf8, V}} | Acc];
263+
(durable, V, Acc) when is_boolean(V) ->
264+
[{{utf8, <<"durable">>}, {boolean, V}} | Acc];
265+
(auto_delete, V, Acc) when is_boolean(V) ->
266+
[{{utf8, <<"auto_delete">>}, {boolean, V}} | Acc];
267+
(internal, V, Acc) when is_boolean(V) ->
268+
[{{utf8, <<"internal">>}, {boolean, V}} | Acc];
269+
(arguments, V, Acc) ->
270+
KVList = maps:fold(
271+
fun(K = <<"x-", _/binary>>, TaggedVal = {T, _}, L)
272+
when is_atom(T) ->
273+
[{{utf8, K}, TaggedVal} | L]
274+
end, [], V),
275+
[{{utf8, <<"arguments">>}, {map, KVList}} | Acc]
276+
end, [{{utf8, <<"type">>}, {utf8, <<"exchange">>}}], ExchangeProperties),
277+
Body1 = {map, Body0},
278+
Body = iolist_to_binary(amqp10_framing:encode_bin(Body1)),
279+
280+
MessageId = message_id(),
281+
HttpMethod = <<"POST">>,
282+
HttpRequestTarget = <<"/$management/entities">>,
283+
ContentType = <<"application/amqp-management+amqp;type=entity">>,
284+
Props = #{message_id => {binary, MessageId},
285+
to => HttpRequestTarget,
286+
subject => HttpMethod,
287+
reply_to => <<"$me">>,
288+
content_type => ContentType},
289+
Req0 = amqp10_msg:new(<<>>, Body, true),
290+
Req = amqp10_msg:set_properties(Props, Req0),
291+
292+
ok = amqp10_client:flow_link_credit(IncomingLink, 1, never),
293+
maybe
294+
ok ?= amqp10_client:send_msg(OutgoingLink, Req),
295+
{ok, Resp} ?= receive {amqp10_msg, IncomingLink, Message} -> {ok, Message}
296+
after ?TIMEOUT -> {error, response_timeout}
297+
end,
298+
#{correlation_id := MessageId,
299+
subject := <<"201">>,
300+
content_type := <<"application/amqp-management+amqp;type=entity-collection">>
301+
} = amqp10_msg:properties(Resp),
302+
#{<<"http:response">> := <<"1.1">>,
303+
<<"location">> := _ExchangeURI
304+
} = amqp10_msg:application_properties(Resp),
305+
RespBody = amqp10_msg:body_bin(Resp),
306+
[{map, KVList}] = amqp10_framing:decode_bin(RespBody),
307+
{ok, proplists:to_map(KVList)}
308+
end.
309+
310+
-spec delete_exchange(link_pair(), binary()) ->
311+
ok | {error, term()}.
312+
delete_exchange(#link_pair{outgoing_link = OutgoingLink,
313+
incoming_link = IncomingLink},
314+
ExchangeName) ->
315+
MessageId = message_id(),
316+
HttpMethod = <<"DELETE">>,
317+
HttpRequestTarget = <<"/$management/exchanges/", ExchangeName/binary>>,
318+
Props = #{message_id => {binary, MessageId},
319+
to => HttpRequestTarget,
320+
subject => HttpMethod,
321+
reply_to => <<"$me">>},
322+
Req0 = amqp10_msg:new(<<>>, <<>>, true),
323+
Req = amqp10_msg:set_properties(Props, Req0),
324+
325+
ok = amqp10_client:flow_link_credit(IncomingLink, 1, never),
326+
maybe
327+
ok ?= amqp10_client:send_msg(OutgoingLink, Req),
328+
{ok, Resp} ?= receive {amqp10_msg, IncomingLink, Message} -> {ok, Message}
329+
after ?TIMEOUT -> {error, response_timeout}
330+
end,
331+
#{correlation_id := MessageId,
332+
subject := <<"204">>
333+
} = amqp10_msg:properties(Resp),
334+
#{<<"http:response">> := <<"1.1">> } = amqp10_msg:application_properties(Resp),
335+
ok
336+
end.
337+
243338
%% "The message producer is usually responsible for setting the message-id in
244339
%% such a way that it is assured to be globally unique." [3.2.4]
245340
-spec message_id() -> binary().

deps/rabbitmq_amqp_client/test/management_SUITE.erl

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ all() ->
2424

2525
groups() ->
2626
[
27-
{tests, [], [queue]}
27+
{tests, [shuffle], [all_operations]}
2828
].
2929

3030
init_per_suite(Config) ->
@@ -55,7 +55,7 @@ init_per_testcase(Testcase, Config) ->
5555
end_per_testcase(Testcase, Config) ->
5656
rabbit_ct_helpers:testcase_finished(Config, Testcase).
5757

58-
queue(Config) ->
58+
all_operations(Config) ->
5959
OpnConf = connection_config(Config),
6060
{ok, Connection} = amqp10_client:open_connection(OpnConf),
6161
receive {amqp10_event, {connection, C, opened}}
@@ -95,7 +95,39 @@ queue(Config) ->
9595
ok = amqp10_client:send_msg(Sender2, Msg2),
9696
ok = wait_for_accepted(DTag2),
9797

98-
?assertEqual({ok, #{message_count => 2}},
98+
XName = <<"my-fanout-exchange">>,
99+
X = #{name => XName,
100+
type => <<"fanout">>,
101+
durable => true,
102+
auto_delete => false,
103+
internal => false,
104+
arguments => #{}},
105+
{ok, #{{utf8, <<"target">>} := {utf8, TargetAddr3}} } = rabbitmq_amqp_client:declare_exchange(LinkPair, X),
106+
107+
SourceExchange = <<"amq.direct">>,
108+
?assertEqual(ok, rabbitmq_amqp_client:bind_queue(LinkPair, QName, XName, <<"ignored">>, #{})),
109+
110+
{ok, Sender3} = amqp10_client:attach_sender_link(Session, <<"sender 3">>, TargetAddr3),
111+
ok = wait_for_credit(Sender3),
112+
flush(credited),
113+
DTag3 = <<"tag 3">>,
114+
Msg3 = amqp10_msg:new(DTag3, <<"m3">>, false),
115+
ok = amqp10_client:send_msg(Sender3, Msg3),
116+
ok = wait_for_accepted(DTag3),
117+
118+
?assertEqual(ok, rabbitmq_amqp_client:delete_exchange(LinkPair, XName)),
119+
%% When we publish the next message, we expect:
120+
%% 1. that the message is released because the exchange doesn't exist anymore, and
121+
DTag4 = <<"tag 4">>,
122+
ok = amqp10_client:send_msg(Sender3, amqp10_msg:new(DTag4, <<"m4">>, false)),
123+
ok = wait_for_settlement(DTag4, released),
124+
%% 2. that the server closes the link, i.e. sends us a DETACH frame.
125+
ExpectedError = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED},
126+
receive {amqp10_event, {link, Sender3, {detached, ExpectedError}}} -> ok
127+
after 5000 -> ct:fail({missing_event, ?LINE})
128+
end,
129+
130+
?assertEqual({ok, #{message_count => 3}},
99131
rabbitmq_amqp_client:purge_queue(LinkPair, QName)),
100132

101133
?assertEqual({ok, #{message_count => 0}},

0 commit comments

Comments
 (0)