Skip to content

Commit d6475c2

Browse files
committed
unbind exchange in AMQP Erlang client
1 parent 49fdae9 commit d6475c2

File tree

3 files changed

+34
-16
lines changed

3 files changed

+34
-16
lines changed

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -245,10 +245,10 @@ process_http_request(<<"DELETE">>,
245245
AppProps = #'v1_0.application_properties'{content = []},
246246
RespPayload = {map, []},
247247
{Props, AppProps, RespPayload};
248-
[XNameBin, <<"bindings">>, SrcXNameBin, BindingKey, ArgsHash] ->
248+
[DstXNameBin, <<"bindings">>, SrcXNameBin, BindingKey, ArgsHash] ->
249249
SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin),
250-
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
251-
Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, XName, true),
250+
DstXName = rabbit_misc:r(Vhost, exchange, DstXNameBin),
251+
Bindings0 = rabbit_binding:list_for_source_and_destination(SrcXName, DstXName, true),
252252
case lists:search(fun(#binding{key = Key,
253253
args = Args}) ->
254254
Key =:= BindingKey andalso

deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
bind_queue/5,
1515
bind_exchange/5,
1616
unbind_queue/5,
17-
% unbind_exchange/5,
17+
unbind_exchange/5,
1818
purge_queue/2,
1919
delete_queue/2,
2020
delete_exchange/2
@@ -232,13 +232,26 @@ bind_exchange(#link_pair{outgoing_link = OutgoingLink,
232232

233233
-spec unbind_queue(link_pair(), binary(), binary(), binary(), #{binary() => amqp10_prim()}) ->
234234
ok | {error, term()}.
235-
unbind_queue(LinkPair = #link_pair{outgoing_link = OutgoingLink,
236-
incoming_link = IncomingLink},
237-
QueueName, ExchangeName, BindingKey, BindingArguments) ->
235+
unbind_queue(LinkPair, QueueName, ExchangeName, BindingKey, BindingArguments) ->
236+
unbind(<<"queues">>, LinkPair, QueueName, ExchangeName, BindingKey, BindingArguments).
237+
238+
-spec unbind_exchange(link_pair(), binary(), binary(), binary(), #{binary() => amqp10_prim()}) ->
239+
ok | {error, term()}.
240+
unbind_exchange(LinkPair, DestinationExchange, SourceExchange, BindingKey, BindingArguments) ->
241+
unbind(<<"exchanges">>, LinkPair, DestinationExchange, SourceExchange, BindingKey, BindingArguments).
242+
243+
-spec unbind(binary(), link_pair(), binary(), binary(), binary(), #{binary() => amqp10_prim()}) ->
244+
ok | {error, term()}.
245+
unbind(Type,
246+
#link_pair{outgoing_link = OutgoingLink,
247+
incoming_link = IncomingLink} = LinkPair,
248+
Destination, Source, BindingKey, BindingArguments) ->
238249
MessageId = message_id(),
239250
HttpMethod = <<"GET">>,
240-
HttpRequestTarget = <<"/$management/queues/", QueueName/binary,
241-
"/$management/bindings?source=", ExchangeName/binary>>,
251+
HttpRequestTarget = <<"/$management/",
252+
Type/binary, "/",
253+
Destination/binary,
254+
"/$management/bindings?source=", Source/binary>>,
242255
Props = #{message_id => {binary, MessageId},
243256
to => HttpRequestTarget,
244257
subject => HttpMethod,
@@ -261,7 +274,7 @@ unbind_queue(LinkPair = #link_pair{outgoing_link = OutgoingLink,
261274
[{list, Bindings}] = amqp10_framing:decode_bin(RespBody),
262275
case binding_uri(BindingKey, BindingArguments, Bindings) of
263276
{ok, Uri} ->
264-
ok = delete_queue_binding(LinkPair, Uri);
277+
ok = delete_binding(LinkPair, Uri);
265278
not_found ->
266279
ok
267280
end
@@ -287,10 +300,10 @@ binding_uri(BindingKey, BindingArguments, [{map, KVList} | Bindings]) ->
287300
binding_uri(BindingKey, BindingArguments, Bindings)
288301
end.
289302

290-
-spec delete_queue_binding(link_pair(), binary()) ->
303+
-spec delete_binding(link_pair(), binary()) ->
291304
ok | {error, term()}.
292-
delete_queue_binding(#link_pair{outgoing_link = OutgoingLink,
293-
incoming_link = IncomingLink}, BindingUri) ->
305+
delete_binding(#link_pair{outgoing_link = OutgoingLink,
306+
incoming_link = IncomingLink}, BindingUri) ->
294307
MessageId = message_id(),
295308
HttpMethod = <<"DELETE">>,
296309
Props = #{message_id => {binary, MessageId},

deps/rabbitmq_amqp_client/test/management_SUITE.erl

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,17 @@ all_management_operations(Config) ->
132132
ok = amqp10_client:send_msg(Sender4, Msg4),
133133
ok = wait_for_accepted(DTag5),
134134

135+
?assertEqual(ok, rabbitmq_amqp_client:unbind_exchange(LinkPair, XName, SourceExchange, BindingKey2, #{})),
136+
DTag6 = <<"tag 6">>,
137+
ok = amqp10_client:send_msg(Sender4, amqp10_msg:new(DTag6, <<"not routed">>, false)),
138+
ok = wait_for_settlement(DTag6, released),
139+
135140
?assertEqual(ok, rabbitmq_amqp_client:delete_exchange(LinkPair, XName)),
136141
%% When we publish the next message, we expect:
137142
%% 1. that the message is released because the exchange doesn't exist anymore, and
138-
DTag6 = <<"tag 6">>,
139-
ok = amqp10_client:send_msg(Sender3, amqp10_msg:new(DTag6, <<"not routed">>, false)),
140-
ok = wait_for_settlement(DTag6, released),
143+
DTag7 = <<"tag 7">>,
144+
ok = amqp10_client:send_msg(Sender3, amqp10_msg:new(DTag7, <<"not routed">>, false)),
145+
ok = wait_for_settlement(DTag7, released),
141146
%% 2. that the server closes the link, i.e. sends us a DETACH frame.
142147
ExpectedError = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED},
143148
receive {amqp10_event, {link, Sender3, {detached, ExpectedError}}} -> ok

0 commit comments

Comments
 (0)