Skip to content

Commit a51e8eb

Browse files
committed
Unbind queue
1 parent c54bb6d commit a51e8eb

File tree

6 files changed

+193
-24
lines changed

6 files changed

+193
-24
lines changed

deps/amqp10_client/src/amqp10_client_types.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
link_event_detail()}.
6565
-type amqp10_event() :: {amqp10_event, amqp10_event_detail()}.
6666

67-
-type properties() :: #{binary() => tuple()}.
67+
-type properties() :: #{binary() => amqp10_binary_generator:amqp10_prim()}.
6868

6969
-export_type([amqp10_performative/0, channel/0,
7070
source/0, target/0, amqp10_msg_record/0,

deps/amqp10_common/src/amqp10_binary_generator.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
-export_type([
5353
amqp10_ctor/0,
5454
amqp10_type/0,
55+
amqp10_prim/0,
5556
amqp10_described/0
5657
]).
5758

deps/rabbit/src/mc_amqpl.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@
2626
message/4,
2727
message/5,
2828
from_basic_message/1,
29-
to_091/2
29+
to_091/2,
30+
from_091/2
3031
]).
3132

3233
-import(rabbit_misc,

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 88 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -186,21 +186,42 @@ process_http_request(<<"POST">>,
186186
{Props, AppProps, RespPayload};
187187

188188
process_http_request(<<"DELETE">>,
189-
<<"/$management/queues/", QNameBin/binary>>,
189+
<<"/$management/queues/", Path/binary>>,
190190
undefined,
191191
[],
192192
Vhost,
193193
#user{username = Username},
194194
ConnectionPid) ->
195-
QName = rabbit_misc:r(Vhost, queue, QNameBin),
196-
{ok, NumMsgs} = rabbit_amqqueue:delete_with(QName, ConnectionPid, false, false, Username, true),
197-
Props = #'v1_0.properties'{
198-
subject = {utf8, <<"200">>},
199-
content_type = {symbol, <<"application/amqp-management+amqp">>}
200-
},
201-
AppProps = #'v1_0.application_properties'{content = []},
202-
RespPayload = {map, [{{utf8, <<"message_count">>}, {ulong, NumMsgs}}]},
203-
{Props, AppProps, RespPayload};
195+
case re:split(Path, <<"/">>, [{return, binary}]) of
196+
[QNameBin] ->
197+
QName = rabbit_misc:r(Vhost, queue, QNameBin),
198+
{ok, NumMsgs} = rabbit_amqqueue:delete_with(QName, ConnectionPid, false, false, Username, true),
199+
Props = #'v1_0.properties'{
200+
subject = {utf8, <<"200">>},
201+
content_type = {symbol, <<"application/amqp-management+amqp">>}
202+
},
203+
AppProps = #'v1_0.application_properties'{content = []},
204+
RespPayload = {map, [{{utf8, <<"message_count">>}, {ulong, NumMsgs}}]},
205+
{Props, AppProps, RespPayload};
206+
[QNameBin, <<"bindings">>, SrcXNameBin, BindingKey, ArgsHash] ->
207+
SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin),
208+
QName = rabbit_misc:r(Vhost, queue, QNameBin),
209+
Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, QName, true),
210+
case lists:search(fun(#binding{key = Key,
211+
args = Args}) ->
212+
Key =:= BindingKey andalso
213+
args_hash(Args) =:= ArgsHash
214+
end, Bindings0) of
215+
{value, Binding} ->
216+
ok = rabbit_binding:remove(Binding, Username);
217+
false ->
218+
ok
219+
end,
220+
Props = #'v1_0.properties'{subject = {utf8, <<"204">>}},
221+
AppProps = #'v1_0.application_properties'{content = []},
222+
RespPayload = {map, []},
223+
{Props, AppProps, RespPayload}
224+
end;
204225

