Skip to content

Commit c54bb6d

Browse files
committed
Bind exchange
1 parent d4ff584 commit c54bb6d

File tree

3 files changed

+94
-25
lines changed

3 files changed

+94
-25
lines changed

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ process_http_request(<<"POST">>,
5959
Vhost,
6060
#user{username = Username},
6161
_) ->
62+
%%TODO
63+
%% If new queue / exchange gets created, return 201 with content.
64+
%% If queue with same fields already exists, return 200 including the queue content.
65+
%% If queue / exchange with other fields exists, return 409 with explanation about which fields diff.
6266
{Type, Id, Self1, Target} =
6367
case decode_entity(ReqPayload) of
6468
#{type := <<"queue">> = Type0,
@@ -113,22 +117,26 @@ process_http_request(<<"POST">>,
113117
{Props, AppProps, RespPayload};
114118

115119
process_http_request(<<"POST">>,
116-
<<"/$management/queues/", QNamePath/binary>>,
120+
<<"/$management/", Path0/binary>>,
117121
{symbol, <<"application/amqp-management+amqp", _OptionalType/binary>>},
118122
[ReqPayload],
119123
Vhost,
120124
#user{username = Username},
121125
_) ->
122-
[QNameBin, <<>>] = re:split(QNamePath, <<"/\\$management/entities$">>, [{return, binary}]),
123-
%% TODO add a top level binding key so that other entity types can be created in future by posting
124-
%% to the queue's $management/entities URI
125-
#{source_exchange := XNameBin,
126+
{DstKind, Kinds, Path} = case Path0 of
127+
<<"queues/", Path1/binary>> ->
128+
{queue, <<"queues">>, Path1};
129+
<<"exchanges/", Path1/binary>> ->
130+
{exchange, <<"exchanges">>, Path1}
131+
end,
132+
[DstNameBin, <<>>] = re:split(Path, <<"/\\$management/entities$">>, [{return, binary}]),
133+
#{source := SrcXNameBin,
126134
binding_key := BindingKey,
127135
arguments := Args} = decode_binding(ReqPayload),
128-
XName = rabbit_misc:r(Vhost, exchange, XNameBin),
129-
QName = rabbit_misc:r(Vhost, queue, QNameBin),
130-
Binding = #binding{source = XName,
131-
destination = QName,
136+
SrcXName = rabbit_misc:r(Vhost, exchange, SrcXNameBin),
137+
DstName = rabbit_misc:r(Vhost, DstKind, DstNameBin),
138+
Binding = #binding{source = SrcXName,
139+
destination = DstName,
132140
key = BindingKey,
133141
args = Args},
134142
%%TODO If the binding already exists, return 303 with location.
@@ -140,9 +148,10 @@ process_http_request(<<"POST">>,
140148
%% TODO URI encode, which is against the HTTP over AMQP spec [3.1].
141149
%% How else to escape "/" in exchange names and binding keys?
142150
ArgsHash = args_hash(Args),
143-
Self = {utf8, <<"/$management/queues/",
144-
QNameBin/binary, "/bindings/",
145-
XNameBin/binary, "/",
151+
Self = {utf8, <<"/$management/",
152+
Kinds/binary, "/",
153+
DstNameBin/binary, "/bindings/",
154+
SrcXNameBin/binary, "/",
146155
BindingKey/binary, "/",
147156
ArgsHash/binary>>},
148157
AppProps = #'v1_0.application_properties'{
@@ -246,8 +255,10 @@ decode_entity({map, KVList}) ->
246255

247256
decode_binding({map, KVList}) ->
248257
lists:foldl(
249-
fun({{utf8, <<"source_exchange">>}, {utf8, V}}, Acc) ->
250-
Acc#{source_exchange => V};
258+
fun({{utf8, <<"type">>}, {utf8, <<"binding">>}}, Acc) ->
259+
Acc;
260+
({{utf8, <<"source">>}, {utf8, V}}, Acc) ->
261+
Acc#{source => V};
251262
({{utf8, <<"binding_key">>}, {utf8, V}}, Acc) ->
252263
Acc#{binding_key => V};
253264
({{utf8, <<"arguments">>}, {map, List}}, Acc) ->

deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010

1111
-export[attach_management_link_pair_sync/2,
1212
declare_queue/2,
13+
declare_exchange/2,
1314
bind_queue/5,
15+
bind_exchange/5,
1416
% unbind_queue/5,
17+
% unbind_exchange/5,
1518
purge_queue/2,
1619
delete_queue/2,
17-
declare_exchange/2,
1820
delete_exchange/2
1921
].
2022

@@ -149,7 +151,7 @@ bind_queue(#link_pair{outgoing_link = OutgoingLink,
149151
[{{utf8, Key}, TaggedVal} | L]
150152
end, [], BindingArguments),
151153
Body0 = {map, [
152-
{{utf8, <<"source_exchange">>}, {utf8, ExchangeName}},
154+
{{utf8, <<"source">>}, {utf8, ExchangeName}},
153155
{{utf8, <<"binding_key">>}, {utf8, BindingKey}},
154156
{{utf8, <<"arguments">>}, {map, KVList}}
155157
]},
@@ -182,6 +184,50 @@ bind_queue(#link_pair{outgoing_link = OutgoingLink,
182184
ok
183185
end.
184186

187+
-spec bind_exchange(link_pair(), binary(), binary(), binary(), #{binary() => {atom(), term()}}) ->
188+
ok | {error, term()}.
189+
bind_exchange(#link_pair{outgoing_link = OutgoingLink,
190+
incoming_link = IncomingLink},
191+
Destination, Source, BindingKey, BindingArguments) ->
192+
KVList = maps:fold(
193+
fun(Key, TaggedVal = {T, _}, L)
194+
when is_binary(Key) andalso is_atom(T) ->
195+
[{{utf8, Key}, TaggedVal} | L]
196+
end, [], BindingArguments),
197+
Body0 = {map, [
198+
{{utf8, <<"source">>}, {utf8, Source}},
199+
{{utf8, <<"binding_key">>}, {utf8, BindingKey}},
200+
{{utf8, <<"arguments">>}, {map, KVList}}
201+
]},
202+
Body = iolist_to_binary(amqp10_framing:encode_bin(Body0)),
203+
204+
MessageId = message_id(),
205+
HttpMethod = <<"POST">>,
206+
HttpRequestTarget = <<"/$management/exchanges/", Destination/binary, "/$management/entities">>,
207+
ContentType = <<"application/amqp-management+amqp;type=entity">>,
208+
Props = #{message_id => {binary, MessageId},
209+
to => HttpRequestTarget,
210+
subject => HttpMethod,
211+
reply_to => <<"$me">>,
212+
content_type => ContentType},
213+
Req0 = amqp10_msg:new(<<>>, Body, true),
214+
Req = amqp10_msg:set_properties(Props, Req0),
215+
216+
ok = amqp10_client:flow_link_credit(IncomingLink, 1, never),
217+
maybe
218+
ok ?= amqp10_client:send_msg(OutgoingLink, Req),
219+
{ok, Resp} ?= receive {amqp10_msg, IncomingLink, Message} -> {ok, Message}
220+
after ?TIMEOUT -> {error, response_timeout}
221+
end,
222+
#{correlation_id := MessageId,
223+
subject := <<"201">>
224+
} = amqp10_msg:properties(Resp),
225+
#{<<"http:response">> := <<"1.1">>,
226+
<<"location">> := _BindingRUI
227+
} = amqp10_msg:application_properties(Resp),
228+
ok
229+
end.
230+
185231
-spec purge_queue(link_pair(), binary()) ->
186232
{ok, map()} | {error, term()}.
187233
purge_queue(#link_pair{outgoing_link = OutgoingLink,

deps/rabbitmq_amqp_client/test/management_SUITE.erl

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ all() ->
2424

2525
groups() ->
2626
[
27-
{tests, [shuffle], [all_operations]}
27+
{tests, [shuffle], [all_management_operations]}
2828
].
2929

3030
init_per_suite(Config) ->
@@ -55,7 +55,7 @@ init_per_testcase(Testcase, Config) ->
5555
end_per_testcase(Testcase, Config) ->
5656
rabbit_ct_helpers:testcase_finished(Config, Testcase).
5757

58-
all_operations(Config) ->
58+
all_management_operations(Config) ->
5959
OpnConf = connection_config(Config),
6060
{ok, Connection} = amqp10_client:open_connection(OpnConf),
6161
receive {amqp10_event, {connection, C, opened}}
@@ -82,10 +82,10 @@ all_operations(Config) ->
8282
ok = amqp10_client:send_msg(Sender1, Msg1),
8383
ok = wait_for_accepted(DTag1),
8484

85-
RoutingKey = BindingKey = <<"key 1">>,
85+
RoutingKey1 = BindingKey1 = <<"key 1">>,
8686
SourceExchange = <<"amq.direct">>,
87-
?assertEqual(ok, rabbitmq_amqp_client:bind_queue(LinkPair, QName, SourceExchange, BindingKey, #{})),
88-
TargetAddr2 = <<"/exchange/", SourceExchange/binary, "/", RoutingKey/binary>>,
87+
?assertEqual(ok, rabbitmq_amqp_client:bind_queue(LinkPair, QName, SourceExchange, BindingKey1, #{})),
88+
TargetAddr2 = <<"/exchange/", SourceExchange/binary, "/", RoutingKey1/binary>>,
8989

9090
{ok, Sender2} = amqp10_client:attach_sender_link(Session, <<"sender 2">>, TargetAddr2),
9191
ok = wait_for_credit(Sender2),
@@ -115,19 +115,31 @@ all_operations(Config) ->
115115
ok = amqp10_client:send_msg(Sender3, Msg3),
116116
ok = wait_for_accepted(DTag3),
117117

118+
RoutingKey2 = BindingKey2 = <<"key 2">>,
119+
?assertEqual(ok, rabbitmq_amqp_client:bind_exchange(LinkPair, XName, SourceExchange, BindingKey2, #{})),
120+
TargetAddr4 = <<"/exchange/", SourceExchange/binary, "/", RoutingKey2/binary>>,
121+
122+
{ok, Sender4} = amqp10_client:attach_sender_link(Session, <<"sender 4">>, TargetAddr4),
123+
ok = wait_for_credit(Sender4),
124+
flush(credited),
125+
DTag4 = <<"tag 4">>,
126+
Msg4 = amqp10_msg:new(DTag4, <<"m4">>, false),
127+
ok = amqp10_client:send_msg(Sender4, Msg4),
128+
ok = wait_for_accepted(DTag4),
129+
118130
?assertEqual(ok, rabbitmq_amqp_client:delete_exchange(LinkPair, XName)),
119131
%% When we publish the next message, we expect:
120132
%% 1. that the message is released because the exchange doesn't exist anymore, and
121-
DTag4 = <<"tag 4">>,
122-
ok = amqp10_client:send_msg(Sender3, amqp10_msg:new(DTag4, <<"m4">>, false)),
123-
ok = wait_for_settlement(DTag4, released),
133+
DTag5 = <<"tag 5">>,
134+
ok = amqp10_client:send_msg(Sender3, amqp10_msg:new(DTag5, <<"m5">>, false)),
135+
ok = wait_for_settlement(DTag5, released),
124136
%% 2. that the server closes the link, i.e. sends us a DETACH frame.
125137
ExpectedError = #'v1_0.error'{condition = ?V_1_0_AMQP_ERROR_RESOURCE_DELETED},
126138
receive {amqp10_event, {link, Sender3, {detached, ExpectedError}}} -> ok
127139
after 5000 -> ct:fail({missing_event, ?LINE})
128140
end,
129141

130-
?assertEqual({ok, #{message_count => 3}},
142+
?assertEqual({ok, #{message_count => 4}},
131143
rabbitmq_amqp_client:purge_queue(LinkPair, QName)),
132144

133145
?assertEqual({ok, #{message_count => 0}},

0 commit comments

Comments
 (0)