205226
process_http_request(<<"DELETE">>,
206227
<<"/$management/exchanges/", XNameBin/binary>>,
@@ -221,6 +242,31 @@ process_http_request(<<"DELETE">>,
221242
Props = #'v1_0.properties'{subject = {utf8, <<"204">>}},
222243
AppProps = #'v1_0.application_properties'{content = []},
223244
RespPayload = {map, []},
245+
{Props, AppProps, RespPayload};
246+
247+
process_http_request(<<"GET">>,
248+
<<"/$management/", Path0/binary>>,
249+
undefined,
250+
[],
251+
Vhost,
252+
_User,
253+
_) ->
254+
{DstKind, Path} = case Path0 of
255+
<<"queues/", Path1/binary>> ->
256+
{queue, Path1};
257+
<<"exchanges/", Path1/binary>> ->
258+
{exchange, Path1}
259+
end,
260+
[DstNameBin, SrcXNameBin] = re:split(Path, <<"/\\$management/bindings\\?source=">>, [{return, binary}]),
261+
SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin),
262+
DstName = rabbit_misc:r(Vhost, DstKind, DstNameBin),
263+
Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, DstName, true),
264+
RespPayload = encode_bindings(Bindings0),
265+
Props = #'v1_0.properties'{
266+
subject = {utf8, <<"200">>},
267+
content_type = {symbol, <<"application/amqp-management+amqp">>}
268+
},
269+
AppProps = #'v1_0.application_properties'{content = []},
224270
{Props, AppProps, RespPayload}.
225271

226272
decode_entity({map, KVList}) ->
@@ -267,6 +313,38 @@ decode_binding({map, KVList}) ->
267313
Acc#{arguments => Args}
268314
end, #{}, KVList).
269315

316+
encode_bindings(Bindings) ->
317+
Bs = lists:map(
318+
fun(#binding{source = #resource{name = SrcName},
319+
key = BindingKey,
320+
destination = #resource{kind = DstKind,
321+
name = DstName},
322+
args = Args091}) ->
323+
DstKindBin = case DstKind of
324+
queue -> <<"queue">>;
325+
exchange -> <<"exchange">>
326+
end,
327+
Args = [{{utf8, Key}, mc_amqpl:from_091(Type, Val)}
328+
|| {Key, Type, Val} <- Args091],
329+
ArgsHash = args_hash(Args091),
330+
Self = <<"/$management/",
331+
DstKindBin/binary, "s/",
332+
DstName/binary, "/bindings/",
333+
SrcName/binary, "/",
334+
BindingKey/binary, "/",
335+
ArgsHash/binary>>,
336+
KVList = [
337+
{{utf8, <<"type">>}, {utf8, <<"binding">>}},
338+
{{utf8, <<"source">>}, {utf8, SrcName}},
339+
{{utf8, DstKindBin}, {utf8, DstName}},
340+
{{utf8, <<"binding_key">>}, {utf8, BindingKey}},
341+
{{utf8, <<"arguments">>}, {map, Args}},
342+
{{utf8, <<"self">>}, {utf8, Self}}
343+
],
344+
{map, KVList}
345+
end, Bindings),
346+
{list, Bs}.
347+
270348
decode_req([], {Props, AppProps, DataRev}) ->
271349
#msg{properties = Props,
272350
application_properties = AppProps,

deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl

Lines changed: 87 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
declare_exchange/2,
1414
bind_queue/5,
1515
bind_exchange/5,
16-
% unbind_queue/5,
16+
unbind_queue/5,
1717
% unbind_exchange/5,
1818
purge_queue/2,
1919
delete_queue/2,
@@ -42,6 +42,8 @@
4242
internal => boolean(),
4343
arguments => x_args()}.
4444

45+
-type amqp10_prim() :: amqp10_binary_generator:amqp10_prim().
46+
4547
-export_type([link_pair/0]).
4648

4749
-spec attach_management_link_pair_sync(pid(), binary()) ->
@@ -140,7 +142,7 @@ declare_queue(#link_pair{outgoing_link = OutgoingLink,
140142
{ok, proplists:to_map(KVList)}
141143
end.
142144

143-
-spec bind_queue(link_pair(), binary(), binary(), binary(), #{binary() => {atom(), term()}}) ->
145+
-spec bind_queue(link_pair(), binary(), binary(), binary(), #{binary() => amqp10_prim()}) ->
144146
ok | {error, term()}.
145147
bind_queue(#link_pair{outgoing_link = OutgoingLink,
146148
incoming_link = IncomingLink},
@@ -184,7 +186,7 @@ bind_queue(#link_pair{outgoing_link = OutgoingLink,
184186
ok
185187
end.
186188

187-
-spec bind_exchange(link_pair(), binary(), binary(), binary(), #{binary() => {atom(), term()}}) ->
189+
-spec bind_exchange(link_pair(), binary(), binary(), binary(), #{binary() => amqp10_prim()}) ->
188190
ok | {error, term()}.
189191
bind_exchange(#link_pair{outgoing_link = OutgoingLink,
190192
incoming_link = IncomingLink},
@@ -228,6 +230,88 @@ bind_exchange(#link_pair{outgoing_link = OutgoingLink,
228230
ok
229231
end.
230232

233+
-spec unbind_queue(link_pair(), binary(), binary(), binary(), #{binary() => amqp10_prim()}) ->
234+
ok | {error, term()}.
235+
unbind_queue(LinkPair = #link_pair{outgoing_link = OutgoingLink,
236+
incoming_link = IncomingLink},
237+
QueueName, ExchangeName, BindingKey, BindingArguments) ->
238+
MessageId = message_id(),
239+
HttpMethod = <<"GET">>,
240+
HttpRequestTarget = <<"/$management/queues/", QueueName/binary,
241+
"/$management/bindings?source=", ExchangeName/binary>>,
242+
Props = #{message_id => {binary, MessageId},
243+
to => HttpRequestTarget,
244+
subject => HttpMethod,
245+
reply_to => <<"$me">>},
246+
Req0 = amqp10_msg:new(<<>>, <<>>, true),
247+
Req = amqp10_msg:set_properties(Props, Req0),
248+
249+
ok = amqp10_client:flow_link_credit(IncomingLink, 1, never),
250+
maybe
251+
ok ?= amqp10_client:send_msg(OutgoingLink, Req),
252+
{ok, Resp} ?= receive {amqp10_msg, IncomingLink, Message} -> {ok, Message}
253+
after ?TIMEOUT -> {error, response_timeout}
254+
end,
255+
#{correlation_id := MessageId,
256+
subject := <<"200">>,
257+
content_type := <<"application/amqp-management+amqp">>
258+
} = amqp10_msg:properties(Resp),
259+
#{<<"http:response">> := <<"1.1">> } = amqp10_msg:application_properties(Resp),
260+
RespBody = amqp10_msg:body_bin(Resp),
261+
[{list, Bindings}] = amqp10_framing:decode_bin(RespBody),
262+
case binding_uri(BindingKey, BindingArguments, Bindings) of
263+
{ok, Uri} ->
264+
ok = delete_queue_binding(LinkPair, Uri);
265+
not_found ->
266+
ok
267+
end
268+
end.
269+
270+
binding_uri(_, _, []) ->
271+
not_found;
272+
binding_uri(BindingKey, BindingArguments, [{map, KVList} | Bindings]) ->
273+
case maps:from_list(KVList) of
274+
#{{utf8, <<"binding_key">>} := {utf8, BindingKey},
275+
{utf8, <<"arguments">>} := {map, Args},
276+
{utf8, <<"self">>} := {utf8, Uri}} ->
277+
Args = lists:map(fun({{utf8, Key}, TypeVal}) ->
278+
{Key, TypeVal}
279+
end, Args),
280+
case maps:from_list(Args) =:= BindingArguments of
281+
true ->
282+
{ok, Uri};
283+
false ->
284+
binding_uri(BindingKey, BindingArguments, Bindings)
285+
end;
286+
_ ->
287+
binding_uri(BindingKey, BindingArguments, Bindings)
288+
end.
289+
290+
-spec delete_queue_binding(link_pair(), binary()) ->
291+
ok | {error, term()}.
292+
delete_queue_binding(#link_pair{outgoing_link = OutgoingLink,
293+
incoming_link = IncomingLink}, BindingUri) ->
294+
MessageId = message_id(),
295+
HttpMethod = <<"DELETE">>,
296+
Props = #{message_id => {binary, MessageId},
297+
to => BindingUri,
298+
subject => HttpMethod,
299+
reply_to => <<"$me">>},
300+
Req0 = amqp10_msg:new(<<>>, <<>>, true),
301+
Req = amqp10_msg:set_properties(Props, Req0),
302+
303+
ok = amqp10_client:flow_link_credit(IncomingLink, 1, never),
304+
maybe
305+
ok ?= amqp10_client:send_msg(OutgoingLink, Req),
306+
{ok, Resp} ?= receive {amqp10_msg, IncomingLink, Message} -> {ok, Message}
307+
after ?TIMEOUT -> {error, response_timeout}
308+
end,
309+
#{correlation_id := MessageId,
310+
subject := <<"204">>} = amqp10_msg:properties(Resp),
311+
#{<<"http:response">> := <<"1.1">> } = amqp10_msg:application_properties(Resp),
312+
ok
313+
end.
314+
231315
-spec purge_queue(link_pair(), binary()) ->
232316
{ok, map()} | {error, term()}.
233317
purge_queue(#link_pair{outgoing_link = OutgoingLink,

deps/rabbitmq_amqp_client/test/management_SUITE.erl

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ all_management_operations(Config) ->
9595
ok = amqp10_client:send_msg(Sender2, Msg2),
9696
ok = wait_for_accepted(DTag2),
9797

98+
?assertEqual(ok, rabbitmq_amqp_client:unbind_queue(LinkPair, QName, SourceExchange, BindingKey1, #{})),
99+
DTag3 = <<"tag 3">>,
100+
ok = amqp10_client:send_msg(Sender2, amqp10_msg:new(DTag3, <<"not routed">>, false)),
101+
ok = wait_for_settlement(DTag3, released),
102+
98103
XName = <<"my-fanout-exchange">>,
99104
X = #{name => XName,
100105
type => <<"fanout">>,
@@ -110,10 +115,10 @@ all_management_operations(Config) ->
110115
{ok, Sender3} = amqp10_client:attach_sender_link(Session, <<"sender 3">>, TargetAddr3),
111116
ok = wait_for_credit(Sender3),
112117
flush(credited),
113-
DTag3 = <<"tag 3">>,
114-
Msg3 = amqp10_msg:new(DTag3, <<"m3">>, false),
118+
DTag4 = <<"tag 4">>,
119+
Msg3 = amqp10_msg:new(DTag4, <<"m3">>, false),
115120
ok = amqp10_client:send_msg(Sender3, Msg3),
116-
ok = wait_for_accepted(DTag3),
121+
ok = wait_for_accepted(DTag4),
117122

118123
RoutingKey2 = BindingKey2 = <<"key 2">>,
119124
?assertEqual(ok, rabbitmq_amqp_client:bind_exchange(LinkPair, XName, SourceExchange, BindingKey2, #{})),
@@ -122,17 +127,17 @@ all_management_operations(Config) ->
122127
{ok, Sender4} = amqp10_client:attach_sender_link(Session, <<"sender 4">>, TargetAddr4),
123128
ok = wait_for_credit(Sender4),
124129
flush(credited),
125-
DTag4 = <<"tag 4">>,
126-
Msg4 = amqp10_msg:new(DTag4, <<"m4">>, false),
130+
DTag5 = <<"tag 5">>,
131+
Msg4 = amqp10_msg:new(DTag5, <<"m4">>, false),
127132
ok = amqp10_client:send_msg(Sender4, Msg4),
128-
ok = wait_for_accepted(DTag4),
133+
ok = wait_for_accepted(DTag5),
129134

130135
?assertEqual(ok, rabbitmq_amqp_client:delete_exchange(LinkPair, XName)),
131136
%% When we publish the next message, we expect:
132137
%% 1. that the message is released because the exchange doesn't exist anymore, and
133-
DTag5 = <<"tag 5">>,
134-
ok = amqp10_client:send_msg(Sender3, amqp10_msg:new(DTag5, <<"m5">>, false)),
135-
ok = wait_for_settlement(DTag5, released),
138+
DTag6 = <<"tag 6">>,
139+
ok = amqp10_client:send_msg(Sender3, amqp10_msg:new(DTag6, <<"not routed">>, false)),
140+
ok = wait_for_settlement(DTag6, released),
136141
%% 2. that the server closes the link, i.e. sends us a DETACH frame.
137142
ExpectedError = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED},
138143
receive {amqp10_event, {link, Sender3, {detached, ExpectedError}}} -> ok

0 commit comments

Comments
 (0)